AnyEvent-RabbitMQ-Fork
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/Fork.pm view on Meta::CPAN
=item B<on_success> Callback when the connection is successfully established.
=item B<on_failure> Called when a failure occurs over the lifetime of the connection.
=item B<on_read_failure> Called when there is a problem reading response from the server.
=item B<on_return> Called if the server returns a published message.
=item B<on_close> Called when the connection is closed remotely.
=back
B<Returns: $self>
=item open_channel(%opts)
Open a logical channel which is where all the AMQP fun is.
Arguments:
=over
=item B<on_success> Called when the channel is open and ready for use.
=item B<on_failure> Called if there is a problem opening the channel.
=item B<on_close> Called when the channel is closed.
=back
=item close(%opts)
Close this connection.
=over
=item B<on_success> Called on successful shutdown.
=item B<on_failure> Called on failed shutdown. Note: the connection is still
closed after this
=back
=back
=cut
foreach my $method (qw(connect open_channel close)) {
no strict 'refs';
*$method = sub {
my $self = shift;
return $self->_delegate($method => 0, @_);
};
}
sub drain_writes {
my ($self, $to) = @_;
my $w;
if ($to) {
$w = AE::timer $to, 0,
sub { $self->_drain_cv->croak("Timed out after $to") };
}
$self->_drain_cv->recv;
$self->_clear_drain_cv;
undef $w;
return;
}
my %event_handlers = (
cb => '_handle_callback',
cbd => '_handle_callback_destroy',
chd => '_handle_channel_destroy',
cdw => '_handle_connection_drain_writes',
i => '_handle_info',
);
sub _on_event {
my $self = shift;
my $type = shift;
if (my $handler = $event_handlers{$type}) {
$self->$handler(@_);
} else {
croak "Unknown event type: '$type'";
}
return;
}
sub _handle_callback { ## no critic (Subroutines::RequireArgUnpacking)
my $self = shift;
my $sig = shift;
my ($id, $event, $method, $pkg) = @$sig;
warn "_handle_callback $id $event $method $pkg\n" if $self->verbose;
if (my $cb = $self->cb_registry->{$id}) {
if (ref($_[0]) eq 'REF' and ref(${ $_[0] }) eq 'ARRAY') {
my ($class, @args) = @{ ${ $_[0] } };
if ($class eq 'AnyEvent::RabbitMQ') {
$_[0] = $self;
} elsif ($class eq 'AnyEvent::RabbitMQ::Channel') {
my $channel_id = shift @args;
$_[0] = $self->channels->{$channel_id}
||= $self->channel_class->new(
id => $channel_id,
connection => $self
);
} else {
croak "Unknown class type: '$class'";
}
}
goto &$cb;
} else {
croak "Unknown callback id: '$id'";
}
( run in 3.094 seconds using v1.01-cache-2.11-cpan-5735350b133 )