AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

            $failure_cb->('Broken data was received');
            @_ = ($self, $close_cb, $failure_cb,);
            goto &_read_loop;
        }

        $self->{_handle}->push_read(chunk => $length, sub {
            my $self = $weak_self or return;
            $stack .= $_[1];
            my ($frame) = Net::AMQP->parse_raw_frames(\$stack);

            $self->{_heartbeat_recv} = time if $self->{_heartbeat_timer};

            if ($self->{verbose}) {
                warn '[C] <-- [S] ', Dumper($frame),
                     '-----------', "\n";
            }

            my $id = $frame->channel;
            if (0 == $id) {
                if ($frame->type_id == 8) {
                    # Heartbeat, no action needs taking.

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

    my $self = shift;
    my ($frame, $close_cb,) = @_;

    my $method_frame = $frame->isa('Net::AMQP::Frame::Method') ? $frame->method_frame : undef;

    if ($self->{_state} == _ST_CLOSED) {
        return $method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::CloseOk');
    }

    if ($method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
        delete $self->{_heartbeat_timer};
        $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
        $self->_server_closed($close_cb, $frame);
        return;
    }

    return 1;
}

sub _server_closed {
    my $self = shift;

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

}

sub _start_heartbeat {
    my ($self, $interval, %args,) = @_;

    my $close_cb   = $args{on_close};
    my $failure_cb = $args{on_read_failure};
    my $last_recv = 0;
    my $idle_cycles = 0;
    weaken(my $weak_self = $self);
    my $timer_cb = sub {
        my $self = $weak_self or return;
        if ($self->{_heartbeat_recv} != $last_recv) {
            $last_recv = $self->{_heartbeat_recv};
            $idle_cycles = 0;
        }
        elsif (++$idle_cycles > 1) {
            delete $self->{_heartbeat_timer};
            $failure_cb->("Heartbeat lost");
            $self->_server_closed($close_cb, "Heartbeat lost");
            return;
        }
        $self->_push_write(Net::AMQP::Frame::Heartbeat->new());
    };

    $self->{_heartbeat_recv} = time;
    $self->{_heartbeat_timer} = AnyEvent->timer(
        after    => $interval,
        interval => $interval,
        cb       => $timer_cb,
    );

    return $self;
}

sub _open {
    my $self = shift;
    my %args = @_;

    $self->_push_write_and_read(

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

    return 1 if $self->is_open;

    $failure_cb->('Connection has already been closed');
    return 0;
}

sub drain_writes {
    my ($self, $timeout) = shift;
    $self->{drain_condvar} = AnyEvent->condvar;
    if ($timeout) {
        $self->{drain_timer} = AnyEvent->timer( after => $timeout, sub {
            $self->{drain_condvar}->croak("Timed out after $timeout");
        });
    }
    $self->{drain_condvar}->recv;
    delete $self->{drain_timer};
}

sub DESTROY {
    my $self = shift;
    $self->close() unless in_global_destruction;
    return;
}

1;
__END__

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN


sub close {
    my $self = shift;
    my $connection = $self->{connection}
        or return;
    my %args = $connection->_set_cbs(@_);

    # If open in in progess, wait for it; 1s arbitrary timing.

    weaken(my $wself = $self);
    my $t; $t = AE::timer 0, 1, sub {
	(my $self = $wself) or undef $t, return;
	return if $self->{_state} == _ST_OPENING;

	# No more tests are required
	undef $t;

        # Double close is OK
	if ($self->{_state} == _ST_CLOSED) {
	    $args{on_success}->($self);
            return;



( run in 1.107 second using v1.01-cache-2.11-cpan-49f99fa48dc )