AnyEvent-RabbitMQ-PubSub

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/PubSub.pm  view on Meta::CPAN

package AnyEvent::RabbitMQ::PubSub;
use 5.010;
use strict;
use warnings;
use AnyEvent;
use AnyEvent::RabbitMQ;
use Data::Dumper;
use Carp qw(longmess);

our $VERSION = "3.2.1";

sub connect {
    my %connection_opts = @_;

    my $cv = AnyEvent->condvar;

    my $ar = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
        %connection_opts,
        on_success      => sub { _open_channel_given_condvar($cv, @_) },
        on_failure      => sub { _report_error($cv, @_) },
        on_read_failure => sub { _report_error($cv, @_) },
        on_return       => sub { _report_error($cv, @_) },
        on_close        => sub { _report_error($cv, @_) },
    );

    return $cv->recv()
}

sub open_channel {
    my ($ar) = @_;

    my $cv = AnyEvent->condvar;
    _open_channel_given_condvar($cv, $ar);

    (undef, my $channel) = $cv->recv();

    return $channel
}

sub _open_channel_given_condvar {
    my ($cv, $ar) = @_;

    $ar->open_channel(
        on_success => sub { my $channel = shift; $cv->send($ar, $channel); },
        on_failure => sub { _report_error($cv, @_) },
        on_close   => sub { _report_error($cv, @_) },
    )
}

sub _report_error {
    my ($cv, $why) = @_;
    if (ref($why) && $why->can('method_frame')) {
        my $method_frame = $why->method_frame;
        $cv->croak(longmess(
            sprintf '%s: %s',
            $method_frame->reply_code || 503,
            $method_frame->reply_text || 'Something went wrong.',
        ));
    }
    else {
        $cv->croak(longmess(Dumper($why)));
    }
}


1;
__END__

=encoding utf-8

=head1 NAME

AnyEvent::RabbitMQ::PubSub - Publish and consume RabbitMQ messages.

=head1 SYNOPSIS

    # print 'received Hello World' and exit

    use AnyEvent;
    use AnyEvent::RabbitMQ::PubSub;
    use AnyEvent::RabbitMQ::PubSub::Publisher;
    use AnyEvent::RabbitMQ::PubSub::Consumer;

    my ($rmq_connection, $channel) = AnyEvent::RabbitMQ::PubSub::connect(
        host  => 'localhost',
        port  => 5672,
        user  => 'guest',
        pass  => 'guest',
        vhost => '/',
    );

    my $exchange = {
        exchange    => 'my_test_exchange',
        type        => 'topic',
        durable     => 0,
        auto_delete => 1,
    };

    my $queue = {
        queue       => 'my_test_queue';
        auto_delete => 1,
    };

    my $routing_key = 'my_rk';

    my $cv = AnyEvent->condvar;

    my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new(
        channel        => $channel,
        exchange       => $exchange,
        queue          => $queue,
        routing_key    => $routing_key,
    );
    $consumer->init(); #declares channel, queue and binding
    $consumer->consume(
        $cv,
        sub {



( run in 1.586 second using v1.01-cache-2.11-cpan-df04353d9ac )