AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN


1.13  Thu May  2 16:48:58 PDT 2013
        - Require Net::AMQP 0.06 to:
           + Get consume cancel notifications (e.g. queue deletion)
           + Properly encode user-provided header strings that look like numbers
        - Fix race between server-sent and client-sent cancellation.

        - Expect server to send heartbeats as promised.  If it doesn't, go President
            Madagasgar on its ass and SHUT DOWN EVERYTHING.

        - Rearrange many things and weaken many references to eliminate bad circular
            references.  Some circular refs are actually good, though; leave those.

        - Allow customized client_properties on connection.

        - Make test output clearer.

1.12  Thu Apr 11 20:45:00 2013
        - Allow AMQP client to adjust tuning, e.g. heartbeat
          (Chip Salzenberg)

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

package AnyEvent::RabbitMQ;

use strict;
use warnings;
use Carp qw(confess croak);
use Scalar::Util qw(refaddr);
use List::MoreUtils qw(none);
use Devel::GlobalDestruction;
use File::ShareDir;
use Readonly;
use Scalar::Util qw/ weaken /;

require Data::Dumper;
sub Dumper {
    local $Data::Dumper::Terse = 1;
    local $Data::Dumper::Indent = 1;
    local $Data::Dumper::Useqq = 1;
    local $Data::Dumper::Deparse = 1;
    local $Data::Dumper::Quotekeys = 0;
    local $Data::Dumper::Sortkeys = 1;
    &Data::Dumper::Dumper

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

    for (qw/ host port /) {
        $args{$_} or return $args{on_failure}->("No $_ passed to connect");
    }

    if ($self->{verbose}) {
        warn 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
    }

    $self->{_state} = _ST_OPENING;

    weaken(my $weak_self = $self);
    my $conn; $conn = AnyEvent::Socket::tcp_connect(
        $args{host},
        $args{port},
        sub {
            undef $conn;
            my $self = $weak_self or return;

            my $fh = shift;

            unless ($fh) {

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN


sub server_properties {
    return shift->{_server_properties};
}

sub _read_loop {
    my ($self, $close_cb, $failure_cb,) = @_;

    return if !defined $self->{_handle}; # called on_error

    weaken(my $weak_self = $self);
    $self->{_handle}->push_read(chunk => 8, sub {
        my $self = $weak_self or return;
        my $data = $_[1];
        my $stack = $_[1];

        if (length($data) <= 7) {
            $failure_cb->('Broken data was received');
            @_ = ($self, $close_cb, $failure_cb,);
            goto &_read_loop;
        }

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

        $args{on_failure},
    );

    return $self;
}

sub _tune {
    my $self = shift;
    my %args = @_;

    weaken(my $weak_self = $self);
    $self->_push_read_and_valid(
        'Connection::Tune',
        sub {
            my $self = $weak_self or return;
            my $frame = shift;

            my %tune;
            foreach (qw( channel_max frame_max heartbeat )) {
                my $client = $args{tune}{$_} || 0;
                my $server = $frame->method_frame->$_ || 0;

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

    return $self;
}

sub _start_heartbeat {
    my ($self, $interval, %args,) = @_;

    my $close_cb   = $args{on_close};
    my $failure_cb = $args{on_read_failure};
    my $last_recv = 0;
    my $idle_cycles = 0;
    weaken(my $weak_self = $self);
    my $timer_cb = sub {
        my $self = $weak_self or return;
        if ($self->{_heartbeat_recv} != $last_recv) {
            $last_recv = $self->{_heartbeat_recv};
            $idle_cycles = 0;
        }
        elsif (++$idle_cycles > 1) {
            delete $self->{_heartbeat_timer};
            $failure_cb->("Heartbeat lost");
            $self->_server_closed($close_cb, "Heartbeat lost");

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

package AnyEvent::RabbitMQ::Channel;

use strict;
use warnings;

use AnyEvent::RabbitMQ::LocalQueue;
use AnyEvent;
use Scalar::Util qw( looks_like_number weaken );
use Devel::GlobalDestruction;
use Carp qw(croak cluck);
use POSIX qw(ceil);
BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }

our $VERSION = '1.22'; # VERSION

use namespace::clean;

use constant {

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN


sub new {
    my $class = shift;

    my $self = bless {
        on_close       => sub {},
        @_,    # id, connection, on_return, on_close, on_inactive, on_active
        _queue         => AnyEvent::RabbitMQ::LocalQueue->new,
        _content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
    }, $class;
    weaken($self->{connection});
    return $self->_reset;
}

sub _reset {
    my $self = shift;

    my %a = (
        _state         => _ST_CLOSED,
        _is_active     => 0,
        _is_confirm    => 0,

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

}

sub close {
    my $self = shift;
    my $connection = $self->{connection}
        or return;
    my %args = $connection->_set_cbs(@_);

    # If open in in progess, wait for it; 1s arbitrary timing.

    weaken(my $wself = $self);
    my $t; $t = AE::timer 0, 1, sub {
	(my $self = $wself) or undef $t, return;
	return if $self->{_state} == _ST_OPENING;

	# No more tests are required
	undef $t;

        # Double close is OK
	if ($self->{_state} == _ST_CLOSED) {
	    $args{on_success}->($self);

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

    return $self;
}

sub confirm {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);
    return $self if !$self->_check_version(0, 9, $failure_cb);

    weaken(my $wself = $self);

    $self->{connection}->_push_write_and_read(
        'Confirm::Select',
        {
            %args,
            nowait       => 0, # FIXME
        },
        'Confirm::SelectOk',
        sub {
            my $me = $wself or return;

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') ||
                 $method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) {
            # CancelOk means we asked for a cancel.
            # Cancel means queue was deleted; it is not AMQP, but RMQ supports it.
            if (!$self->_canceled($method_frame->consumer_tag, $frame)
                  && $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
                $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
            }
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
            weaken(my $wself = $self);
            my $cb = sub {
                my $ret = shift;
                my $me = $wself or return;
                my $headers = $ret->{header}->headers || {};
                my $onret_cb;
                if (defined(my $tag = $headers->{_ar_return})) {
                    my $cbs = $me->{_publish_cbs}->{$tag};
                    $onret_cb = $cbs->[2] if $cbs;
                }
                $onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {};  # oh well

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

    return $self;
}

sub _push_read_header_and_body {
    my $self = shift;
    my ($type, $frame, $cb, $failure_cb,) = @_;
    my $response = {$type => $frame};
    my $body_size = 0;
    my $body_payload = "";

    weaken(my $wcontq = $self->{_content_queue});
    my $w_body_frame;
    my $body_frame = sub {
        my $frame = shift;

        return $failure_cb->('Received data is not body frame')
            if !$frame->isa('Net::AMQP::Frame::Body');

        $body_payload .= $frame->payload;

        if (length($body_payload) < $body_size) {

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

            my $contq = $wcontq or return;
            $contq->get($w_body_frame);
        }
        else {
            $frame->payload($body_payload);
            $response->{body} = $frame;
            $cb->($response);
        }
    };
    $w_body_frame = $body_frame;
    weaken($w_body_frame);

    $self->{_content_queue}->get(sub{
        my $frame = shift;

        return $failure_cb->('Received data is not header frame')
            if !$frame->isa('Net::AMQP::Frame::Header');

        my $header_frame = $frame->header_frame;
        return $failure_cb->(
              'Header is not Protocol::Basic::ContentHeader'



( run in 0.426 second using v1.01-cache-2.11-cpan-65fba6d93b7 )