AnyEvent-RabbitMQ-Simple

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Simple.pm  view on Meta::CPAN

# ABSTRACT: Easy to use asynchronous AMQP client
use strict;
use warnings;
package AnyEvent::RabbitMQ::Simple;
our $AUTHORITY = 'cpan:AJGB';
$AnyEvent::RabbitMQ::Simple::VERSION = '0.02';
use AnyEvent;
use AnyEvent::RabbitMQ;
use Moo;

has 'host' => (
    is => 'ro',
    default => '127.0.0.1',
);

has 'port' => (
    is => 'ro',
    default => 5672,
);

has 'vhost' => (
    is => 'ro',
    default => '/',
);

has 'user' => (
    is => 'ro',
    default => 'guest'
);

has 'pass' => (
    is => 'ro',
    default => '',
);

has 'failure_cb' => (
    is => 'ro',
    required => 1,
);

has [qw( tls tune )] => (
    is => 'ro',
);

has $_ => (
    is => 'ro',
    predicate => "_has_$_",
) for qw(exchange exchanges queue queues bind_exchanges bind_queues);

has 'timeout' => (
    is => 'ro',
    default => 0,
);

has 'prefetch_count' => (
    is => 'ro',
    default => 0,
);

has 'confirm_publish' => (
    is => 'ro',

lib/AnyEvent/RabbitMQ/Simple.pm  view on Meta::CPAN


    $cv->begin( sub { shift->send(1) } );

    if ( $self->_has_bind_queues ) {
        my @pairs;
        my $bind_queues = $self->bind_queues;
        if ( ref $bind_queues eq 'ARRAY' ) {
            for my $pair ( @{ $bind_queues || [] } ) {
                push @pairs, _make_pair($pair);
            }
        } elsif ( ref $bind_queues eq 'HASH' ) {
            push @pairs, _make_pair($bind_queues);
        }

        for ( my $i = 0; $i < scalar @pairs; $i += 2 ) {
            my $queue = $pairs[$i];
            my ($exchange, $routing_key) = @{ $pairs[$i+1] };
            my %opts;
            if ( $routing_key ) {
                $opts{routing_key} = $routing_key;
            }

            $self->_bind_queue($cv, $queue, $exchange, %opts);
        }
    }
    $cv->end;
}

sub _bind_queue {
    my ($self, $cv, $queue, $exchange, %options) = @_;

    $self->_guard->{flow}->begin;
    $cv->begin;
    $self->_guard->{channel}->bind_queue(
        %options,
        queue       => $queue,
        exchange    => $exchange,
        on_success  => sub {
            $self->_guard->{flow}->end;
            $cv->end;
        },
        on_failure => sub {
            $self->_handle_error( 'BindQueueOnFailure', "queue:$queue, exchange:$exchange", @_ );
            $cv->send;
        },
    );
}



1;

__END__

=pod

=encoding UTF-8

=head1 NAME

AnyEvent::RabbitMQ::Simple - Easy to use asynchronous AMQP client

=head1 VERSION

version 0.02

=head1 SYNOPSIS

    use strict;
    use warnings;
    use AnyEvent::RabbitMQ::Simple;

    # create main loop
    my $loop = AE::cv;

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        host       => '127.0.0.1',
        port       => 5672,
        user       => 'username',
        pass       => 'password',
        vhost      => '/',
        timeout    => 1,
        tls        => 0,
        verbose    => 0,
        confirm_publish => 1,
        prefetch_count => 10,

        failure_cb => sub {
            my ($event, $details, $why) = @_;
            if ( ref $why ) {
                my $method_frame = $why->method_frame;
                $why = $method_frame->reply_text;
            }
            $loop->croak("[ERROR] $event($details): $why" );
        },

        # routing layout
        # [========== exchanges ===================] [===== queues ==============]
        # [             (type/routing key)         ] [        (routing key) ]
        #  logger ----------> stats -------------->   stats-logs
        #   |(fanout)           (direct)                (mail.stats)
        #   |  |
        #   |  | \----------> errors ------------->   ftp-error-logs
        #   |  |              | (topic:*.error.#)       (ftp.error.#)
        #   |  |              |
        #   |  |              \------------------->   mail-error-logs
        #   |  |                                        (mail.error.#)
        #   |  |
        #   |   \-----------> info --------------->   info-logs
        #   |                   (topic:*.info.#)        (*.info.#)
        #   |
        #    \------------------------------------>   debug-queue


        # declare exchanges
        exchanges => [
            'logger' => {
                durable => 0,
                type => 'fanout',
                internal => 0,
                auto_delete => 1,



( run in 2.942 seconds using v1.01-cache-2.11-cpan-98e64b0badf )