AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

package AnyEvent::RabbitMQ::Channel;

use strict;
use warnings;

use AnyEvent::RabbitMQ::LocalQueue;
use AnyEvent;
use Scalar::Util qw( looks_like_number weaken );
use Devel::GlobalDestruction;
use Carp qw(croak cluck);
use POSIX qw(ceil);
BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }

our $VERSION = '1.22'; # VERSION

use namespace::clean;

use constant {
    _ST_CLOSED => 0,
    _ST_OPENING => 1,
    _ST_OPEN => 2,
};

sub new {
    my $class = shift;

    my $self = bless {
        on_close       => sub {},
        @_,    # id, connection, on_return, on_close, on_inactive, on_active
        _queue         => AnyEvent::RabbitMQ::LocalQueue->new,
        _content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
    }, $class;
    weaken($self->{connection});
    return $self->_reset;
}

sub _reset {
    my $self = shift;

    my %a = (
        _state         => _ST_CLOSED,
        _is_active     => 0,
        _is_confirm    => 0,
        _publish_tag   => 0,
        _publish_cbs   => {},  # values: [on_ack, on_nack, on_return]
        _consumer_cbs  => {},  # values: [on_consume, on_cancel...]
    );
    @$self{keys %a} = values %a;

    return $self;
}

sub id {
    my $self = shift;
    return $self->{id};
}

sub is_open {
    my $self = shift;
    return $self->{_state} == _ST_OPEN;
}

sub is_active {
    my $self = shift;
    return $self->{_is_active};
}

sub is_confirm {
    my $self = shift;
    return $self->{_is_confirm};
}

sub queue {
    my $self = shift;
    return $self->{_queue};
}

sub open {
    my $self = shift;
    my %args = @_;

    if ($self->{_state} != _ST_CLOSED) {
        $args{on_failure}->('Channel has already been opened');
        return $self;
    }

    $self->{_state} = _ST_OPENING;

    $self->{connection}->_push_write_and_read(
        'Channel::Open', {}, 'Channel::OpenOk',
        sub {
            $self->{_state} = _ST_OPEN;
            $self->{_is_active} = 1;
            $args{on_success}->($self);
        },
        sub {
	    $self->{_state} = _ST_CLOSED;
            $args{on_failure}->($self);
        },
        $self->{id},
    );

    return $self;
}

sub close {
    my $self = shift;
    my $connection = $self->{connection}
        or return;
    my %args = $connection->_set_cbs(@_);

    # If open in in progess, wait for it; 1s arbitrary timing.

    weaken(my $wself = $self);
    my $t; $t = AE::timer 0, 1, sub {
	(my $self = $wself) or undef $t, return;
	return if $self->{_state} == _ST_OPENING;

	# No more tests are required
	undef $t;

        # Double close is OK
	if ($self->{_state} == _ST_CLOSED) {
	    $args{on_success}->($self);
            return;
        }

        $connection->_push_write(
            $self->_close_frame,
            $self->{id},
        );

        # The spec says that after a party sends Channel::Close, it MUST
        # discard all frames for that channel.  So this channel is dead
        # immediately.
        $self->_closed();

        $connection->_push_read_and_valid(
            'Channel::CloseOk',
            sub {
                $args{on_success}->($self);
                $self->_orphan();
            },
            sub {
                $args{on_failure}->(@_);
                $self->_orphan();
            },
            $self->{id},
        );
    };

    return $self;
}

sub _closed {
    my $self = shift;
    my ($frame,) = @_;
    $frame ||= $self->_close_frame();

    return if $self->{_state} == _ST_CLOSED;
    $self->{_state} = _ST_CLOSED;

    # Perform callbacks for all outstanding commands
    $self->{_queue}->_flush($frame);
    $self->{_content_queue}->_flush($frame);

    # Fake nacks of all outstanding publishes
    $_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} };

    # Report cancelation of all outstanding consumes
    my @tags = keys %{ $self->{_consumer_cbs} };
    $self->_canceled($_, $frame) for @tags;

    # Report close to on_close callback

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

                if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
            $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
        },
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub ack {
    my $self = shift;
    my %args = @_;

    return $self if !$self->_check_open(sub {});

    $self->{connection}->_push_write(
        Net::AMQP::Protocol::Basic::Ack->new(
            delivery_tag => 0,
            multiple     => (
                defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1
            ),
            %args,
        ),
        $self->{id},
    );

    return $self;
}

sub qos {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Basic::Qos',
        {
            prefetch_count => 1,
            prefetch_size  => 0,
            global         => 0,
            %args,
        },
        'Basic::QosOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub confirm {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);
    return $self if !$self->_check_version(0, 9, $failure_cb);

    weaken(my $wself = $self);

    $self->{connection}->_push_write_and_read(
        'Confirm::Select',
        {
            %args,
            nowait       => 0, # FIXME
        },
        'Confirm::SelectOk',
        sub {
            my $me = $wself or return;
            $me->{_is_confirm} = 1;
            $cb->();
        },
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub recover {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open(sub {});

    $self->{connection}->_push_write(
        Net::AMQP::Protocol::Basic::Recover->new(
            requeue => 1,
            %args,
        ),
        $self->{id},
    );

     if (!$args{nowait} && $self->_check_version(0, 9)) {
        $self->{connection}->_push_read_and_valid(
            'Basic::RecoverOk',
            $cb,
            $failure_cb,
            $self->{id},
        );
    }
    else {
        $cb->();
    }

    return $self;
}

sub reject {
    my $self = shift;
    my %args = @_;

    return $self if !$self->_check_open( sub { } );

    $self->{connection}->_push_write(
        Net::AMQP::Protocol::Basic::Reject->new(
            delivery_tag => 0,
            requeue      => 0,
            %args,

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

        $self->{id},
    );

    return $self;
}

sub rollback_tx {
    my $self = shift;
    my ($cb, $failure_cb,) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Tx::Rollback', {}, 'Tx::RollbackOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub push_queue_or_consume {
    my $self = shift;
    my ($frame, $failure_cb,) = @_;

    # Note: the spec says that after a party sends Channel::Close, it MUST
    # discard all frames for that channel other than Close and CloseOk.

    if ($frame->isa('Net::AMQP::Frame::Method')) {
        my $method_frame = $frame->method_frame;
        if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
            $self->{connection}->_push_write(
                Net::AMQP::Protocol::Channel::CloseOk->new(),
                $self->{id},
            );
            $self->_closed($frame);
            $self->_orphan();
            return $self;
        } elsif ($self->{_state} != _ST_OPEN) {
            if ($method_frame->isa('Net::AMQP::Protocol::Channel::OpenOk') ||
                $method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) {
                $self->{_queue}->push($frame);
            }
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
            my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag};
            my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {};
            $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') ||
                 $method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) {
            # CancelOk means we asked for a cancel.
            # Cancel means queue was deleted; it is not AMQP, but RMQ supports it.
            if (!$self->_canceled($method_frame->consumer_tag, $frame)
                  && $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
                $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
            }
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
            weaken(my $wself = $self);
            my $cb = sub {
                my $ret = shift;
                my $me = $wself or return;
                my $headers = $ret->{header}->headers || {};
                my $onret_cb;
                if (defined(my $tag = $headers->{_ar_return})) {
                    my $cbs = $me->{_publish_cbs}->{$tag};
                    $onret_cb = $cbs->[2] if $cbs;
                }
                $onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {};  # oh well
                $onret_cb->($frame);
            };
            $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') ||
                 $method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) {
            (my $resp = ref($method_frame)) =~ s/.*:://;
            my $cbs;
            if (!$self->{_is_confirm}) {
                $failure_cb->("Received $resp when not in confirm mode");
            }
            else {
                my @tags;
                if ($method_frame->{multiple}) {
                    @tags = sort { $a <=> $b }
                              grep { $_ <= $method_frame->{delivery_tag} }
                                keys %{$self->{_publish_cbs}};
                }
                else {
                    @tags = ($method_frame->{delivery_tag});
                }
                my $cbi = ($resp eq 'Ack') ? 0 : 1;
                for my $tag (@tags) {
                    my $cbs;
                    if (not $cbs = delete $self->{_publish_cbs}->{$tag}) {
                        $failure_cb->("Received $resp of unknown delivery tag $tag");
                    }
                    elsif ($cbs->[$cbi]) {
                        $cbs->[$cbi]->($frame);
                    }
                }
            }
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
            $self->{_is_active} = $method_frame->active;
            $self->{connection}->_push_write(
                Net::AMQP::Protocol::Channel::FlowOk->new(
                    active => $method_frame->active,
                ),
                $self->{id},
            );
            my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive';
            my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {};
            $cb->($frame);
            return $self;
        }
        $self->{_queue}->push($frame);
    } else {
        $self->{_content_queue}->push($frame);
    }

    return $self;
}

sub _push_read_header_and_body {
    my $self = shift;
    my ($type, $frame, $cb, $failure_cb,) = @_;
    my $response = {$type => $frame};
    my $body_size = 0;
    my $body_payload = "";

    weaken(my $wcontq = $self->{_content_queue});
    my $w_body_frame;
    my $body_frame = sub {
        my $frame = shift;

        return $failure_cb->('Received data is not body frame')
            if !$frame->isa('Net::AMQP::Frame::Body');

        $body_payload .= $frame->payload;

        if (length($body_payload) < $body_size) {
            # More to come
            my $contq = $wcontq or return;
            $contq->get($w_body_frame);
        }
        else {
            $frame->payload($body_payload);
            $response->{body} = $frame;
            $cb->($response);
        }
    };
    $w_body_frame = $body_frame;
    weaken($w_body_frame);

    $self->{_content_queue}->get(sub{
        my $frame = shift;

        return $failure_cb->('Received data is not header frame')
            if !$frame->isa('Net::AMQP::Frame::Header');

        my $header_frame = $frame->header_frame;
        return $failure_cb->(
              'Header is not Protocol::Basic::ContentHeader'
            . 'Header was ' . ref $header_frame
        ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');

        $response->{header} = $header_frame;

        $body_size = $frame->body_size;
        if ( $body_size ) {
            my $contq = $wcontq or return;
            $contq->get($body_frame);
        } else {
            $response->{body} = undef;
            $cb->($response);
        }
    });

    return $self;
}

sub _delete_cbs {
    my $self = shift;
    my %args = @_;

    my $cb         = delete $args{on_success} || sub {};
    my $failure_cb = delete $args{on_failure} || sub {die @_};

    return $cb, $failure_cb, %args;
}

sub _check_open {
    my $self = shift;
    my ($failure_cb) = @_;

    return 1 if $self->is_open();

    $failure_cb->('Channel has already been closed');
    return 0;
}

sub _check_version {
    my $self = shift;
    my ($major, $minor, $failure_cb) = @_;

    my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR;
    my $amin = $Net::AMQP::Protocol::VERSION_MINOR;

    return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;

    $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
    return 0;
}



( run in 1.606 second using v1.01-cache-2.11-cpan-ceb78f64989 )