AnyEvent-RabbitMQ-Simple

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

                'mail-error-logs' => {
                    durable => 0,
                    auto_delete => 1,
                },
                'info-logs' => {
                    durable => 0,
                    auto_delete => 1,
                },
            ],

            # exchange to exchange bindings, with optional routing key
            bind_exchanges => [
                { 'stats'   =>   'logger'                 },
                { 'errors'  => [ 'logger', '*.error.#' ]  },
                { 'info'    => [ 'logger', '*.info.#'  ]  },
            ],


            # queue to exchange bindings, with optional routing key
            bind_queues => [
                { 'debug-queue'     =>   'logger'                   },
                { 'ftp-error-logs'  => [ 'errors', 'ftp.error.#'  ] },
                { 'mail-error-logs' => [ 'errors', 'mail.error.#' ] },
                { 'info-logs'       => [ 'info',   'info.#'       ] },
                { 'stats-logs'      => [ 'stats',  'mail.stats'   ] },
            ],

        );

        # publisher timer

README  view on Meta::CPAN

                },
                ...
            ],
            ...
        );

    Optional list of queues to declare with their configuration options.

    See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.

   bind_exchanges
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            bind_exchanges => [
                # without routing key
                { 'destination1' => 'source' },

                # with routing key
                { 'destination2'  => [ 'source', 'routing_key' ]  },
                ...
            ],
            ...
        );

    Optional list of exchange-to-exchange bindings.

    See "bind_exchange" in AnyEvent::RabbitMQ::Channel for details.

   bind_queues
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            bind_queues => [
                # without routing key
                { 'queue1' => 'exchange' },

                # with routing key
                { 'queue2'  => [ 'exchange', 'routing_key' ]  },
                ...
            ],
            ...
        );

    Optional list of queue-to-exchange bindings.

    See "bind_queue" in AnyEvent::RabbitMQ::Channel for details.

   failure_cb
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            failure_cb => sub {
                my ($event, $details, $why) = @_;
                if ( ref $why ) {
                    my $method_frame = $why->method_frame;
                    $why = $method_frame->reply_text;
                }
                $loop->croak("[ERROR] $event($details): $why" );

example/synopsis.pl  view on Meta::CPAN

        'mail-error-logs' => {
            durable => 0,
            auto_delete => 1,
        },
        'info-logs' => {
            durable => 0,
            auto_delete => 1,
        },
    ],

    # exchange to exchange bindings, with optional routing key
    bind_exchanges => [
        { 'stats'   =>   'logger'                 },
        { 'errors'  => [ 'logger', '*.error.#' ]  },
        { 'info'    => [ 'logger', '*.info.#'  ]  },
    ],


    # queue to exchange bindings, with optional routing key
    bind_queues => [
        { 'debug-queue'     =>   'logger'                   },
        { 'ftp-error-logs'  => [ 'errors', 'ftp.error.#'  ] },
        { 'mail-error-logs' => [ 'errors', 'mail.error.#' ] },
        { 'info-logs'       => [ 'info',   'info.#'       ] },
        { 'stats-logs'      => [ 'stats',  'mail.stats'   ] },
    ],

);

# publisher timer

lib/AnyEvent/RabbitMQ/Simple.pm  view on Meta::CPAN

    required => 1,
);

has [qw( tls tune )] => (
    is => 'ro',
);

has $_ => (
    is => 'ro',
    predicate => "_has_$_",
) for qw(exchange exchanges queue queues bind_exchanges bind_queues);

has 'timeout' => (
    is => 'ro',
    default => 0,
);

has 'prefetch_count' => (
    is => 'ro',
    default => 0,
);

lib/AnyEvent/RabbitMQ/Simple.pm  view on Meta::CPAN

sub _open_channel {
    my ($self, $conn) = @_;
    $self->_guard->{flow}->begin;
    $conn->open_channel(
        on_success => sub {
            my $channel = shift;
            $self->_guard->{channel} = $channel;

            my $cv_dec_ex = $self->_guard->{flows}->{cv_dec_ex} = AE::cv;
            my $cv_dec_q = $self->_guard->{flows}->{cv_dec_q} = AE::cv;
            my $cv_bind_ex = $self->_guard->{flows}->{cv_bind_ex} = AE::cv;
            my $cv_bind_q = $self->_guard->{flows}->{cv_bind_q} = AE::cv;
            my $cv_confirm_channel = $self->_guard->{flows}->{cv_confirm_channel} = AE::cv;
            my $cv_qos_channel = $self->_guard->{flows}->{cv_qos_channel} = AE::cv;

            $cv_dec_ex->cb(
                sub {
                    my $done = shift->recv;
                    $self->_declare_queues($cv_dec_q) if $done;
                }
            );
            $cv_dec_q->cb(
                sub {
                    my $done = shift->recv;
                    $self->_bind_exchanges($cv_bind_ex) if $done;
                }
            );
            $cv_bind_ex->cb(
                sub {
                    my $done = shift->recv;
                    $self->_bind_queues($cv_bind_q) if $done;
                }
            );
            $cv_bind_q->cb(
                sub {
                    my $done = shift->recv;
                    $self->_confirm_channel( $cv_confirm_channel ) if $done;
                }
            );
            $cv_confirm_channel->cb(
                sub {
                    my $done = shift->recv;
                    $self->_qos_channel( $cv_qos_channel ) if $done;
                }

lib/AnyEvent/RabbitMQ/Simple.pm  view on Meta::CPAN

    my ($pairs) = @_;

    my @list;
    while (my ($l,$r) = each %{ $pairs || {} }) {
        push @list, $l, ref $r eq 'ARRAY' ? $r : [ $r, undef ];
    }

    return @list;
}

sub _bind_exchanges {
    my ($self, $cv) = @_;

    $cv->begin( sub { shift->send(1) } );

    if ( $self->_has_bind_exchanges ) {
        my @pairs;
        my $bind_exchanges = $self->bind_exchanges;
        if ( ref $bind_exchanges eq 'ARRAY' ) {
            for my $pair ( @{ $bind_exchanges || [] } ) {
                push @pairs, _make_pair($pair);
            }
        } elsif ( ref $bind_exchanges eq 'HASH' ) {
            push @pairs, _make_pair($bind_exchanges);
        }

        for ( my $i = 0; $i < scalar @pairs; $i += 2 ) {
            my $destination = $pairs[$i];
            my ($source, $routing_key) = @{ $pairs[$i+1] };
            my %opts;
            if ( $routing_key ) {
                $opts{routing_key} = $routing_key;
            }

            $self->_bind_exchange($cv, $source, $destination, %opts);
        }
    }
    $cv->end;
}

sub _bind_exchange {
    my ($self, $cv, $source, $destination, %options ) = @_;

    $self->_guard->{flow}->begin;
    $cv->begin;
    $self->_guard->{channel}->bind_exchange(
        %options,
        source      => $source,
        destination => $destination,
        on_success  => sub {
            $self->_guard->{flow}->end;
            $cv->end;
        },
        on_failure => sub {
            $self->_handle_error( 'BindExchangeOnFailure', "source:$source, destination:$destination", @_ );
            $cv->send;
        },
    );
}

sub _bind_queues {
    my ($self, $cv) = @_;

    $cv->begin( sub { shift->send(1) } );

    if ( $self->_has_bind_queues ) {
        my @pairs;
        my $bind_queues = $self->bind_queues;
        if ( ref $bind_queues eq 'ARRAY' ) {
            for my $pair ( @{ $bind_queues || [] } ) {
                push @pairs, _make_pair($pair);
            }
        } elsif ( ref $bind_queues eq 'HASH' ) {
            push @pairs, _make_pair($bind_queues);
        }

        for ( my $i = 0; $i < scalar @pairs; $i += 2 ) {
            my $queue = $pairs[$i];
            my ($exchange, $routing_key) = @{ $pairs[$i+1] };
            my %opts;
            if ( $routing_key ) {
                $opts{routing_key} = $routing_key;
            }

            $self->_bind_queue($cv, $queue, $exchange, %opts);
        }
    }
    $cv->end;
}

sub _bind_queue {
    my ($self, $cv, $queue, $exchange, %options) = @_;

    $self->_guard->{flow}->begin;
    $cv->begin;
    $self->_guard->{channel}->bind_queue(
        %options,
        queue       => $queue,
        exchange    => $exchange,
        on_success  => sub {
            $self->_guard->{flow}->end;
            $cv->end;
        },
        on_failure => sub {
            $self->_handle_error( 'BindQueueOnFailure', "queue:$queue, exchange:$exchange", @_ );
            $cv->send;

lib/AnyEvent/RabbitMQ/Simple.pm  view on Meta::CPAN

            'mail-error-logs' => {
                durable => 0,
                auto_delete => 1,
            },
            'info-logs' => {
                durable => 0,
                auto_delete => 1,
            },
        ],

        # exchange to exchange bindings, with optional routing key
        bind_exchanges => [
            { 'stats'   =>   'logger'                 },
            { 'errors'  => [ 'logger', '*.error.#' ]  },
            { 'info'    => [ 'logger', '*.info.#'  ]  },
        ],


        # queue to exchange bindings, with optional routing key
        bind_queues => [
            { 'debug-queue'     =>   'logger'                   },
            { 'ftp-error-logs'  => [ 'errors', 'ftp.error.#'  ] },
            { 'mail-error-logs' => [ 'errors', 'mail.error.#' ] },
            { 'info-logs'       => [ 'info',   'info.#'       ] },
            { 'stats-logs'      => [ 'stats',  'mail.stats'   ] },
        ],

    );

    # publisher timer

lib/AnyEvent/RabbitMQ/Simple.pm  view on Meta::CPAN

            },
            ...
        ],
        ...
    );

Optional list of queues to declare with their configuration options.

See L<AnyEvent::RabbitMQ::Channel/"declare_queue"> for details.

=head3 bind_exchanges

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        bind_exchanges => [
            # without routing key
            { 'destination1' => 'source' },

            # with routing key
            { 'destination2'  => [ 'source', 'routing_key' ]  },
            ...
        ],
        ...
    );

Optional list of exchange-to-exchange bindings.

See L<AnyEvent::RabbitMQ::Channel/"bind_exchange"> for details.

=head3 bind_queues

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        bind_queues => [
            # without routing key
            { 'queue1' => 'exchange' },

            # with routing key
            { 'queue2'  => [ 'exchange', 'routing_key' ]  },
            ...
        ],
        ...
    );

Optional list of queue-to-exchange bindings.

See L<AnyEvent::RabbitMQ::Channel/"bind_queue"> for details.

=head3 failure_cb

    my $rmq = AnyEvent::RabbitMQ::Simple->new(
        failure_cb => sub {
            my ($event, $details, $why) = @_;
            if ( ref $why ) {
                my $method_frame = $why->method_frame;
                $why = $method_frame->reply_text;
            }



( run in 1.402 second using v1.01-cache-2.11-cpan-2398b32b56e )