AnyEvent-RabbitMQ
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
package AnyEvent::RabbitMQ;
use strict;
use warnings;
use Carp qw(confess croak);
use Scalar::Util qw(refaddr);
use List::MoreUtils qw(none);
use Devel::GlobalDestruction;
use File::ShareDir;
use Readonly;
use Scalar::Util qw/ weaken /;
require Data::Dumper;
sub Dumper {
local $Data::Dumper::Terse = 1;
local $Data::Dumper::Indent = 1;
local $Data::Dumper::Useqq = 1;
local $Data::Dumper::Deparse = 1;
local $Data::Dumper::Quotekeys = 0;
local $Data::Dumper::Sortkeys = 1;
&Data::Dumper::Dumper
}
use AnyEvent::Handle;
use AnyEvent::Socket;
use Net::AMQP 0.06;
use Net::AMQP::Common qw(:all);
use AnyEvent::RabbitMQ::Channel;
use AnyEvent::RabbitMQ::LocalQueue;
use namespace::clean;
our $VERSION = '1.22'; # VERSION
use constant {
_ST_CLOSED => 0,
_ST_OPENING => 1,
_ST_OPEN => 2,
_ST_CLOSING => 3,
};
Readonly my $DEFAULT_AMQP_SPEC
=> File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-9-1.xml';
Readonly my $DEFAULT_CHANNEL_MAX => 2**16-1;
sub new {
my $class = shift;
return bless {
verbose => 0,
@_,
_state => _ST_CLOSED,
_queue => AnyEvent::RabbitMQ::LocalQueue->new,
_last_chan_id => 0,
_channels => {},
_login_user => '',
_server_properties => {},
_frame_max => undef,
_body_max => undef,
_channel_max => undef,
}, $class;
}
sub verbose {
my $self = shift;
@_ ? ($self->{verbose} = shift) : $self->{verbose}
}
sub is_open {
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
}
sub channels {
my $self = shift;
return $self->{_channels};
}
sub _delete_channel {
my $self = shift;
my ($channel,) = @_;
my $c = $self->{_channels}->{$channel->id};
if (defined($c) && refaddr($c) == refaddr($channel)) {
delete $self->{_channels}->{$channel->id};
return 1;
}
return 0;
}
sub login_user {
my $self = shift;
return $self->{_login_user};
}
my $_loaded_spec;
sub load_xml_spec {
my $self = shift;
my ($spec) = @_;
$spec ||= $DEFAULT_AMQP_SPEC;
if ($_loaded_spec && $_loaded_spec ne $spec) {
croak("Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible");
}
elsif (!$_loaded_spec) {
Net::AMQP::Protocol->load_xml_spec($_loaded_spec = $spec);
}
return $self;
}
sub connect {
my $self = shift;
my %args = $self->_set_cbs(@_);
if ($self->{_state} != _ST_CLOSED) {
$args{on_failure}->('Connection has already been opened');
return $self;
}
$args{on_close} ||= sub {};
$args{on_read_failure} ||= sub {warn @_, "\n"};
$args{timeout} ||= 0;
for (qw/ host port /) {
$args{$_} or return $args{on_failure}->("No $_ passed to connect");
}
if ($self->{verbose}) {
warn 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
}
$self->{_state} = _ST_OPENING;
weaken(my $weak_self = $self);
my $conn; $conn = AnyEvent::Socket::tcp_connect(
$args{host},
$args{port},
sub {
undef $conn;
my $self = $weak_self or return;
my $fh = shift;
unless ($fh) {
$self->{_state} = _ST_CLOSED;
return $args{on_failure}->(
sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!)
);
}
my $close_cb = $args{on_close};
my $failure_cb = $args{on_failure};
$self->{_handle} = AnyEvent::Handle->new(
fh => $fh,
on_error => sub {
my ($handle, $fatal, $message) = @_;
my $self = $weak_self or return;
if ($self->is_open) {
$self->_server_closed($close_cb, $message);
}
else {
$failure_cb->(@_);
}
},
on_drain => sub {
my ($handle) = @_;
my $self = $weak_self or return;
$self->{drain_condvar}->send
if exists $self->{drain_condvar};
},
peername => $args{host},
$args{tls} ? (tls => 'connect') : (),
$args{tls_ctx} ? ( tls_ctx => $args{tls_ctx} ) : (),
$args{nodelay} ? ( nodelay => $args{nodelay} ) : (),
);
$self->_read_loop($args{on_close}, $args{on_read_failure});
$self->_start(%args,);
},
sub {
return $args{timeout};
},
);
return $self;
}
sub server_properties {
return shift->{_server_properties};
}
sub _read_loop {
my ($self, $close_cb, $failure_cb,) = @_;
return if !defined $self->{_handle}; # called on_error
weaken(my $weak_self = $self);
$self->{_handle}->push_read(chunk => 8, sub {
my $self = $weak_self or return;
my $data = $_[1];
my $stack = $_[1];
if (length($data) <= 7) {
$failure_cb->('Broken data was received');
@_ = ($self, $close_cb, $failure_cb,);
goto &_read_loop;
}
my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
if (!defined $type_id || !defined $channel || !defined $length) {
$failure_cb->('Broken data was received');
@_ = ($self, $close_cb, $failure_cb,);
goto &_read_loop;
}
$self->{_handle}->push_read(chunk => $length, sub {
my $self = $weak_self or return;
$stack .= $_[1];
my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
$self->{_heartbeat_recv} = time if $self->{_heartbeat_timer};
if ($self->{verbose}) {
warn '[C] <-- [S] ', Dumper($frame),
'-----------', "\n";
}
my $id = $frame->channel;
if (0 == $id) {
if ($frame->type_id == 8) {
# Heartbeat, no action needs taking.
}
else {
return unless $self->_check_close_and_clean($frame, $close_cb,);
$self->{_queue}->push($frame);
}
} else {
my $channel = $self->{_channels}->{$id};
if (defined $channel) {
$channel->push_queue_or_consume($frame, $failure_cb);
} else {
$failure_cb->('Unknown channel id: ' . $frame->channel);
}
}
@_ = ($self, $close_cb, $failure_cb,);
goto &_read_loop;
});
});
return $self;
}
sub _check_close_and_clean {
my $self = shift;
my ($frame, $close_cb,) = @_;
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
sub _start {
my $self = shift;
my %args = @_;
if ($self->{verbose}) {
warn 'post header', "\n";
}
$self->{_handle}->push_write(Net::AMQP::Protocol->header);
$self->_push_read_and_valid(
'Connection::Start',
sub {
my $frame = shift;
my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
if none {$_ eq 'AMQPLAIN'} @mechanisms;
my @locales = split /\s/, $frame->method_frame->locales;
return $args{on_failure}->('en_US is not found in locales')
if none {$_ eq 'en_US'} @locales;
$self->{_server_properties} = $frame->method_frame->server_properties;
$self->_push_write(
Net::AMQP::Protocol::Connection::StartOk->new(
client_properties => {
platform => 'Perl',
product => __PACKAGE__,
information => 'http://d.hatena.ne.jp/cooldaemon/',
version => Net::AMQP::Value::String->new(__PACKAGE__->VERSION),
capabilities => {
consumer_cancel_notify => Net::AMQP::Value::true,
exchange_exchange_bindings => Net::AMQP::Value::true,
},
%{ $args{client_properties} || {} },
},
mechanism => 'AMQPLAIN',
response => {
LOGIN => $args{user},
PASSWORD => $args{pass},
},
locale => 'en_US',
),
);
$self->_tune(%args,);
},
$args{on_failure},
);
return $self;
}
sub _tune {
my $self = shift;
my %args = @_;
weaken(my $weak_self = $self);
$self->_push_read_and_valid(
'Connection::Tune',
sub {
my $self = $weak_self or return;
my $frame = shift;
my %tune;
foreach (qw( channel_max frame_max heartbeat )) {
my $client = $args{tune}{$_} || 0;
my $server = $frame->method_frame->$_ || 0;
# negotiate with the server such that we cannot request a larger
# value set by the server, unless the server said unlimited
$tune{$_} = ($server == 0 or $client == 0)
? ($server > $client ? $server : $client) # max
: ($client > $server ? $server : $client); # min
}
if ($self->{_frame_max} = $tune{frame_max}) {
# calculate how big the body can actually be
$self->{_body_max} = $self->{_frame_max} - Net::AMQP::_HEADER_LEN - Net::AMQP::_FOOTER_LEN;
}
$self->{_channel_max} = $tune{channel_max} || $DEFAULT_CHANNEL_MAX;
$self->_push_write(
Net::AMQP::Protocol::Connection::TuneOk->new(%tune,)
);
if ($tune{heartbeat} > 0) {
$self->_start_heartbeat($tune{heartbeat}, %args,);
}
$self->_open(%args,);
},
$args{on_failure},
);
return $self;
}
sub _start_heartbeat {
my ($self, $interval, %args,) = @_;
my $close_cb = $args{on_close};
my $failure_cb = $args{on_read_failure};
my $last_recv = 0;
my $idle_cycles = 0;
weaken(my $weak_self = $self);
my $timer_cb = sub {
my $self = $weak_self or return;
if ($self->{_heartbeat_recv} != $last_recv) {
$last_recv = $self->{_heartbeat_recv};
$idle_cycles = 0;
}
elsif (++$idle_cycles > 1) {
delete $self->{_heartbeat_timer};
$failure_cb->("Heartbeat lost");
$self->_server_closed($close_cb, "Heartbeat lost");
return;
}
$self->_push_write(Net::AMQP::Frame::Heartbeat->new());
};
$self->{_heartbeat_recv} = time;
$self->{_heartbeat_timer} = AnyEvent->timer(
after => $interval,
interval => $interval,
cb => $timer_cb,
);
return $self;
}
sub _open {
my $self = shift;
my %args = @_;
$self->_push_write_and_read(
'Connection::Open',
{
virtual_host => $args{vhost},
insist => 1,
},
'Connection::OpenOk',
sub {
$self->{_state} = _ST_OPEN;
$self->{_login_user} = $args{user};
$args{on_success}->($self);
},
$args{on_failure},
);
return $self;
}
sub close {
return if in_global_destruction;
my $self = shift;
my %args = $self->_set_cbs(@_);
if ($self->{_state} == _ST_CLOSED) {
$args{on_success}->(@_);
return $self;
}
if ($self->{_state} != _ST_OPEN) {
$args{on_failure}->(($self->{_state} == _ST_OPENING ? "open" : "close") . " already in progress");
return $self;
}
( run in 1.304 second using v1.01-cache-2.11-cpan-5b529ec07f3 )