AnyEvent-RabbitMQ
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
$failure_cb->('Broken data was received');
@_ = ($self, $close_cb, $failure_cb,);
goto &_read_loop;
}
$self->{_handle}->push_read(chunk => $length, sub {
my $self = $weak_self or return;
$stack .= $_[1];
my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
$self->{_heartbeat_recv} = time if $self->{_heartbeat_timer};
if ($self->{verbose}) {
warn '[C] <-- [S] ', Dumper($frame),
'-----------', "\n";
}
my $id = $frame->channel;
if (0 == $id) {
if ($frame->type_id == 8) {
# Heartbeat, no action needs taking.
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
my $self = shift;
my ($frame, $close_cb,) = @_;
my $method_frame = $frame->isa('Net::AMQP::Frame::Method') ? $frame->method_frame : undef;
if ($self->{_state} == _ST_CLOSED) {
return $method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::CloseOk');
}
if ($method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
delete $self->{_heartbeat_timer};
$self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
$self->_server_closed($close_cb, $frame);
return;
}
return 1;
}
sub _server_closed {
my $self = shift;
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
}
sub _start_heartbeat {
my ($self, $interval, %args,) = @_;
my $close_cb = $args{on_close};
my $failure_cb = $args{on_read_failure};
my $last_recv = 0;
my $idle_cycles = 0;
weaken(my $weak_self = $self);
my $timer_cb = sub {
my $self = $weak_self or return;
if ($self->{_heartbeat_recv} != $last_recv) {
$last_recv = $self->{_heartbeat_recv};
$idle_cycles = 0;
}
elsif (++$idle_cycles > 1) {
delete $self->{_heartbeat_timer};
$failure_cb->("Heartbeat lost");
$self->_server_closed($close_cb, "Heartbeat lost");
return;
}
$self->_push_write(Net::AMQP::Frame::Heartbeat->new());
};
$self->{_heartbeat_recv} = time;
$self->{_heartbeat_timer} = AnyEvent->timer(
after => $interval,
interval => $interval,
cb => $timer_cb,
);
return $self;
}
sub _open {
my $self = shift;
my %args = @_;
$self->_push_write_and_read(
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
return 1 if $self->is_open;
$failure_cb->('Connection has already been closed');
return 0;
}
sub drain_writes {
my ($self, $timeout) = shift;
$self->{drain_condvar} = AnyEvent->condvar;
if ($timeout) {
$self->{drain_timer} = AnyEvent->timer( after => $timeout, sub {
$self->{drain_condvar}->croak("Timed out after $timeout");
});
}
$self->{drain_condvar}->recv;
delete $self->{drain_timer};
}
sub DESTROY {
my $self = shift;
$self->close() unless in_global_destruction;
return;
}
1;
__END__
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
sub close {
my $self = shift;
my $connection = $self->{connection}
or return;
my %args = $connection->_set_cbs(@_);
# If open in in progess, wait for it; 1s arbitrary timing.
weaken(my $wself = $self);
my $t; $t = AE::timer 0, 1, sub {
(my $self = $wself) or undef $t, return;
return if $self->{_state} == _ST_OPENING;
# No more tests are required
undef $t;
# Double close is OK
if ($self->{_state} == _ST_CLOSED) {
$args{on_success}->($self);
return;
( run in 1.107 second using v1.01-cache-2.11-cpan-49f99fa48dc )