AnyEvent-ZeroMQ
view release on metacpan or search on metacpan
t/publish-subscribe.t view on Meta::CPAN
use strict;
use warnings;
use Test::More;
use ok 'AnyEvent::ZeroMQ::Role::WithHandle';
use ok 'AnyEvent::ZeroMQ::Publish';
use ok 'AnyEvent::ZeroMQ::Subscribe';
my $ENDPOINT = 'inproc://#1';
my $c = ZeroMQ::Raw::Context->new( threads => 0 );
my $pub = AnyEvent::ZeroMQ::Publish->new(
context => $c,
bind => $ENDPOINT,
);
my $all = AnyEvent::ZeroMQ::Subscribe->new(
context => $c,
connect => $ENDPOINT,
);
my $foo = AnyEvent::ZeroMQ::Subscribe->new(
context => $c,
connect => $ENDPOINT,
topics => [qw/foo:/],
);
my $foobar = AnyEvent::ZeroMQ::Subscribe->new(
context => $c,
connect => $ENDPOINT,
);
$foobar->topics([qw/foo: bar:/]);
$pub->publish('foo: bar');
$pub->publish('bar: baz');
$pub->publish('baz: qux');
my $cv = AnyEvent->condvar;
$cv->begin for 1..6;
my $results = {};
$all ->on_read(sub { $cv->end; push @{$results->{all} }, $_[1] });
$foo ->on_read(sub { $cv->end; push @{$results->{foo} }, $_[1] });
$foobar->on_read(sub { $cv->end; push @{$results->{foobar}}, $_[1] });
$cv->recv;
is_deeply $results, { all => [ 'foo: bar', 'bar: baz', 'baz: qux' ],
foo => [ 'foo: bar' ],
foobar => [ 'foo: bar', 'bar: baz' ],
},
'got results';
$cv = AnyEvent->condvar;
$cv->begin for 1..3;
$results = {};
# change subscriptions
$all->topics([]);
$foo->topics([]);
$foobar->topics([qw/bar: qux: gorch:/]);
# use this opportunity to ensure that push_read works as on_read does
$foobar->push_read(sub { $cv->end; push @{$results->{foobar}}, lc $_[1] });
# send data
$pub->publish('foo: no');
$pub->publish('bar: YES');
( run in 2.216 seconds using v1.01-cache-2.11-cpan-75ffa21a3d4 )