AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

    $self->_canceled($_, $frame) for @tags;

    # Report close to on_close callback
    { local $@;
      eval { $self->{on_close}->($frame) };
      warn "Error in channel on_close callback, ignored:\n  $@  " if $@; }

    # Reset state (partly redundant)
    $self->_reset;

    return $self;
}

sub _close_frame {
    my $self = shift;
    my ($text,) = @_;

    Net::AMQP::Frame::Method->new(
        method_frame => Net::AMQP::Protocol::Channel::Close->new(
            reply_text => $text,
        ),
    );
}

sub _orphan {
    my $self = shift;

    if (my $connection = $self->{connection}) {
        $connection->_delete_channel($self);
    }
    return $self;
}

sub declare_exchange {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Exchange::Declare',
        {
            type        => 'direct',
            passive     => 0,
            durable     => 0,
            auto_delete => 0,
            internal    => 0,
            %args, # exchange
            ticket      => 0,
            nowait      => 0, # FIXME
        },
        'Exchange::DeclareOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub bind_exchange {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Exchange::Bind',
        {
            %args, # source, destination, routing_key
            ticket      => 0,
            nowait      => 0, # FIXME
        },
        'Exchange::BindOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub unbind_exchange {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Exchange::Unbind',
        {
            %args, # source, destination, routing_key
            ticket      => 0,
            nowait      => 0, # FIXME
        },
        'Exchange::UnbindOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub delete_exchange {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Exchange::Delete',
        {
            if_unused => 0,
            %args, # exchange
            ticket    => 0,
            nowait    => 0, # FIXME
        },
        'Exchange::DeleteOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub declare_queue {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Queue::Declare',
        {
            queue       => '',
            passive     => 0,
            durable     => 0,
            exclusive   => 0,
            auto_delete => 0,
            no_ack      => 1,
            %args,
            ticket      => 0,
            nowait      => 0, # FIXME
        },
        'Queue::DeclareOk',
        $cb,
        $failure_cb,
        $self->{id},
    );
}

sub bind_queue {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Queue::Bind',
        {
            %args, # queue, exchange, routing_key
            ticket => 0,
            nowait => 0, # FIXME
        },
        'Queue::BindOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub unbind_queue {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Queue::Unbind',
        {
            %args, # queue, exchange, routing_key
            ticket => 0,
        },
        'Queue::UnbindOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub purge_queue {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Queue::Purge',
        {
            %args, # queue
            ticket => 0,
            nowait => 0, # FIXME
        },
        'Queue::PurgeOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub delete_queue {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Queue::Delete',
        {
            if_unused => 0,
            if_empty  => 0,
            %args, # queue
            ticket    => 0,
            nowait    => 0, # FIXME
        },
        'Queue::DeleteOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub publish {
    my $self = shift;
    my %args = @_;

    # Docs should advise channel-level callback over this, but still, better to give user an out
    unless ($self->{_is_active}) {

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

A channel is a light-weight virtual connection within a TCP connection to a
RabbitMQ broker.

=head1 ARGUMENTS FOR C<open_channel>

=over

=item on_close

Callback invoked when the channel closes.  Callback will be passed the
incoming message that caused the close, if any.

=item on_return

Callback invoked when a mandatory or immediate message publish fails.
Callback will be passed the incoming message, with accessors
C<method_frame>, C<header_frame>, and C<body_frame>.

=back

=head1 METHODS

=head2 declare_exchange (%args)

Declare an exchange (to publish messages to) on the server.

Arguments:

=over

=item on_success

=item on_failure

=item type

Default 'direct'

=item passive

Default 0

=item durable

Default 0

=item auto_delete

Default 0

=item internal

Default 0

=item exchange

The name of the exchange

=back

=head2 bind_exchange

Binds an exchange to another exchange, with a routing key.

Arguments:

=over

=item source

The name of the source exchange to bind

=item destination

The name of the destination exchange to bind

=item routing_key

The routing key to bind with

=back

=head2 unbind_exchange

=head2 delete_exchange

=head2 declare_queue

Declare a queue (create it if it doesn't exist yet) for publishing messages
to on the server.

  my $done    = AnyEvent->condvar;
  $channel->declare_queue(
     exchange    => $queue_exchange,
     queue       => $queueName,
     durable     => 0,
     auto_delete => 1,
     passive     => 0,
     arguments   => { 'x-expires' => 0, },
     on_success  => sub { $done->send; },
     on_failure  => sub {
         say "Unable to create queue $queueName";
         $done->send;
     },
  );
  $done->recv;

Arguments:

=over

=item queue

Name of the queue to be declared. If the queue name is the empty string,
RabbitMQ will create a unique name for the queue. This is useful for
temporary/private reply queues.

=item on_success

Callback that is called when the queue was declared successfully. The argument
to the callback is of type L<Net::AMQP::Frame::Method>. To get the name of the
Queue (if you declared it with an empty name), you can say

    on_success => sub {
        my $method = shift;
        my $name   = $method->method_frame->queue;
    };

=item on_failure

Callback that is called when the declaration of the queue has failed.

=item auto_delete

0 or 1, default 0

=item passive

0 or 1, default 0

=item durable

0 or 1, default 0

=item exclusive

0 or 1, default 0

=item no_ack

0 or 1, default 1

=item ticket

default 0

=for comment
XXX Is "exchange" a valid parameter?

=item arguments

C<arguments> is a hashref of additional parameters which RabbitMQ extensions
may use. This list is not complete and your RabbitMQ server configuration will
determine which arguments are valid and how they act.

=over

=item x-expires

The queue will automatically be removed after being idle for this many milliseconds.

Default of 0 disables automatic queue removal.

=back

=back

=head2 bind_queue

Binds a queue to an exchange, with a routing key.

Arguments:

=over

=item queue

The name of the queue to bind

=item exchange

The name of the exchange to bind

=item routing_key

The routing key to bind with

=back

=head2 unbind_queue

=head2 purge_queue

Flushes the contents of a queue.

=head2 delete_queue

Deletes a queue. The queue may not have any active consumers.

=head2 consume

Subscribe to consume messages from a queue.

Arguments:

=over

=item queue

The name of the queue to be consumed from.

=item on_consume

Callback called with an argument of the message which has been consumed.

The message is a hash reference, where the value to key C<header> is an object
of type L<Net::AMQP::Protocol::Basic::ContentHeader>, L<body> is a
L<Net::AMQP::Frame::Body>, and C<deliver> a L<Net::AMQP::Frame::Method>.

=item on_cancel

Callback called if consumption is cancelled.  This may be at client request
or as a side effect of queue deletion.  (Notification of queue deletion is a
RabbitMQ extension.)

=item consumer_tag

Identifies this consumer, will be auto-generated if you do not provide it, but you must
supply a value if you want to be able to later cancel the subscription.

=item on_success

Callback called if the subscription was successful (before the first message is consumed).

=item on_failure

Callback called if the subscription fails for any reason.

=item no_ack

Pass through the C<no_ack> flag. Defaults to C<1>. If set to C<1>, the server
will not expect messages to be acknowledged.

=back

=head2 publish

Publish a message to an exchange.

Arguments:

=over

=item exchange

The name of the exchange to send the message to.

=item routing_key

The routing key with which to publish the message.

=item header

Hash of AMQP message header info, including the confusingly similar element "headers",
which may contain arbitrary string key/value pairs.

=item body

The text body of the message to send.

=item mandatory

Boolean; if true, then if the message doesn't land in a queue (e.g. the exchange has no
bindings), it will be "returned."  (see "on_return")

=item immediate

Boolean; if true, then if the message cannot be delivered directly to a consumer, it
will be "returned."  (see "on_return")

=item on_ack

Callback called with the frame that acknowledges receipt (if channel is in confirm mode),
typically L<Net::AMQP::Protocol::Basic::Ack>.

=item on_nack

Callback called with the frame that declines receipt (if the channel is in confirm mode),
typically L<Net::AMQP::Protocol::Basic::Nack> or L<Net::AMQP::Protocol::Channel::Close>.

=item on_return

In AMQP, a "returned" message is one that cannot be delivered in compliance with the
C<immediate> or C<mandatory> flags.

If in confirm mode, this callback will be called with the frame that reports message
return, typically L<Net::AMQP::Protocol::Basic::Return>.  If confirm mode is off or
this callback is not provided, then the channel or connection objects' on_return
callbacks (if any), will be called instead.

NOTE: If confirm mode is on, the on_ack or on_nack callback will be called whether or
not on_return is called first.

=back

=head2 cancel

Cancel a queue subscription.

Note that the cancellation B<will not> take place at once, and further messages may be
consumed before the subscription is cancelled. No further messages will be
consumed after the on_success callback has been called.

Arguments:

=over

=item consumer_tag

Identifies this consumer, needs to be the value supplied when the queue is initially
consumed from.

=item on_success

Callback called if the subscription was successfully cancelled.

=item on_failure

Callback called if the subscription could not be cancelled for any reason.

=back

=head2 get



( run in 0.674 second using v1.01-cache-2.11-cpan-2398b32b56e )