AnyEvent-RabbitMQ
view release on metacpan or search on metacpan
to RabbitMQ, as long as they all use the same spec file.
1.05 Tue Jul 22 16:55:55 2011
- Fixed a compiling error.
1.04 Tue Jul 19 17:04:24 2011
- Bug fix for consuming large messages.
1.03 Thu Apr 7 02:55:12 2011
- Separate AnyEvent::RabbitMQ from Net::RabbitFoot.
- Avoid (additional) issues when in global destruction.
- Do not set reply_to to an empty string in the header frame.
- Implement basic.reject (requires RabbitMQ >= 2.0.0).
- Store server properties in the object for easy server product
and sever version access.
- Shutdown the AnyEvent handle using push_shutdown.
- Be more careful in DESTROY blocks.
1.02 Wed Jun 30 11:35:32 2010
- Fix errors in global destruction due to destruction order being
random.
- Fix bug if you call ->close on a Net::RabbitFoot instance which
is not already connected. Previously this would never return.
1.01 Sun Mar 18 07:21:58 2010
- fix bugs.
- support channel.flow.
1.00 Fri Mar 5 11:30:00 2010
- fix module name.
"version" : "1.22",
"x_contributors" : [
"Tom Doran <bobtfish@cpan.org>",
"Nicolas R <atoomic@cpan.org>",
"Dave Lambley <dlambley@cpan.org>",
"Ruslan Zakirov <ruz@bestpractical.com>",
"Masahito Ikuta <cooldaemon@gmail.com>",
"Rod Taylor <rod.taylor@gmail.com>",
"Carl H\u00f6rberg <carl@cloudamqp.com>",
"Julio Polo <julio@hawaii.edu>",
"A.J. Ragusa <aragusa@globalnoc.iu.edu>",
"Jos\u00e9 Mic\u00f3",
"Scott O'Neil <scott@cpanel.net>"
],
"x_serialization_backend" : "Cpanel::JSON::XS version 3.0239"
}
version: '1.22'
x_contributors:
- 'Tom Doran <bobtfish@cpan.org>'
- 'Nicolas R <atoomic@cpan.org>'
- 'Dave Lambley <dlambley@cpan.org>'
- 'Ruslan Zakirov <ruz@bestpractical.com>'
- 'Masahito Ikuta <cooldaemon@gmail.com>'
- 'Rod Taylor <rod.taylor@gmail.com>'
- 'Carl Hörberg <carl@cloudamqp.com>'
- 'Julio Polo <julio@hawaii.edu>'
- 'A.J. Ragusa <aragusa@globalnoc.iu.edu>'
- 'José Micó'
- "Scott O'Neil <scott@cpanel.net>"
x_serialization_backend: 'YAML::Tiny version 1.70'
[Metadata]
x_contributors = Tom Doran <bobtfish@cpan.org>
x_contributors = Nicolas R <atoomic@cpan.org>
x_contributors = Dave Lambley <dlambley@cpan.org>
x_contributors = Ruslan Zakirov <ruz@bestpractical.com>
x_contributors = Masahito Ikuta <cooldaemon@gmail.com>
x_contributors = Rod Taylor <rod.taylor@gmail.com>
x_contributors = Carl Hörberg <carl@cloudamqp.com>
x_contributors = Julio Polo <julio@hawaii.edu>
x_contributors = A.J. Ragusa <aragusa@globalnoc.iu.edu>
x_contributors = José Micó
x_contributors = Scott O'Neil <scott@cpanel.net>
[InstallGuide]
[CPANFile]
[MetaJSON]
[OurPkgVersion]
underscore_eval_version = 1
no_critic = 1
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
$self->{_login_user} = $args{user};
$args{on_success}->($self);
},
$args{on_failure},
);
return $self;
}
sub close {
return if in_global_destruction;
my $self = shift;
my %args = $self->_set_cbs(@_);
if ($self->{_state} == _ST_CLOSED) {
$args{on_success}->(@_);
return $self;
}
if ($self->{_state} != _ST_OPEN) {
$args{on_failure}->(($self->{_state} == _ST_OPENING ? "open" : "close") . " already in progress");
return $self;
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
my $queue;
if (!$id) {
$queue = $self->{_queue};
} elsif (defined $self->{_channels}->{$id}) {
$queue = $self->{_channels}->{$id}->queue;
} else {
$failure_cb->('Unknown channel id: ' . $id);
}
return unless $queue; # Can go away in global destruction..
$queue->get(sub {
my $frame = shift;
return $failure_cb->('Received data is not method frame')
if !$frame->isa('Net::AMQP::Frame::Method');
my $method_frame = $frame->method_frame;
for my $exp_elem (@$exp) {
return $cb->($frame)
if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
if ($output->isa('Net::AMQP::Protocol::Base')) {
$output = $output->frame_wrap;
}
$output->channel($id || 0);
if ($self->{verbose}) {
warn '[C] --> [S] ', Dumper($output);
}
$self->{_handle}->push_write($output->to_raw_frame())
if $self->{_handle}; # Careful - could have gone (global destruction)
return;
}
sub _set_cbs {
my $self = shift;
my %args = @_;
$args{on_success} ||= sub {};
$args{on_failure} ||= sub { die @_ unless in_global_destruction };
return %args;
}
sub _check_open {
my $self = shift;
my ($failure_cb) = @_;
return 1 if $self->is_open;
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
$self->{drain_timer} = AnyEvent->timer( after => $timeout, sub {
$self->{drain_condvar}->croak("Timed out after $timeout");
});
}
$self->{drain_condvar}->recv;
delete $self->{drain_timer};
}
sub DESTROY {
my $self = shift;
$self->close() unless in_global_destruction;
return;
}
1;
__END__
=head1 NAME
AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Basic::Qos',
{
prefetch_count => 1,
prefetch_size => 0,
global => 0,
%args,
},
'Basic::QosOk',
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
my $amin = $Net::AMQP::Protocol::VERSION_MINOR;
return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;
$failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
return 0;
}
sub DESTROY {
my $self = shift;
$self->close() if !in_global_destruction && $self->is_open();
return;
}
1;
__END__
=head1 NAME
AnyEvent::RabbitMQ::Channel - Abstraction of an AMQP channel.
share/fixed_amqp0-8.xml view on Meta::CPAN
a message will only be sent in advance if both prefetch windows
(and those at the channel and connection level) allow it.
The prefetch-count is ignored if the no-ack option is set.
</doc>
<doc name = "rule" test = "amq_basic_18">
The server MAY send less data in advance than allowed by the
client's specified prefetch windows but it MUST NOT send more.
</doc>
</field>
<field name = "global" type = "bit">
apply to entire connection
<doc>
By default the QoS settings apply to the current channel only. If
this field is set, they are applied to the entire connection.
</doc>
</field>
</method>
<method name = "qos-ok" synchronous = "1" index = "11">
confirm the requested qos
share/fixed_amqp0-8.xml view on Meta::CPAN
message will only be sent in advance if both prefetch windows
(and those at the channel and connection level) allow it.
The prefetch-count is ignored if the no-ack option is set.
</doc>
<doc name = "rule">
The server MAY send less data in advance than allowed by the
client's specified prefetch windows but it MUST NOT send more.
</doc>
</field>
<field name = "global" type = "bit">
apply to entire connection
<doc>
By default the QoS settings apply to the current channel only. If
this field is set, they are applied to the entire connection.
</doc>
</field>
</method>
<method name = "qos-ok" synchronous = "1" index = "11">
confirm the requested qos
share/fixed_amqp0-8.xml view on Meta::CPAN
possible.
</doc>
<doc name = "rule">
The server MAY ignore the prefetch values and consume rates,
depending on the type of stream and the ability of the server
to queue and/or reply it. The server MAY drop low-priority
messages in favour of high-priority messages.
</doc>
</field>
<field name = "global" type = "bit">
apply to entire connection
<doc>
By default the QoS settings apply to the current channel only. If
this field is set, they are applied to the entire connection.
</doc>
</field>
</method>
<method name = "qos-ok" synchronous = "1" index = "11">
confirm the requested qos
share/fixed_amqp0-9-1.xml view on Meta::CPAN
specified prefetch windows but it MUST NOT send more.
</doc>
<doc type = "scenario">
Define a QoS prefetch-size limit and a prefetch-count limit greater than
one. Send multiple messages that exceed the prefetch size. Verify that
no more than one message arrives at once.
</doc>
</rule>
</field>
<field name = "global" domain = "bit" label = "apply to entire connection">
<doc>
By default the QoS settings apply to the current channel only. If this field is
set, they are applied to the entire connection.
</doc>
</field>
</method>
<method name = "qos-ok" synchronous = "1" index = "11" label = "confirm the requested qos">
<doc>
This method tells the client that the requested QoS levels could be handled by the
( run in 1.083 second using v1.01-cache-2.11-cpan-49f99fa48dc )