view release on metacpan or search on metacpan
1.13 Thu May 2 16:48:58 PDT 2013
- Require Net::AMQP 0.06 to:
+ Get consume cancel notifications (e.g. queue deletion)
+ Properly encode user-provided header strings that look like numbers
- Fix race between server-sent and client-sent cancellation.
- Expect server to send heartbeats as promised. If it doesn't, go President
Madagasgar on its ass and SHUT DOWN EVERYTHING.
- Rearrange many things and weaken many references to eliminate bad circular
references. Some circular refs are actually good, though; leave those.
- Allow customized client_properties on connection.
- Make test output clearer.
1.12 Thu Apr 11 20:45:00 2013
- Allow AMQP client to adjust tuning, e.g. heartbeat
(Chip Salzenberg)
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
package AnyEvent::RabbitMQ;
use strict;
use warnings;
use Carp qw(confess croak);
use Scalar::Util qw(refaddr);
use List::MoreUtils qw(none);
use Devel::GlobalDestruction;
use File::ShareDir;
use Readonly;
use Scalar::Util qw/ weaken /;
require Data::Dumper;
sub Dumper {
local $Data::Dumper::Terse = 1;
local $Data::Dumper::Indent = 1;
local $Data::Dumper::Useqq = 1;
local $Data::Dumper::Deparse = 1;
local $Data::Dumper::Quotekeys = 0;
local $Data::Dumper::Sortkeys = 1;
&Data::Dumper::Dumper
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
for (qw/ host port /) {
$args{$_} or return $args{on_failure}->("No $_ passed to connect");
}
if ($self->{verbose}) {
warn 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
}
$self->{_state} = _ST_OPENING;
weaken(my $weak_self = $self);
my $conn; $conn = AnyEvent::Socket::tcp_connect(
$args{host},
$args{port},
sub {
undef $conn;
my $self = $weak_self or return;
my $fh = shift;
unless ($fh) {
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
sub server_properties {
return shift->{_server_properties};
}
sub _read_loop {
my ($self, $close_cb, $failure_cb,) = @_;
return if !defined $self->{_handle}; # called on_error
weaken(my $weak_self = $self);
$self->{_handle}->push_read(chunk => 8, sub {
my $self = $weak_self or return;
my $data = $_[1];
my $stack = $_[1];
if (length($data) <= 7) {
$failure_cb->('Broken data was received');
@_ = ($self, $close_cb, $failure_cb,);
goto &_read_loop;
}
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
$args{on_failure},
);
return $self;
}
sub _tune {
my $self = shift;
my %args = @_;
weaken(my $weak_self = $self);
$self->_push_read_and_valid(
'Connection::Tune',
sub {
my $self = $weak_self or return;
my $frame = shift;
my %tune;
foreach (qw( channel_max frame_max heartbeat )) {
my $client = $args{tune}{$_} || 0;
my $server = $frame->method_frame->$_ || 0;
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
return $self;
}
sub _start_heartbeat {
my ($self, $interval, %args,) = @_;
my $close_cb = $args{on_close};
my $failure_cb = $args{on_read_failure};
my $last_recv = 0;
my $idle_cycles = 0;
weaken(my $weak_self = $self);
my $timer_cb = sub {
my $self = $weak_self or return;
if ($self->{_heartbeat_recv} != $last_recv) {
$last_recv = $self->{_heartbeat_recv};
$idle_cycles = 0;
}
elsif (++$idle_cycles > 1) {
delete $self->{_heartbeat_timer};
$failure_cb->("Heartbeat lost");
$self->_server_closed($close_cb, "Heartbeat lost");
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
package AnyEvent::RabbitMQ::Channel;
use strict;
use warnings;
use AnyEvent::RabbitMQ::LocalQueue;
use AnyEvent;
use Scalar::Util qw( looks_like_number weaken );
use Devel::GlobalDestruction;
use Carp qw(croak cluck);
use POSIX qw(ceil);
BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }
our $VERSION = '1.22'; # VERSION
use namespace::clean;
use constant {
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
sub new {
my $class = shift;
my $self = bless {
on_close => sub {},
@_, # id, connection, on_return, on_close, on_inactive, on_active
_queue => AnyEvent::RabbitMQ::LocalQueue->new,
_content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
}, $class;
weaken($self->{connection});
return $self->_reset;
}
sub _reset {
my $self = shift;
my %a = (
_state => _ST_CLOSED,
_is_active => 0,
_is_confirm => 0,
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
}
sub close {
my $self = shift;
my $connection = $self->{connection}
or return;
my %args = $connection->_set_cbs(@_);
# If open in in progess, wait for it; 1s arbitrary timing.
weaken(my $wself = $self);
my $t; $t = AE::timer 0, 1, sub {
(my $self = $wself) or undef $t, return;
return if $self->{_state} == _ST_OPENING;
# No more tests are required
undef $t;
# Double close is OK
if ($self->{_state} == _ST_CLOSED) {
$args{on_success}->($self);
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
return $self;
}
sub confirm {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
return $self if !$self->_check_version(0, 9, $failure_cb);
weaken(my $wself = $self);
$self->{connection}->_push_write_and_read(
'Confirm::Select',
{
%args,
nowait => 0, # FIXME
},
'Confirm::SelectOk',
sub {
my $me = $wself or return;
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') ||
$method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) {
# CancelOk means we asked for a cancel.
# Cancel means queue was deleted; it is not AMQP, but RMQ supports it.
if (!$self->_canceled($method_frame->consumer_tag, $frame)
&& $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
$failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
}
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
weaken(my $wself = $self);
my $cb = sub {
my $ret = shift;
my $me = $wself or return;
my $headers = $ret->{header}->headers || {};
my $onret_cb;
if (defined(my $tag = $headers->{_ar_return})) {
my $cbs = $me->{_publish_cbs}->{$tag};
$onret_cb = $cbs->[2] if $cbs;
}
$onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {}; # oh well
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
return $self;
}
sub _push_read_header_and_body {
my $self = shift;
my ($type, $frame, $cb, $failure_cb,) = @_;
my $response = {$type => $frame};
my $body_size = 0;
my $body_payload = "";
weaken(my $wcontq = $self->{_content_queue});
my $w_body_frame;
my $body_frame = sub {
my $frame = shift;
return $failure_cb->('Received data is not body frame')
if !$frame->isa('Net::AMQP::Frame::Body');
$body_payload .= $frame->payload;
if (length($body_payload) < $body_size) {
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
my $contq = $wcontq or return;
$contq->get($w_body_frame);
}
else {
$frame->payload($body_payload);
$response->{body} = $frame;
$cb->($response);
}
};
$w_body_frame = $body_frame;
weaken($w_body_frame);
$self->{_content_queue}->get(sub{
my $frame = shift;
return $failure_cb->('Received data is not header frame')
if !$frame->isa('Net::AMQP::Frame::Header');
my $header_frame = $frame->header_frame;
return $failure_cb->(
'Header is not Protocol::Basic::ContentHeader'