AnyEvent-RabbitMQ-Simple

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Simple.pm  view on Meta::CPAN

# ABSTRACT: Easy to use asynchronous AMQP client
use strict;
use warnings;
package AnyEvent::RabbitMQ::Simple;
our $AUTHORITY = 'cpan:AJGB';
$AnyEvent::RabbitMQ::Simple::VERSION = '0.02';
use AnyEvent;
use AnyEvent::RabbitMQ;
use Moo;

has 'host' => (
    is => 'ro',
    default => '127.0.0.1',
);

has 'port' => (
    is => 'ro',
    default => 5672,
);

has 'vhost' => (
    is => 'ro',
    default => '/',
);

has 'user' => (
    is => 'ro',
    default => 'guest'
);

has 'pass' => (
    is => 'ro',
    default => '',
);

has 'failure_cb' => (
    is => 'ro',
    required => 1,
);

has [qw( tls tune )] => (
    is => 'ro',
);

has $_ => (
    is => 'ro',
    predicate => "_has_$_",
) for qw(exchange exchanges queue queues bind_exchanges bind_queues);

has 'timeout' => (
    is => 'ro',
    default => 0,
);

has 'prefetch_count' => (
    is => 'ro',
    default => 0,
);

has 'confirm_publish' => (
    is => 'ro',
    default => 0,
);

has 'gen_queue' => (
    is => 'rw',
);

has 'verbose' => (
    is => 'ro',
    default => 0,
);

has '_guard' => (
    is => 'rw',
    default => sub { +{} },
);

sub _handle_error {
    my $self = shift;

    my $guard = $self->_guard;

    # cancel pending actions
    delete $guard->{flows};

    # exec failure callback
    $self->failure_cb->( @_ );

    # send undef
    $guard->{flow}->send();
}

sub connect {
    my ($self) = @_;
    my $guard = $self->_guard;
    my $cv = $guard->{cv} = AE::cv;
    my $flow = $guard->{flow} = AE::cv;
    $flow->begin(
        sub {
            $cv->send($guard->{channel});
        }
    );
    $guard->{conn} = AnyEvent::RabbitMQ->new(verbose=>$self->verbose)->load_xml_spec()->connect(
        host       => $self->host,
        port       => $self->port,
        user       => $self->user,
        pass       => $self->pass,
        vhost      => $self->vhost,
        timeout    => $self->timeout,
        tls        => $self->tls,
        tune       => $self->tune,
        on_success => sub {
            my $conn = shift;
            $self->_open_channel($conn);
            $flow->end;
        },
        on_failure => sub { $self->_handle_error( 'ConnectOnFailure', '', @_ ) },
        on_read_failure => sub { $self->_handle_error( 'ConnectOnReadFailure', '', @_ ) },
        on_return => sub { $self->_handle_error( 'ConnectOnReturn', '', @_ ) },
        on_close => sub { $self->_handle_error( 'ConnectOnClose', '', @_ ) },
    );

    return $cv;
}

sub disconnect {
    my ($self) = @_;

    delete $self->_guard->{conn};
}

sub _open_channel {
    my ($self, $conn) = @_;
    $self->_guard->{flow}->begin;
    $conn->open_channel(
        on_success => sub {
            my $channel = shift;
            $self->_guard->{channel} = $channel;

            my $cv_dec_ex = $self->_guard->{flows}->{cv_dec_ex} = AE::cv;
            my $cv_dec_q = $self->_guard->{flows}->{cv_dec_q} = AE::cv;
            my $cv_bind_ex = $self->_guard->{flows}->{cv_bind_ex} = AE::cv;
            my $cv_bind_q = $self->_guard->{flows}->{cv_bind_q} = AE::cv;
            my $cv_confirm_channel = $self->_guard->{flows}->{cv_confirm_channel} = AE::cv;
            my $cv_qos_channel = $self->_guard->{flows}->{cv_qos_channel} = AE::cv;

            $cv_dec_ex->cb(
                sub {
                    my $done = shift->recv;
                    $self->_declare_queues($cv_dec_q) if $done;
                }
            );
            $cv_dec_q->cb(
                sub {
                    my $done = shift->recv;
                    $self->_bind_exchanges($cv_bind_ex) if $done;
                }
            );
            $cv_bind_ex->cb(
                sub {
                    my $done = shift->recv;
                    $self->_bind_queues($cv_bind_q) if $done;
                }
            );
            $cv_bind_q->cb(
                sub {
                    my $done = shift->recv;
                    $self->_confirm_channel( $cv_confirm_channel ) if $done;
                }
            );
            $cv_confirm_channel->cb(
                sub {
                    my $done = shift->recv;
                    $self->_qos_channel( $cv_qos_channel ) if $done;
                }
            );
            $cv_qos_channel->cb(
                sub {
                    my $done = shift->recv;
                    $self->_guard->{flow}->end;
                }
            );

            $self->_declare_exchanges($cv_dec_ex);

        },
        on_failure => sub { $self->_handle_error( 'OpenChannelOnFailure', '', @_ ) },
        on_return  => sub { $self->_handle_error( 'OpenChannelOnReturn', '', @_ ) },
        on_close  => sub { $self->_handle_error( 'OpenChannelOnClose', '', @_ ) },
    );
}

sub _confirm_channel {
    my ($self, $cv) = @_;
    if ( $self->confirm_publish ) {
        $self->_guard->{flow}->begin;
        $self->_guard->{channel}->confirm(
            on_success => sub {
                $self->_guard->{flow}->end;
                $cv->send(1);
            },
            on_failure => sub {
                $self->_handle_error( 'ConfirmChannelOnFailure', '', @_ );
                $cv->send;
            },
        );
    } else {
        $cv->send(1);
    }
}

sub _qos_channel {
    my ($self, $cv) = @_;
    if ( $self->prefetch_count ) {
        $self->_guard->{flow}->begin;
        $self->_guard->{channel}->qos(
            prefetch_count => $self->prefetch_count,
            on_success => sub {
                $self->_guard->{flow}->end;
                $cv->send(1);
            },
            on_failure => sub {
                $self->_handle_error( 'QosChannelOnFailure', '', @_ );
                $cv->send;
            },

lib/AnyEvent/RabbitMQ/Simple.pm  view on Meta::CPAN


    $cv->begin( sub { shift->send(1) } );

    if ( $self->_has_queue ) {
        $self->_declare_queue($cv, $self->queue);
    }
    if ( $self->_has_queues ) {
        my @queues = @{ $self->queues || [] };
        for ( my $i = 0; $i < scalar @queues; $i += 2 ) {
            my $name = $queues[$i];
            my $opts = $queues[$i+1];

            # another name
            if ( defined $opts && ref $opts ne 'HASH' ) {
                $self->_declare_queue($cv, $name);
                $self->_declare_queue($cv, $opts);
            } else {
                $self->_declare_queue($cv, $name, %{ $opts || {} });
            }
        }
    } else {
        $self->_declare_queue($cv, '');
    }
    $cv->end;
}

sub _declare_queue {
    my ($self, $cv, $name, %options) = @_;

    $self->_guard->{flow}->begin;
    $cv->begin;
    $self->_guard->{channel}->declare_queue(
        %options,
        queue       => $name || '',
        on_success  => sub {
            my $method = shift;
            if ( ! $name ) {
                $self->gen_queue( $method->method_frame->queue );
            }
            $self->_guard->{flow}->end;
            $cv->end;
        },
        on_failure => sub {
            $self->_handle_error( 'DeclareQueueOnFailure', "queue:$name", @_ );
            $cv->send;
        },
    );
}

sub _make_pair {
    my ($pairs) = @_;

    my @list;
    while (my ($l,$r) = each %{ $pairs || {} }) {
        push @list, $l, ref $r eq 'ARRAY' ? $r : [ $r, undef ];
    }

    return @list;
}

sub _bind_exchanges {
    my ($self, $cv) = @_;

    $cv->begin( sub { shift->send(1) } );

    if ( $self->_has_bind_exchanges ) {
        my @pairs;
        my $bind_exchanges = $self->bind_exchanges;
        if ( ref $bind_exchanges eq 'ARRAY' ) {
            for my $pair ( @{ $bind_exchanges || [] } ) {
                push @pairs, _make_pair($pair);
            }
        } elsif ( ref $bind_exchanges eq 'HASH' ) {
            push @pairs, _make_pair($bind_exchanges);
        }

        for ( my $i = 0; $i < scalar @pairs; $i += 2 ) {
            my $destination = $pairs[$i];
            my ($source, $routing_key) = @{ $pairs[$i+1] };
            my %opts;
            if ( $routing_key ) {
                $opts{routing_key} = $routing_key;
            }

            $self->_bind_exchange($cv, $source, $destination, %opts);
        }
    }
    $cv->end;
}

sub _bind_exchange {
    my ($self, $cv, $source, $destination, %options ) = @_;

    $self->_guard->{flow}->begin;
    $cv->begin;
    $self->_guard->{channel}->bind_exchange(
        %options,
        source      => $source,
        destination => $destination,
        on_success  => sub {
            $self->_guard->{flow}->end;
            $cv->end;
        },
        on_failure => sub {
            $self->_handle_error( 'BindExchangeOnFailure', "source:$source, destination:$destination", @_ );
            $cv->send;
        },
    );
}

sub _bind_queues {
    my ($self, $cv) = @_;

    $cv->begin( sub { shift->send(1) } );

    if ( $self->_has_bind_queues ) {
        my @pairs;
        my $bind_queues = $self->bind_queues;
        if ( ref $bind_queues eq 'ARRAY' ) {
            for my $pair ( @{ $bind_queues || [] } ) {
                push @pairs, _make_pair($pair);
            }
        } elsif ( ref $bind_queues eq 'HASH' ) {
            push @pairs, _make_pair($bind_queues);
        }

        for ( my $i = 0; $i < scalar @pairs; $i += 2 ) {
            my $queue = $pairs[$i];
            my ($exchange, $routing_key) = @{ $pairs[$i+1] };
            my %opts;
            if ( $routing_key ) {
                $opts{routing_key} = $routing_key;
            }

            $self->_bind_queue($cv, $queue, $exchange, %opts);
        }
    }
    $cv->end;
}

sub _bind_queue {
    my ($self, $cv, $queue, $exchange, %options) = @_;

    $self->_guard->{flow}->begin;
    $cv->begin;
    $self->_guard->{channel}->bind_queue(
        %options,
        queue       => $queue,
        exchange    => $exchange,
        on_success  => sub {
            $self->_guard->{flow}->end;
            $cv->end;
        },
        on_failure => sub {
            $self->_handle_error( 'BindQueueOnFailure', "queue:$queue, exchange:$exchange", @_ );
            $cv->send;
        },
    );
}



1;

__END__

=pod

=encoding UTF-8

=head1 NAME

AnyEvent::RabbitMQ::Simple - Easy to use asynchronous AMQP client

=head1 VERSION

version 0.02

=head1 SYNOPSIS

    use strict;
    use warnings;
    use AnyEvent::RabbitMQ::Simple;

    # create main loop
    my $loop = AE::cv;

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        host       => '127.0.0.1',
        port       => 5672,
        user       => 'username',
        pass       => 'password',
        vhost      => '/',
        timeout    => 1,
        tls        => 0,
        verbose    => 0,
        confirm_publish => 1,
        prefetch_count => 10,

        failure_cb => sub {
            my ($event, $details, $why) = @_;
            if ( ref $why ) {
                my $method_frame = $why->method_frame;
                $why = $method_frame->reply_text;
            }
            $loop->croak("[ERROR] $event($details): $why" );

lib/AnyEvent/RabbitMQ/Simple.pm  view on Meta::CPAN

        #   |  |                                        (mail.error.#)
        #   |  |
        #   |   \-----------> info --------------->   info-logs
        #   |                   (topic:*.info.#)        (*.info.#)
        #   |
        #    \------------------------------------>   debug-queue


        # declare exchanges
        exchanges => [
            'logger' => {
                durable => 0,
                type => 'fanout',
                internal => 0,
                auto_delete => 1,
            },
            'stats' => {
                durable => 0,
                type => 'direct',
                internal => 0,
                auto_delete => 1,
            },
            'errors' => {
                durable => 0,
                type => 'topic',
                internal => 0,
                auto_delete => 1,
            },
            'info' => {
                durable => 0,
                type => 'topic',
                internal => 0,
                auto_delete => 1,
            },
        ],

        # declare queues
        queues => [
            'debug-queue' => {
                durable => 0,
                auto_delete => 1,
            },
            'stats-logs' => {
                durable => 0,
                auto_delete => 1,
            },
            'ftp-error-logs' => {
                durable => 0,
                auto_delete => 1,
            },
            'mail-error-logs' => {
                durable => 0,
                auto_delete => 1,
            },
            'info-logs' => {
                durable => 0,
                auto_delete => 1,
            },
        ],

        # exchange to exchange bindings, with optional routing key
        bind_exchanges => [
            { 'stats'   =>   'logger'                 },
            { 'errors'  => [ 'logger', '*.error.#' ]  },
            { 'info'    => [ 'logger', '*.info.#'  ]  },
        ],


        # queue to exchange bindings, with optional routing key
        bind_queues => [
            { 'debug-queue'     =>   'logger'                   },
            { 'ftp-error-logs'  => [ 'errors', 'ftp.error.#'  ] },
            { 'mail-error-logs' => [ 'errors', 'mail.error.#' ] },
            { 'info-logs'       => [ 'info',   'info.#'       ] },
            { 'stats-logs'      => [ 'stats',  'mail.stats'   ] },
        ],

    );

    # publisher timer
    my $t;

    # connect and set up channel
    my $conn = $rmq->connect();
    $conn->cb(
        sub {
            print "waiting for channel..\n";
            my $channel = shift->recv or $loop->croak("Could not open channel");

            print "************* consuming\n";
            for my $q ( qw( debug-queue ftp-error-logs mail-error-logs info-logs stats-logs ) ) {
                consume($channel, $q);
            }

            print "************* starting publishing\n";
            $t = AE::timer 0, 1.0, sub { publish($channel, "message prepared at ". scalar(localtime) ) };
        }
    );

    # consumes from requested queue
    sub consume {
        my ($channel, $queue) = @_;

        my $consumer_tag;

        $channel->consume(
            queue => $queue,
            no_ack => 0,
            on_success => sub {
                my $frame = shift;
                $consumer_tag = $frame->method_frame->consumer_tag;
                print "************* consuming from $queue with $consumer_tag\n";
            },
            on_consume => sub {
                my $res = shift;
                my $body = $res->{body}->payload;
                print "+++++++++++++ consumed($queue): $body\n";
                $channel->ack(
                    delivery_tag => $res->{deliver}->method_frame->delivery_tag
                );
            },
            on_failure => sub {
                print "************* failed to consume($queue)\n";
            }
        );
    }

    # randomly generates routing key and message body
    sub publish {
        my ($channel, $msg) = @_;

lib/AnyEvent/RabbitMQ/Simple.pm  view on Meta::CPAN

        exchange => 'name_of_exchange',
        ...
    );

Optional name of exchange to declare with its default configuration options.

See L<AnyEvent::RabbitMQ::Channel/"declare_exchange (%args)"> for details.

=head3 exchanges

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        exchanges => [
            'name_of_exchange' => {
                durable => 1,
                type => 'fanout',
                ... # other exchange configuration parameters
            },
            ...
        ],
        ...
    );

Optional list of exchanges to declare with their configuration options.

See L<AnyEvent::RabbitMQ::Channel/"declare_exchange (%args)"> for details.

=head3 queue

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        queue => 'name_of_queue',
        ...
    );

Optional name of queue to declare with its default configuration options.

If no queues were declared or empty name has been specified a unique
generated queue name will be available:

    my $gen_queue = $rmq->gen_queue;

See L<AnyEvent::RabbitMQ::Channel/"declare_queue"> for details.

=head3 queues

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        queues => [
            'name_of_queue' => {
                durable => 1,
                no_ack => 0,
                ... # other queue configuration parameters
            },
            ...
        ],
        ...
    );

Optional list of queues to declare with their configuration options.

See L<AnyEvent::RabbitMQ::Channel/"declare_queue"> for details.

=head3 bind_exchanges

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        bind_exchanges => [
            # without routing key
            { 'destination1' => 'source' },

            # with routing key
            { 'destination2'  => [ 'source', 'routing_key' ]  },
            ...
        ],
        ...
    );

Optional list of exchange-to-exchange bindings.

See L<AnyEvent::RabbitMQ::Channel/"bind_exchange"> for details.

=head3 bind_queues

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        bind_queues => [
            # without routing key
            { 'queue1' => 'exchange' },

            # with routing key
            { 'queue2'  => [ 'exchange', 'routing_key' ]  },
            ...
        ],
        ...
    );

Optional list of queue-to-exchange bindings.

See L<AnyEvent::RabbitMQ::Channel/"bind_queue"> for details.

=head3 failure_cb

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        failure_cb => sub {
            my ($event, $details, $why) = @_;
            if ( ref $why ) {
                my $method_frame = $why->method_frame;
                $why = $method_frame->reply_text;
            }
            $loop->croak("[ERROR] $event($details): $why" );
        },
        ...
    );

Required catch-all error handling callback. The value of C<$event> is one of:

=over 4

=item ConnectOnFailure

=item ConnectOnReadFailure

=item ConnectOnReturn

=item ConnectOnClose

=item OpenChannelOnFailure

=item OpenChannelOnReturn

=item OpenChannelOnClose

=item DeclareExchangeOnFailure

Value of C<$details> has following format: C<name:$name_of_exchange>.

=item BindExchangeOnFailure

Value of C<$details> has following format:
C<source:$name_of_source_exchange, destination:$name_of_destination_exchange>.

=item DeclareQueueOnFailure

Value of C<$details> has following format: C<name:$name_of_queue>.

=item BindQueueOnFailure

Value of C<$details> has following format:
C<queue:$name_of_queue, exchange:$name_of_exchange>.

=item ConfirmChannelOnFailure

=item QosChannelOnFailure

=back

=head2 connect

    my $conn = $rmq->connect();



( run in 0.865 second using v1.01-cache-2.11-cpan-2398b32b56e )