AnyEvent-RabbitMQ-Simple
view release on metacpan or search on metacpan
vhost => '/',
timeout => 1,
tls => 0,
verbose => 0,
confirm_publish => 1,
prefetch_count => 10,
failure_cb => sub {
my ($event, $details, $why) = @_;
if ( ref $why ) {
my $method_frame = $why->method_frame;
$why = $method_frame->reply_text;
}
$loop->croak("[ERROR] $event($details): $why" );
},
# routing layout
# [========== exchanges ===================] [===== queues ==============]
# [ (type/routing key) ] [ (routing key) ]
# logger ----------> stats --------------> stats-logs
# |(fanout) (direct) (mail.stats)
# | |
# consumes from requested queue
sub consume {
my ($channel, $queue) = @_;
my $consumer_tag;
$channel->consume(
queue => $queue,
no_ack => 0,
on_success => sub {
my $frame = shift;
$consumer_tag = $frame->method_frame->consumer_tag;
print "************* consuming from $queue with $consumer_tag\n";
},
on_consume => sub {
my $res = shift;
my $body = $res->{body}->payload;
print "+++++++++++++ consumed($queue): $body\n";
$channel->ack(
delivery_tag => $res->{deliver}->method_frame->delivery_tag
);
},
on_failure => sub {
print "************* failed to consume($queue)\n";
}
);
}
# randomly generates routing key and message body
sub publish {
...
);
Password.
tune
my $rmq = AnyEvent::RabbitMQ::Simple->new(
tune => {
heartbeat => $connection_heartbeat,
channel_max => $max_channel_number,
frame_max => $max_frame_size
},
...
);
Optional connection tuning options.
timeout
my $rmq = AnyEvent::RabbitMQ::Simple->new(
timeout => 0, # default
...
Optional list of queue-to-exchange bindings.
See "bind_queue" in AnyEvent::RabbitMQ::Channel for details.
failure_cb
my $rmq = AnyEvent::RabbitMQ::Simple->new(
failure_cb => sub {
my ($event, $details, $why) = @_;
if ( ref $why ) {
my $method_frame = $why->method_frame;
$why = $method_frame->reply_text;
}
$loop->croak("[ERROR] $event($details): $why" );
},
...
);
Required catch-all error handling callback. The value of $event is one
of:
ConnectOnFailure
example/simplest.pl view on Meta::CPAN
use warnings;
use AnyEvent::RabbitMQ::Simple;
# create main loop
my $loop = AE::cv;
my $rmq = AnyEvent::RabbitMQ::Simple->new(
failure_cb => sub {
my ($event, $details, $why) = @_;
if ( ref $why ) {
my $method_frame = $why->method_frame;
$why = $method_frame->reply_text;
}
$loop->croak("[ERROR] $event($details): $why" );
},
);
# publisher timer
my $t;
# connect and set up channel
my $conn = $rmq->connect();
example/simplest.pl view on Meta::CPAN
# consumes from requested queue
sub consume {
my ($channel, $queue) = @_;
my $consumer_tag;
$channel->consume(
queue => $queue,
on_success => sub {
my $frame = shift;
$consumer_tag = $frame->method_frame->consumer_tag;
print "************* consuming from $queue with $consumer_tag\n";
},
on_consume => sub {
my $res = shift;
my $body = $res->{body}->payload;
print "+++++++++++++ consumed($queue): $body\n";
},
on_failure => sub {
print "************* failed to consume($queue)\n";
}
example/synopsis.pl view on Meta::CPAN
vhost => '/',
timeout => 1,
tls => 0,
verbose => 0,
confirm_publish => 1,
prefetch_count => 10,
failure_cb => sub {
my ($event, $details, $why) = @_;
if ( ref $why ) {
my $method_frame = $why->method_frame;
$why = $method_frame->reply_text;
}
$loop->croak("[ERROR] $event($details): $why" );
},
# routing layout
# [========== exchanges ===================] [===== queues ==============]
# [ (type/routing key) ] [ (routing key) ]
# logger ----------> stats --------------> stats-logs
# |(fanout) (direct) (mail.stats)
# | |
example/synopsis.pl view on Meta::CPAN
# consumes from requested queue
sub consume {
my ($channel, $queue) = @_;
my $consumer_tag;
$channel->consume(
queue => $queue,
no_ack => 0,
on_success => sub {
my $frame = shift;
$consumer_tag = $frame->method_frame->consumer_tag;
print "************* consuming from $queue with $consumer_tag\n";
},
on_consume => sub {
my $res = shift;
my $body = $res->{body}->payload;
print "+++++++++++++ consumed($queue): $body\n";
$channel->ack(
delivery_tag => $res->{deliver}->method_frame->delivery_tag
);
},
on_failure => sub {
print "************* failed to consume($queue)\n";
}
);
}
# randomly generates routing key and message body
sub publish {
lib/AnyEvent/RabbitMQ/Simple.pm view on Meta::CPAN
my ($self, $cv, $name, %options) = @_;
$self->_guard->{flow}->begin;
$cv->begin;
$self->_guard->{channel}->declare_queue(
%options,
queue => $name || '',
on_success => sub {
my $method = shift;
if ( ! $name ) {
$self->gen_queue( $method->method_frame->queue );
}
$self->_guard->{flow}->end;
$cv->end;
},
on_failure => sub {
$self->_handle_error( 'DeclareQueueOnFailure', "queue:$name", @_ );
$cv->send;
},
);
}
lib/AnyEvent/RabbitMQ/Simple.pm view on Meta::CPAN
vhost => '/',
timeout => 1,
tls => 0,
verbose => 0,
confirm_publish => 1,
prefetch_count => 10,
failure_cb => sub {
my ($event, $details, $why) = @_;
if ( ref $why ) {
my $method_frame = $why->method_frame;
$why = $method_frame->reply_text;
}
$loop->croak("[ERROR] $event($details): $why" );
},
# routing layout
# [========== exchanges ===================] [===== queues ==============]
# [ (type/routing key) ] [ (routing key) ]
# logger ----------> stats --------------> stats-logs
# |(fanout) (direct) (mail.stats)
# | |
lib/AnyEvent/RabbitMQ/Simple.pm view on Meta::CPAN
# consumes from requested queue
sub consume {
my ($channel, $queue) = @_;
my $consumer_tag;
$channel->consume(
queue => $queue,
no_ack => 0,
on_success => sub {
my $frame = shift;
$consumer_tag = $frame->method_frame->consumer_tag;
print "************* consuming from $queue with $consumer_tag\n";
},
on_consume => sub {
my $res = shift;
my $body = $res->{body}->payload;
print "+++++++++++++ consumed($queue): $body\n";
$channel->ack(
delivery_tag => $res->{deliver}->method_frame->delivery_tag
);
},
on_failure => sub {
print "************* failed to consume($queue)\n";
}
);
}
# randomly generates routing key and message body
sub publish {
lib/AnyEvent/RabbitMQ/Simple.pm view on Meta::CPAN
);
Password.
=head3 tune
my $rmq = AnyEvent::RabbitMQ::Simple->new(
tune => {
heartbeat => $connection_heartbeat,
channel_max => $max_channel_number,
frame_max => $max_frame_size
},
...
);
Optional connection tuning options.
=head3 timeout
my $rmq = AnyEvent::RabbitMQ::Simple->new(
timeout => 0, # default
lib/AnyEvent/RabbitMQ/Simple.pm view on Meta::CPAN
Optional list of queue-to-exchange bindings.
See L<AnyEvent::RabbitMQ::Channel/"bind_queue"> for details.
=head3 failure_cb
my $rmq = AnyEvent::RabbitMQ::Simple->new(
failure_cb => sub {
my ($event, $details, $why) = @_;
if ( ref $why ) {
my $method_frame = $why->method_frame;
$why = $method_frame->reply_text;
}
$loop->croak("[ERROR] $event($details): $why" );
},
...
);
Required catch-all error handling callback. The value of C<$event> is one of:
=over 4
( run in 1.940 second using v1.01-cache-2.11-cpan-df04353d9ac )