AnyEvent-RabbitMQ-Fork

 view release on metacpan or  search on metacpan

t/release-04_anyevent.t  view on Meta::CPAN


use Test::More;
use Test::Exception;
use Data::Dumper;

use FindBin;

my %server = (
    product => undef,
    version => undef,
);

my %conf = (
    host  => 'localhost',
    port  => 5672,
    user  => 'guest',
    pass  => 'guest',
    vhost => q{/},
    #verbose => 1,
);

eval {
    use IO::Socket::INET;

    my $socket = IO::Socket::INET->new(
        Proto    => 'tcp',
        PeerAddr => $conf{host},
        PeerPort => $conf{port},
        Timeout  => 1,
      )
      or die 'Error connecting to AMQP Server!';

    close $socket;
};

plan skip_all => 'Connection failure: ' . $conf{host} . q{:} . $conf{port} if $@;

use AnyEvent::RabbitMQ::Fork;

my $ar = AnyEvent::RabbitMQ::Fork->new(verbose => $conf{verbose});

lives_ok sub {
    $ar->load_xml_spec();
}, 'load xml spec';

my @nagle = ([], [nodelay => 0], [nodelay => 1]);

for my $opt (@nagle) {
    my $done = AnyEvent->condvar;
    my $z = AnyEvent::RabbitMQ::Fork->new(verbose => $conf{verbose})->load_xml_spec;
    $z->connect(
        (map { $_ => $conf{$_} } qw(host port user pass vhost)),
        timeout    => 1,
        on_success => sub {
            my $ar = shift;
            isa_ok($ar, 'AnyEvent::RabbitMQ::Fork');
            $done->send;
        },
        on_failure => failure_cb($done),
        on_return  => sub {
            my $method_frame = shift->method_frame;
            die "return: ", $method_frame->reply_code, $method_frame->reply_text
              if $method_frame->reply_code;
        },
        on_close => sub {
            my $method_frame = shift->method_frame;
            Carp::confess "close: ", $method_frame->reply_code, $method_frame->reply_text
              if $method_frame->reply_code;
        },
        @{$opt},
    );
    $done->recv;
}

my $done = AnyEvent->condvar;
$ar->connect(
    (map { $_ => $conf{$_} } qw(host port user pass vhost)),
    tune       => { frame_max => 2**17 },
    timeout    => 1,
    on_success => sub {
        my $ar = shift;
        isa_ok($ar, 'AnyEvent::RabbitMQ::Fork');
        $server{product} = $ar->server_properties->{product};
        $server{version} = version->parse($ar->server_properties->{version});
        $done->send;
    },
    on_failure => failure_cb($done),
    on_return  => sub {
        my $method_frame = shift->method_frame;
        die "return: ", $method_frame->reply_code, $method_frame->reply_text
          if $method_frame->reply_code;
    },
    on_close => sub {
        my $method_frame = shift->method_frame;
        Carp::confess "close: ", $method_frame->reply_code, $method_frame->reply_text
          if $method_frame->reply_code;
    },
);
$done->recv;

our $ch;
$done = AnyEvent->condvar;
open_ch($done);
$done->recv;

sub open_ch {
    my ($cv,) = @_;
    $ar->open_channel(
        on_success => sub {
            $ch = shift;
            isa_ok($ch, 'AnyEvent::RabbitMQ::Fork::Channel');
            $cv->send;
        },
        on_failure => failure_cb($cv),
        on_close   => sub {
            my $method_frame = shift->method_frame;
            die $method_frame->reply_code, $method_frame->reply_text
              if $method_frame->reply_code;
        },
    );
}

$done = AnyEvent->condvar;
$ch->declare_exchange(
    exchange   => 'test_x',
    on_success => sub {
        pass('declare exchange');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->declare_exchange(
    exchange   => 'test_x_dest',
    on_success => sub {
        pass('declare destination exchange');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->bind_exchange(
    source      => 'test_x',
    destination => 'test_x_dest',
    on_success  => sub {
        pass('bind exchange -> dest');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->declare_queue(
    queue      => 'test_q',
    on_success => sub {
        pass('declare queue');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->bind_queue(
    queue       => 'test_q',
    exchange    => 'test_x',
    routing_key => 'test_r',
    on_success  => sub {
        pass('bound queue');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->confirm(
    on_success => sub { $done->send },
    on_failure => failure_cb($done),
);
$done->recv;
pass('confirm');

$done = AnyEvent->condvar;
my $consumer_tag;
$ch->consume(
    queue      => 'test_q',
    on_success => sub {
        my $frame = shift;
        $consumer_tag = $frame->method_frame->consumer_tag;
        pass('consume');
    },
    on_consume => sub {
        my $response = shift;
        ok($response->{body}->payload, 'publish');
        $done->send;
    },
    on_failure => failure_cb($done),
);
publish($ch, 'Hello RabbitMQ.', $done,);
$done->recv;

$done = AnyEvent->condvar;
$ch->cancel(
    consumer_tag => $consumer_tag,
    on_success   => sub {
        pass('cancel');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
publish($ch, 'I love RabbitMQ.', $done,);
$done->recv;
$done = AnyEvent->condvar;
$ch->get(
    queue      => 'test_q',
    on_success => sub {
        my $response = shift;
        ok(defined $response->{ok}, 'getok');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->get(
    queue      => 'test_q',
    on_success => sub {
        my $response = shift;
        ok(defined $response->{empty}, 'empty');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

for my $size (10, 131_064, 10, 140_000) {
    send_large_size_message($ch, $size);
}

$done = AnyEvent->condvar;
publish($ch, 'NO RabbitMQ, NO LIFE.', $done,);
$done->recv;
$done = AnyEvent->condvar;
$ch->consume(
    queue      => 'test_q',
    no_ack     => 0,
    on_consume => sub {
        my $response = shift;
        $ch->ack(delivery_tag => $response->{deliver}->method_frame->delivery_tag);
        pass('ack deliver');

        $ch->cancel(
            consumer_tag => $response->{deliver}->method_frame->consumer_tag,
            on_success   => sub {
                pass('cancel');
                $done->send;
            },
            on_failure => failure_cb($done),
        );
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
publish($ch, 'RabbitMQ is cool.', $done,);
$done->recv;
$done = AnyEvent->condvar;
$ch->get(
    queue      => 'test_q',
    no_ack     => 0,
    on_success => sub {
        my $response = shift;
        $ch->ack(delivery_tag => $response->{ok}->method_frame->delivery_tag);
        pass('ack get');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
my @responses;
$ch->qos(
    prefetch_count => 2,
    on_success     => sub {
        $ch->consume(
            queue      => 'test_q',
            no_ack     => 0,
            on_consume => sub {
                my $response = shift;
                push @responses, $response;
                return if 2 > @responses;
                $done->send;
            },
            on_failure => failure_cb($done),
        );
    },
    on_failure => failure_cb($done),
);
my $pub_done = AnyEvent->condvar;
$pub_done->begin;
$pub_done->begin;
$pub_done->begin;
publish($ch, 'RabbitMQ is excellent.', AnyEvent->condvar(sub { $pub_done->end }),);
publish($ch, 'RabbitMQ is fantastic.', AnyEvent->condvar(sub { $pub_done->end }),);
$pub_done->end;
$done->recv;
pass('qos');

for my $response (@responses) {
    $ch->ack(delivery_tag => $response->{deliver}->method_frame->delivery_tag,);
}

$done = AnyEvent->condvar;
$ch->cancel(
    consumer_tag => $responses[0]->{deliver}->method_frame->consumer_tag,
    on_success   => sub {
        $ch->qos(
            on_success => sub {
                $done->send;
            },
            on_failure => failure_cb($done),
        );
    },
    on_failure => failure_cb($done),
);
$done->recv;
pass('qos reset');

$done = AnyEvent->condvar;
publish($ch, 'RabbitMQ is powerful.', $done,);
$done->recv;
$done = AnyEvent->condvar;
my $recover_count = 0;
$ch->consume(
    queue      => 'test_q',
    no_ack     => 0,
    on_consume => sub {
        my $response = shift;

        if (5 > ++$recover_count) {
            $ch->recover();
            return;
        }

        $ch->ack(delivery_tag => $response->{deliver}->method_frame->delivery_tag);

        $ch->cancel(
            consumer_tag => $response->{deliver}->method_frame->consumer_tag,
            on_success   => sub {
                $done->send;
            },
            on_failure => failure_cb($done),
        );
    },
    on_failure => failure_cb($done),
);
$done->recv;
pass('recover');

# This only works for RabbitMQ >= 2.0.0
my $can_reject = $server{product} eq 'RabbitMQ' && $server{version} >= version->parse('2.0.0');
SKIP: {
    skip 'We need RabbitMQ >= 2.0.0 for the confirm and reject test', 1 unless $can_reject;

    $done = AnyEvent->condvar;
    $ch->confirm(
        on_success => sub { $done->send },
        on_failure => failure_cb($done),
    );
    $done->recv;
    pass('confirm');

    $done = AnyEvent->condvar;
    my $reject_count = 0;
    $ch->consume(
        queue      => 'test_q',
        no_ack     => 0,
        on_consume => sub {
            my $response = shift;

            if (5 > ++$reject_count) {
                $ch->reject(
                    delivery_tag => $response->{deliver}->method_frame->delivery_tag,

                    # requeue! Else the server does not deliver the message again to this client.
                    requeue => 1,
                );
                return;
            }

            $ch->ack(delivery_tag => $response->{deliver}->method_frame->delivery_tag);

            $ch->cancel(
                consumer_tag => $response->{deliver}->method_frame->consumer_tag,
                on_success   => sub {
                    $done->send;
                },
                on_failure => failure_cb($done),
            );
        },
        on_failure => failure_cb($done),
    );
    my $pub_done = AnyEvent->condvar;
    publish($ch, 'RabbitMQ is powerful.', $pub_done,);
    $pub_done->recv;
    $done->recv;
    pass('reject');

    # reopen because confirm is not compatible with transactions
    $done = AnyEvent->condvar;
    $ch->close(
        on_success => sub {
            pass('close2');
            $done->send;
        },
        on_failure => failure_cb($done),
    );
    $done->recv;
    undef $ch;

    $done = AnyEvent->condvar;
    open_ch($done);
    $done->recv;
    pass('open2');

    $done = AnyEvent->condvar;
    $ch->confirm(
        on_success => sub { $done->send },
        on_failure => failure_cb($done),
    );
    $done->recv;
    pass('confirm');
}

{
    local $ch;
    $done = AnyEvent->condvar;
    open_ch($done);
    $done->recv;
    pass('open3');

    $done = AnyEvent->condvar;
    $ch->select_tx(
        on_success => sub {
            pass('select tx');

            $ch->publish(
                exchange    => 'test_x',
                routing_key => 'test_r',
                body        => 'RabbitMQ is highly reliable systems.',
            );

            $ch->rollback_tx(
                on_success => sub {



( run in 1.306 second using v1.01-cache-2.11-cpan-df04353d9ac )