view release on metacpan or search on metacpan
1.19 Sat Mar 21 16:49:24 GMT 2015
- Add 'no_ack' as an optional argument to the ->consume method
(Dave Mueller).
- Fill in some missing documentation (Moritz Lenz).
1.18 Mon Sep 29 19:36:00 PDT 2014
- Added the bind_exchange and unbind_exchange methods
for exchange-exchange bindings.
1.17 Fri Jul 25 14:02:00 PDT 2014
- Add support for chunking large bodies into multiple AMQP frames,
allowing the sending of large messages.
1.16 Sat Apr 12 14:42:00 BST 2014
- Doc fixes (Mark Ellis)
- Fix leak when calling ->close + tests (Peter Haworth)
1.15 Mon Jul 1 12:35:00 BST 2013
- Fix paper-bag bug in connection close - calling nonexistent method.
1.14 Fri Jun 7 08:54:00 BST 2013
1.05 Tue Jul 22 16:55:55 2011
- Fixed a compiling error.
1.04 Tue Jul 19 17:04:24 2011
- Bug fix for consuming large messages.
1.03 Thu Apr 7 02:55:12 2011
- Separate AnyEvent::RabbitMQ from Net::RabbitFoot.
- Avoid (additional) issues when in global destruction.
- Do not set reply_to to an empty string in the header frame.
- Implement basic.reject (requires RabbitMQ >= 2.0.0).
- Store server properties in the object for easy server product
and sever version access.
- Shutdown the AnyEvent handle using push_shutdown.
- Be more careful in DESTROY blocks.
1.02 Wed Jun 30 11:35:32 2010
- Fix errors in global destruction due to destruction order being
random.
- Fix bug if you call ->close on a Net::RabbitFoot instance which
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
my $class = shift;
return bless {
verbose => 0,
@_,
_state => _ST_CLOSED,
_queue => AnyEvent::RabbitMQ::LocalQueue->new,
_last_chan_id => 0,
_channels => {},
_login_user => '',
_server_properties => {},
_frame_max => undef,
_body_max => undef,
_channel_max => undef,
}, $class;
}
sub verbose {
my $self = shift;
@_ ? ($self->{verbose} = shift) : $self->{verbose}
}
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
if (!defined $type_id || !defined $channel || !defined $length) {
$failure_cb->('Broken data was received');
@_ = ($self, $close_cb, $failure_cb,);
goto &_read_loop;
}
$self->{_handle}->push_read(chunk => $length, sub {
my $self = $weak_self or return;
$stack .= $_[1];
my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
$self->{_heartbeat_recv} = time if $self->{_heartbeat_timer};
if ($self->{verbose}) {
warn '[C] <-- [S] ', Dumper($frame),
'-----------', "\n";
}
my $id = $frame->channel;
if (0 == $id) {
if ($frame->type_id == 8) {
# Heartbeat, no action needs taking.
}
else {
return unless $self->_check_close_and_clean($frame, $close_cb,);
$self->{_queue}->push($frame);
}
} else {
my $channel = $self->{_channels}->{$id};
if (defined $channel) {
$channel->push_queue_or_consume($frame, $failure_cb);
} else {
$failure_cb->('Unknown channel id: ' . $frame->channel);
}
}
@_ = ($self, $close_cb, $failure_cb,);
goto &_read_loop;
});
});
return $self;
}
sub _check_close_and_clean {
my $self = shift;
my ($frame, $close_cb,) = @_;
my $method_frame = $frame->isa('Net::AMQP::Frame::Method') ? $frame->method_frame : undef;
if ($self->{_state} == _ST_CLOSED) {
return $method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::CloseOk');
}
if ($method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
delete $self->{_heartbeat_timer};
$self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
$self->_server_closed($close_cb, $frame);
return;
}
return 1;
}
sub _server_closed {
my $self = shift;
my ($close_cb, $why,) = @_;
$self->{_state} = _ST_CLOSING;
for my $channel (values %{ $self->{_channels} }) {
$channel->_closed(ref($why) ? $why : $channel->_close_frame($why));
}
$self->{_channels} = {};
$self->{_handle}->push_shutdown;
$self->{_state} = _ST_CLOSED;
$close_cb->($why);
return;
}
sub _start {
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
if ($self->{verbose}) {
warn 'post header', "\n";
}
$self->{_handle}->push_write(Net::AMQP::Protocol->header);
$self->_push_read_and_valid(
'Connection::Start',
sub {
my $frame = shift;
my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
if none {$_ eq 'AMQPLAIN'} @mechanisms;
my @locales = split /\s/, $frame->method_frame->locales;
return $args{on_failure}->('en_US is not found in locales')
if none {$_ eq 'en_US'} @locales;
$self->{_server_properties} = $frame->method_frame->server_properties;
$self->_push_write(
Net::AMQP::Protocol::Connection::StartOk->new(
client_properties => {
platform => 'Perl',
product => __PACKAGE__,
information => 'http://d.hatena.ne.jp/cooldaemon/',
version => Net::AMQP::Value::String->new(__PACKAGE__->VERSION),
capabilities => {
consumer_cancel_notify => Net::AMQP::Value::true,
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
sub _tune {
my $self = shift;
my %args = @_;
weaken(my $weak_self = $self);
$self->_push_read_and_valid(
'Connection::Tune',
sub {
my $self = $weak_self or return;
my $frame = shift;
my %tune;
foreach (qw( channel_max frame_max heartbeat )) {
my $client = $args{tune}{$_} || 0;
my $server = $frame->method_frame->$_ || 0;
# negotiate with the server such that we cannot request a larger
# value set by the server, unless the server said unlimited
$tune{$_} = ($server == 0 or $client == 0)
? ($server > $client ? $server : $client) # max
: ($client > $server ? $server : $client); # min
}
if ($self->{_frame_max} = $tune{frame_max}) {
# calculate how big the body can actually be
$self->{_body_max} = $self->{_frame_max} - Net::AMQP::_HEADER_LEN - Net::AMQP::_FOOTER_LEN;
}
$self->{_channel_max} = $tune{channel_max} || $DEFAULT_CHANNEL_MAX;
$self->_push_write(
Net::AMQP::Protocol::Connection::TuneOk->new(%tune,)
);
if ($tune{heartbeat} > 0) {
$self->_start_heartbeat($tune{heartbeat}, %args,);
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
return $self;
}
sub _push_write_and_read {
my $self = shift;
my ($method, $args, $exp, $cb, $failure_cb, $id,) = @_;
$method = 'Net::AMQP::Protocol::' . $method;
$self->_push_write(
Net::AMQP::Frame::Method->new(
method_frame => $method->new(%$args)
),
$id,
);
return $self->_push_read_and_valid($exp, $cb, $failure_cb, $id,);
}
sub _push_read_and_valid {
my $self = shift;
my ($exp, $cb, $failure_cb, $id,) = @_;
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
if (!$id) {
$queue = $self->{_queue};
} elsif (defined $self->{_channels}->{$id}) {
$queue = $self->{_channels}->{$id}->queue;
} else {
$failure_cb->('Unknown channel id: ' . $id);
}
return unless $queue; # Can go away in global destruction..
$queue->get(sub {
my $frame = shift;
return $failure_cb->('Received data is not method frame')
if !$frame->isa('Net::AMQP::Frame::Method');
my $method_frame = $frame->method_frame;
for my $exp_elem (@$exp) {
return $cb->($frame)
if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
}
$failure_cb->(
$method_frame->isa('Net::AMQP::Protocol::Channel::Close')
? 'Channel closed'
: 'Expected ' . join(',', @$exp) . ' but got ' . ref($method_frame)
);
});
}
sub _push_write {
my $self = shift;
my ($output, $id,) = @_;
if ($output->isa('Net::AMQP::Protocol::Base')) {
$output = $output->frame_wrap;
}
$output->channel($id || 0);
if ($self->{verbose}) {
warn '[C] --> [S] ', Dumper($output);
}
$self->{_handle}->push_write($output->to_raw_frame())
if $self->{_handle}; # Careful - could have gone (global destruction)
return;
}
sub _set_cbs {
my $self = shift;
my %args = @_;
$args{on_success} ||= sub {};
$args{on_failure} ||= sub { die @_ unless in_global_destruction };
lib/AnyEvent/RabbitMQ.pm view on Meta::CPAN
my $ar = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0, # Or 1 if you'd like SSL
tls_ctx => $anyevent_tls # or a hash of AnyEvent::TLS options.
tune => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
nodelay => 1, # Reduces latency by disabling Nagle's algorithm
on_success => sub {
my $ar = shift;
$ar->open_channel(
on_success => sub {
my $channel = shift;
$channel->declare_exchange(
exchange => 'test_exchange',
on_success => sub {
$cv->send('Declared exchange');
},
on_failure => $cv,
);
},
on_failure => $cv,
on_close => sub {
my $method_frame = shift->method_frame;
die $method_frame->reply_code, $method_frame->reply_text;
},
);
},
on_failure => $cv,
on_read_failure => sub { die @_ },
on_return => sub {
my $frame = shift;
die "Unable to deliver ", Dumper($frame);
},
on_close => sub {
my $why = shift;
if (ref($why)) {
my $method_frame = $why->method_frame;
die $method_frame->reply_code, ": ", $method_frame->reply_text;
}
else {
die $why;
}
},
);
print $cv->recv, "\n";
=head1 DESCRIPTION
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
# No more tests are required
undef $t;
# Double close is OK
if ($self->{_state} == _ST_CLOSED) {
$args{on_success}->($self);
return;
}
$connection->_push_write(
$self->_close_frame,
$self->{id},
);
# The spec says that after a party sends Channel::Close, it MUST
# discard all frames for that channel. So this channel is dead
# immediately.
$self->_closed();
$connection->_push_read_and_valid(
'Channel::CloseOk',
sub {
$args{on_success}->($self);
$self->_orphan();
},
sub {
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
},
$self->{id},
);
};
return $self;
}
sub _closed {
my $self = shift;
my ($frame,) = @_;
$frame ||= $self->_close_frame();
return if $self->{_state} == _ST_CLOSED;
$self->{_state} = _ST_CLOSED;
# Perform callbacks for all outstanding commands
$self->{_queue}->_flush($frame);
$self->{_content_queue}->_flush($frame);
# Fake nacks of all outstanding publishes
$_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} };
# Report cancelation of all outstanding consumes
my @tags = keys %{ $self->{_consumer_cbs} };
$self->_canceled($_, $frame) for @tags;
# Report close to on_close callback
{ local $@;
eval { $self->{on_close}->($frame) };
warn "Error in channel on_close callback, ignored:\n $@ " if $@; }
# Reset state (partly redundant)
$self->_reset;
return $self;
}
sub _close_frame {
my $self = shift;
my ($text,) = @_;
Net::AMQP::Frame::Method->new(
method_frame => Net::AMQP::Protocol::Channel::Close->new(
reply_text => $text,
),
);
}
sub _orphan {
my $self = shift;
if (my $connection = $self->{connection}) {
$connection->_delete_channel($self);
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
$headers = {
%$headers,
map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot
};
}
$self->{connection}->_push_write(
Net::AMQP::Frame::Header->new(
weight => $weight,
body_size => length($body),
header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
content_type => 'application/octet-stream',
content_encoding => undef,
delivery_mode => 1,
priority => 1,
correlation_id => undef,
expiration => undef,
message_id => undef,
timestamp => time,
type => undef,
user_id => $self->{connection}->login_user,
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
);
return $self;
}
sub _body {
my ($self, $body,) = @_;
my $body_max = $self->{connection}->{_body_max} || length $body;
# chunk up body into segments measured by $frame_max
while (length $body) {
$self->{connection}->_push_write(
Net::AMQP::Frame::Body->new(
payload => substr($body, 0, $body_max, '')),
$self->{id}
);
}
return $self;
}
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
no_local => 0,
no_ack => $no_ack,
exclusive => 0,
%args, # queue
ticket => 0,
nowait => 0, # FIXME
},
'Basic::ConsumeOk',
sub {
my $frame = shift;
my $tag = $frame->method_frame->consumer_tag;
$self->{_consumer_cbs}->{$tag} = [ $consumer_cb, $cancel_cb ];
$cb->($frame);
},
$failure_cb,
$self->{id},
);
return $self;
}
sub cancel {
my $self = shift;
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
nowait => 0,
),
$self->{id},
);
return $self;
}
sub _canceled {
my $self = shift;
my ($tag, $frame,) = @_;
my $cons_cbs = delete $self->{_consumer_cbs}->{$tag}
or return 0;
shift @$cons_cbs; # no more deliveries
for my $cb (reverse @$cons_cbs) {
$cb->($frame);
}
return 1;
}
sub get {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
my $no_ack = delete $args{no_ack} // 1;
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
$self->{connection}->_push_write_and_read(
'Basic::Get',
{
no_ack => $no_ack,
%args, # queue
ticket => 0,
},
[qw(Basic::GetOk Basic::GetEmpty)],
sub {
my $frame = shift;
return $cb->({empty => $frame})
if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
$self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
},
$failure_cb,
$self->{id},
);
return $self;
}
sub ack {
my $self = shift;
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
sub push_queue_or_consume {
my $self = shift;
my ($frame, $failure_cb,) = @_;
# Note: the spec says that after a party sends Channel::Close, it MUST
# discard all frames for that channel other than Close and CloseOk.
if ($frame->isa('Net::AMQP::Frame::Method')) {
my $method_frame = $frame->method_frame;
if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
$self->{connection}->_push_write(
Net::AMQP::Protocol::Channel::CloseOk->new(),
$self->{id},
);
$self->_closed($frame);
$self->_orphan();
return $self;
} elsif ($self->{_state} != _ST_OPEN) {
if ($method_frame->isa('Net::AMQP::Protocol::Channel::OpenOk') ||
$method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) {
$self->{_queue}->push($frame);
}
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag};
my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {};
$self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') ||
$method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) {
# CancelOk means we asked for a cancel.
# Cancel means queue was deleted; it is not AMQP, but RMQ supports it.
if (!$self->_canceled($method_frame->consumer_tag, $frame)
&& $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
$failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
}
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
weaken(my $wself = $self);
my $cb = sub {
my $ret = shift;
my $me = $wself or return;
my $headers = $ret->{header}->headers || {};
my $onret_cb;
if (defined(my $tag = $headers->{_ar_return})) {
my $cbs = $me->{_publish_cbs}->{$tag};
$onret_cb = $cbs->[2] if $cbs;
}
$onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {}; # oh well
$onret_cb->($frame);
};
$self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') ||
$method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) {
(my $resp = ref($method_frame)) =~ s/.*:://;
my $cbs;
if (!$self->{_is_confirm}) {
$failure_cb->("Received $resp when not in confirm mode");
}
else {
my @tags;
if ($method_frame->{multiple}) {
@tags = sort { $a <=> $b }
grep { $_ <= $method_frame->{delivery_tag} }
keys %{$self->{_publish_cbs}};
}
else {
@tags = ($method_frame->{delivery_tag});
}
my $cbi = ($resp eq 'Ack') ? 0 : 1;
for my $tag (@tags) {
my $cbs;
if (not $cbs = delete $self->{_publish_cbs}->{$tag}) {
$failure_cb->("Received $resp of unknown delivery tag $tag");
}
elsif ($cbs->[$cbi]) {
$cbs->[$cbi]->($frame);
}
}
}
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
$self->{_is_active} = $method_frame->active;
$self->{connection}->_push_write(
Net::AMQP::Protocol::Channel::FlowOk->new(
active => $method_frame->active,
),
$self->{id},
);
my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive';
my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {};
$cb->($frame);
return $self;
}
$self->{_queue}->push($frame);
} else {
$self->{_content_queue}->push($frame);
}
return $self;
}
sub _push_read_header_and_body {
my $self = shift;
my ($type, $frame, $cb, $failure_cb,) = @_;
my $response = {$type => $frame};
my $body_size = 0;
my $body_payload = "";
weaken(my $wcontq = $self->{_content_queue});
my $w_body_frame;
my $body_frame = sub {
my $frame = shift;
return $failure_cb->('Received data is not body frame')
if !$frame->isa('Net::AMQP::Frame::Body');
$body_payload .= $frame->payload;
if (length($body_payload) < $body_size) {
# More to come
my $contq = $wcontq or return;
$contq->get($w_body_frame);
}
else {
$frame->payload($body_payload);
$response->{body} = $frame;
$cb->($response);
}
};
$w_body_frame = $body_frame;
weaken($w_body_frame);
$self->{_content_queue}->get(sub{
my $frame = shift;
return $failure_cb->('Received data is not header frame')
if !$frame->isa('Net::AMQP::Frame::Header');
my $header_frame = $frame->header_frame;
return $failure_cb->(
'Header is not Protocol::Basic::ContentHeader'
. 'Header was ' . ref $header_frame
) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
$response->{header} = $header_frame;
$body_size = $frame->body_size;
if ( $body_size ) {
my $contq = $wcontq or return;
$contq->get($body_frame);
} else {
$response->{body} = undef;
$cb->($response);
}
});
return $self;
}
sub _delete_cbs {
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
=item on_close
Callback invoked when the channel closes. Callback will be passed the
incoming message that caused the close, if any.
=item on_return
Callback invoked when a mandatory or immediate message publish fails.
Callback will be passed the incoming message, with accessors
C<method_frame>, C<header_frame>, and C<body_frame>.
=back
=head1 METHODS
=head2 declare_exchange (%args)
Declare an exchange (to publish messages to) on the server.
Arguments:
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
temporary/private reply queues.
=item on_success
Callback that is called when the queue was declared successfully. The argument
to the callback is of type L<Net::AMQP::Frame::Method>. To get the name of the
Queue (if you declared it with an empty name), you can say
on_success => sub {
my $method = shift;
my $name = $method->method_frame->queue;
};
=item on_failure
Callback that is called when the declaration of the queue has failed.
=item auto_delete
0 or 1, default 0
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
Boolean; if true, then if the message doesn't land in a queue (e.g. the exchange has no
bindings), it will be "returned." (see "on_return")
=item immediate
Boolean; if true, then if the message cannot be delivered directly to a consumer, it
will be "returned." (see "on_return")
=item on_ack
Callback called with the frame that acknowledges receipt (if channel is in confirm mode),
typically L<Net::AMQP::Protocol::Basic::Ack>.
=item on_nack
Callback called with the frame that declines receipt (if the channel is in confirm mode),
typically L<Net::AMQP::Protocol::Basic::Nack> or L<Net::AMQP::Protocol::Channel::Close>.
=item on_return
In AMQP, a "returned" message is one that cannot be delivered in compliance with the
C<immediate> or C<mandatory> flags.
If in confirm mode, this callback will be called with the frame that reports message
return, typically L<Net::AMQP::Protocol::Basic::Return>. If confirm mode is off or
this callback is not provided, then the channel or connection objects' on_return
callbacks (if any), will be called instead.
NOTE: If confirm mode is on, the on_ack or on_nack callback will be called whether or
not on_return is called first.
=back
=head2 cancel
lib/AnyEvent/RabbitMQ/LocalQueue.pm view on Meta::CPAN
for (1 .. $count) {
&{shift @{$self->{_drain_code_queue}}}(
shift @{$self->{_message_queue}}
);
}
return $self;
}
sub _flush {
my ($self, $frame) = @_;
$self->_drain_queue;
while (my $cb = shift @{$self->{_drain_code_queue}}) {
local $@; # Flush frames immediately, throwing away errors for on-close
eval { $cb->($frame) };
}
}
1;
share/fixed_amqp0-8.xml view on Meta::CPAN
-->
<amqp major="8" minor="0" port="5672" comment="AMQ protocol 0.80">
AMQ Protocol 0.80
<!--
======================================================
== CONSTANTS
======================================================
-->
<constant name="frame method" value="1"/>
<constant name="frame header" value="2"/>
<constant name="frame body" value="3"/>
<constant name="frame oob method" value="4"/>
<constant name="frame oob header" value="5"/>
<constant name="frame oob body" value="6"/>
<constant name="frame trace" value="7"/>
<constant name="frame heartbeat" value="8"/>
<constant name="frame min size" value="4096"/>
<constant name="frame end" value="206"/>
<constant name="reply success" value="200">
Indicates that the method completed successfully. This reply code is
reserved for future use - the current protocol design does not use
positive confirmation and reply codes are sent only in case of an
error.
</constant>
<constant name="not delivered" value="310" class="soft error">
The client asked for a specific message that is no longer available.
The message was delivered to another client, or was purged from the
queue for some other reason.
share/fixed_amqp0-8.xml view on Meta::CPAN
The client attempted to work with a server entity to which it has
no due to security settings.
</constant>
<constant name="not found" value="404" class="soft error">
The client attempted to work with a server entity that does not exist.
</constant>
<constant name="resource locked" value="405" class="soft error">
The client attempted to work with a server entity to which it has
no access because another client is working with it.
</constant>
<constant name="frame error" value="501" class="hard error">
The client sent a malformed frame that the server could not decode.
This strongly implies a programming error in the client.
</constant>
<constant name="syntax error" value="502" class="hard error">
The client sent a frame that contained illegal values for one or more
fields. This strongly implies a programming error in the client.
</constant>
<constant name="command invalid" value="503" class="hard error">
The client sent an invalid sequence of frames, attempting to perform
an operation that was considered invalid by the server. This usually
implies a programming error in the client.
</constant>
<constant name="channel error" value="504" class="hard error">
The client attempted to work with a channel that had not been
correctly opened. This most likely indicates a fault in the client
layer.
</constant>
<constant name="resource error" value="506" class="hard error">
The server could not complete the method because it lacked sufficient
share/fixed_amqp0-8.xml view on Meta::CPAN
<response name="tune-ok"/>
<field name="channel max" type="short">
proposed maximum channels
<doc>
The maximum total number of channels that the server allows
per connection. Zero means that the server does not impose a
fixed limit, but the number of allowed channels may be limited
by available server resources.
</doc>
</field>
<field name="frame max" type="long">
proposed maximum frame size
<doc>
The largest frame size that the server proposes for the
connection. The client can negotiate a lower value. Zero means
that the server does not impose any specific limit but may reject
very large frames if it cannot allocate resources for them.
</doc>
<rule implement="MUST">
Until the frame-max has been negotiated, both peers MUST accept
frames of up to 4096 octets large. The minimum non-zero value for
the frame-max field is 4096.
</rule>
</field>
<field name="heartbeat" type="short">
desired heartbeat delay
<doc>
The delay, in seconds, of the connection heartbeat that the server
wants. Zero means the server does not want a heartbeat.
</doc>
</field>
</method>
share/fixed_amqp0-8.xml view on Meta::CPAN
per connection. May not be higher than the value specified by
the server.
</doc>
<rule implement="MAY">
The server MAY ignore the channel-max value or MAY use it for
tuning its resource allocation.
</rule>
<assert check="notnull"/>
<assert check="le" method="tune" field="channel max"/>
</field>
<field name="frame max" type="long">
negotiated maximum frame size
<doc>
The largest frame size that the client and server will use for
the connection. Zero means that the client does not impose any
specific limit but may reject very large frames if it cannot
allocate resources for them. Note that the frame-max limit
applies principally to content frames, where large contents
can be broken into frames of arbitrary size.
</doc>
<rule implement="MUST">
Until the frame-max has been negotiated, both peers must accept
frames of up to 4096 octets large. The minimum non-zero value for
the frame-max field is 4096.
</rule>
</field>
<field name="heartbeat" type="short">
desired heartbeat delay
<doc>
The delay, in seconds, of the connection heartbeat that the client
wants. Zero means the client does not want a heartbeat.
</doc>
</field>
</method>
share/fixed_amqp0-8.xml view on Meta::CPAN
intended for window control. The peer that receives a request to
stop sending content should finish sending the current content, if
any, and then wait until it receives a Flow restart method.
</doc>
<rule implement="MAY">
When a new channel is opened, it is active. Some applications
assume that channels are inactive until started. To emulate this
behaviour a client MAY open the channel, then pause it.
</rule>
<rule implement="SHOULD">
When sending content data in multiple frames, a peer SHOULD monitor
the channel for incoming methods and respond to a Channel.Flow as
rapidly as possible.
</rule>
<rule implement="MAY">
A peer MAY use the Channel.Flow method to throttle incoming content
data for internal reasons, for example, when exchangeing data over a
slower connection.
</rule>
<rule implement="MAY">
The peer that requests a Channel.Flow method MAY disconnect and/or
ban a peer that does not respect the request.
</rule>
<chassis name="server" implement="MUST"/>
<chassis name="client" implement="MUST"/>
<response name="flow-ok"/>
<field name="active" type="bit">
start/stop content frames
<doc>
If 1, the peer starts sending content frames. If 0, the peer
stops sending content frames.
</doc>
</field>
</method>
<method name="flow-ok" index="21">
confirm a flow method
<doc>
Confirms to the peer that a flow command was received and processed.
</doc>
<chassis name="server" implement="MUST"/>
<chassis name="client" implement="MUST"/>
<field name="active" type="bit">
current flow setting
<doc>
Confirms the setting of the processed flow method: 1 means the
peer will start sending or continue to send content frames; 0
means it will not.
</doc>
</field>
</method>
<!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
<method name="alert" index="30">
send a non-fatal warning message
<doc>
This method allows the server to send a non-fatal warning to the
client. This is used for methods that are normally asynchronous
share/fixed_amqp0-8.xml view on Meta::CPAN
reject an incoming message
<doc>
This method allows a client to reject a message. It can be used to
interrupt and cancel large incoming messages, or return untreatable
messages to their original queue.
</doc>
<doc name = "rule" test = "amq_basic_21">
The server SHOULD be capable of accepting and process the Reject
method while sending message content with a Deliver or Get-Ok
method. I.e. the server should read and process incoming methods
while sending output frames. To cancel a partially-send content,
the server sends a content body frame of size 1 (i.e. with no data
except the frame-end octet).
</doc>
<doc name = "rule" test = "amq_basic_22">
The server SHOULD interpret this method as meaning that the client
is unable to process the message at this time.
</doc>
<doc name = "rule">
A client MUST NOT use this method as a means of selecting messages
to process. A rejected message MAY be discarded or dead-lettered,
not necessarily passed to another client.
</doc>
share/fixed_amqp0-8.xml view on Meta::CPAN
<class name="tunnel" handler="tunnel" index="110">
<!--
======================================================
== TUNNEL
======================================================
-->
methods for protocol tunneling.
<doc>
The tunnel methods are used to send blocks of binary data - which
can be serialised AMQP methods or other protocol frames - between
AMQP peers.
</doc>
<doc name="grammar">
tunnel = C:REQUEST
/ S:REQUEST
</doc>
<chassis name="server" implement="MAY"/>
<chassis name="client" implement="MAY"/>
<field name="headers" type="table">
Message header field table
share/fixed_amqp0-8.xml view on Meta::CPAN
======================================================
== TEST - CHECK FUNCTIONAL CAPABILITIES OF AN IMPLEMENTATION
======================================================
-->
test functional primitives of the implementation
<doc>
The test class provides methods for a peer to test the basic
operational correctness of another peer. The test methods are
intended to ensure that all peers respect at least the basic
elements of the protocol, such as frame and content organisation
and field types. We assume that a specially-designed peer, a
"monitor client" would perform such tests.
</doc>
<doc name="grammar">
test = C:INTEGER S:INTEGER-OK
/ S:INTEGER C:INTEGER-OK
/ C:STRING S:STRING-OK
/ S:STRING C:STRING-OK
/ C:TABLE S:TABLE-OK
/ S:TABLE C:TABLE-OK
share/fixed_amqp0-9-1.xml view on Meta::CPAN
-->
<amqp major = "0" minor = "9" revision = "1"
port = "5672" comment = "AMQ Protocol version 0-9-1">
<!--
======================================================
== CONSTANTS
======================================================
-->
<!-- Frame types -->
<constant name = "frame-method" value = "1" />
<constant name = "frame-header" value = "2" />
<constant name = "frame-body" value = "3" />
<constant name = "frame-heartbeat" value = "8" />
<!-- Protocol constants -->
<constant name = "frame-min-size" value = "4096" />
<constant name = "frame-end" value = "206" />
<!-- Reply codes -->
<constant name = "reply-success" value = "200">
<doc>
Indicates that the method completed successfully. This reply code is
reserved for future use - the current protocol design does not use positive
confirmation and reply codes are sent only in case of an error.
</doc>
</constant>
share/fixed_amqp0-9-1.xml view on Meta::CPAN
</doc>
</constant>
<constant name = "precondition-failed" value = "406" class = "soft-error">
<doc>
The client requested a method that was not allowed because some precondition
failed.
</doc>
</constant>
<constant name = "frame-error" value = "501" class = "hard-error">
<doc>
The sender sent a malformed frame that the recipient could not decode.
This strongly implies a programming error in the sending peer.
</doc>
</constant>
<constant name = "syntax-error" value = "502" class = "hard-error">
<doc>
The sender sent a frame that contained illegal values for one or more
fields. This strongly implies a programming error in the sending peer.
</doc>
</constant>
<constant name = "command-invalid" value = "503" class = "hard-error">
<doc>
The client sent an invalid sequence of frames, attempting to perform an
operation that was considered invalid by the server. This usually implies
a programming error in the client.
</doc>
</constant>
<constant name = "channel-error" value = "504" class = "hard-error">
<doc>
The client attempted to work with a channel that had not been correctly
opened. This most likely indicates a fault in the client layer.
</doc>
</constant>
<constant name = "unexpected-frame" value = "505" class = "hard-error">
<doc>
The peer sent a frame that was not expected, usually in the context of
a content header and body. This strongly indicates a fault in the peer's
content processing.
</doc>
</constant>
<constant name = "resource-error" value = "506" class = "hard-error">
<doc>
The server could not complete the method because it lacked sufficient
resources. This may be due to the client creating too many of some type
of entity.
share/fixed_amqp0-9-1.xml view on Meta::CPAN
<response name = "tune-ok" />
<field name = "channel-max" domain = "short" label = "proposed maximum channels">
<doc>
Specifies highest channel number that the server permits. Usable channel numbers
are in the range 1..channel-max. Zero indicates no specified limit.
</doc>
</field>
<field name = "frame-max" domain = "long" label = "proposed maximum frame size">
<doc>
The largest frame size that the server proposes for the connection, including
frame header and end-byte. The client can negotiate a lower value. Zero means
that the server does not impose any specific limit but may reject very large
frames if it cannot allocate resources for them.
</doc>
<rule name = "minimum">
<doc>
Until the frame-max has been negotiated, both peers MUST accept frames of up
to frame-min-size octets large, and the minimum negotiated value for frame-max
is also frame-min-size.
</doc>
<doc type = "scenario">
Client connects to server and sends a large properties field, creating a frame
of frame-min-size octets. The server must accept this frame.
</doc>
</rule>
</field>
<field name = "heartbeat" domain = "short" label = "desired heartbeat delay">
<doc>
The delay, in seconds, of the connection heartbeat that the server wants.
Zero means the server does not want a heartbeat.
</doc>
</field>
share/fixed_amqp0-9-1.xml view on Meta::CPAN
If the client specifies a channel max that is higher than the value provided
by the server, the server MUST close the connection without attempting a
negotiated close. The server may report the error in some fashion to assist
implementors.
</doc>
</rule>
<assert check = "notnull" />
<assert check = "le" method = "tune" field = "channel-max" />
</field>
<field name = "frame-max" domain = "long" label = "negotiated maximum frame size">
<doc>
The largest frame size that the client and server will use for the connection.
Zero means that the client does not impose any specific limit but may reject
very large frames if it cannot allocate resources for them. Note that the
frame-max limit applies principally to content frames, where large contents can
be broken into frames of arbitrary size.
</doc>
<rule name = "minimum">
<doc>
Until the frame-max has been negotiated, both peers MUST accept frames of up
to frame-min-size octets large, and the minimum negotiated value for frame-max
is also frame-min-size.
</doc>
</rule>
<rule name = "upper-limit">
<doc>
If the client specifies a frame max that is higher than the value provided
by the server, the server MUST close the connection without attempting a
negotiated close. The server may report the error in some fashion to assist
implementors.
</doc>
</rule>
</field>
<field name = "heartbeat" domain = "short" label = "desired heartbeat delay">
<doc>
The delay, in seconds, of the connection heartbeat that the client wants. Zero
share/fixed_amqp0-9-1.xml view on Meta::CPAN
<rule name = "initial-state">
<doc>
When a new channel is opened, it is active (flow is active). Some applications
assume that channels are inactive until started. To emulate this behaviour a
client MAY open the channel, then pause it.
</doc>
</rule>
<rule name = "bidirectional">
<doc>
When sending content frames, a peer SHOULD monitor the channel for incoming
methods and respond to a Channel.Flow as rapidly as possible.
</doc>
</rule>
<rule name = "throttling">
<doc>
A peer MAY use the Channel.Flow method to throttle incoming content data for
internal reasons, for example, when exchanging data over a slower connection.
</doc>
</rule>
share/fixed_amqp0-9-1.xml view on Meta::CPAN
that does not respect the request. This is to prevent badly-behaved clients
from overwhelming a server.
</doc>
</rule>
<chassis name = "server" implement = "MUST" />
<chassis name = "client" implement = "MUST" />
<response name = "flow-ok" />
<field name = "active" domain = "bit" label = "start/stop content frames">
<doc>
If 1, the peer starts sending content frames. If 0, the peer stops sending
content frames.
</doc>
</field>
</method>
<method name = "flow-ok" index = "21" label = "confirm a flow method">
<doc>
Confirms to the peer that a flow command was received and processed.
</doc>
<chassis name = "server" implement = "MUST" />
<chassis name = "client" implement = "MUST" />
<field name = "active" domain = "bit" label = "current flow setting">
<doc>
Confirms the setting of the processed flow method: 1 means the peer will start
sending or continue to send content frames; 0 means it will not.
</doc>
</field>
</method>
<!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
<method name = "close" synchronous = "1" index = "40" label = "request a channel close">
<doc>
This method indicates that the sender wants to close the channel. This may be due to
internal conditions (e.g. a forced shut-down) or due to an error handling a specific
share/fixed_amqp0-9-1.xml view on Meta::CPAN
<doc>
This method allows a client to reject a message. It can be used to interrupt and
cancel large incoming messages, or return untreatable messages to their original
queue.
</doc>
<rule name = "01">
<doc>
The server SHOULD be capable of accepting and process the Reject method while
sending message content with a Deliver or Get-Ok method. I.e. the server should
read and process incoming methods while sending output frames. To cancel a
partially-send content, the server sends a content body frame of size 1 (i.e.
with no data except the frame-end octet).
</doc>
</rule>
<rule name = "02">
<doc>
The server SHOULD interpret this method as meaning that the client is unable to
process the message at this time.
</doc>
<doc type = "scenario">
TODO.
share/fixed_amqp0-9-1.xml view on Meta::CPAN
This method is also used by the server to inform publishers on channels in
confirm mode of unhandled messages. If a publisher receives this method, it
probably needs to republish the offending messages.
</doc>
<rule name = "01">
<doc>
The server SHOULD be capable of accepting and processing the Nack method while
sending message content with a Deliver or Get-Ok method. I.e. the server should
read and process incoming methods while sending output frames. To cancel a
partially-send content, the server sends a content body frame of size 1 (i.e.
with no data except the frame-end octet).
</doc>
</rule>
<rule name = "02">
<doc>
The server SHOULD interpret this method as meaning that the client is unable to
process the message at this time.
</doc>
<doc type = "scenario">
TODO.
xt/04_anyevent.t view on Meta::CPAN
$z->connect(
(map {$_ => $conf{$_}} qw(host port user pass vhost)),
timeout => 1,
on_success => sub {
my $ar = shift;
isa_ok($ar, 'AnyEvent::RabbitMQ');
$done->send;
},
on_failure => failure_cb($done),
on_return => sub {
my $method_frame = shift->method_frame;
die "return: ", $method_frame->reply_code, $method_frame->reply_text
if $method_frame->reply_code;
},
on_close => sub {
my $method_frame = shift->method_frame;
Carp::confess "close: ", $method_frame->reply_code, $method_frame->reply_text
if $method_frame->reply_code;
},
@{ $opt },
);
$done->recv;
}
my $done = AnyEvent->condvar;
$ar->connect(
(map {$_ => $conf{$_}} qw(host port user pass vhost)),
tune => { frame_max => 2**17 },
timeout => 1,
on_success => sub {
my $ar = shift;
isa_ok($ar, 'AnyEvent::RabbitMQ');
$server{product} = $ar->server_properties->{product};
$server{version} = version->parse($ar->server_properties->{version});
$done->send;
},
on_failure => failure_cb($done),
on_return => sub {
my $method_frame = shift->method_frame;
die "return: ", $method_frame->reply_code, $method_frame->reply_text
if $method_frame->reply_code;
},
on_close => sub {
my $method_frame = shift->method_frame;
Carp::confess "close: ", $method_frame->reply_code, $method_frame->reply_text
if $method_frame->reply_code;
},
);
$done->recv;
my $ch;
$done = AnyEvent->condvar;
open_ch($done);
$done->recv;
sub open_ch {
my ($cv,) = @_;
$ar->open_channel(
on_success => sub {
$ch = shift;
isa_ok($ch, 'AnyEvent::RabbitMQ::Channel');
$cv->send;
},
on_failure => failure_cb($cv),
on_close => sub {
my $method_frame = shift->method_frame;
die $method_frame->reply_code, $method_frame->reply_text
if $method_frame->reply_code;
},
);
}
$done = AnyEvent->condvar;
$ch->declare_exchange(
exchange => 'test_x',
on_success => sub {
pass('declare exchange');
$done->send;
xt/04_anyevent.t view on Meta::CPAN
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
my $consumer_tag;
$ch->consume(
queue => 'test_q',
on_success => sub {
my $frame = shift;
$consumer_tag = $frame->method_frame->consumer_tag;
pass('consume');
},
on_consume => sub {
my $response = shift;
ok($response->{body}->payload, 'publish');
$done->send;
},
on_failure => failure_cb($done),
);
publish($ch, 'Hello RabbitMQ.', $done,);
xt/04_anyevent.t view on Meta::CPAN
send_large_size_message($ch, $size);
}
$done = AnyEvent->condvar;
$ch->consume(
queue => 'test_q',
no_ack => 0,
on_consume => sub {
my $response = shift;
$ch->ack(
delivery_tag => $response->{deliver}->method_frame->delivery_tag
);
pass('ack deliver');
$ch->cancel(
consumer_tag => $response->{deliver}->method_frame->consumer_tag,
on_success => sub {
pass('cancel');
$done->send;
},
on_failure => failure_cb($done),
);
},
on_failure => failure_cb($done),
);
publish($ch, 'NO RabbitMQ, NO LIFE.', $done,);
$done->recv;
$done = AnyEvent->condvar;
publish($ch, 'RabbitMQ is cool.', $done,);
$ch->get(
queue => 'test_q',
no_ack => 0,
on_success => sub {
my $response = shift;
$ch->ack(
delivery_tag => $response->{ok}->method_frame->delivery_tag
);
pass('ack get');
$done->send;
},
on_failure => failure_cb($done),
);
$done->recv;
$done = AnyEvent->condvar;
my @responses;
xt/04_anyevent.t view on Meta::CPAN
},
on_failure => failure_cb($done),
);
publish($ch, 'RabbitMQ is excellent.', $done,);
publish($ch, 'RabbitMQ is fantastic.', $done,);
$done->recv;
pass('qos');
for my $response (@responses) {
$ch->ack(
delivery_tag => $response->{deliver}->method_frame->delivery_tag,
);
}
$done = AnyEvent->condvar;
$ch->cancel(
consumer_tag => $responses[0]->{deliver}->method_frame->consumer_tag,
on_success => sub {
$ch->qos(
on_success => sub {
$done->send;
},
on_failure => failure_cb($done),
);
},
on_failure => failure_cb($done),
);
xt/04_anyevent.t view on Meta::CPAN
no_ack => 0,
on_consume => sub {
my $response = shift;
if (5 > ++$recover_count) {
$ch->recover();
return;
}
$ch->ack(
delivery_tag => $response->{deliver}->method_frame->delivery_tag
);
$ch->cancel(
consumer_tag => $response->{deliver}->method_frame->consumer_tag,
on_success => sub {
$done->send;
},
on_failure => failure_cb($done),
);
},
on_failure => failure_cb($done),
);
publish($ch, 'RabbitMQ is powerful.', $done,);
$done->recv;
xt/04_anyevent.t view on Meta::CPAN
$done = AnyEvent->condvar;
my $reject_count = 0;
$ch->consume(
queue => 'test_q',
no_ack => 0,
on_consume => sub {
my $response = shift;
if ( 5 > ++$reject_count ) {
$ch->reject(
delivery_tag => $response->{deliver}->method_frame->delivery_tag,
# requeue! Else the server does not deliver the message again to this client.
requeue => 1,
);
return;
}
$ch->ack( delivery_tag => $response->{deliver}->method_frame->delivery_tag );
$ch->cancel(
consumer_tag => $response->{deliver}->method_frame->consumer_tag,
on_success => sub {
$done->send;
},
on_failure => failure_cb($done),
);
},
on_failure => failure_cb($done),
);
my $pub_done = AnyEvent->condvar;
publish($ch, 'RabbitMQ is powerful.', $pub_done,);
xt/05_multi_channel.t view on Meta::CPAN
$ch->publish(
routing_key => $queue,
body => $message,
mandatory => 1,
);
return;
}
sub handle_close {
my $method_frame = shift->method_frame;
die $method_frame->reply_code, $method_frame->reply_text
if $method_frame->reply_code;
}
xt/06_close.t view on Meta::CPAN
on_return => sub {die 'Receive return'},
on_close => \&handle_close,
);
my $ch = $done->recv;
die 'Open channel failure' if !$ch;
return $ch;
}
sub handle_close {
my $method_frame = shift->method_frame;
die $method_frame->reply_code, $method_frame->reply_text
if $method_frame->reply_code;
}