AnyEvent-RabbitMQ
view release on metacpan or search on metacpan
xt/04_anyevent.t view on Meta::CPAN
},
on_failure => failure_cb($done),
on_return => sub {
my $method_frame = shift->method_frame;
die "return: ", $method_frame->reply_code, $method_frame->reply_text
if $method_frame->reply_code;
},
on_close => sub {
my $method_frame = shift->method_frame;
Carp::confess "close: ", $method_frame->reply_code, $method_frame->reply_text
if $method_frame->reply_code;
},
);
$done->recv;
my $ch;
$done = AnyEvent->condvar;
open_ch($done);
$done->recv;
sub open_ch {
my ($cv,) = @_;
$ar->open_channel(
on_success => sub {
$ch = shift;
isa_ok($ch, 'AnyEvent::RabbitMQ::Channel');
$cv->send;
},
on_failure => failure_cb($cv),
on_close => sub {
my $method_frame = shift->method_frame;
die $method_frame->reply_code, $method_frame->reply_text
if $method_frame->reply_code;
},
);
}
$done = AnyEvent->condvar;
$ch->declare_exchange(
exchange => 'test_x',
on_success => sub {
pass('declare exchange');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
$ch->declare_exchange(
exchange => 'test_x_dest',
on_success => sub {
pass('declare destination exchange');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
$ch->bind_exchange(
source => 'test_x',
destination => 'test_x_dest',
on_success => sub {
pass('bind exchange -> dest');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
$ch->declare_queue(
queue => 'test_q',
on_success => sub {
pass('declare queue');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
$ch->bind_queue(
queue => 'test_q',
exchange => 'test_x',
routing_key => 'test_r',
on_success => sub {
pass('bound queue');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
my $consumer_tag;
$ch->consume(
queue => 'test_q',
on_success => sub {
my $frame = shift;
$consumer_tag = $frame->method_frame->consumer_tag;
pass('consume');
},
on_consume => sub {
my $response = shift;
ok($response->{body}->payload, 'publish');
$done->send;
},
on_failure => failure_cb($done),
);
publish($ch, 'Hello RabbitMQ.', $done,);
$done->recv;
$done = AnyEvent->condvar;
$ch->cancel(
consumer_tag => $consumer_tag,
on_success => sub {
pass('cancel');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
publish($ch, 'I love RabbitMQ.', $done,);
$ch->get(
queue => 'test_q',
on_success => sub {
my $response = shift;
ok(defined $response->{ok}, 'getok');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
$ch->get(
queue => 'test_q',
on_success => sub {
my $response = shift;
ok(defined $response->{empty}, 'empty');
xt/04_anyevent.t view on Meta::CPAN
$pub_done->recv;
$done->recv;
pass('reject');
# reopen because confirm is not compatible with transactions
$done = AnyEvent->condvar;
$ch->close(
on_success => sub {
pass('close2');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
undef $ch;
$done = AnyEvent->condvar;
open_ch($done);
$done->recv;
pass('open2');
};
$done = AnyEvent->condvar;
$ch->select_tx(
on_success => sub {
pass('select tx');
publish($ch, 'RabbitMQ is highly reliable systems.', $done,);
$ch->rollback_tx(
on_success => sub {
pass('rollback tx');
publish($ch, 'RabbitMQ is highly reliable systems.', $done,);
$ch->commit_tx(
on_success => sub {
pass('commit tx');
$done->send;
},
on_failure => failure_cb($done),
);
},
on_failure => failure_cb($done),
);
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
$ch->purge_queue(
queue => 'test_q',
on_success => sub {
pass('purge queue');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
$ch->unbind_queue(
queue => 'test_q',
exchange => 'test_x',
routing_key => 'test_r',
on_success => sub {
pass('unbind queue');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
$ch->delete_queue(
queue => 'test_q',
on_success => sub {
pass('delete queue');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
$ch->unbind_exchange(
source => 'test_x',
destination => 'test_x_dest',
on_success => sub {
pass('unbind exchange');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
$ch->delete_exchange(
exchange => 'test_x',
on_success => sub {
pass('delete exchange');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
$ch->delete_exchange(
exchange => 'test_x_dest',
on_success => sub {
pass('delete destination exchange');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
$ar->close(
on_success => sub {
pass('close2');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
sub failure_cb {
my ($cv,) = @_;
return sub {
fail(join(' ', 'on_failure:', @_));
$cv->send;
};
}
sub publish {
my ($ch, $message, $cv,) = @_;
$ch->publish(
exchange => 'test_x',
routing_key => 'test_r',
body => $message,
on_ack => sub { $cv->send },
on_return => sub {
my $response = shift;
fail('on_return: ' . Dumper($response));
$cv->send;
},
);
( run in 0.751 second using v1.01-cache-2.11-cpan-2398b32b56e )