AnyEvent-RabbitMQ
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
if ($method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
delete $self->{_heartbeat_timer};
$self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
$self->_server_closed($close_cb, $frame);
return;
}
return 1;
}
sub _server_closed {
my $self = shift;
my ($close_cb, $why,) = @_;
$self->{_state} = _ST_CLOSING;
for my $channel (values %{ $self->{_channels} }) {
$channel->_closed(ref($why) ? $why : $channel->_close_frame($why));
}
$self->{_channels} = {};
$self->{_handle}->push_shutdown;
$self->{_state} = _ST_CLOSED;
$close_cb->($why);
return;
}
sub _start {
my $self = shift;
my %args = @_;
if ($self->{verbose}) {
warn 'post header', "\n";
}
$self->{_handle}->push_write(Net::AMQP::Protocol->header);
$self->_push_read_and_valid(
'Connection::Start',
sub {
my $frame = shift;
my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
if none {$_ eq 'AMQPLAIN'} @mechanisms;
my @locales = split /\s/, $frame->method_frame->locales;
return $args{on_failure}->('en_US is not found in locales')
if none {$_ eq 'en_US'} @locales;
$self->{_server_properties} = $frame->method_frame->server_properties;
$self->_push_write(
Net::AMQP::Protocol::Connection::StartOk->new(
client_properties => {
platform => 'Perl',
product => __PACKAGE__,
information => 'http://d.hatena.ne.jp/cooldaemon/',
version => Net::AMQP::Value::String->new(__PACKAGE__->VERSION),
capabilities => {
consumer_cancel_notify => Net::AMQP::Value::true,
exchange_exchange_bindings => Net::AMQP::Value::true,
},
%{ $args{client_properties} || {} },
},
mechanism => 'AMQPLAIN',
response => {
LOGIN => $args{user},
PASSWORD => $args{pass},
},
locale => 'en_US',
),
);
$self->_tune(%args,);
},
$args{on_failure},
);
return $self;
}
sub _tune {
my $self = shift;
my %args = @_;
weaken(my $weak_self = $self);
$self->_push_read_and_valid(
'Connection::Tune',
sub {
my $self = $weak_self or return;
my $frame = shift;
my %tune;
foreach (qw( channel_max frame_max heartbeat )) {
my $client = $args{tune}{$_} || 0;
my $server = $frame->method_frame->$_ || 0;
# negotiate with the server such that we cannot request a larger
# value set by the server, unless the server said unlimited
$tune{$_} = ($server == 0 or $client == 0)
? ($server > $client ? $server : $client) # max
: ($client > $server ? $server : $client); # min
}
if ($self->{_frame_max} = $tune{frame_max}) {
# calculate how big the body can actually be
$self->{_body_max} = $self->{_frame_max} - Net::AMQP::_HEADER_LEN - Net::AMQP::_FOOTER_LEN;
}
$self->{_channel_max} = $tune{channel_max} || $DEFAULT_CHANNEL_MAX;
$self->_push_write(
Net::AMQP::Protocol::Connection::TuneOk->new(%tune,)
);
if ($tune{heartbeat} > 0) {
$self->_start_heartbeat($tune{heartbeat}, %args,);
}
$self->_open(%args,);
},
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
my $cv = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0, # Or 1 if you'd like SSL
tls_ctx => $anyevent_tls # or a hash of AnyEvent::TLS options.
tune => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
nodelay => 1, # Reduces latency by disabling Nagle's algorithm
on_success => sub {
my $ar = shift;
$ar->open_channel(
on_success => sub {
my $channel = shift;
$channel->declare_exchange(
exchange => 'test_exchange',
on_success => sub {
$cv->send('Declared exchange');
},
on_failure => $cv,
);
},
on_failure => $cv,
on_close => sub {
my $method_frame = shift->method_frame;
die $method_frame->reply_code, $method_frame->reply_text;
},
);
},
on_failure => $cv,
on_read_failure => sub { die @_ },
on_return => sub {
my $frame = shift;
die "Unable to deliver ", Dumper($frame);
},
on_close => sub {
my $why = shift;
if (ref($why)) {
my $method_frame = $why->method_frame;
die $method_frame->reply_code, ": ", $method_frame->reply_text;
}
else {
die $why;
}
},
);
print $cv->recv, "\n";
=head1 DESCRIPTION
AnyEvent::RabbitMQ is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in an asynchronous fashion.
You can use AnyEvent::RabbitMQ to -
* Declare and delete exchanges
* Declare, delete, bind and unbind queues
* Set QoS and confirm mode
* Publish, consume, get, ack, recover and reject messages
* Select, commit and rollback transactions
Most of these actions can be done through L<AnyEvent::RabbitMQ::Channel>.
Please see the documentation there for more details.
AnyEvent::RabbitMQ is known to work with RabbitMQ versions 2.5.1 and versions 0-8 and 0-9-1 of the AMQP specification.
This client is the non-blocking version, for a blocking version with a similar API, see L<Net::RabbitFoot>.
=head1 AUTHOR
Masahito Ikuta E<lt>cooldaemon@gmail.comE<gt>
=head1 MAINTAINER
Currently maintained by C<< <bobtfish@bobtfish.net> >> due to the original
author being missing in action.
=head1 COPYRIGHT
Copyright (c) 2010, the above named author(s).
=head1 SEE ALSO
=head1 LICENSE
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=cut
( run in 1.446 second using v1.01-cache-2.11-cpan-97f6503c9c8 )