AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN


1.19  Sat Mar 21 16:49:24 GMT 2015
        - Add 'no_ack' as an optional argument to the ->consume method
          (Dave Mueller).
        - Fill in some missing documentation (Moritz Lenz).
1.18  Mon Sep 29 19:36:00 PDT 2014
        - Added the bind_exchange and unbind_exchange methods
          for exchange-exchange bindings.

1.17  Fri Jul 25 14:02:00 PDT 2014
        - Add support for chunking large bodies into multiple AMQP frames,
          allowing the sending of large messages.

1.16  Sat Apr 12 14:42:00 BST 2014
        - Doc fixes (Mark Ellis)
        - Fix leak when calling ->close + tests (Peter Haworth)

1.15  Mon Jul 1 12:35:00 BST 2013
        - Fix paper-bag bug in connection close - calling nonexistent method.

1.14  Fri Jun  7 08:54:00 BST 2013

Changes  view on Meta::CPAN


1.05    Tue Jul 22 16:55:55 2011
        - Fixed a compiling error.

1.04    Tue Jul 19 17:04:24 2011
        - Bug fix for consuming large messages.

1.03    Thu Apr  7 02:55:12 2011
        - Separate AnyEvent::RabbitMQ from Net::RabbitFoot.
        - Avoid (additional) issues when in global destruction.
        - Do not set reply_to to an empty string in the header frame.
        - Implement basic.reject (requires RabbitMQ >= 2.0.0).
        - Store server properties in the object for easy server product
          and sever version access.
        - Shutdown the AnyEvent handle using push_shutdown.
        - Be more careful in DESTROY blocks.

1.02    Wed Jun 30 11:35:32 2010
        - Fix errors in global destruction due to destruction order being
          random.
        - Fix bug if you call ->close on a Net::RabbitFoot instance which

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

    my $class = shift;
    return bless {
        verbose            => 0,
        @_,
        _state             => _ST_CLOSED,
        _queue             => AnyEvent::RabbitMQ::LocalQueue->new,
        _last_chan_id      => 0,
        _channels          => {},
        _login_user        => '',
        _server_properties => {},
        _frame_max         => undef,
        _body_max          => undef,
        _channel_max       => undef,
    }, $class;
}

sub verbose {
    my $self = shift;
    @_ ? ($self->{verbose} = shift) : $self->{verbose}
}

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

        my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
        if (!defined $type_id || !defined $channel || !defined $length) {
            $failure_cb->('Broken data was received');
            @_ = ($self, $close_cb, $failure_cb,);
            goto &_read_loop;
        }

        $self->{_handle}->push_read(chunk => $length, sub {
            my $self = $weak_self or return;
            $stack .= $_[1];
            my ($frame) = Net::AMQP->parse_raw_frames(\$stack);

            $self->{_heartbeat_recv} = time if $self->{_heartbeat_timer};

            if ($self->{verbose}) {
                warn '[C] <-- [S] ', Dumper($frame),
                     '-----------', "\n";
            }

            my $id = $frame->channel;
            if (0 == $id) {
                if ($frame->type_id == 8) {
                    # Heartbeat, no action needs taking.
                }
                else {
                    return unless $self->_check_close_and_clean($frame, $close_cb,);
                    $self->{_queue}->push($frame);
                }
            } else {
                my $channel = $self->{_channels}->{$id};
                if (defined $channel) {
                    $channel->push_queue_or_consume($frame, $failure_cb);
                } else {
                    $failure_cb->('Unknown channel id: ' . $frame->channel);
                }
            }

            @_ = ($self, $close_cb, $failure_cb,);
            goto &_read_loop;
        });
    });

    return $self;
}

sub _check_close_and_clean {
    my $self = shift;
    my ($frame, $close_cb,) = @_;

    my $method_frame = $frame->isa('Net::AMQP::Frame::Method') ? $frame->method_frame : undef;

    if ($self->{_state} == _ST_CLOSED) {
        return $method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::CloseOk');
    }

    if ($method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
        delete $self->{_heartbeat_timer};
        $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
        $self->_server_closed($close_cb, $frame);
        return;
    }

    return 1;
}

sub _server_closed {
    my $self = shift;
    my ($close_cb, $why,) = @_;

    $self->{_state} = _ST_CLOSING;
    for my $channel (values %{ $self->{_channels} }) {
        $channel->_closed(ref($why) ? $why : $channel->_close_frame($why));
    }
    $self->{_channels} = {};
    $self->{_handle}->push_shutdown;
    $self->{_state} = _ST_CLOSED;

    $close_cb->($why);
    return;
}

sub _start {

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN


    if ($self->{verbose}) {
        warn 'post header', "\n";
    }

    $self->{_handle}->push_write(Net::AMQP::Protocol->header);

    $self->_push_read_and_valid(
        'Connection::Start',
        sub {
            my $frame = shift;

            my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
            return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
                if none {$_ eq 'AMQPLAIN'} @mechanisms;

            my @locales = split /\s/, $frame->method_frame->locales;
            return $args{on_failure}->('en_US is not found in locales')
                if none {$_ eq 'en_US'} @locales;

            $self->{_server_properties} = $frame->method_frame->server_properties;

            $self->_push_write(
                Net::AMQP::Protocol::Connection::StartOk->new(
                    client_properties => {
                        platform     => 'Perl',
                        product      => __PACKAGE__,
                        information  => 'http://d.hatena.ne.jp/cooldaemon/',
                        version      => Net::AMQP::Value::String->new(__PACKAGE__->VERSION),
                        capabilities => {
                            consumer_cancel_notify     => Net::AMQP::Value::true,

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN


sub _tune {
    my $self = shift;
    my %args = @_;

    weaken(my $weak_self = $self);
    $self->_push_read_and_valid(
        'Connection::Tune',
        sub {
            my $self = $weak_self or return;
            my $frame = shift;

            my %tune;
            foreach (qw( channel_max frame_max heartbeat )) {
                my $client = $args{tune}{$_} || 0;
                my $server = $frame->method_frame->$_ || 0;

                # negotiate with the server such that we cannot request a larger
                # value set by the server, unless the server said unlimited
                $tune{$_} = ($server == 0 or $client == 0)
                    ? ($server > $client ? $server : $client)   # max
                    : ($client > $server ? $server : $client);  # min
            }

            if ($self->{_frame_max} = $tune{frame_max}) {
                # calculate how big the body can actually be
                $self->{_body_max} = $self->{_frame_max} - Net::AMQP::_HEADER_LEN - Net::AMQP::_FOOTER_LEN;
            }

            $self->{_channel_max} = $tune{channel_max} || $DEFAULT_CHANNEL_MAX;

            $self->_push_write(
                Net::AMQP::Protocol::Connection::TuneOk->new(%tune,)
            );

            if ($tune{heartbeat} > 0) {
                $self->_start_heartbeat($tune{heartbeat}, %args,);

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

    return $self;
}

sub _push_write_and_read {
    my $self = shift;
    my ($method, $args, $exp, $cb, $failure_cb, $id,) = @_;

    $method = 'Net::AMQP::Protocol::' . $method;
    $self->_push_write(
        Net::AMQP::Frame::Method->new(
            method_frame => $method->new(%$args)
        ),
        $id,
    );

    return $self->_push_read_and_valid($exp, $cb, $failure_cb, $id,);
}

sub _push_read_and_valid {
    my $self = shift;
    my ($exp, $cb, $failure_cb, $id,) = @_;

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

    if (!$id) {
        $queue = $self->{_queue};
    } elsif (defined $self->{_channels}->{$id}) {
        $queue = $self->{_channels}->{$id}->queue;
    } else {
        $failure_cb->('Unknown channel id: ' . $id);
    }

    return unless $queue; # Can go away in global destruction..
    $queue->get(sub {
        my $frame = shift;

        return $failure_cb->('Received data is not method frame')
            if !$frame->isa('Net::AMQP::Frame::Method');

        my $method_frame = $frame->method_frame;
        for my $exp_elem (@$exp) {
            return $cb->($frame)
                if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
        }

        $failure_cb->(
            $method_frame->isa('Net::AMQP::Protocol::Channel::Close')
              ? 'Channel closed'
              : 'Expected ' . join(',', @$exp) . ' but got ' . ref($method_frame)
        );
    });
}

sub _push_write {
    my $self = shift;
    my ($output, $id,) = @_;

    if ($output->isa('Net::AMQP::Protocol::Base')) {
        $output = $output->frame_wrap;
    }
    $output->channel($id || 0);

    if ($self->{verbose}) {
        warn '[C] --> [S] ', Dumper($output);
    }

    $self->{_handle}->push_write($output->to_raw_frame())
        if $self->{_handle}; # Careful - could have gone (global destruction)
    return;
}

sub _set_cbs {
    my $self = shift;
    my %args = @_;

    $args{on_success} ||= sub {};
    $args{on_failure} ||= sub { die @_ unless in_global_destruction };

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN


  my $ar = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
      host       => 'localhost',
      port       => 5672,
      user       => 'guest',
      pass       => 'guest',
      vhost      => '/',
      timeout    => 1,
      tls        => 0, # Or 1 if you'd like SSL
      tls_ctx    => $anyevent_tls # or a hash of AnyEvent::TLS options.
      tune       => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
      nodelay    => 1, # Reduces latency by disabling Nagle's algorithm
      on_success => sub {
          my $ar = shift;
          $ar->open_channel(
              on_success => sub {
                  my $channel = shift;
                  $channel->declare_exchange(
                      exchange   => 'test_exchange',
                      on_success => sub {
                          $cv->send('Declared exchange');
                      },
                      on_failure => $cv,
                  );
              },
              on_failure => $cv,
              on_close   => sub {
                  my $method_frame = shift->method_frame;
                  die $method_frame->reply_code, $method_frame->reply_text;
              },
          );
      },
      on_failure => $cv,
      on_read_failure => sub { die @_ },
      on_return  => sub {
          my $frame = shift;
          die "Unable to deliver ", Dumper($frame);
      },
      on_close   => sub {
          my $why = shift;
          if (ref($why)) {
              my $method_frame = $why->method_frame;
              die $method_frame->reply_code, ": ", $method_frame->reply_text;
          }
          else {
              die $why;
          }
      },
  );

  print $cv->recv, "\n";

=head1 DESCRIPTION

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

	# No more tests are required
	undef $t;

        # Double close is OK
	if ($self->{_state} == _ST_CLOSED) {
	    $args{on_success}->($self);
            return;
        }

        $connection->_push_write(
            $self->_close_frame,
            $self->{id},
        );

        # The spec says that after a party sends Channel::Close, it MUST
        # discard all frames for that channel.  So this channel is dead
        # immediately.
        $self->_closed();

        $connection->_push_read_and_valid(
            'Channel::CloseOk',
            sub {
                $args{on_success}->($self);
                $self->_orphan();
            },
            sub {

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

            },
            $self->{id},
        );
    };

    return $self;
}

sub _closed {
    my $self = shift;
    my ($frame,) = @_;
    $frame ||= $self->_close_frame();

    return if $self->{_state} == _ST_CLOSED;
    $self->{_state} = _ST_CLOSED;

    # Perform callbacks for all outstanding commands
    $self->{_queue}->_flush($frame);
    $self->{_content_queue}->_flush($frame);

    # Fake nacks of all outstanding publishes
    $_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} };

    # Report cancelation of all outstanding consumes
    my @tags = keys %{ $self->{_consumer_cbs} };
    $self->_canceled($_, $frame) for @tags;

    # Report close to on_close callback
    { local $@;
      eval { $self->{on_close}->($frame) };
      warn "Error in channel on_close callback, ignored:\n  $@  " if $@; }

    # Reset state (partly redundant)
    $self->_reset;

    return $self;
}

sub _close_frame {
    my $self = shift;
    my ($text,) = @_;

    Net::AMQP::Frame::Method->new(
        method_frame => Net::AMQP::Protocol::Channel::Close->new(
            reply_text => $text,
        ),
    );
}

sub _orphan {
    my $self = shift;

    if (my $connection = $self->{connection}) {
        $connection->_delete_channel($self);

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

        $headers = {
            %$headers,
            map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot
        };
    }

    $self->{connection}->_push_write(
        Net::AMQP::Frame::Header->new(
            weight       => $weight,
            body_size    => length($body),
            header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
                content_type     => 'application/octet-stream',
                content_encoding => undef,
                delivery_mode    => 1,
                priority         => 1,
                correlation_id   => undef,
                expiration       => undef,
                message_id       => undef,
                timestamp        => time,
                type             => undef,
                user_id          => $self->{connection}->login_user,

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

    );

    return $self;
}

sub _body {
    my ($self, $body,) = @_;

    my $body_max = $self->{connection}->{_body_max} || length $body;

    # chunk up body into segments measured by $frame_max
    while (length $body) {
        $self->{connection}->_push_write(
            Net::AMQP::Frame::Body->new(
                payload => substr($body, 0, $body_max, '')),
            $self->{id}
        );
    }

    return $self;
}

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

            no_local     => 0,
            no_ack       => $no_ack,
            exclusive    => 0,

            %args, # queue
            ticket       => 0,
            nowait       => 0, # FIXME
        },
        'Basic::ConsumeOk',
        sub {
            my $frame = shift;
            my $tag = $frame->method_frame->consumer_tag;
            $self->{_consumer_cbs}->{$tag} = [ $consumer_cb, $cancel_cb ];
            $cb->($frame);
        },
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub cancel {
    my $self = shift;

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

            nowait => 0,
        ),
        $self->{id},
    );

    return $self;
}

sub _canceled {
    my $self = shift;
    my ($tag, $frame,) = @_;

    my $cons_cbs = delete $self->{_consumer_cbs}->{$tag}
      or return 0;

    shift @$cons_cbs; # no more deliveries
    for my $cb (reverse @$cons_cbs) {
        $cb->($frame);
    }
    return 1;
}

sub get {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    my $no_ack = delete $args{no_ack} // 1;

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN


    $self->{connection}->_push_write_and_read(
        'Basic::Get',
        {
            no_ack => $no_ack,
            %args, # queue
            ticket => 0,
        },
        [qw(Basic::GetOk Basic::GetEmpty)],
        sub {
            my $frame = shift;
            return $cb->({empty => $frame})
                if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
            $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
        },
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub ack {
    my $self = shift;

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub push_queue_or_consume {
    my $self = shift;
    my ($frame, $failure_cb,) = @_;

    # Note: the spec says that after a party sends Channel::Close, it MUST
    # discard all frames for that channel other than Close and CloseOk.

    if ($frame->isa('Net::AMQP::Frame::Method')) {
        my $method_frame = $frame->method_frame;
        if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
            $self->{connection}->_push_write(
                Net::AMQP::Protocol::Channel::CloseOk->new(),
                $self->{id},
            );
            $self->_closed($frame);
            $self->_orphan();
            return $self;
        } elsif ($self->{_state} != _ST_OPEN) {
            if ($method_frame->isa('Net::AMQP::Protocol::Channel::OpenOk') ||
                $method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) {
                $self->{_queue}->push($frame);
            }
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
            my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag};
            my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {};
            $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') ||
                 $method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) {
            # CancelOk means we asked for a cancel.
            # Cancel means queue was deleted; it is not AMQP, but RMQ supports it.
            if (!$self->_canceled($method_frame->consumer_tag, $frame)
                  && $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
                $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
            }
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
            weaken(my $wself = $self);
            my $cb = sub {
                my $ret = shift;
                my $me = $wself or return;
                my $headers = $ret->{header}->headers || {};
                my $onret_cb;
                if (defined(my $tag = $headers->{_ar_return})) {
                    my $cbs = $me->{_publish_cbs}->{$tag};
                    $onret_cb = $cbs->[2] if $cbs;
                }
                $onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {};  # oh well
                $onret_cb->($frame);
            };
            $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') ||
                 $method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) {
            (my $resp = ref($method_frame)) =~ s/.*:://;
            my $cbs;
            if (!$self->{_is_confirm}) {
                $failure_cb->("Received $resp when not in confirm mode");
            }
            else {
                my @tags;
                if ($method_frame->{multiple}) {
                    @tags = sort { $a <=> $b }
                              grep { $_ <= $method_frame->{delivery_tag} }
                                keys %{$self->{_publish_cbs}};
                }
                else {
                    @tags = ($method_frame->{delivery_tag});
                }
                my $cbi = ($resp eq 'Ack') ? 0 : 1;
                for my $tag (@tags) {
                    my $cbs;
                    if (not $cbs = delete $self->{_publish_cbs}->{$tag}) {
                        $failure_cb->("Received $resp of unknown delivery tag $tag");
                    }
                    elsif ($cbs->[$cbi]) {
                        $cbs->[$cbi]->($frame);
                    }
                }
            }
            return $self;
        } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
            $self->{_is_active} = $method_frame->active;
            $self->{connection}->_push_write(
                Net::AMQP::Protocol::Channel::FlowOk->new(
                    active => $method_frame->active,
                ),
                $self->{id},
            );
            my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive';
            my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {};
            $cb->($frame);
            return $self;
        }
        $self->{_queue}->push($frame);
    } else {
        $self->{_content_queue}->push($frame);
    }

    return $self;
}

sub _push_read_header_and_body {
    my $self = shift;
    my ($type, $frame, $cb, $failure_cb,) = @_;
    my $response = {$type => $frame};
    my $body_size = 0;
    my $body_payload = "";

    weaken(my $wcontq = $self->{_content_queue});
    my $w_body_frame;
    my $body_frame = sub {
        my $frame = shift;

        return $failure_cb->('Received data is not body frame')
            if !$frame->isa('Net::AMQP::Frame::Body');

        $body_payload .= $frame->payload;

        if (length($body_payload) < $body_size) {
            # More to come
            my $contq = $wcontq or return;
            $contq->get($w_body_frame);
        }
        else {
            $frame->payload($body_payload);
            $response->{body} = $frame;
            $cb->($response);
        }
    };
    $w_body_frame = $body_frame;
    weaken($w_body_frame);

    $self->{_content_queue}->get(sub{
        my $frame = shift;

        return $failure_cb->('Received data is not header frame')
            if !$frame->isa('Net::AMQP::Frame::Header');

        my $header_frame = $frame->header_frame;
        return $failure_cb->(
              'Header is not Protocol::Basic::ContentHeader'
            . 'Header was ' . ref $header_frame
        ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');

        $response->{header} = $header_frame;

        $body_size = $frame->body_size;
        if ( $body_size ) {
            my $contq = $wcontq or return;
            $contq->get($body_frame);
        } else {
            $response->{body} = undef;
            $cb->($response);
        }
    });

    return $self;
}

sub _delete_cbs {

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN


=item on_close

Callback invoked when the channel closes.  Callback will be passed the
incoming message that caused the close, if any.

=item on_return

Callback invoked when a mandatory or immediate message publish fails.
Callback will be passed the incoming message, with accessors
C<method_frame>, C<header_frame>, and C<body_frame>.

=back

=head1 METHODS

=head2 declare_exchange (%args)

Declare an exchange (to publish messages to) on the server.

Arguments:

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

temporary/private reply queues.

=item on_success

Callback that is called when the queue was declared successfully. The argument
to the callback is of type L<Net::AMQP::Frame::Method>. To get the name of the
Queue (if you declared it with an empty name), you can say

    on_success => sub {
        my $method = shift;
        my $name   = $method->method_frame->queue;
    };

=item on_failure

Callback that is called when the declaration of the queue has failed.

=item auto_delete

0 or 1, default 0

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

Boolean; if true, then if the message doesn't land in a queue (e.g. the exchange has no
bindings), it will be "returned."  (see "on_return")

=item immediate

Boolean; if true, then if the message cannot be delivered directly to a consumer, it
will be "returned."  (see "on_return")

=item on_ack

Callback called with the frame that acknowledges receipt (if channel is in confirm mode),
typically L<Net::AMQP::Protocol::Basic::Ack>.

=item on_nack

Callback called with the frame that declines receipt (if the channel is in confirm mode),
typically L<Net::AMQP::Protocol::Basic::Nack> or L<Net::AMQP::Protocol::Channel::Close>.

=item on_return

In AMQP, a "returned" message is one that cannot be delivered in compliance with the
C<immediate> or C<mandatory> flags.

If in confirm mode, this callback will be called with the frame that reports message
return, typically L<Net::AMQP::Protocol::Basic::Return>.  If confirm mode is off or
this callback is not provided, then the channel or connection objects' on_return
callbacks (if any), will be called instead.

NOTE: If confirm mode is on, the on_ack or on_nack callback will be called whether or
not on_return is called first.

=back

=head2 cancel

lib/AnyEvent/RabbitMQ/LocalQueue.pm  view on Meta::CPAN

    for (1 .. $count) {
        &{shift @{$self->{_drain_code_queue}}}(
            shift @{$self->{_message_queue}}
        );
    }

    return $self;
}

sub _flush {
    my ($self, $frame) = @_;

    $self->_drain_queue;

    while (my $cb = shift @{$self->{_drain_code_queue}}) {
        local $@; # Flush frames immediately, throwing away errors for on-close
        eval { $cb->($frame) };
    }
}

1;

share/fixed_amqp0-8.xml  view on Meta::CPAN


-->

<amqp major="8" minor="0" port="5672" comment="AMQ protocol 0.80">
    AMQ Protocol 0.80
<!--
======================================================
==       CONSTANTS
======================================================
-->
  <constant name="frame method" value="1"/>
  <constant name="frame header" value="2"/>
  <constant name="frame body" value="3"/>
  <constant name="frame oob method" value="4"/>
  <constant name="frame oob header" value="5"/>
  <constant name="frame oob body" value="6"/>
  <constant name="frame trace" value="7"/>
  <constant name="frame heartbeat" value="8"/>
  <constant name="frame min size" value="4096"/>
  <constant name="frame end" value="206"/>
  <constant name="reply success" value="200">
  Indicates that the method completed successfully. This reply code is
  reserved for future use - the current protocol design does not use
  positive confirmation and reply codes are sent only in case of an
  error.
</constant>
  <constant name="not delivered" value="310" class="soft error">
  The client asked for a specific message that is no longer available.
  The message was delivered to another client, or was purged from the
  queue for some other reason.

share/fixed_amqp0-8.xml  view on Meta::CPAN

  The client attempted to work with a server entity to which it has
  no  due to security settings.
</constant>
  <constant name="not found" value="404" class="soft error">
  The client attempted to work with a server entity that does not exist.
</constant>
  <constant name="resource locked" value="405" class="soft error">
  The client attempted to work with a server entity to which it has
  no access because another client is working with it.
</constant>
  <constant name="frame error" value="501" class="hard error">
  The client sent a malformed frame that the server could not decode.
  This strongly implies a programming error in the client.
</constant>
  <constant name="syntax error" value="502" class="hard error">
  The client sent a frame that contained illegal values for one or more
  fields.  This strongly implies a programming error in the client.
</constant>
  <constant name="command invalid" value="503" class="hard error">
  The client sent an invalid sequence of frames, attempting to perform
  an operation that was considered invalid by the server. This usually
  implies a programming error in the client.
</constant>
  <constant name="channel error" value="504" class="hard error">
  The client attempted to work with a channel that had not been
  correctly opened.  This most likely indicates a fault in the client
  layer.
</constant>
  <constant name="resource error" value="506" class="hard error">
  The server could not complete the method because it lacked sufficient

share/fixed_amqp0-8.xml  view on Meta::CPAN

      <response name="tune-ok"/>
      <field name="channel max" type="short">
    proposed maximum channels
    <doc>
      The maximum total number of channels that the server allows
      per connection. Zero means that the server does not impose a
      fixed limit, but the number of allowed channels may be limited
      by available server resources.
    </doc>
      </field>
      <field name="frame max" type="long">
    proposed maximum frame size
    <doc>
      The largest frame size that the server proposes for the
      connection. The client can negotiate a lower value.  Zero means
      that the server does not impose any specific limit but may reject
      very large frames if it cannot allocate resources for them.
    </doc>
        <rule implement="MUST">
      Until the frame-max has been negotiated, both peers MUST accept
      frames of up to 4096 octets large. The minimum non-zero value for
      the frame-max field is 4096.
    </rule>
      </field>
      <field name="heartbeat" type="short">
    desired heartbeat delay
    <doc>
      The delay, in seconds, of the connection heartbeat that the server
      wants.  Zero means the server does not want a heartbeat.
    </doc>
      </field>
    </method>

share/fixed_amqp0-8.xml  view on Meta::CPAN

      per connection.  May not be higher than the value specified by
      the server.
    </doc>
        <rule implement="MAY">
      The server MAY ignore the channel-max value or MAY use it for
      tuning its resource allocation.
    </rule>
        <assert check="notnull"/>
        <assert check="le" method="tune" field="channel max"/>
      </field>
      <field name="frame max" type="long">
    negotiated maximum frame size
    <doc>
      The largest frame size that the client and server will use for
      the connection.  Zero means that the client does not impose any
      specific limit but may reject very large frames if it cannot
      allocate resources for them.  Note that the frame-max limit
      applies principally to content frames, where large contents
      can be broken into frames of arbitrary size.
    </doc>
        <rule implement="MUST">
      Until the frame-max has been negotiated, both peers must accept
      frames of up to 4096 octets large. The minimum non-zero value for
      the frame-max field is 4096.
    </rule>
      </field>
      <field name="heartbeat" type="short">
    desired heartbeat delay
    <doc>
      The delay, in seconds, of the connection heartbeat that the client
      wants. Zero means the client does not want a heartbeat.
    </doc>
      </field>
    </method>

share/fixed_amqp0-8.xml  view on Meta::CPAN

    intended for window control.  The peer that receives a request to
    stop sending content should finish sending the current content, if
    any, and then wait until it receives a Flow restart method.
  </doc>
      <rule implement="MAY">
    When a new channel is opened, it is active.  Some applications
    assume that channels are inactive until started.  To emulate this
    behaviour a client MAY open the channel, then pause it.
  </rule>
      <rule implement="SHOULD">
    When sending content data in multiple frames, a peer SHOULD monitor
    the channel for incoming methods and respond to a Channel.Flow as
    rapidly as possible.
  </rule>
      <rule implement="MAY">
    A peer MAY use the Channel.Flow method to throttle incoming content
    data for internal reasons, for example, when exchangeing data over a
    slower connection.
  </rule>
      <rule implement="MAY">
    The peer that requests a Channel.Flow method MAY disconnect and/or
    ban a peer that does not respect the request.
  </rule>
      <chassis name="server" implement="MUST"/>
      <chassis name="client" implement="MUST"/>
      <response name="flow-ok"/>
      <field name="active" type="bit">
    start/stop content frames
    <doc>
      If 1, the peer starts sending content frames.  If 0, the peer
      stops sending content frames.
    </doc>
      </field>
    </method>
    <method name="flow-ok" index="21">
  confirm a flow method
  <doc>
    Confirms to the peer that a flow command was received and processed.
  </doc>
      <chassis name="server" implement="MUST"/>
      <chassis name="client" implement="MUST"/>
      <field name="active" type="bit">
    current flow setting
    <doc>
      Confirms the setting of the processed flow method: 1 means the
      peer will start sending or continue to send content frames; 0
      means it will not.
    </doc>
      </field>
    </method>
    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
    <method name="alert" index="30">
  send a non-fatal warning message
  <doc>
    This method allows the server to send a non-fatal warning to the
    client.  This is used for methods that are normally asynchronous

share/fixed_amqp0-8.xml  view on Meta::CPAN

  reject an incoming message
  <doc>
    This method allows a client to reject a message.  It can be used to
    interrupt and cancel large incoming messages, or return untreatable
    messages to their original queue.
  </doc>
  <doc name = "rule" test = "amq_basic_21">
    The server SHOULD be capable of accepting and process the Reject
    method while sending message content with a Deliver or Get-Ok
    method.  I.e. the server should read and process incoming methods
    while sending output frames.  To cancel a partially-send content,
    the server sends a content body frame of size 1 (i.e. with no data
    except the frame-end octet).
  </doc>
  <doc name = "rule" test = "amq_basic_22">
    The server SHOULD interpret this method as meaning that the client
    is unable to process the message at this time.
  </doc>
  <doc name = "rule">
    A client MUST NOT use this method as a means of selecting messages
    to process.  A rejected message MAY be discarded or dead-lettered,
    not necessarily passed to another client.
  </doc>      

share/fixed_amqp0-8.xml  view on Meta::CPAN

  <class name="tunnel" handler="tunnel" index="110">
    <!--
======================================================
==       TUNNEL
======================================================
-->
  methods for protocol tunneling.

<doc>
  The tunnel methods are used to send blocks of binary data - which
  can be serialised AMQP methods or other protocol frames - between
  AMQP peers.
</doc>
    <doc name="grammar">
    tunnel              = C:REQUEST
                        / S:REQUEST
</doc>
    <chassis name="server" implement="MAY"/>
    <chassis name="client" implement="MAY"/>
    <field name="headers" type="table">
    Message header field table

share/fixed_amqp0-8.xml  view on Meta::CPAN

======================================================
==       TEST - CHECK FUNCTIONAL CAPABILITIES OF AN IMPLEMENTATION
======================================================
-->
  test functional primitives of the implementation

<doc>
  The test class provides methods for a peer to test the basic
  operational correctness of another peer. The test methods are
  intended to ensure that all peers respect at least the basic
  elements of the protocol, such as frame and content organisation
  and field types. We assume that a specially-designed peer, a
  "monitor client" would perform such tests.
</doc>
    <doc name="grammar">
    test                = C:INTEGER S:INTEGER-OK
                        / S:INTEGER C:INTEGER-OK
                        / C:STRING S:STRING-OK
                        / S:STRING C:STRING-OK
                        / C:TABLE S:TABLE-OK
                        / S:TABLE C:TABLE-OK

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

 -->

<amqp major = "0" minor = "9" revision = "1"
    port = "5672" comment = "AMQ Protocol version 0-9-1">
  <!--
      ======================================================
      ==       CONSTANTS
      ======================================================
  -->
  <!-- Frame types -->
  <constant name = "frame-method"     value = "1" />
  <constant name = "frame-header"     value = "2" />
  <constant name = "frame-body"       value = "3" />
  <constant name = "frame-heartbeat"  value = "8" />

  <!-- Protocol constants -->
  <constant name = "frame-min-size"   value = "4096" />
  <constant name = "frame-end"        value = "206" />

  <!-- Reply codes -->
  <constant name = "reply-success" value = "200">
    <doc>
      Indicates that the method completed successfully. This reply code is
      reserved for future use - the current protocol design does not use positive
      confirmation and reply codes are sent only in case of an error.
    </doc>
  </constant>

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

    </doc>
  </constant>

  <constant name = "precondition-failed" value = "406" class = "soft-error">
    <doc>
      The client requested a method that was not allowed because some precondition
      failed.
    </doc>
  </constant>

  <constant name = "frame-error" value = "501" class = "hard-error">
    <doc>
      The sender sent a malformed frame that the recipient could not decode.
      This strongly implies a programming error in the sending peer.
    </doc>
  </constant>

  <constant name = "syntax-error" value = "502" class = "hard-error">
    <doc>
      The sender sent a frame that contained illegal values for one or more
      fields. This strongly implies a programming error in the sending peer.
    </doc>
  </constant>

  <constant name = "command-invalid" value = "503" class = "hard-error">
    <doc>
      The client sent an invalid sequence of frames, attempting to perform an
      operation that was considered invalid by the server. This usually implies
      a programming error in the client.
    </doc>
  </constant>

  <constant name = "channel-error" value = "504" class = "hard-error">
    <doc>
      The client attempted to work with a channel that had not been correctly
      opened. This most likely indicates a fault in the client layer.
    </doc>
  </constant>

  <constant name = "unexpected-frame" value = "505" class = "hard-error">
    <doc>
      The peer sent a frame that was not expected, usually in the context of
      a content header and body.  This strongly indicates a fault in the peer's
      content processing.
    </doc>
  </constant>

  <constant name = "resource-error" value = "506" class = "hard-error">
    <doc>
      The server could not complete the method because it lacked sufficient
      resources. This may be due to the client creating too many of some type
      of entity.

share/fixed_amqp0-9-1.xml  view on Meta::CPAN


      <response name = "tune-ok" />

      <field name = "channel-max" domain = "short" label = "proposed maximum channels">
        <doc>
          Specifies highest channel number that the server permits.  Usable channel numbers
          are in the range 1..channel-max.  Zero indicates no specified limit.
        </doc>
      </field>

      <field name = "frame-max" domain = "long" label = "proposed maximum frame size">
        <doc>
          The largest frame size that the server proposes for the connection, including
          frame header and end-byte.  The client can negotiate a lower value. Zero means
          that the server does not impose any specific limit but may reject very large
          frames if it cannot allocate resources for them.
        </doc>
        <rule name = "minimum">
          <doc>
            Until the frame-max has been negotiated, both peers MUST accept frames of up
            to frame-min-size octets large, and the minimum negotiated value for frame-max
            is also frame-min-size.
          </doc>
          <doc type = "scenario">
            Client connects to server and sends a large properties field, creating a frame
            of frame-min-size octets.  The server must accept this frame.
          </doc>
        </rule>
      </field>

      <field name = "heartbeat" domain = "short" label = "desired heartbeat delay">
        <doc>
          The delay, in seconds, of the connection heartbeat that the server wants.
          Zero means the server does not want a heartbeat.
        </doc>
      </field>

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

            If the client specifies a channel max that is higher than the value provided
            by the server, the server MUST close the connection without attempting a
            negotiated close.  The server may report the error in some fashion to assist
            implementors.
          </doc>
        </rule>
        <assert check = "notnull" />
        <assert check = "le" method = "tune" field = "channel-max" />
      </field>

      <field name = "frame-max" domain = "long" label = "negotiated maximum frame size">
        <doc>
          The largest frame size that the client and server will use for the connection.
          Zero means that the client does not impose any specific limit but may reject
          very large frames if it cannot allocate resources for them. Note that the
          frame-max limit applies principally to content frames, where large contents can
          be broken into frames of arbitrary size.
        </doc>
        <rule name = "minimum">
          <doc>
            Until the frame-max has been negotiated, both peers MUST accept frames of up
            to frame-min-size octets large, and the minimum negotiated value for frame-max
            is also frame-min-size.
          </doc>
        </rule>
        <rule name = "upper-limit">
          <doc>
            If the client specifies a frame max that is higher than the value provided
            by the server, the server MUST close the connection without attempting a
            negotiated close. The server may report the error in some fashion to assist
            implementors.
          </doc>
        </rule>
      </field>

      <field name = "heartbeat" domain = "short" label = "desired heartbeat delay">
        <doc>
          The delay, in seconds, of the connection heartbeat that the client wants. Zero

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

      <rule name = "initial-state">
        <doc>
          When a new channel is opened, it is active (flow is active). Some applications
          assume that channels are inactive until started. To emulate this behaviour a
          client MAY open the channel, then pause it.
        </doc>
      </rule>

      <rule name = "bidirectional">
        <doc>
          When sending content frames, a peer SHOULD monitor the channel for incoming
          methods and respond to a Channel.Flow as rapidly as possible.
        </doc>
      </rule>

      <rule name = "throttling">
        <doc>
          A peer MAY use the Channel.Flow method to throttle incoming content data for
          internal reasons, for example, when exchanging data over a slower connection.
        </doc>
      </rule>

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

          that does not respect the request.  This is to prevent badly-behaved clients
          from overwhelming a server.
        </doc>
      </rule>

      <chassis name = "server" implement = "MUST" />
      <chassis name = "client" implement = "MUST" />

      <response name = "flow-ok" />

      <field name = "active" domain = "bit" label = "start/stop content frames">
        <doc>
          If 1, the peer starts sending content frames. If 0, the peer stops sending
          content frames.
        </doc>
      </field>
    </method>

    <method name = "flow-ok" index = "21" label = "confirm a flow method">
      <doc>
        Confirms to the peer that a flow command was received and processed.
      </doc>
      <chassis name = "server" implement = "MUST" />
      <chassis name = "client" implement = "MUST" />
      <field name = "active" domain = "bit" label = "current flow setting">
        <doc>
          Confirms the setting of the processed flow method: 1 means the peer will start
          sending or continue to send content frames; 0 means it will not.
        </doc>
      </field>
    </method>

    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->

    <method name = "close" synchronous = "1" index = "40" label = "request a channel close">
      <doc>
        This method indicates that the sender wants to close the channel. This may be due to
        internal conditions (e.g. a forced shut-down) or due to an error handling a specific

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

      <doc>
        This method allows a client to reject a message. It can be used to interrupt and
        cancel large incoming messages, or return untreatable messages to their original
        queue.
      </doc>

      <rule name = "01">
        <doc>
          The server SHOULD be capable of accepting and process the Reject method while
          sending message content with a Deliver or Get-Ok method. I.e. the server should
          read and process incoming methods while sending output frames. To cancel a
          partially-send content, the server sends a content body frame of size 1 (i.e.
          with no data except the frame-end octet).
        </doc>
      </rule>

      <rule name = "02">
        <doc>
          The server SHOULD interpret this method as meaning that the client is unable to
          process the message at this time.
        </doc>
        <doc type = "scenario">
          TODO.

share/fixed_amqp0-9-1.xml  view on Meta::CPAN


        This method is also used by the server to inform publishers on channels in
        confirm mode of unhandled messages.  If a publisher receives this method, it
        probably needs to republish the offending messages.
      </doc>

      <rule name = "01">
        <doc>
          The server SHOULD be capable of accepting and processing the Nack method while
          sending message content with a Deliver or Get-Ok method. I.e. the server should
          read and process incoming methods while sending output frames. To cancel a
          partially-send content, the server sends a content body frame of size 1 (i.e.
          with no data except the frame-end octet).
        </doc>
      </rule>

      <rule name = "02">
        <doc>
          The server SHOULD interpret this method as meaning that the client is unable to
          process the message at this time.
        </doc>
        <doc type = "scenario">
          TODO.

xt/04_anyevent.t  view on Meta::CPAN

    $z->connect(
        (map {$_ => $conf{$_}} qw(host port user pass vhost)),
        timeout    => 1,
        on_success => sub {
            my $ar = shift;
            isa_ok($ar, 'AnyEvent::RabbitMQ');
            $done->send;
        },
        on_failure => failure_cb($done),
        on_return  => sub {
            my $method_frame = shift->method_frame;
            die "return: ", $method_frame->reply_code, $method_frame->reply_text
              if $method_frame->reply_code;
        },
        on_close   => sub {
            my $method_frame = shift->method_frame;
            Carp::confess "close: ", $method_frame->reply_code, $method_frame->reply_text
              if $method_frame->reply_code;
        },
        @{ $opt },
    );
    $done->recv;
}

my $done = AnyEvent->condvar;
$ar->connect(
    (map {$_ => $conf{$_}} qw(host port user pass vhost)),
    tune       => { frame_max => 2**17 },
    timeout    => 1,
    on_success => sub {
        my $ar = shift;
        isa_ok($ar, 'AnyEvent::RabbitMQ');
        $server{product} = $ar->server_properties->{product};
        $server{version} = version->parse($ar->server_properties->{version});
        $done->send;
    },
    on_failure => failure_cb($done),
    on_return  => sub {
        my $method_frame = shift->method_frame;
        die "return: ", $method_frame->reply_code, $method_frame->reply_text
          if $method_frame->reply_code;
    },
    on_close   => sub {
        my $method_frame = shift->method_frame;
        Carp::confess "close: ", $method_frame->reply_code, $method_frame->reply_text
          if $method_frame->reply_code;
    },
);
$done->recv;

my $ch;
$done = AnyEvent->condvar;
open_ch($done);
$done->recv;

sub open_ch {
    my ($cv,) = @_;
    $ar->open_channel(
    on_success => sub {
            $ch = shift;
            isa_ok($ch, 'AnyEvent::RabbitMQ::Channel');
            $cv->send;
        },
        on_failure => failure_cb($cv),
        on_close   => sub {
            my $method_frame = shift->method_frame;
            die $method_frame->reply_code, $method_frame->reply_text
              if $method_frame->reply_code;
        },
    );
}

$done = AnyEvent->condvar;
$ch->declare_exchange(
    exchange   => 'test_x',
    on_success => sub {
        pass('declare exchange');
        $done->send;

xt/04_anyevent.t  view on Meta::CPAN

    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
my $consumer_tag;
$ch->consume(
    queue      => 'test_q',
    on_success => sub {
        my $frame = shift;
        $consumer_tag = $frame->method_frame->consumer_tag;
        pass('consume');
    },
    on_consume => sub {
        my $response = shift;
        ok($response->{body}->payload, 'publish');
        $done->send;
    },
    on_failure => failure_cb($done),
);
publish($ch, 'Hello RabbitMQ.', $done,);

xt/04_anyevent.t  view on Meta::CPAN

    send_large_size_message($ch, $size);
}

$done = AnyEvent->condvar;
$ch->consume(
    queue      => 'test_q',
    no_ack     => 0,
    on_consume => sub {
        my $response = shift;
        $ch->ack(
            delivery_tag => $response->{deliver}->method_frame->delivery_tag
        );
        pass('ack deliver');

        $ch->cancel(
            consumer_tag => $response->{deliver}->method_frame->consumer_tag,
            on_success   => sub {
                pass('cancel');
                $done->send;
            },
            on_failure   => failure_cb($done),
        );
    },
    on_failure => failure_cb($done),
);
publish($ch, 'NO RabbitMQ, NO LIFE.', $done,);
$done->recv;

$done = AnyEvent->condvar;
publish($ch, 'RabbitMQ is cool.', $done,);
$ch->get(
    queue      => 'test_q',
    no_ack     => 0,
    on_success => sub {
        my $response = shift;
        $ch->ack(
            delivery_tag => $response->{ok}->method_frame->delivery_tag
        );
        pass('ack get');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
my @responses;

xt/04_anyevent.t  view on Meta::CPAN

    },
    on_failure => failure_cb($done),
);
publish($ch, 'RabbitMQ is excellent.', $done,);
publish($ch, 'RabbitMQ is fantastic.', $done,);
$done->recv;
pass('qos');

for my $response (@responses) {
    $ch->ack(
        delivery_tag => $response->{deliver}->method_frame->delivery_tag,
    );
}

$done = AnyEvent->condvar;
$ch->cancel(
    consumer_tag => $responses[0]->{deliver}->method_frame->consumer_tag,
    on_success   => sub {
        $ch->qos(
            on_success => sub {
                $done->send;
            },
            on_failure => failure_cb($done),
        );
    },
    on_failure   => failure_cb($done),
);

xt/04_anyevent.t  view on Meta::CPAN

    no_ack     => 0,
    on_consume => sub {
        my $response = shift;

        if (5 > ++$recover_count) {
            $ch->recover();
            return;
        }

        $ch->ack(
            delivery_tag => $response->{deliver}->method_frame->delivery_tag
        );

        $ch->cancel(
            consumer_tag => $response->{deliver}->method_frame->consumer_tag,
            on_success   => sub {
                $done->send;
            },
            on_failure   => failure_cb($done),
        );
    },
    on_failure => failure_cb($done),
);
publish($ch, 'RabbitMQ is powerful.', $done,);
$done->recv;

xt/04_anyevent.t  view on Meta::CPAN

    $done = AnyEvent->condvar;
    my $reject_count = 0;
    $ch->consume(
        queue      => 'test_q',
        no_ack     => 0,
        on_consume => sub {
            my $response = shift;

            if ( 5 > ++$reject_count ) {
                $ch->reject(
                    delivery_tag => $response->{deliver}->method_frame->delivery_tag,

                    # requeue! Else the server does not deliver the message again to this client.
                    requeue => 1,
                );
                return;
            }

            $ch->ack( delivery_tag => $response->{deliver}->method_frame->delivery_tag );

            $ch->cancel(
                consumer_tag => $response->{deliver}->method_frame->consumer_tag,
                on_success   => sub {
                    $done->send;
                },
                on_failure => failure_cb($done),
            );
        },
        on_failure => failure_cb($done),
    );
    my $pub_done = AnyEvent->condvar;
    publish($ch, 'RabbitMQ is powerful.', $pub_done,);

xt/05_multi_channel.t  view on Meta::CPAN

    $ch->publish(
        routing_key => $queue,
        body        => $message,
        mandatory   => 1,
    );

    return;
}

sub handle_close {
    my $method_frame = shift->method_frame;
    die $method_frame->reply_code, $method_frame->reply_text
      if $method_frame->reply_code;
}

xt/06_close.t  view on Meta::CPAN

        on_return  => sub {die 'Receive return'},
        on_close   => \&handle_close,
    );
    my $ch = $done->recv;
    die 'Open channel failure' if !$ch;

    return $ch;
}

sub handle_close {
    my $method_frame = shift->method_frame;
    die $method_frame->reply_code, $method_frame->reply_text
      if $method_frame->reply_code;
}



( run in 2.949 seconds using v1.01-cache-2.11-cpan-df04353d9ac )