AnyMQ-AMQP

 view release on metacpan or  search on metacpan

lib/AnyMQ/Trait/AMQP.pm  view on Meta::CPAN

package AnyMQ::Trait::AMQP;
use Moose::Role;
use File::ShareDir;

use AnyEvent;
use AnyEvent::RabbitMQ;
use JSON;
use Try::Tiny;
use Carp qw(croak carp);

has host => (is => "ro", isa => "Str");
has port => (is => "ro", isa => "Int");
has user => (is => "ro", isa => "Str");
has pass => (is => "ro", isa => "Str");
has vhost => (is => "ro", isa => "Str");
has exchange => (is => "ro", isa => "Str");

has bind_mode => (is => "ro", isa => "Str", default => sub { 'exchange' });

has _rf => (is => "rw");
has _rf_channel => (is => "rw");
has _rf_queue => (is => "rw");

has cv => (is => "rw", isa => "AnyEvent::CondVar");

has on_ready => (is => "rw", isa => "CodeRef");

has _connected => (is => "rw", isa => "Bool");

sub default_amqp_spec { #this is to avoid loading coro
    my $dir = File::ShareDir::dist_dir("AnyEvent-RabbitMQ");
    return "$dir/fixed_amqp0-8.xml";
}

AnyEvent::RabbitMQ->load_xml_spec(default_amqp_spec());

sub BUILD {}; after 'BUILD' => sub {
    my $self = shift;

    my $cv = $self->cv(AE::cv);

    $self->connect($cv);
    my $cb; $cb = sub {
        my $msg = $_[0]->recv;
        if ( $msg eq 'init' ) {
            $self->_connected(1);
            $self->on_ready->() if $self->on_ready;
        }
        else {
            my $cv = AE::cv;
            $cv->cb($cb);
            $self->cv($cv);
            carp "Connection failed, retrying in 5 seconds.  Reason: ".$msg;
            my $w; $w = AnyEvent->timer(after => 5,
                                        cb => sub {
                                            undef $w;
                                            $self->connect($cv);
                                        });
        }
    };
    $cv->cb($cb);

    if (!$self->on_ready) {
        while ((my $msg = $self->cv->recv) ne 'init') {};
    }
};

sub connect {
    my $self = shift;
    my $cv = shift;

    my $rf = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
    $self->_rf($rf);

    # XXX: wrapped object with monadic method modifier
    # my $channel = run_monad { $rf->connect(....)->open_channel()->return }
    # my $queue = run_monad { $channel->declare_queue(....)->return }->method_frame->queue;
    # run_monad { $channel->consume( ....) }

    my $init = sub {
        my $channel = shift;
        $channel->declare_queue(
            exclusive => 1,
            on_success => sub {
                my $method = shift;
                my $queue = $method->method_frame->queue;
                $self->_rf_queue($queue);
                $channel->consume(queue => $queue,
                                  no_ack => 1,
                                  on_success => sub {
                                      $cv->send('init');
                                  },
                                  on_consume => $self->on_consume,
                                  on_failure => $cv,
                              );
            },
            on_failure => $cv,
        )
    };

    $rf->connect(
        (map { $_ => $self->$_ }
             qw(host port user pass vhost)),
        on_success => sub {
            $rf->open_channel(
                on_success => sub {
                    my $channel = shift;
                    $self->_rf_channel($channel);
                    $channel->qos();
                    return $init->($channel)
                        unless $self->exchange;

                    $channel->declare_exchange(
                        type => 'topic',



( run in 1.516 second using v1.01-cache-2.11-cpan-39bf76dae61 )