AnyEvent-RabbitMQ-Simple
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/Simple.pm view on Meta::CPAN
# ABSTRACT: Easy to use asynchronous AMQP client
use strict;
use warnings;
package AnyEvent::RabbitMQ::Simple;
our $AUTHORITY = 'cpan:AJGB';
$AnyEvent::RabbitMQ::Simple::VERSION = '0.02';
use AnyEvent;
use AnyEvent::RabbitMQ;
use Moo;
has 'host' => (
is => 'ro',
default => '127.0.0.1',
);
has 'port' => (
is => 'ro',
default => 5672,
);
has 'vhost' => (
is => 'ro',
default => '/',
);
has 'user' => (
is => 'ro',
default => 'guest'
);
has 'pass' => (
is => 'ro',
default => '',
);
has 'failure_cb' => (
is => 'ro',
required => 1,
);
has [qw( tls tune )] => (
is => 'ro',
);
has $_ => (
is => 'ro',
predicate => "_has_$_",
) for qw(exchange exchanges queue queues bind_exchanges bind_queues);
has 'timeout' => (
is => 'ro',
default => 0,
);
has 'prefetch_count' => (
is => 'ro',
default => 0,
);
has 'confirm_publish' => (
is => 'ro',
lib/AnyEvent/RabbitMQ/Simple.pm view on Meta::CPAN
$cv->begin( sub { shift->send(1) } );
if ( $self->_has_bind_queues ) {
my @pairs;
my $bind_queues = $self->bind_queues;
if ( ref $bind_queues eq 'ARRAY' ) {
for my $pair ( @{ $bind_queues || [] } ) {
push @pairs, _make_pair($pair);
}
} elsif ( ref $bind_queues eq 'HASH' ) {
push @pairs, _make_pair($bind_queues);
}
for ( my $i = 0; $i < scalar @pairs; $i += 2 ) {
my $queue = $pairs[$i];
my ($exchange, $routing_key) = @{ $pairs[$i+1] };
my %opts;
if ( $routing_key ) {
$opts{routing_key} = $routing_key;
}
$self->_bind_queue($cv, $queue, $exchange, %opts);
}
}
$cv->end;
}
sub _bind_queue {
my ($self, $cv, $queue, $exchange, %options) = @_;
$self->_guard->{flow}->begin;
$cv->begin;
$self->_guard->{channel}->bind_queue(
%options,
queue => $queue,
exchange => $exchange,
on_success => sub {
$self->_guard->{flow}->end;
$cv->end;
},
on_failure => sub {
$self->_handle_error( 'BindQueueOnFailure', "queue:$queue, exchange:$exchange", @_ );
$cv->send;
},
);
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
AnyEvent::RabbitMQ::Simple - Easy to use asynchronous AMQP client
=head1 VERSION
version 0.02
=head1 SYNOPSIS
use strict;
use warnings;
use AnyEvent::RabbitMQ::Simple;
# create main loop
my $loop = AE::cv;
my $rmq = AnyEvent::RabbitMQ::Simple->new(
host => '127.0.0.1',
port => 5672,
user => 'username',
pass => 'password',
vhost => '/',
timeout => 1,
tls => 0,
verbose => 0,
confirm_publish => 1,
prefetch_count => 10,
failure_cb => sub {
my ($event, $details, $why) = @_;
if ( ref $why ) {
my $method_frame = $why->method_frame;
$why = $method_frame->reply_text;
}
$loop->croak("[ERROR] $event($details): $why" );
},
# routing layout
# [========== exchanges ===================] [===== queues ==============]
# [ (type/routing key) ] [ (routing key) ]
# logger ----------> stats --------------> stats-logs
# |(fanout) (direct) (mail.stats)
# | |
# | | \----------> errors -------------> ftp-error-logs
# | | | (topic:*.error.#) (ftp.error.#)
# | | |
# | | \-------------------> mail-error-logs
# | | (mail.error.#)
# | |
# | \-----------> info ---------------> info-logs
# | (topic:*.info.#) (*.info.#)
# |
# \------------------------------------> debug-queue
# declare exchanges
exchanges => [
'logger' => {
durable => 0,
type => 'fanout',
internal => 0,
auto_delete => 1,
( run in 2.942 seconds using v1.01-cache-2.11-cpan-98e64b0badf )