AnyEvent-RabbitMQ
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
package AnyEvent::RabbitMQ::Channel;
use strict;
use warnings;
use AnyEvent::RabbitMQ::LocalQueue;
use AnyEvent;
use Scalar::Util qw( looks_like_number weaken );
use Devel::GlobalDestruction;
use Carp qw(croak cluck);
use POSIX qw(ceil);
BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }
our $VERSION = '1.22'; # VERSION
use namespace::clean;
use constant {
_ST_CLOSED => 0,
_ST_OPENING => 1,
_ST_OPEN => 2,
};
sub new {
my $class = shift;
my $self = bless {
on_close => sub {},
@_, # id, connection, on_return, on_close, on_inactive, on_active
_queue => AnyEvent::RabbitMQ::LocalQueue->new,
_content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
}, $class;
weaken($self->{connection});
return $self->_reset;
}
sub _reset {
my $self = shift;
my %a = (
_state => _ST_CLOSED,
_is_active => 0,
_is_confirm => 0,
_publish_tag => 0,
_publish_cbs => {}, # values: [on_ack, on_nack, on_return]
_consumer_cbs => {}, # values: [on_consume, on_cancel...]
);
@$self{keys %a} = values %a;
return $self;
}
sub id {
my $self = shift;
return $self->{id};
}
sub is_open {
my $self = shift;
return $self->{_state} == _ST_OPEN;
}
sub is_active {
my $self = shift;
return $self->{_is_active};
}
sub is_confirm {
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
my $header_args = delete $args{header};
my $body = delete $args{body};
my $ack_cb = delete $args{on_ack};
my $nack_cb = delete $args{on_nack};
my $return_cb = delete $args{on_return};
defined($header_args) or $header_args = {};
defined($body) or $body = '';
if ( defined($ack_cb) or defined($nack_cb) or defined($return_cb) ) {
cluck "Can't set on_ack/on_nack/on_return callback when not in confirm mode"
unless $self->{_is_confirm};
}
my $tag;
if ($self->{_is_confirm}) {
# yeah, delivery tags in acks are sequential. see Java client
$tag = ++$self->{_publish_tag};
if ($return_cb) {
$header_args = { %$header_args };
$header_args->{headers}->{_ar_return} = $tag; # just reuse the same value, why not
}
$self->{_publish_cbs}->{$tag} = [$ack_cb, $nack_cb, $return_cb];
}
$self->_publish(
%args,
)->_header(
$header_args, $body,
)->_body(
$body,
);
return $self;
}
sub _publish {
my $self = shift;
my %args = @_;
$self->{connection}->_push_write(
Net::AMQP::Protocol::Basic::Publish->new(
exchange => '',
mandatory => 0,
immediate => 0,
%args, # routing_key
ticket => 0,
),
$self->{id},
);
return $self;
}
sub _header {
my ($self, $args, $body) = @_;
my $weight = delete $args->{weight} || 0;
# user-provided message headers must be strings. protect values that look like numbers.
my $headers = $args->{headers} || {};
my @prot = grep { my $v = $headers->{$_}; !ref($v) && looks_like_number($v) } keys %$headers;
if (@prot) {
$headers = {
%$headers,
map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot
};
}
$self->{connection}->_push_write(
Net::AMQP::Frame::Header->new(
weight => $weight,
body_size => length($body),
header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
content_type => 'application/octet-stream',
content_encoding => undef,
delivery_mode => 1,
priority => 1,
correlation_id => undef,
expiration => undef,
message_id => undef,
timestamp => time,
type => undef,
user_id => $self->{connection}->login_user,
app_id => undef,
cluster_id => undef,
%$args,
headers => $headers,
),
),
$self->{id},
);
return $self;
}
sub _body {
my ($self, $body,) = @_;
my $body_max = $self->{connection}->{_body_max} || length $body;
# chunk up body into segments measured by $frame_max
while (length $body) {
$self->{connection}->_push_write(
Net::AMQP::Frame::Body->new(
payload => substr($body, 0, $body_max, '')),
$self->{id}
);
}
return $self;
}
sub consume {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
my $consumer_cb = delete $args{on_consume} || sub {};
my $cancel_cb = delete $args{on_cancel} || sub {};
my $no_ack = delete $args{no_ack} // 1;
( run in 1.069 second using v1.01-cache-2.11-cpan-39bf76dae61 )