AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

package AnyEvent::RabbitMQ::Channel;

use strict;
use warnings;

use AnyEvent::RabbitMQ::LocalQueue;
use AnyEvent;
use Scalar::Util qw( looks_like_number weaken );
use Devel::GlobalDestruction;
use Carp qw(croak cluck);
use POSIX qw(ceil);
BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }

our $VERSION = '1.22'; # VERSION

use namespace::clean;

use constant {
    _ST_CLOSED => 0,
    _ST_OPENING => 1,
    _ST_OPEN => 2,
};

sub new {
    my $class = shift;

    my $self = bless {
        on_close       => sub {},
        @_,    # id, connection, on_return, on_close, on_inactive, on_active
        _queue         => AnyEvent::RabbitMQ::LocalQueue->new,
        _content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
    }, $class;
    weaken($self->{connection});
    return $self->_reset;
}

sub _reset {
    my $self = shift;

    my %a = (
        _state         => _ST_CLOSED,
        _is_active     => 0,
        _is_confirm    => 0,
        _publish_tag   => 0,
        _publish_cbs   => {},  # values: [on_ack, on_nack, on_return]
        _consumer_cbs  => {},  # values: [on_consume, on_cancel...]
    );
    @$self{keys %a} = values %a;

    return $self;
}

sub id {
    my $self = shift;
    return $self->{id};
}

sub is_open {
    my $self = shift;
    return $self->{_state} == _ST_OPEN;
}

sub is_active {
    my $self = shift;
    return $self->{_is_active};
}

sub is_confirm {

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

    my $header_args = delete $args{header};
    my $body        = delete $args{body};
    my $ack_cb      = delete $args{on_ack};
    my $nack_cb     = delete $args{on_nack};
    my $return_cb   = delete $args{on_return};

    defined($header_args) or $header_args = {};
    defined($body) or $body = '';
    if ( defined($ack_cb) or defined($nack_cb) or defined($return_cb) ) {
        cluck "Can't set on_ack/on_nack/on_return callback when not in confirm mode"
            unless $self->{_is_confirm};
    }

    my $tag;
    if ($self->{_is_confirm}) {
        # yeah, delivery tags in acks are sequential.  see Java client
        $tag = ++$self->{_publish_tag};
        if ($return_cb) {
            $header_args = { %$header_args };
            $header_args->{headers}->{_ar_return} = $tag;  # just reuse the same value, why not
        }
        $self->{_publish_cbs}->{$tag} = [$ack_cb, $nack_cb, $return_cb];
    }

    $self->_publish(
        %args,
    )->_header(
        $header_args, $body,
    )->_body(
        $body,
    );

    return $self;
}

sub _publish {
    my $self = shift;
    my %args = @_;

    $self->{connection}->_push_write(
        Net::AMQP::Protocol::Basic::Publish->new(
            exchange  => '',
            mandatory => 0,
            immediate => 0,
            %args, # routing_key
            ticket    => 0,
        ),
        $self->{id},
    );

    return $self;
}

sub _header {
    my ($self, $args, $body) = @_;

    my $weight = delete $args->{weight} || 0;

    # user-provided message headers must be strings.  protect values that look like numbers.
    my $headers = $args->{headers} || {};
    my @prot = grep { my $v = $headers->{$_}; !ref($v) && looks_like_number($v) } keys %$headers;
    if (@prot) {
        $headers = {
            %$headers,
            map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot
        };
    }

    $self->{connection}->_push_write(
        Net::AMQP::Frame::Header->new(
            weight       => $weight,
            body_size    => length($body),
            header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
                content_type     => 'application/octet-stream',
                content_encoding => undef,
                delivery_mode    => 1,
                priority         => 1,
                correlation_id   => undef,
                expiration       => undef,
                message_id       => undef,
                timestamp        => time,
                type             => undef,
                user_id          => $self->{connection}->login_user,
                app_id           => undef,
                cluster_id       => undef,
                %$args,
                headers          => $headers,
            ),
        ),
        $self->{id},
    );

    return $self;
}

sub _body {
    my ($self, $body,) = @_;

    my $body_max = $self->{connection}->{_body_max} || length $body;

    # chunk up body into segments measured by $frame_max
    while (length $body) {
        $self->{connection}->_push_write(
            Net::AMQP::Frame::Body->new(
                payload => substr($body, 0, $body_max, '')),
            $self->{id}
        );
    }

    return $self;
}

sub consume {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    my $consumer_cb = delete $args{on_consume}  || sub {};
    my $cancel_cb   = delete $args{on_cancel}   || sub {};
    my $no_ack      = delete $args{no_ack}      // 1;



( run in 1.069 second using v1.01-cache-2.11-cpan-39bf76dae61 )