AnyEvent-RabbitMQ-PubSub

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/PubSub.pm  view on Meta::CPAN

package AnyEvent::RabbitMQ::PubSub;
use 5.010;
use strict;
use warnings;
use AnyEvent;
use AnyEvent::RabbitMQ;
use Data::Dumper;
use Carp qw(longmess);

our $VERSION = "3.2.1";

sub connect {
    my %connection_opts = @_;

    my $cv = AnyEvent->condvar;

    my $ar = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
        %connection_opts,
        on_success      => sub { _open_channel_given_condvar($cv, @_) },
        on_failure      => sub { _report_error($cv, @_) },
        on_read_failure => sub { _report_error($cv, @_) },
        on_return       => sub { _report_error($cv, @_) },
        on_close        => sub { _report_error($cv, @_) },
    );

    return $cv->recv()
}

sub open_channel {
    my ($ar) = @_;

    my $cv = AnyEvent->condvar;
    _open_channel_given_condvar($cv, $ar);

    (undef, my $channel) = $cv->recv();

    return $channel
}

sub _open_channel_given_condvar {
    my ($cv, $ar) = @_;

    $ar->open_channel(
        on_success => sub { my $channel = shift; $cv->send($ar, $channel); },
        on_failure => sub { _report_error($cv, @_) },
        on_close   => sub { _report_error($cv, @_) },
    )
}

sub _report_error {
    my ($cv, $why) = @_;
    if (ref($why) && $why->can('method_frame')) {
        my $method_frame = $why->method_frame;
        $cv->croak(longmess(
            sprintf '%s: %s',
            $method_frame->reply_code || 503,
            $method_frame->reply_text || 'Something went wrong.',
        ));
    }
    else {
        $cv->croak(longmess(Dumper($why)));
    }
}


1;
__END__



( run in 1.474 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )