AnyEvent-RabbitMQ-PubSub

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/PubSub/Consumer.pm  view on Meta::CPAN

package AnyEvent::RabbitMQ::PubSub::Consumer;
use Moose;
use AnyEvent::RabbitMQ::PubSub;
use Data::Dumper;
use Time::HiRes qw(usleep);

use AnyEvent;
use Promises qw(deferred collect);

=head1 NAME

AnyEvent::RabbitMQ::PubSub::Consumer - rabbitmq consumer

=cut

has channel => (
    is => 'ro', isa => 'AnyEvent::RabbitMQ::Channel', required => 1
);
has exchange => (
    is => 'ro', isa => 'HashRef', required => 1
);
has queue => (
    is => 'ro', isa => 'HashRef', required => 1
);
has routing_key => (
    is => 'ro', isa => 'Str', default => '#'
);
has prefetch_count => (
    is => 'ro', isa => 'Int', default => 5,
);

=head1 METHODS

=head2 init()

set prefetch_count

declare exchange and queue

=cut

sub init {
    my ($self) = @_;

    $self->channel->qos(prefetch_count => $self->prefetch_count);

    my $cv = AnyEvent->condvar;

    $self->declare_exchange_and_queue()
        ->then( sub { $self->bind_queue() })
        ->then( sub { $cv->send() })
        ->catch(sub { $cv->croak(@_) });

    $cv->recv();
    return
}

=head2 consume($cv, $on_consume)

run consume C<$on_consume> code on channel

return L<Promise>

    my $cv = AnyEvent->condvar();



( run in 2.589 seconds using v1.01-cache-2.11-cpan-f56aa216473 )