AnyEvent-RabbitMQ-PubSub
view release on metacpan or search on metacpan
Revision history for Perl extension AnyEvent-RabbitMQ-PubSub
3.2.1 2018-05-24T09:13:46Z
- fix calling method_frame on object without this function
3.1.2 2017-03-17T13:06:26Z
- die on on_inactive error
3.1.1 2016-11-01T16:35:28Z
- croak on declare/bind errors
3.1.0 2016-10-25T10:43:36Z
- ack and reject_and_republish methods in consumer
lib/AnyEvent/RabbitMQ/PubSub.pm view on Meta::CPAN
$ar->open_channel(
on_success => sub { my $channel = shift; $cv->send($ar, $channel); },
on_failure => sub { _report_error($cv, @_) },
on_close => sub { _report_error($cv, @_) },
)
}
sub _report_error {
my ($cv, $why) = @_;
if (ref($why) && $why->can('method_frame')) {
my $method_frame = $why->method_frame;
$cv->croak(longmess(
sprintf '%s: %s',
$method_frame->reply_code || 503,
$method_frame->reply_text || 'Something went wrong.',
));
}
else {
$cv->croak(longmess(Dumper($why)));
}
}
1;
__END__
lib/AnyEvent/RabbitMQ/PubSub/Consumer.pm view on Meta::CPAN
reject (drop) message
=cut
sub reject {
my ($self, $msg) = @_;
warn "Message to reject not specified" if !defined $msg;
my $delivery_tag = $msg->{deliver}{method_frame}{delivery_tag};
$self->channel->reject(delivery_tag => $delivery_tag);
}
=head2 ack($msg)
ack C<$msg> same as
$consumer->channel->ack(delivery_tag => $msg->{deliver}{method_frame}{delivery_tag});
=cut
sub ack {
my ($self, $msg) = @_;
warn "Message to ack not specified" if !defined $msg;
my $delivery_tag = $msg->{deliver}{method_frame}{delivery_tag};
$self->channel->ack(delivery_tag => $delivery_tag);
}
sub declare_exchange_and_queue {
my ($self, $cv) = @_;
return collect(
$self->declare_exchange(),
$self->declare_queue(),
)->then(sub {
( run in 1.535 second using v1.01-cache-2.11-cpan-df04353d9ac )