AnyEvent-RabbitMQ-Fork

 view release on metacpan or  search on metacpan

t/release-05_multi_channel.t  view on Meta::CPAN

#!/usr/bin/env perl

BEGIN {
  unless ($ENV{RELEASE_TESTING}) {
    print qq{1..0 # SKIP these tests are for release candidate testing\n};
    exit
  }
}


use strict;
use warnings;

use Test::More;
use Test::Exception;

my %conf = (
    host  => 'localhost',
    port  => 5672,
    user  => 'guest',
    pass  => 'guest',
    vhost => '/',
);

eval {
    use IO::Socket::INET;

    my $socket = IO::Socket::INET->new(
        Proto    => 'tcp',
        PeerAddr => $conf{host},
        PeerPort => $conf{port},
        Timeout  => 1,
    ) or die 'Error connecting to AMQP Server!';

    close $socket;
};

plan skip_all => 'Connection failure: '
               . $conf{host} . ':' . $conf{port} if $@;
plan tests => 3;

use AnyEvent::RabbitMQ::Fork;

my $ar = connect_ar();

my @queues = map {
    my $ch = open_channel($ar);
    my $queue = 'test_q' . $_;
    declare_queue($ch, $queue,);

    my $done = AnyEvent->condvar;
    my $cdone = AnyEvent->condvar;
    consume($ch, $queue, sub {
        my $response = shift;
        return if 'stop' ne $response->{body}->payload;
        $done->send();
    }, sub {
        $cdone->send();
    });
    {name => $queue, cv => $done, ccv => $cdone};
} (1..5);

pass('queue setup');

my $ch = open_channel($ar);
for my $queue (@queues) {
    publish($ch, $queue->{name}, 'hello');
    publish($ch, $queue->{name}, 'stop');
}

my $count = 0;
for my $queue (@queues) {
    $queue->{cv}->recv;
    $count++;
}

is($count, 5, 'consume count');

for my $queue (@queues) {
    delete_queue($ch, $queue->{name});
}

my $ccount = 0;
for my $queue (@queues) {
    $queue->{ccv}->recv;
    $ccount++;
}

is($ccount, 5, 'cancel count');

close_ar($ar);

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 1.038 second using v1.00-cache-2.02-grep-82fe00e-cpan-2c419f77a38b )