AnyEvent-RabbitMQ-PubSub
view release on metacpan or search on metacpan
Revision history for Perl extension AnyEvent-RabbitMQ-PubSub
3.2.1 2018-05-24T09:13:46Z
- fix calling method_frame on object without this function
3.1.2 2017-03-17T13:06:26Z
- die on on_inactive error
3.1.1 2016-11-01T16:35:28Z
- croak on declare/bind errors
3.1.0 2016-10-25T10:43:36Z
- ack and reject_and_republish methods in consumer
3.0.2 2016-09-23T13:36:53Z
- public release
3.0.1
- add to cpanfile Moose requirements (FIX)
- Publisher default_headers is Maybe[HashRef]
my $routing_key = 'my_rk';
my $cv = AnyEvent->condvar;
my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new(
channel => $channel,
exchange => $exchange,
queue => $queue,
routing_key => $routing_key,
);
$consumer->init(); #declares channel, queue and binding
$consumer->consume(
$cv,
sub {
my ($consumer, $msg) = @_;
print 'received ', $msg->{body}->payload, "\n";
$consumer->ack($msg);
$cv->send();
},
);
lib/AnyEvent/RabbitMQ/PubSub.pm view on Meta::CPAN
my $routing_key = 'my_rk';
my $cv = AnyEvent->condvar;
my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new(
channel => $channel,
exchange => $exchange,
queue => $queue,
routing_key => $routing_key,
);
$consumer->init(); #declares channel, queue and binding
$consumer->consume(
$cv,
sub {
my ($consumer, $msg) = @_;
print 'received ', $msg->{body}->payload, "\n";
$consumer->ack($msg);
$cv->send();
},
);
lib/AnyEvent/RabbitMQ/PubSub/Consumer.pm view on Meta::CPAN
=cut
sub init {
my ($self) = @_;
$self->channel->qos(prefetch_count => $self->prefetch_count);
my $cv = AnyEvent->condvar;
$self->declare_exchange_and_queue()
->then( sub { $self->bind_queue() })
->then( sub { $cv->send() })
->catch(sub { $cv->croak(@_) });
$cv->recv();
return
}
=head2 consume($cv, $on_consume)
run consume C<$on_consume> code on channel
lib/AnyEvent/RabbitMQ/PubSub/Consumer.pm view on Meta::CPAN
my $d = deferred;
$self->channel->declare_exchange(
%{ $self->exchange },
on_success => sub { $d->resolve() },
on_failure => sub { $d->reject(@_) },
);
return $d->promise()
}
sub bind_queue {
my ($self) = @_;
my $d = deferred;
$self->channel->bind_queue(
queue => $self->queue->{queue},
exchange => $self->exchange->{exchange},
routing_key => $self->routing_key,
on_success => sub { $d->resolve() },
on_failure => sub { $d->reject(@_) },
);
return $d->promise()
}
1
( run in 2.619 seconds using v1.01-cache-2.11-cpan-2398b32b56e )