AnyEvent-RabbitMQ-PubSub
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/PubSub.pm view on Meta::CPAN
package AnyEvent::RabbitMQ::PubSub;
use 5.010;
use strict;
use warnings;
use AnyEvent;
use AnyEvent::RabbitMQ;
use Data::Dumper;
use Carp qw(longmess);
our $VERSION = "3.2.1";
sub connect {
my %connection_opts = @_;
my $cv = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
%connection_opts,
on_success => sub { _open_channel_given_condvar($cv, @_) },
on_failure => sub { _report_error($cv, @_) },
on_read_failure => sub { _report_error($cv, @_) },
on_return => sub { _report_error($cv, @_) },
on_close => sub { _report_error($cv, @_) },
);
return $cv->recv()
}
sub open_channel {
my ($ar) = @_;
my $cv = AnyEvent->condvar;
_open_channel_given_condvar($cv, $ar);
(undef, my $channel) = $cv->recv();
return $channel
}
sub _open_channel_given_condvar {
my ($cv, $ar) = @_;
$ar->open_channel(
on_success => sub { my $channel = shift; $cv->send($ar, $channel); },
on_failure => sub { _report_error($cv, @_) },
on_close => sub { _report_error($cv, @_) },
)
}
sub _report_error {
my ($cv, $why) = @_;
if (ref($why) && $why->can('method_frame')) {
my $method_frame = $why->method_frame;
$cv->croak(longmess(
sprintf '%s: %s',
$method_frame->reply_code || 503,
$method_frame->reply_text || 'Something went wrong.',
));
}
else {
$cv->croak(longmess(Dumper($why)));
}
}
1;
__END__
( run in 1.474 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )