AnyMQ-ZeroMQ
view release on metacpan or search on metacpan
t/10-pubsub.t view on Meta::CPAN
unless $ENV{ZMQ_PUBSUB_TESTS};
run_tests();
}
sub run_tests {
test_separate_clients();
test_single_client();
}
sub test_separate_clients {
my $pub_bus = AnyMQ->new_with_traits(
traits => [ 'ZeroMQ' ],
publish_address => 'tcp://localhost:4000',
);
my $sub_bus = AnyMQ->new_with_traits(
traits => [ 'ZeroMQ' ],
subscribe_address => 'tcp://localhost:4001',
);
test_mq($pub_bus, $sub_bus);
}
sub test_single_client {
my $bus = AnyMQ->new_with_traits(
traits => [ 'ZeroMQ' ],
publish_address => 'tcp://localhost:4000',
subscribe_address => 'tcp://localhost:4001',
);
test_mq($bus, $bus);
}
sub test_mq {
my ($pub_bus, $sub_bus) = @_;
my $pub_topic = $pub_bus->topic('ping');
my $sub_topic_1 = $sub_bus->topic('ping');
# make sure we can have subscribers to different topics without breakage
my $sub_topic_2 = $sub_bus->topic('blargle');
my $ping_count = 0;
my $cv = AE::cv;
$cv->begin;
$cv->begin;
# subscribe
my $listener = $sub_bus->new_listener($sub_topic_1);
$listener->poll(sub { $ping_count++; $cv->end });
# publish
$pub_topic->publish({ type => 'pong' });
$pub_topic->publish({ type => 'ping', params => { bleep => 'bloop' } });
$pub_topic->publish({ type => 'ping', params => { bleep => 'bloop' } });
$cv->recv;
$cv = AE::cv;
my $t = AnyEvent->timer(
after => 1.5,
cb => sub {
$cv->send;
},
);
$cv->recv;
is($ping_count, 2, "Got pings");
}
( run in 2.155 seconds using v1.01-cache-2.11-cpan-5837b0d9d2c )