AnyEvent-RabbitMQ
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
return $self;
}
sub _start_heartbeat {
my ($self, $interval, %args,) = @_;
my $close_cb = $args{on_close};
my $failure_cb = $args{on_read_failure};
my $last_recv = 0;
my $idle_cycles = 0;
weaken(my $weak_self = $self);
my $timer_cb = sub {
my $self = $weak_self or return;
if ($self->{_heartbeat_recv} != $last_recv) {
$last_recv = $self->{_heartbeat_recv};
$idle_cycles = 0;
}
elsif (++$idle_cycles > 1) {
delete $self->{_heartbeat_timer};
$failure_cb->("Heartbeat lost");
$self->_server_closed($close_cb, "Heartbeat lost");
return;
}
$self->_push_write(Net::AMQP::Frame::Heartbeat->new());
};
$self->{_heartbeat_recv} = time;
$self->{_heartbeat_timer} = AnyEvent->timer(
after => $interval,
interval => $interval,
cb => $timer_cb,
);
return $self;
}
sub _open {
my $self = shift;
my %args = @_;
$self->_push_write_and_read(
'Connection::Open',
{
virtual_host => $args{vhost},
insist => 1,
},
'Connection::OpenOk',
sub {
$self->{_state} = _ST_OPEN;
$self->{_login_user} = $args{user};
$args{on_success}->($self);
},
$args{on_failure},
);
return $self;
}
sub close {
return if in_global_destruction;
my $self = shift;
my %args = $self->_set_cbs(@_);
if ($self->{_state} == _ST_CLOSED) {
$args{on_success}->(@_);
return $self;
}
if ($self->{_state} != _ST_OPEN) {
$args{on_failure}->(($self->{_state} == _ST_OPENING ? "open" : "close") . " already in progress");
return $self;
}
$self->{_state} = _ST_CLOSING;
my $cv = AE::cv {
delete $self->{_closing};
$self->_finish_close(%args);
};
$cv->begin();
my @ids = keys %{$self->{_channels}};
for my $id (@ids) {
my $channel = $self->{_channels}->{$id};
if ($channel->is_open) {
$cv->begin();
$channel->close(
on_success => sub { $cv->end() },
on_failure => sub { $cv->end() },
);
}
}
$cv->end();
return $self;
}
sub _finish_close {
my $self = shift;
my %args = @_;
if (my @ch = map { $_->id } grep { defined() && $_->is_open } values %{$self->{_channels}}) {
$args{on_failure}->("BUG: closing with channel(s) open: @ch");
return;
}
$self->{_state} = _ST_CLOSED;
$self->_push_write_and_read(
'Connection::Close', {}, 'Connection::CloseOk',
sub {
# circular ref ok
$self->{_handle}->push_shutdown;
$args{on_success}->(@_);
},
sub {
# circular ref ok
$self->{_handle}->push_shutdown;
$args{on_failure}->(@_);
},
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
last;
}
}
if (!$id) {
$args{on_failure}->('Ran out of channel ids');
return $self;
}
$self->{_last_chan_id} = $id;
}
my $channel = AnyEvent::RabbitMQ::Channel->new(
id => $id,
connection => $self,
on_close => $args{on_close},
);
$self->{_channels}->{$id} = $channel;
$channel->open(
on_success => sub {
$args{on_success}->($channel);
},
on_failure => sub {
$self->_delete_channel($channel);
$args{on_failure}->(@_);
},
);
return $self;
}
sub _push_write_and_read {
my $self = shift;
my ($method, $args, $exp, $cb, $failure_cb, $id,) = @_;
$method = 'Net::AMQP::Protocol::' . $method;
$self->_push_write(
Net::AMQP::Frame::Method->new(
method_frame => $method->new(%$args)
),
$id,
);
return $self->_push_read_and_valid($exp, $cb, $failure_cb, $id,);
}
sub _push_read_and_valid {
my $self = shift;
my ($exp, $cb, $failure_cb, $id,) = @_;
$exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];
my $queue;
if (!$id) {
$queue = $self->{_queue};
} elsif (defined $self->{_channels}->{$id}) {
$queue = $self->{_channels}->{$id}->queue;
} else {
$failure_cb->('Unknown channel id: ' . $id);
}
return unless $queue; # Can go away in global destruction..
$queue->get(sub {
my $frame = shift;
return $failure_cb->('Received data is not method frame')
if !$frame->isa('Net::AMQP::Frame::Method');
my $method_frame = $frame->method_frame;
for my $exp_elem (@$exp) {
return $cb->($frame)
if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
}
$failure_cb->(
$method_frame->isa('Net::AMQP::Protocol::Channel::Close')
? 'Channel closed'
: 'Expected ' . join(',', @$exp) . ' but got ' . ref($method_frame)
);
});
}
sub _push_write {
my $self = shift;
my ($output, $id,) = @_;
if ($output->isa('Net::AMQP::Protocol::Base')) {
$output = $output->frame_wrap;
}
$output->channel($id || 0);
if ($self->{verbose}) {
warn '[C] --> [S] ', Dumper($output);
}
$self->{_handle}->push_write($output->to_raw_frame())
if $self->{_handle}; # Careful - could have gone (global destruction)
return;
}
sub _set_cbs {
my $self = shift;
my %args = @_;
$args{on_success} ||= sub {};
$args{on_failure} ||= sub { die @_ unless in_global_destruction };
return %args;
}
sub _check_open {
my $self = shift;
my ($failure_cb) = @_;
return 1 if $self->is_open;
$failure_cb->('Connection has already been closed');
return 0;
}
sub drain_writes {
my ($self, $timeout) = shift;
$self->{drain_condvar} = AnyEvent->condvar;
if ($timeout) {
$self->{drain_timer} = AnyEvent->timer( after => $timeout, sub {
$self->{drain_condvar}->croak("Timed out after $timeout");
});
}
$self->{drain_condvar}->recv;
delete $self->{drain_timer};
}
sub DESTROY {
my $self = shift;
$self->close() unless in_global_destruction;
return;
}
1;
__END__
=head1 NAME
AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
=head1 SYNOPSIS
use AnyEvent::RabbitMQ;
my $cv = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0, # Or 1 if you'd like SSL
tls_ctx => $anyevent_tls # or a hash of AnyEvent::TLS options.
tune => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
nodelay => 1, # Reduces latency by disabling Nagle's algorithm
on_success => sub {
my $ar = shift;
$ar->open_channel(
on_success => sub {
my $channel = shift;
$channel->declare_exchange(
exchange => 'test_exchange',
on_success => sub {
$cv->send('Declared exchange');
},
on_failure => $cv,
);
},
on_failure => $cv,
on_close => sub {
my $method_frame = shift->method_frame;
die $method_frame->reply_code, $method_frame->reply_text;
},
);
},
on_failure => $cv,
on_read_failure => sub { die @_ },
on_return => sub {
my $frame = shift;
die "Unable to deliver ", Dumper($frame);
},
on_close => sub {
my $why = shift;
if (ref($why)) {
my $method_frame = $why->method_frame;
die $method_frame->reply_code, ": ", $method_frame->reply_text;
}
else {
( run in 1.414 second using v1.01-cache-2.11-cpan-5b529ec07f3 )