AnyEvent-RabbitMQ
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
$self->_canceled($_, $frame) for @tags;
# Report close to on_close callback
{ local $@;
eval { $self->{on_close}->($frame) };
warn "Error in channel on_close callback, ignored:\n $@ " if $@; }
# Reset state (partly redundant)
$self->_reset;
return $self;
}
sub _close_frame {
my $self = shift;
my ($text,) = @_;
Net::AMQP::Frame::Method->new(
method_frame => Net::AMQP::Protocol::Channel::Close->new(
reply_text => $text,
),
);
}
sub _orphan {
my $self = shift;
if (my $connection = $self->{connection}) {
$connection->_delete_channel($self);
}
return $self;
}
sub declare_exchange {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Exchange::Declare',
{
type => 'direct',
passive => 0,
durable => 0,
auto_delete => 0,
internal => 0,
%args, # exchange
ticket => 0,
nowait => 0, # FIXME
},
'Exchange::DeclareOk',
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
sub bind_exchange {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Exchange::Bind',
{
%args, # source, destination, routing_key
ticket => 0,
nowait => 0, # FIXME
},
'Exchange::BindOk',
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
sub unbind_exchange {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Exchange::Unbind',
{
%args, # source, destination, routing_key
ticket => 0,
nowait => 0, # FIXME
},
'Exchange::UnbindOk',
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
sub delete_exchange {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Exchange::Delete',
{
if_unused => 0,
%args, # exchange
ticket => 0,
nowait => 0, # FIXME
},
'Exchange::DeleteOk',
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
sub declare_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Queue::Declare',
{
queue => '',
passive => 0,
durable => 0,
exclusive => 0,
auto_delete => 0,
no_ack => 1,
%args,
ticket => 0,
nowait => 0, # FIXME
},
'Queue::DeclareOk',
$cb,
$failure_cb,
$self->{id},
);
}
sub bind_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Queue::Bind',
{
%args, # queue, exchange, routing_key
ticket => 0,
nowait => 0, # FIXME
},
'Queue::BindOk',
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
sub unbind_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Queue::Unbind',
{
%args, # queue, exchange, routing_key
ticket => 0,
},
'Queue::UnbindOk',
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
sub purge_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Queue::Purge',
{
%args, # queue
ticket => 0,
nowait => 0, # FIXME
},
'Queue::PurgeOk',
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
sub delete_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Queue::Delete',
{
if_unused => 0,
if_empty => 0,
%args, # queue
ticket => 0,
nowait => 0, # FIXME
},
'Queue::DeleteOk',
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
sub publish {
my $self = shift;
my %args = @_;
# Docs should advise channel-level callback over this, but still, better to give user an out
unless ($self->{_is_active}) {
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
A channel is a light-weight virtual connection within a TCP connection to a
RabbitMQ broker.
=head1 ARGUMENTS FOR C<open_channel>
=over
=item on_close
Callback invoked when the channel closes. Callback will be passed the
incoming message that caused the close, if any.
=item on_return
Callback invoked when a mandatory or immediate message publish fails.
Callback will be passed the incoming message, with accessors
C<method_frame>, C<header_frame>, and C<body_frame>.
=back
=head1 METHODS
=head2 declare_exchange (%args)
Declare an exchange (to publish messages to) on the server.
Arguments:
=over
=item on_success
=item on_failure
=item type
Default 'direct'
=item passive
Default 0
=item durable
Default 0
=item auto_delete
Default 0
=item internal
Default 0
=item exchange
The name of the exchange
=back
=head2 bind_exchange
Binds an exchange to another exchange, with a routing key.
Arguments:
=over
=item source
The name of the source exchange to bind
=item destination
The name of the destination exchange to bind
=item routing_key
The routing key to bind with
=back
=head2 unbind_exchange
=head2 delete_exchange
=head2 declare_queue
Declare a queue (create it if it doesn't exist yet) for publishing messages
to on the server.
my $done = AnyEvent->condvar;
$channel->declare_queue(
exchange => $queue_exchange,
queue => $queueName,
durable => 0,
auto_delete => 1,
passive => 0,
arguments => { 'x-expires' => 0, },
on_success => sub { $done->send; },
on_failure => sub {
say "Unable to create queue $queueName";
$done->send;
},
);
$done->recv;
Arguments:
=over
=item queue
Name of the queue to be declared. If the queue name is the empty string,
RabbitMQ will create a unique name for the queue. This is useful for
temporary/private reply queues.
=item on_success
Callback that is called when the queue was declared successfully. The argument
to the callback is of type L<Net::AMQP::Frame::Method>. To get the name of the
Queue (if you declared it with an empty name), you can say
on_success => sub {
my $method = shift;
my $name = $method->method_frame->queue;
};
=item on_failure
Callback that is called when the declaration of the queue has failed.
=item auto_delete
0 or 1, default 0
=item passive
0 or 1, default 0
=item durable
0 or 1, default 0
=item exclusive
0 or 1, default 0
=item no_ack
0 or 1, default 1
=item ticket
default 0
=for comment
XXX Is "exchange" a valid parameter?
=item arguments
C<arguments> is a hashref of additional parameters which RabbitMQ extensions
may use. This list is not complete and your RabbitMQ server configuration will
determine which arguments are valid and how they act.
=over
=item x-expires
The queue will automatically be removed after being idle for this many milliseconds.
Default of 0 disables automatic queue removal.
=back
=back
=head2 bind_queue
Binds a queue to an exchange, with a routing key.
Arguments:
=over
=item queue
The name of the queue to bind
=item exchange
The name of the exchange to bind
=item routing_key
The routing key to bind with
=back
=head2 unbind_queue
=head2 purge_queue
Flushes the contents of a queue.
=head2 delete_queue
Deletes a queue. The queue may not have any active consumers.
=head2 consume
Subscribe to consume messages from a queue.
Arguments:
=over
=item queue
The name of the queue to be consumed from.
=item on_consume
Callback called with an argument of the message which has been consumed.
The message is a hash reference, where the value to key C<header> is an object
of type L<Net::AMQP::Protocol::Basic::ContentHeader>, L<body> is a
L<Net::AMQP::Frame::Body>, and C<deliver> a L<Net::AMQP::Frame::Method>.
=item on_cancel
Callback called if consumption is cancelled. This may be at client request
or as a side effect of queue deletion. (Notification of queue deletion is a
RabbitMQ extension.)
=item consumer_tag
Identifies this consumer, will be auto-generated if you do not provide it, but you must
supply a value if you want to be able to later cancel the subscription.
=item on_success
Callback called if the subscription was successful (before the first message is consumed).
=item on_failure
Callback called if the subscription fails for any reason.
=item no_ack
Pass through the C<no_ack> flag. Defaults to C<1>. If set to C<1>, the server
will not expect messages to be acknowledged.
=back
=head2 publish
Publish a message to an exchange.
Arguments:
=over
=item exchange
The name of the exchange to send the message to.
=item routing_key
The routing key with which to publish the message.
=item header
Hash of AMQP message header info, including the confusingly similar element "headers",
which may contain arbitrary string key/value pairs.
=item body
The text body of the message to send.
=item mandatory
Boolean; if true, then if the message doesn't land in a queue (e.g. the exchange has no
bindings), it will be "returned." (see "on_return")
=item immediate
Boolean; if true, then if the message cannot be delivered directly to a consumer, it
will be "returned." (see "on_return")
=item on_ack
Callback called with the frame that acknowledges receipt (if channel is in confirm mode),
typically L<Net::AMQP::Protocol::Basic::Ack>.
=item on_nack
Callback called with the frame that declines receipt (if the channel is in confirm mode),
typically L<Net::AMQP::Protocol::Basic::Nack> or L<Net::AMQP::Protocol::Channel::Close>.
=item on_return
In AMQP, a "returned" message is one that cannot be delivered in compliance with the
C<immediate> or C<mandatory> flags.
If in confirm mode, this callback will be called with the frame that reports message
return, typically L<Net::AMQP::Protocol::Basic::Return>. If confirm mode is off or
this callback is not provided, then the channel or connection objects' on_return
callbacks (if any), will be called instead.
NOTE: If confirm mode is on, the on_ack or on_nack callback will be called whether or
not on_return is called first.
=back
=head2 cancel
Cancel a queue subscription.
Note that the cancellation B<will not> take place at once, and further messages may be
consumed before the subscription is cancelled. No further messages will be
consumed after the on_success callback has been called.
Arguments:
=over
=item consumer_tag
Identifies this consumer, needs to be the value supplied when the queue is initially
consumed from.
=item on_success
Callback called if the subscription was successfully cancelled.
=item on_failure
Callback called if the subscription could not be cancelled for any reason.
=back
=head2 get
( run in 0.674 second using v1.01-cache-2.11-cpan-2398b32b56e )