AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

package AnyEvent::RabbitMQ;

use strict;
use warnings;
use Carp qw(confess croak);
use Scalar::Util qw(refaddr);
use List::MoreUtils qw(none);
use Devel::GlobalDestruction;
use File::ShareDir;
use Readonly;
use Scalar::Util qw/ weaken /;

require Data::Dumper;
sub Dumper {
    local $Data::Dumper::Terse = 1;
    local $Data::Dumper::Indent = 1;
    local $Data::Dumper::Useqq = 1;
    local $Data::Dumper::Deparse = 1;
    local $Data::Dumper::Quotekeys = 0;
    local $Data::Dumper::Sortkeys = 1;
    &Data::Dumper::Dumper
}

use AnyEvent::Handle;
use AnyEvent::Socket;

use Net::AMQP 0.06;
use Net::AMQP::Common qw(:all);

use AnyEvent::RabbitMQ::Channel;
use AnyEvent::RabbitMQ::LocalQueue;

use namespace::clean;

our $VERSION = '1.22'; # VERSION

use constant {
    _ST_CLOSED => 0,
    _ST_OPENING => 1,
    _ST_OPEN => 2,
    _ST_CLOSING => 3,
};

Readonly my $DEFAULT_AMQP_SPEC
    => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-9-1.xml';

Readonly my $DEFAULT_CHANNEL_MAX => 2**16-1;

sub new {
    my $class = shift;
    return bless {
        verbose            => 0,
        @_,
        _state             => _ST_CLOSED,
        _queue             => AnyEvent::RabbitMQ::LocalQueue->new,
        _last_chan_id      => 0,
        _channels          => {},
        _login_user        => '',
        _server_properties => {},
        _frame_max         => undef,
        _body_max          => undef,
        _channel_max       => undef,
    }, $class;
}

sub verbose {
    my $self = shift;
    @_ ? ($self->{verbose} = shift) : $self->{verbose}
}

sub is_open {

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

}

sub channels {
    my $self = shift;
    return $self->{_channels};
}

sub _delete_channel {
    my $self = shift;
    my ($channel,) = @_;
    my $c = $self->{_channels}->{$channel->id};
    if (defined($c) && refaddr($c) == refaddr($channel)) {
        delete $self->{_channels}->{$channel->id};
        return 1;
    }
    return 0;
}

sub login_user {
    my $self = shift;
    return $self->{_login_user};
}

my $_loaded_spec;
sub load_xml_spec {
    my $self = shift;
    my ($spec) = @_;
    $spec ||= $DEFAULT_AMQP_SPEC;
    if ($_loaded_spec && $_loaded_spec ne $spec) {
        croak("Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible");
    }
    elsif (!$_loaded_spec) {
        Net::AMQP::Protocol->load_xml_spec($_loaded_spec = $spec);
    }
    return $self;
}

sub connect {
    my $self = shift;
    my %args = $self->_set_cbs(@_);

    if ($self->{_state} != _ST_CLOSED) {
        $args{on_failure}->('Connection has already been opened');
        return $self;
    }

    $args{on_close}        ||= sub {};
    $args{on_read_failure} ||= sub {warn @_, "\n"};
    $args{timeout}         ||= 0;

    for (qw/ host port /) {
        $args{$_} or return $args{on_failure}->("No $_ passed to connect");
    }

    if ($self->{verbose}) {
        warn 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
    }

    $self->{_state} = _ST_OPENING;

    weaken(my $weak_self = $self);
    my $conn; $conn = AnyEvent::Socket::tcp_connect(
        $args{host},
        $args{port},
        sub {
            undef $conn;
            my $self = $weak_self or return;

            my $fh = shift;

            unless ($fh) {
                $self->{_state} = _ST_CLOSED;
                return $args{on_failure}->(
                    sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!)
                );
            }

            my $close_cb = $args{on_close};
            my $failure_cb = $args{on_failure};
            $self->{_handle} = AnyEvent::Handle->new(
                fh       => $fh,
                on_error => sub {
                    my ($handle, $fatal, $message) = @_;
                    my $self = $weak_self or return;

                    if ($self->is_open) {
                        $self->_server_closed($close_cb, $message);
                    }
                    else {
                        $failure_cb->(@_);
                    }
                },
                on_drain => sub {
                    my ($handle) = @_;
                    my $self = $weak_self or return;

                    $self->{drain_condvar}->send
                        if exists $self->{drain_condvar};
                },
                peername => $args{host},
                $args{tls} ? (tls => 'connect') : (),
                $args{tls_ctx} ? ( tls_ctx => $args{tls_ctx} ) : (),
                $args{nodelay} ? ( nodelay => $args{nodelay} ) : (),
            );
            $self->_read_loop($args{on_close}, $args{on_read_failure});
            $self->_start(%args,);
        },
        sub {
            return $args{timeout};
        },
    );

    return $self;
}

sub server_properties {
    return shift->{_server_properties};
}

sub _read_loop {
    my ($self, $close_cb, $failure_cb,) = @_;

    return if !defined $self->{_handle}; # called on_error

    weaken(my $weak_self = $self);
    $self->{_handle}->push_read(chunk => 8, sub {
        my $self = $weak_self or return;
        my $data = $_[1];
        my $stack = $_[1];

        if (length($data) <= 7) {
            $failure_cb->('Broken data was received');
            @_ = ($self, $close_cb, $failure_cb,);
            goto &_read_loop;
        }

        my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
        if (!defined $type_id || !defined $channel || !defined $length) {
            $failure_cb->('Broken data was received');
            @_ = ($self, $close_cb, $failure_cb,);
            goto &_read_loop;
        }

        $self->{_handle}->push_read(chunk => $length, sub {
            my $self = $weak_self or return;
            $stack .= $_[1];
            my ($frame) = Net::AMQP->parse_raw_frames(\$stack);

            $self->{_heartbeat_recv} = time if $self->{_heartbeat_timer};

            if ($self->{verbose}) {
                warn '[C] <-- [S] ', Dumper($frame),
                     '-----------', "\n";
            }

            my $id = $frame->channel;
            if (0 == $id) {
                if ($frame->type_id == 8) {
                    # Heartbeat, no action needs taking.
                }
                else {
                    return unless $self->_check_close_and_clean($frame, $close_cb,);
                    $self->{_queue}->push($frame);
                }
            } else {
                my $channel = $self->{_channels}->{$id};
                if (defined $channel) {
                    $channel->push_queue_or_consume($frame, $failure_cb);
                } else {
                    $failure_cb->('Unknown channel id: ' . $frame->channel);
                }
            }

            @_ = ($self, $close_cb, $failure_cb,);
            goto &_read_loop;
        });
    });

    return $self;
}

sub _check_close_and_clean {
    my $self = shift;
    my ($frame, $close_cb,) = @_;

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN


sub _start {
    my $self = shift;
    my %args = @_;

    if ($self->{verbose}) {
        warn 'post header', "\n";
    }

    $self->{_handle}->push_write(Net::AMQP::Protocol->header);

    $self->_push_read_and_valid(
        'Connection::Start',
        sub {
            my $frame = shift;

            my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
            return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
                if none {$_ eq 'AMQPLAIN'} @mechanisms;

            my @locales = split /\s/, $frame->method_frame->locales;
            return $args{on_failure}->('en_US is not found in locales')
                if none {$_ eq 'en_US'} @locales;

            $self->{_server_properties} = $frame->method_frame->server_properties;

            $self->_push_write(
                Net::AMQP::Protocol::Connection::StartOk->new(
                    client_properties => {
                        platform     => 'Perl',
                        product      => __PACKAGE__,
                        information  => 'http://d.hatena.ne.jp/cooldaemon/',
                        version      => Net::AMQP::Value::String->new(__PACKAGE__->VERSION),
                        capabilities => {
                            consumer_cancel_notify     => Net::AMQP::Value::true,
                            exchange_exchange_bindings => Net::AMQP::Value::true,
                        },
                        %{ $args{client_properties} || {} },
                    },
                    mechanism => 'AMQPLAIN',
                    response => {
                        LOGIN    => $args{user},
                        PASSWORD => $args{pass},
                    },
                    locale => 'en_US',
                ),
            );

            $self->_tune(%args,);
        },
        $args{on_failure},
    );

    return $self;
}

sub _tune {
    my $self = shift;
    my %args = @_;

    weaken(my $weak_self = $self);
    $self->_push_read_and_valid(
        'Connection::Tune',
        sub {
            my $self = $weak_self or return;
            my $frame = shift;

            my %tune;
            foreach (qw( channel_max frame_max heartbeat )) {
                my $client = $args{tune}{$_} || 0;
                my $server = $frame->method_frame->$_ || 0;

                # negotiate with the server such that we cannot request a larger
                # value set by the server, unless the server said unlimited
                $tune{$_} = ($server == 0 or $client == 0)
                    ? ($server > $client ? $server : $client)   # max
                    : ($client > $server ? $server : $client);  # min
            }

            if ($self->{_frame_max} = $tune{frame_max}) {
                # calculate how big the body can actually be
                $self->{_body_max} = $self->{_frame_max} - Net::AMQP::_HEADER_LEN - Net::AMQP::_FOOTER_LEN;
            }

            $self->{_channel_max} = $tune{channel_max} || $DEFAULT_CHANNEL_MAX;

            $self->_push_write(
                Net::AMQP::Protocol::Connection::TuneOk->new(%tune,)
            );

            if ($tune{heartbeat} > 0) {
                $self->_start_heartbeat($tune{heartbeat}, %args,);
            }

            $self->_open(%args,);
        },
        $args{on_failure},
    );

    return $self;
}

sub _start_heartbeat {
    my ($self, $interval, %args,) = @_;

    my $close_cb   = $args{on_close};
    my $failure_cb = $args{on_read_failure};
    my $last_recv = 0;
    my $idle_cycles = 0;
    weaken(my $weak_self = $self);
    my $timer_cb = sub {
        my $self = $weak_self or return;
        if ($self->{_heartbeat_recv} != $last_recv) {
            $last_recv = $self->{_heartbeat_recv};
            $idle_cycles = 0;
        }
        elsif (++$idle_cycles > 1) {
            delete $self->{_heartbeat_timer};
            $failure_cb->("Heartbeat lost");
            $self->_server_closed($close_cb, "Heartbeat lost");
            return;
        }
        $self->_push_write(Net::AMQP::Frame::Heartbeat->new());
    };

    $self->{_heartbeat_recv} = time;
    $self->{_heartbeat_timer} = AnyEvent->timer(
        after    => $interval,
        interval => $interval,
        cb       => $timer_cb,
    );

    return $self;
}

sub _open {
    my $self = shift;
    my %args = @_;

    $self->_push_write_and_read(
        'Connection::Open',
        {
            virtual_host => $args{vhost},
            insist       => 1,
        },
        'Connection::OpenOk',
        sub {
            $self->{_state} = _ST_OPEN;
            $self->{_login_user} = $args{user};
            $args{on_success}->($self);
        },
        $args{on_failure},
    );

    return $self;
}

sub close {
    return if in_global_destruction;
    my $self = shift;
    my %args = $self->_set_cbs(@_);

    if ($self->{_state} == _ST_CLOSED) {
        $args{on_success}->(@_);
        return $self;
    }
    if ($self->{_state} != _ST_OPEN) {
        $args{on_failure}->(($self->{_state} == _ST_OPENING ? "open" : "close") . " already in progress");
        return $self;
    }



( run in 1.304 second using v1.01-cache-2.11-cpan-5b529ec07f3 )