AnyEvent-RabbitMQ
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
return $self;
}
sub close {
my $self = shift;
my $connection = $self->{connection}
or return;
my %args = $connection->_set_cbs(@_);
# If open in in progess, wait for it; 1s arbitrary timing.
weaken(my $wself = $self);
my $t; $t = AE::timer 0, 1, sub {
(my $self = $wself) or undef $t, return;
return if $self->{_state} == _ST_OPENING;
# No more tests are required
undef $t;
# Double close is OK
if ($self->{_state} == _ST_CLOSED) {
$args{on_success}->($self);
return;
}
$connection->_push_write(
$self->_close_frame,
$self->{id},
);
# The spec says that after a party sends Channel::Close, it MUST
# discard all frames for that channel. So this channel is dead
# immediately.
$self->_closed();
$connection->_push_read_and_valid(
'Channel::CloseOk',
sub {
$args{on_success}->($self);
$self->_orphan();
},
sub {
$args{on_failure}->(@_);
$self->_orphan();
},
$self->{id},
);
};
return $self;
}
sub _closed {
my $self = shift;
my ($frame,) = @_;
$frame ||= $self->_close_frame();
return if $self->{_state} == _ST_CLOSED;
$self->{_state} = _ST_CLOSED;
# Perform callbacks for all outstanding commands
$self->{_queue}->_flush($frame);
$self->{_content_queue}->_flush($frame);
# Fake nacks of all outstanding publishes
$_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} };
# Report cancelation of all outstanding consumes
my @tags = keys %{ $self->{_consumer_cbs} };
$self->_canceled($_, $frame) for @tags;
# Report close to on_close callback
{ local $@;
eval { $self->{on_close}->($frame) };
warn "Error in channel on_close callback, ignored:\n $@ " if $@; }
# Reset state (partly redundant)
$self->_reset;
return $self;
}
sub _close_frame {
my $self = shift;
my ($text,) = @_;
Net::AMQP::Frame::Method->new(
method_frame => Net::AMQP::Protocol::Channel::Close->new(
reply_text => $text,
),
);
}
sub _orphan {
my $self = shift;
if (my $connection = $self->{connection}) {
$connection->_delete_channel($self);
}
return $self;
}
sub declare_exchange {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Exchange::Declare',
{
type => 'direct',
passive => 0,
durable => 0,
auto_delete => 0,
internal => 0,
%args, # exchange
ticket => 0,
nowait => 0, # FIXME
},
'Exchange::DeclareOk',
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
=item no_ack
Pass through the C<no_ack> flag. Defaults to C<1>. If set to C<1>, the server
will not expect messages to be acknowledged.
=back
=head2 publish
Publish a message to an exchange.
Arguments:
=over
=item exchange
The name of the exchange to send the message to.
=item routing_key
The routing key with which to publish the message.
=item header
Hash of AMQP message header info, including the confusingly similar element "headers",
which may contain arbitrary string key/value pairs.
=item body
The text body of the message to send.
=item mandatory
Boolean; if true, then if the message doesn't land in a queue (e.g. the exchange has no
bindings), it will be "returned." (see "on_return")
=item immediate
Boolean; if true, then if the message cannot be delivered directly to a consumer, it
will be "returned." (see "on_return")
=item on_ack
Callback called with the frame that acknowledges receipt (if channel is in confirm mode),
typically L<Net::AMQP::Protocol::Basic::Ack>.
=item on_nack
Callback called with the frame that declines receipt (if the channel is in confirm mode),
typically L<Net::AMQP::Protocol::Basic::Nack> or L<Net::AMQP::Protocol::Channel::Close>.
=item on_return
In AMQP, a "returned" message is one that cannot be delivered in compliance with the
C<immediate> or C<mandatory> flags.
If in confirm mode, this callback will be called with the frame that reports message
return, typically L<Net::AMQP::Protocol::Basic::Return>. If confirm mode is off or
this callback is not provided, then the channel or connection objects' on_return
callbacks (if any), will be called instead.
NOTE: If confirm mode is on, the on_ack or on_nack callback will be called whether or
not on_return is called first.
=back
=head2 cancel
Cancel a queue subscription.
Note that the cancellation B<will not> take place at once, and further messages may be
consumed before the subscription is cancelled. No further messages will be
consumed after the on_success callback has been called.
Arguments:
=over
=item consumer_tag
Identifies this consumer, needs to be the value supplied when the queue is initially
consumed from.
=item on_success
Callback called if the subscription was successfully cancelled.
=item on_failure
Callback called if the subscription could not be cancelled for any reason.
=back
=head2 get
Try to get a single message from a queue.
Arguments:
=over
=item queue
Mandatory. Name of the queue to try to receive a message from.
=item on_success
Will be called either with either a message, or, if the queue is empty,
a notification that there was nothing to collect from the queue.
=item on_failure
This callback will be called if an error is signalled on this channel.
=item no_ack
0 or 1, default 1
=back
( run in 0.947 second using v1.01-cache-2.11-cpan-ceb78f64989 )