AnyEvent-RabbitMQ-PubSub

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

Revision history for Perl extension AnyEvent-RabbitMQ-PubSub

3.2.1 2018-05-24T09:13:46Z
    - fix calling method_frame on object without this function

3.1.2 2017-03-17T13:06:26Z
    - die on on_inactive error

3.1.1 2016-11-01T16:35:28Z
    - croak on declare/bind errors

3.1.0 2016-10-25T10:43:36Z
    - ack and reject_and_republish methods in consumer

3.0.2 2016-09-23T13:36:53Z
    - public release

3.0.1
    - add to cpanfile Moose requirements (FIX)
    - Publisher default_headers is Maybe[HashRef]

README.md  view on Meta::CPAN

    my $routing_key = 'my_rk';

    my $cv = AnyEvent->condvar;

    my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new(
        channel        => $channel,
        exchange       => $exchange,
        queue          => $queue,
        routing_key    => $routing_key,
    );
    $consumer->init(); #declares channel, queue and binding
    $consumer->consume(
        $cv,
        sub {
            my ($consumer, $msg) = @_;
            print 'received ', $msg->{body}->payload, "\n";
            $consumer->ack($msg);
            $cv->send();
        },
    );

lib/AnyEvent/RabbitMQ/PubSub.pm  view on Meta::CPAN

    my $routing_key = 'my_rk';

    my $cv = AnyEvent->condvar;

    my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new(
        channel        => $channel,
        exchange       => $exchange,
        queue          => $queue,
        routing_key    => $routing_key,
    );
    $consumer->init(); #declares channel, queue and binding
    $consumer->consume(
        $cv,
        sub {
            my ($consumer, $msg) = @_;
            print 'received ', $msg->{body}->payload, "\n";
            $consumer->ack($msg);
            $cv->send();
        },
    );

lib/AnyEvent/RabbitMQ/PubSub/Consumer.pm  view on Meta::CPAN

=cut

sub init {
    my ($self) = @_;

    $self->channel->qos(prefetch_count => $self->prefetch_count);

    my $cv = AnyEvent->condvar;

    $self->declare_exchange_and_queue()
        ->then( sub { $self->bind_queue() })
        ->then( sub { $cv->send() })
        ->catch(sub { $cv->croak(@_) });

    $cv->recv();
    return
}

=head2 consume($cv, $on_consume)

run consume C<$on_consume> code on channel

lib/AnyEvent/RabbitMQ/PubSub/Consumer.pm  view on Meta::CPAN


    my $d = deferred;
    $self->channel->declare_exchange(
        %{ $self->exchange },
        on_success => sub { $d->resolve() },
        on_failure => sub { $d->reject(@_) },
    );
    return $d->promise()
}

sub bind_queue {
    my ($self) = @_;

    my $d = deferred;
    $self->channel->bind_queue(
        queue       => $self->queue->{queue},
        exchange    => $self->exchange->{exchange},
        routing_key => $self->routing_key,
        on_success  => sub { $d->resolve() },
        on_failure => sub { $d->reject(@_) },
    );
    return $d->promise()
}

1



( run in 2.619 seconds using v1.01-cache-2.11-cpan-2398b32b56e )