AnyEvent-ZeroMQ

 view release on metacpan or  search on metacpan

t/publish-subscribe.t  view on Meta::CPAN

);

my $all = AnyEvent::ZeroMQ::Subscribe->new(
    context => $c,
    connect => $ENDPOINT,
);

my $foo = AnyEvent::ZeroMQ::Subscribe->new(
    context => $c,
    connect => $ENDPOINT,
    topics  => [qw/foo:/],
);

my $foobar = AnyEvent::ZeroMQ::Subscribe->new(
    context => $c,
    connect => $ENDPOINT,
);

$foobar->topics([qw/foo: bar:/]);

$pub->publish('foo: bar');
$pub->publish('bar: baz');
$pub->publish('baz: qux');

my $cv = AnyEvent->condvar;
$cv->begin for 1..6;

my $results = {};

$all   ->on_read(sub { $cv->end; push @{$results->{all}   }, $_[1] });
$foo   ->on_read(sub { $cv->end; push @{$results->{foo}   }, $_[1] });
$foobar->on_read(sub { $cv->end; push @{$results->{foobar}}, $_[1] });

$cv->recv;

is_deeply $results, { all    => [ 'foo: bar', 'bar: baz', 'baz: qux' ],
                      foo    => [ 'foo: bar' ],
                      foobar => [ 'foo: bar', 'bar: baz' ],
                    },
    'got results';

$cv = AnyEvent->condvar;
$cv->begin for 1..3;
$results = {};

# change subscriptions
$all->topics([]);
$foo->topics([]);
$foobar->topics([qw/bar: qux: gorch:/]);

# use this opportunity to ensure that push_read works as on_read does
$foobar->push_read(sub { $cv->end; push @{$results->{foobar}}, lc $_[1] });

# send data
$pub->publish('foo: no');
$pub->publish('bar: YES');
$pub->publish('qux: yes');
$pub->publish('gorch: yes');
$cv->recv;

# subscriptions updated and push_read works?
is_deeply $results, { foobar => [ map { "$_: yes" } qw/bar qux gorch/] },
    'subscriptions mirror $topics attribute';

done_testing;



( run in 0.954 second using v1.01-cache-2.11-cpan-524268b4103 )