EV-Nats
view release on metacpan or search on metacpan
t/04_queue_group.t view on Meta::CPAN
use strict;
use warnings;
use Test::More;
use IO::Socket::INET;
use EV;
use EV::Nats;
my $host = $ENV{TEST_NATS_HOST} || '127.0.0.1';
my $port = $ENV{TEST_NATS_PORT} || 4222;
my $sock = IO::Socket::INET->new(
PeerAddr => $host,
PeerPort => $port,
Timeout => 1,
);
unless ($sock) {
plan skip_all => "NATS server not available at $host:$port";
}
close $sock;
plan tests => 3;
# Two subscribers in same queue group should split messages
my $nats;
$nats = EV::Nats->new(
host => $host,
port => $port,
on_error => sub { diag "error: @_"; EV::break },
connect_timeout => 5000,
);
my $guard = EV::timer 10, 0, sub { fail 'timeout'; EV::break };
my ($count_a, $count_b) = (0, 0);
my $total_expected = 20;
my $ready; $ready = EV::timer 0.1, 0.1, sub {
return unless $nats->is_connected;
undef $ready;
$nats->subscribe('qtest.>', sub { $count_a++ }, 'grp');
$nats->subscribe('qtest.>', sub { $count_b++ }, 'grp');
my $pub; $pub = EV::timer 0.1, 0, sub {
undef $pub;
for (1 .. $total_expected) {
$nats->publish('qtest.x', 'data');
}
my $check; $check = EV::timer 0.5, 0, sub {
undef $check;
my $total = $count_a + $count_b;
is $total, $total_expected, "total messages received = $total_expected";
ok $count_a > 0, "worker A got messages ($count_a)";
ok $count_b > 0, "worker B got messages ($count_b)";
$nats->disconnect;
EV::break;
};
};
};
EV::run;
( run in 1.529 second using v1.01-cache-2.11-cpan-71847e10f99 )