AnyEvent-RabbitMQ-Fork

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Fork.pm  view on Meta::CPAN

=item B<on_success> Callback when the connection is successfully established.

=item B<on_failure> Called when a failure occurs over the lifetime of the connection.

=item B<on_read_failure> Called when there is a problem reading response from the server.

=item B<on_return> Called if the server returns a published message.

=item B<on_close> Called when the connection is closed remotely.

=back

B<Returns: $self>

=item open_channel(%opts)

Open a logical channel which is where all the AMQP fun is.

Arguments:

=over

=item B<on_success> Called when the channel is open and ready for use.

=item B<on_failure> Called if there is a problem opening the channel.

=item B<on_close> Called when the channel is closed.

=back

=item close(%opts)

Close this connection.

=over

=item B<on_success> Called on successful shutdown.

=item B<on_failure> Called on failed shutdown. Note: the connection is still
closed after this

=back

=back

=cut

foreach my $method (qw(connect open_channel close)) {
    no strict 'refs';
    *$method = sub {
        my $self = shift;
        return $self->_delegate($method => 0, @_);
    };
}

sub drain_writes {
    my ($self, $to) = @_;

    my $w;
    if ($to) {
        $w = AE::timer $to, 0,
          sub { $self->_drain_cv->croak("Timed out after $to") };
    }

    $self->_drain_cv->recv;
    $self->_clear_drain_cv;
    undef $w;

    return;
}

my %event_handlers = (
    cb  => '_handle_callback',
    cbd => '_handle_callback_destroy',
    chd => '_handle_channel_destroy',
    cdw => '_handle_connection_drain_writes',
    i   => '_handle_info',
);

sub _on_event {
    my $self = shift;
    my $type = shift;

    if (my $handler = $event_handlers{$type}) {
        $self->$handler(@_);
    } else {
        croak "Unknown event type: '$type'";
    }

    return;
}

sub _handle_callback {    ## no critic (Subroutines::RequireArgUnpacking)
    my $self = shift;
    my $sig  = shift;
    my ($id, $event, $method, $pkg) = @$sig;

    warn "_handle_callback $id $event $method $pkg\n" if $self->verbose;

    if (my $cb = $self->cb_registry->{$id}) {
        if (ref($_[0]) eq 'REF' and ref(${ $_[0] }) eq 'ARRAY') {
            my ($class, @args) = @{ ${ $_[0] } };

            if ($class eq 'AnyEvent::RabbitMQ') {
                $_[0] = $self;
            } elsif ($class eq 'AnyEvent::RabbitMQ::Channel') {
                my $channel_id = shift @args;
                $_[0] = $self->channels->{$channel_id}
                  ||= $self->channel_class->new(
                    id         => $channel_id,
                    connection => $self
                  );
            } else {
                croak "Unknown class type: '$class'";
            }
        }

        goto &$cb;
    } else {
        croak "Unknown callback id: '$id'";
    }



( run in 3.094 seconds using v1.01-cache-2.11-cpan-5735350b133 )