AnyMQ-AMQP
view release on metacpan or search on metacpan
lib/AnyMQ/Trait/AMQP.pm view on Meta::CPAN
my $msg = $_[0]->recv;
if ( $msg eq 'init' ) {
$self->_connected(1);
$self->on_ready->() if $self->on_ready;
}
else {
my $cv = AE::cv;
$cv->cb($cb);
$self->cv($cv);
carp "Connection failed, retrying in 5 seconds. Reason: ".$msg;
my $w; $w = AnyEvent->timer(after => 5,
cb => sub {
undef $w;
$self->connect($cv);
});
}
};
$cv->cb($cb);
if (!$self->on_ready) {
while ((my $msg = $self->cv->recv) ne 'init') {};
xt/author/basic.t view on Meta::CPAN
$q2->send([0, 'extra message received.']);
}
});
# messages coming to the exchange should be bound to our queue
$ext_channel->publish( routing_key => 'test_q',
exchange => 'foo',
body => JSON::to_json({ time => time() }),
) for 1..10;
my $w; $w = AnyEvent->timer( after => 5,
cb => sub {
$q->send([0, 'test timeout']);
} );
my ($ok, $msg) = @{$q->recv};
ok($ok, $msg);
$bus2->cv(AE::cv);
my $test2 = $bus2->topic({name => 'test_q', publisher_only => 0});
$bus2->cv->recv;
my $client2 = $bus2->new_listener; $client2->subscribe($test2);
$client2->poll( sub {
my $timestamp = shift->{time};
diag "client2 latency: ".(time() - $timestamp);
$c2->end;
});
$test->publish({ time => time() });
$w = AnyEvent->timer( after => 5,
cb => sub {
$q2->send([0, 'test timeout']);
} );
($ok, $msg) = @{$q2->recv};
ok($ok, $msg);
undef $bus; undef $bus2;
done_testing;
( run in 1.153 second using v1.01-cache-2.11-cpan-49f99fa48dc )