AnyEvent-RabbitMQ-Fork
view release on metacpan - search on metacpan
view release on metacpan or search on metacpan
t/release-05_multi_channel.t view on Meta::CPAN
#!/usr/bin/env perl
BEGIN {
unless ($ENV{RELEASE_TESTING}) {
print qq{1..0 # SKIP these tests are for release candidate testing\n};
exit
}
}
use strict;
use warnings;
use Test::More;
use Test::Exception;
my %conf = (
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
);
eval {
use IO::Socket::INET;
my $socket = IO::Socket::INET->new(
Proto => 'tcp',
PeerAddr => $conf{host},
PeerPort => $conf{port},
Timeout => 1,
) or die 'Error connecting to AMQP Server!';
close $socket;
};
plan skip_all => 'Connection failure: '
. $conf{host} . ':' . $conf{port} if $@;
plan tests => 3;
use AnyEvent::RabbitMQ::Fork;
my $ar = connect_ar();
my @queues = map {
my $ch = open_channel($ar);
my $queue = 'test_q' . $_;
declare_queue($ch, $queue,);
my $done = AnyEvent->condvar;
my $cdone = AnyEvent->condvar;
consume($ch, $queue, sub {
my $response = shift;
return if 'stop' ne $response->{body}->payload;
$done->send();
}, sub {
$cdone->send();
});
{name => $queue, cv => $done, ccv => $cdone};
} (1..5);
pass('queue setup');
my $ch = open_channel($ar);
for my $queue (@queues) {
publish($ch, $queue->{name}, 'hello');
publish($ch, $queue->{name}, 'stop');
}
my $count = 0;
for my $queue (@queues) {
$queue->{cv}->recv;
$count++;
}
is($count, 5, 'consume count');
for my $queue (@queues) {
delete_queue($ch, $queue->{name});
}
my $ccount = 0;
for my $queue (@queues) {
$queue->{ccv}->recv;
$ccount++;
}
is($ccount, 5, 'cancel count');
close_ar($ar);
view all matches for this distributionview release on metacpan - search on metacpan
( run in 1.038 second using v1.00-cache-2.02-grep-82fe00e-cpan-2c419f77a38b )