AnyEvent-RabbitMQ-PubSub
view release on metacpan - search on metacpan
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/PubSub/Consumer.pm view on Meta::CPAN
package AnyEvent::RabbitMQ::PubSub::Consumer;
use Moose;
use AnyEvent::RabbitMQ::PubSub;
use Data::Dumper;
use Time::HiRes qw(usleep);
use AnyEvent;
use Promises qw(deferred collect);
=head1 NAME
AnyEvent::RabbitMQ::PubSub::Consumer - rabbitmq consumer
=cut
has channel => (
is => 'ro', isa => 'AnyEvent::RabbitMQ::Channel', required => 1
);
has exchange => (
is => 'ro', isa => 'HashRef', required => 1
);
has queue => (
is => 'ro', isa => 'HashRef', required => 1
);
has routing_key => (
is => 'ro', isa => 'Str', default => '#'
);
has prefetch_count => (
is => 'ro', isa => 'Int', default => 5,
);
=head1 METHODS
=head2 init()
set prefetch_count
declare exchange and queue
=cut
sub init {
my ($self) = @_;
$self->channel->qos(prefetch_count => $self->prefetch_count);
my $cv = AnyEvent->condvar;
$self->declare_exchange_and_queue()
->then( sub { $self->bind_queue() })
->then( sub { $cv->send() })
->catch(sub { $cv->croak(@_) });
$cv->recv();
return
}
=head2 consume($cv, $on_consume)
run consume C<$on_consume> code on channel
return L<Promise>
my $cv = AnyEvent->condvar();
$self->consume(
$cv,
sub {
my ($consumer, $msg) = @_;
...
}
)->then(sub {
say 'Consumer was started...';
});
=cut
sub consume {
my ($self, $cv, $on_consume) = @_;
my $d = deferred();
view all matches for this distributionview release on metacpan - search on metacpan
( run in 1.558 second using v1.00-cache-2.02-grep-82fe00e-cpan-f5108d614456 )