AnyEvent-RabbitMQ
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
package AnyEvent::RabbitMQ::Channel;
use strict;
use warnings;
use AnyEvent::RabbitMQ::LocalQueue;
use AnyEvent;
use Scalar::Util qw( looks_like_number weaken );
use Devel::GlobalDestruction;
use Carp qw(croak cluck);
use POSIX qw(ceil);
BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }
our $VERSION = '1.22'; # VERSION
use namespace::clean;
use constant {
_ST_CLOSED => 0,
_ST_OPENING => 1,
_ST_OPEN => 2,
};
sub new {
my $class = shift;
my $self = bless {
on_close => sub {},
@_, # id, connection, on_return, on_close, on_inactive, on_active
_queue => AnyEvent::RabbitMQ::LocalQueue->new,
_content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
}, $class;
weaken($self->{connection});
return $self->_reset;
}
sub _reset {
my $self = shift;
my %a = (
_state => _ST_CLOSED,
_is_active => 0,
_is_confirm => 0,
_publish_tag => 0,
_publish_cbs => {}, # values: [on_ack, on_nack, on_return]
_consumer_cbs => {}, # values: [on_consume, on_cancel...]
);
@$self{keys %a} = values %a;
return $self;
}
sub id {
my $self = shift;
return $self->{id};
}
sub is_open {
my $self = shift;
return $self->{_state} == _ST_OPEN;
}
sub is_active {
my $self = shift;
return $self->{_is_active};
}
sub is_confirm {
my $self = shift;
return $self->{_is_confirm};
}
sub queue {
my $self = shift;
return $self->{_queue};
}
sub open {
my $self = shift;
my %args = @_;
if ($self->{_state} != _ST_CLOSED) {
$args{on_failure}->('Channel has already been opened');
return $self;
}
$self->{_state} = _ST_OPENING;
$self->{connection}->_push_write_and_read(
'Channel::Open', {}, 'Channel::OpenOk',
sub {
$self->{_state} = _ST_OPEN;
$self->{_is_active} = 1;
$args{on_success}->($self);
},
sub {
$self->{_state} = _ST_CLOSED;
$args{on_failure}->($self);
},
$self->{id},
);
return $self;
}
sub close {
my $self = shift;
my $connection = $self->{connection}
or return;
my %args = $connection->_set_cbs(@_);
# If open in in progess, wait for it; 1s arbitrary timing.
weaken(my $wself = $self);
my $t; $t = AE::timer 0, 1, sub {
(my $self = $wself) or undef $t, return;
return if $self->{_state} == _ST_OPENING;
# No more tests are required
undef $t;
# Double close is OK
if ($self->{_state} == _ST_CLOSED) {
$args{on_success}->($self);
return;
}
$connection->_push_write(
$self->_close_frame,
$self->{id},
);
# The spec says that after a party sends Channel::Close, it MUST
# discard all frames for that channel. So this channel is dead
# immediately.
$self->_closed();
$connection->_push_read_and_valid(
'Channel::CloseOk',
sub {
$args{on_success}->($self);
$self->_orphan();
},
sub {
$args{on_failure}->(@_);
$self->_orphan();
},
$self->{id},
);
};
return $self;
}
sub _closed {
my $self = shift;
my ($frame,) = @_;
$frame ||= $self->_close_frame();
return if $self->{_state} == _ST_CLOSED;
$self->{_state} = _ST_CLOSED;
# Perform callbacks for all outstanding commands
$self->{_queue}->_flush($frame);
$self->{_content_queue}->_flush($frame);
# Fake nacks of all outstanding publishes
$_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} };
# Report cancelation of all outstanding consumes
my @tags = keys %{ $self->{_consumer_cbs} };
$self->_canceled($_, $frame) for @tags;
# Report close to on_close callback
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
$self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
},
$failure_cb,
$self->{id},
);
return $self;
}
sub ack {
my $self = shift;
my %args = @_;
return $self if !$self->_check_open(sub {});
$self->{connection}->_push_write(
Net::AMQP::Protocol::Basic::Ack->new(
delivery_tag => 0,
multiple => (
defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1
),
%args,
),
$self->{id},
);
return $self;
}
sub qos {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Basic::Qos',
{
prefetch_count => 1,
prefetch_size => 0,
global => 0,
%args,
},
'Basic::QosOk',
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
sub confirm {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
return $self if !$self->_check_version(0, 9, $failure_cb);
weaken(my $wself = $self);
$self->{connection}->_push_write_and_read(
'Confirm::Select',
{
%args,
nowait => 0, # FIXME
},
'Confirm::SelectOk',
sub {
my $me = $wself or return;
$me->{_is_confirm} = 1;
$cb->();
},
$failure_cb,
$self->{id},
);
return $self;
}
sub recover {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open(sub {});
$self->{connection}->_push_write(
Net::AMQP::Protocol::Basic::Recover->new(
requeue => 1,
%args,
),
$self->{id},
);
if (!$args{nowait} && $self->_check_version(0, 9)) {
$self->{connection}->_push_read_and_valid(
'Basic::RecoverOk',
$cb,
$failure_cb,
$self->{id},
);
}
else {
$cb->();
}
return $self;
}
sub reject {
my $self = shift;
my %args = @_;
return $self if !$self->_check_open( sub { } );
$self->{connection}->_push_write(
Net::AMQP::Protocol::Basic::Reject->new(
delivery_tag => 0,
requeue => 0,
%args,
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
$self->{id},
);
return $self;
}
sub rollback_tx {
my $self = shift;
my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Tx::Rollback', {}, 'Tx::RollbackOk',
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
sub push_queue_or_consume {
my $self = shift;
my ($frame, $failure_cb,) = @_;
# Note: the spec says that after a party sends Channel::Close, it MUST
# discard all frames for that channel other than Close and CloseOk.
if ($frame->isa('Net::AMQP::Frame::Method')) {
my $method_frame = $frame->method_frame;
if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
$self->{connection}->_push_write(
Net::AMQP::Protocol::Channel::CloseOk->new(),
$self->{id},
);
$self->_closed($frame);
$self->_orphan();
return $self;
} elsif ($self->{_state} != _ST_OPEN) {
if ($method_frame->isa('Net::AMQP::Protocol::Channel::OpenOk') ||
$method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) {
$self->{_queue}->push($frame);
}
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag};
my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {};
$self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') ||
$method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) {
# CancelOk means we asked for a cancel.
# Cancel means queue was deleted; it is not AMQP, but RMQ supports it.
if (!$self->_canceled($method_frame->consumer_tag, $frame)
&& $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
$failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
}
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
weaken(my $wself = $self);
my $cb = sub {
my $ret = shift;
my $me = $wself or return;
my $headers = $ret->{header}->headers || {};
my $onret_cb;
if (defined(my $tag = $headers->{_ar_return})) {
my $cbs = $me->{_publish_cbs}->{$tag};
$onret_cb = $cbs->[2] if $cbs;
}
$onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {}; # oh well
$onret_cb->($frame);
};
$self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') ||
$method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) {
(my $resp = ref($method_frame)) =~ s/.*:://;
my $cbs;
if (!$self->{_is_confirm}) {
$failure_cb->("Received $resp when not in confirm mode");
}
else {
my @tags;
if ($method_frame->{multiple}) {
@tags = sort { $a <=> $b }
grep { $_ <= $method_frame->{delivery_tag} }
keys %{$self->{_publish_cbs}};
}
else {
@tags = ($method_frame->{delivery_tag});
}
my $cbi = ($resp eq 'Ack') ? 0 : 1;
for my $tag (@tags) {
my $cbs;
if (not $cbs = delete $self->{_publish_cbs}->{$tag}) {
$failure_cb->("Received $resp of unknown delivery tag $tag");
}
elsif ($cbs->[$cbi]) {
$cbs->[$cbi]->($frame);
}
}
}
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
$self->{_is_active} = $method_frame->active;
$self->{connection}->_push_write(
Net::AMQP::Protocol::Channel::FlowOk->new(
active => $method_frame->active,
),
$self->{id},
);
my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive';
my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {};
$cb->($frame);
return $self;
}
$self->{_queue}->push($frame);
} else {
$self->{_content_queue}->push($frame);
}
return $self;
}
sub _push_read_header_and_body {
my $self = shift;
my ($type, $frame, $cb, $failure_cb,) = @_;
my $response = {$type => $frame};
my $body_size = 0;
my $body_payload = "";
weaken(my $wcontq = $self->{_content_queue});
my $w_body_frame;
my $body_frame = sub {
my $frame = shift;
return $failure_cb->('Received data is not body frame')
if !$frame->isa('Net::AMQP::Frame::Body');
$body_payload .= $frame->payload;
if (length($body_payload) < $body_size) {
# More to come
my $contq = $wcontq or return;
$contq->get($w_body_frame);
}
else {
$frame->payload($body_payload);
$response->{body} = $frame;
$cb->($response);
}
};
$w_body_frame = $body_frame;
weaken($w_body_frame);
$self->{_content_queue}->get(sub{
my $frame = shift;
return $failure_cb->('Received data is not header frame')
if !$frame->isa('Net::AMQP::Frame::Header');
my $header_frame = $frame->header_frame;
return $failure_cb->(
'Header is not Protocol::Basic::ContentHeader'
. 'Header was ' . ref $header_frame
) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
$response->{header} = $header_frame;
$body_size = $frame->body_size;
if ( $body_size ) {
my $contq = $wcontq or return;
$contq->get($body_frame);
} else {
$response->{body} = undef;
$cb->($response);
}
});
return $self;
}
sub _delete_cbs {
my $self = shift;
my %args = @_;
my $cb = delete $args{on_success} || sub {};
my $failure_cb = delete $args{on_failure} || sub {die @_};
return $cb, $failure_cb, %args;
}
sub _check_open {
my $self = shift;
my ($failure_cb) = @_;
return 1 if $self->is_open();
$failure_cb->('Channel has already been closed');
return 0;
}
sub _check_version {
my $self = shift;
my ($major, $minor, $failure_cb) = @_;
my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR;
my $amin = $Net::AMQP::Protocol::VERSION_MINOR;
return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;
$failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
return 0;
}
( run in 1.606 second using v1.01-cache-2.11-cpan-ceb78f64989 )