AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

        - Allow AnyEvent::TLS options to be passed (Nicolas R).
        - Correct check when in confirm mode (Ruslan Zakirov).
        - Fix @INC breaking perls >= 5.26 (Nicolas R).
        - Minor test fixes

1.19  Sat Mar 21 16:49:24 GMT 2015
        - Add 'no_ack' as an optional argument to the ->consume method
          (Dave Mueller).
        - Fill in some missing documentation (Moritz Lenz).
1.18  Mon Sep 29 19:36:00 PDT 2014
        - Added the bind_exchange and unbind_exchange methods
          for exchange-exchange bindings.

1.17  Fri Jul 25 14:02:00 PDT 2014
        - Add support for chunking large bodies into multiple AMQP frames,
          allowing the sending of large messages.

1.16  Sat Apr 12 14:42:00 BST 2014
        - Doc fixes (Mark Ellis)
        - Fix leak when calling ->close + tests (Peter Haworth)

1.15  Mon Jul 1 12:35:00 BST 2013

README  view on Meta::CPAN

This is Perl module AnyEvent::RabbitMQ.

AnyEvent::RabbitMQ is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in an asynchronous fashion.

You can use AnyEvent::RabbitMQ to -

  * Declare and delete exchanges
  * Declare, delete, bind and unbind queues
  * Set QoS
  * Publish, consume, get, ack, recover and reject messages
  * Select, commit and rollback transactions

AnyEvent::RabbitMQ is known to work with RabbitMQ versions 2.5.1 and version 0-8 of the AMQP specification.

INSTALLATION

Download it, unpack it, then build it as per the usual:

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN


            $self->_push_write(
                Net::AMQP::Protocol::Connection::StartOk->new(
                    client_properties => {
                        platform     => 'Perl',
                        product      => __PACKAGE__,
                        information  => 'http://d.hatena.ne.jp/cooldaemon/',
                        version      => Net::AMQP::Value::String->new(__PACKAGE__->VERSION),
                        capabilities => {
                            consumer_cancel_notify     => Net::AMQP::Value::true,
                            exchange_exchange_bindings => Net::AMQP::Value::true,
                        },
                        %{ $args{client_properties} || {} },
                    },
                    mechanism => 'AMQPLAIN',
                    response => {
                        LOGIN    => $args{user},
                        PASSWORD => $args{pass},
                    },
                    locale => 'en_US',
                ),

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN


  print $cv->recv, "\n";

=head1 DESCRIPTION

AnyEvent::RabbitMQ is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in an asynchronous fashion.

You can use AnyEvent::RabbitMQ to -

  * Declare and delete exchanges
  * Declare, delete, bind and unbind queues
  * Set QoS and confirm mode
  * Publish, consume, get, ack, recover and reject messages
  * Select, commit and rollback transactions

Most of these actions can be done through L<AnyEvent::RabbitMQ::Channel>.
Please see the documentation there for more details.

AnyEvent::RabbitMQ is known to work with RabbitMQ versions 2.5.1 and versions 0-8 and 0-9-1 of the AMQP specification.

This client is the non-blocking version, for a blocking version with a similar API, see L<Net::RabbitFoot>.

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

        },
        'Exchange::DeclareOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub bind_exchange {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Exchange::Bind',
        {
            %args, # source, destination, routing_key
            ticket      => 0,

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

        },
        'Exchange::BindOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub unbind_exchange {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Exchange::Unbind',
        {
            %args, # source, destination, routing_key
            ticket      => 0,
            nowait      => 0, # FIXME
        },
        'Exchange::UnbindOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub delete_exchange {
    my $self = shift;

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

            ticket      => 0,
            nowait      => 0, # FIXME
        },
        'Queue::DeclareOk',
        $cb,
        $failure_cb,
        $self->{id},
    );
}

sub bind_queue {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Queue::Bind',
        {
            %args, # queue, exchange, routing_key
            ticket => 0,

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

        },
        'Queue::BindOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub unbind_queue {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Queue::Unbind',
        {
            %args, # queue, exchange, routing_key
            ticket => 0,
        },
        'Queue::UnbindOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub purge_queue {
    my $self = shift;

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

=item internal

Default 0

=item exchange

The name of the exchange

=back

=head2 bind_exchange

Binds an exchange to another exchange, with a routing key.

Arguments:

=over

=item source

The name of the source exchange to bind

=item destination

The name of the destination exchange to bind

=item routing_key

The routing key to bind with

=back

=head2 unbind_exchange

=head2 delete_exchange

=head2 declare_queue

Declare a queue (create it if it doesn't exist yet) for publishing messages
to on the server.

  my $done    = AnyEvent->condvar;
  $channel->declare_queue(

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

=item x-expires

The queue will automatically be removed after being idle for this many milliseconds.

Default of 0 disables automatic queue removal.

=back

=back

=head2 bind_queue

Binds a queue to an exchange, with a routing key.

Arguments:

=over

=item queue

The name of the queue to bind

=item exchange

The name of the exchange to bind

=item routing_key

The routing key to bind with

=back

=head2 unbind_queue

=head2 purge_queue

Flushes the contents of a queue.

=head2 delete_queue

Deletes a queue. The queue may not have any active consumers.

=head2 consume

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

Hash of AMQP message header info, including the confusingly similar element "headers",
which may contain arbitrary string key/value pairs.

=item body

The text body of the message to send.

=item mandatory

Boolean; if true, then if the message doesn't land in a queue (e.g. the exchange has no
bindings), it will be "returned."  (see "on_return")

=item immediate

Boolean; if true, then if the message cannot be delivered directly to a consumer, it
will be "returned."  (see "on_return")

=item on_ack

Callback called with the frame that acknowledges receipt (if channel is in confirm mode),
typically L<Net::AMQP::Protocol::Basic::Ack>.

share/fixed_amqp0-8.xml  view on Meta::CPAN

                        / C:DELETE   S:DELETE-OK
</doc>
    <chassis name="server" implement="MUST"/>
    <chassis name="client" implement="MUST"/>
    <rule implement="MUST">
      <test>amq_exchange_19</test>
  The server MUST implement the direct and fanout exchange types, and
  predeclare the corresponding exchanges named amq.direct and amq.fanout
  in each virtual host. The server MUST also predeclare a direct
  exchange to act as the default exchange for content Publish methods
  and for default queue bindings.
</rule>
    <rule implement="SHOULD">
      <test>amq_exchange_20</test>
  The server SHOULD implement the topic exchange type, and predeclare
  the corresponding exchange named amq.topic in each virtual host.
</rule>
    <rule implement="MAY">
      <test>amq_exchange_21</test>
  The server MAY implement the system exchange type, and predeclare the
  corresponding exchanges named amq.system in each virtual host. If the
  client attempts to bind a queue to the system exchange, the server
  MUST raise a connection exception with reply code 507 (not allowed).
</rule>
    <rule implement="MUST">
      <test>amq_exchange_22</test>
  The default exchange MUST be defined as internal, and be inaccessible
  to the client except by specifying an empty exchange name in a content
  Publish method. That is, the server MUST NOT let clients make explicit
  bindings to this exchange.
</rule>
    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
    <method name="declare" synchronous="1" index="10">
  declare exchange, create if needed
  <doc>
    This method creates an exchange if it does not already exist, and if the
    exchange exists, verifies that it is of the correct and expected class.
  </doc>
      <rule implement="SHOULD">
        <test>amq_exchange_23</test>

share/fixed_amqp0-8.xml  view on Meta::CPAN

    auto-delete when unused
    <doc>
      If set, the exchange is deleted when all queues have finished
      using it.
    </doc>
        <rule implement="SHOULD">
          <test>amq_exchange_02</test>
      The server SHOULD allow for a reasonable delay between the point
      when it determines that an exchange is not being used (or no longer
      used), and the point when it deletes the exchange.  At the least it
      must allow a client to create an exchange and then bind a queue to
      it, with a small but non-zero delay between these two actions.
    </rule>
        <rule implement="MUST">
          <test>amq_exchange_25</test>
      The server MUST ignore the auto-delete field if the exchange already
      exists.
    </rule>
      </field>
      <field name="internal" type="bit">
    create internal exchange

share/fixed_amqp0-8.xml  view on Meta::CPAN

    This method confirms a Declare method and confirms the name of the
    exchange, essential for automatically-named exchanges.
  </doc>
      <chassis name="client" implement="MUST"/>
    </method>
    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
    <method name="delete" synchronous="1" index="20">
  delete an exchange
  <doc>
    This method deletes an exchange.  When an exchange is deleted all queue
    bindings on the exchange are cancelled.
  </doc>
      <chassis name="server" implement="MUST"/>
      <response name="delete-ok"/>
      <field name="ticket" domain="access ticket">
        <rule implement="MUST">
      The client MUST provide a valid access ticket giving "active"
      access rights to the exchange's access realm.
    </rule>
      </field>
      <field name="exchange" domain="exchange name">

share/fixed_amqp0-8.xml  view on Meta::CPAN

          <test>amq_exchange_11</test>
      The exchange MUST exist. Attempting to delete a non-existing exchange
      causes a channel exception.
    </rule>
        <assert check="notnull"/>
      </field>
      <field name="if unused" type="bit">
    delete only if unused
    <doc>
      If set, the server will only delete the exchange if it has no queue
      bindings. If the exchange has queue bindings the server does not
      delete it but raises a channel exception instead.
    </doc>
        <rule implement="SHOULD">
          <test>amq_exchange_12</test>
      If set, the server SHOULD delete the exchange but only if it has
      no queue bindings.
    </rule>
        <rule implement="SHOULD">
          <test>amq_exchange_13</test>
      If set, the server SHOULD raise a channel exception if the exchange is in
      use.
    </rule>
      </field>

  <field name = "nowait" type = "bit">
    do not send a reply method

share/fixed_amqp0-8.xml  view on Meta::CPAN

    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
    <method name="declare" synchronous="1" index="10">
  declare queue, create if needed
  <doc>
    This method creates or checks a queue.  When creating a new queue
    the client can specify various properties that control the durability
    of the queue and its contents, and the level of sharing for the queue.
  </doc>
      <rule implement="MUST">
        <test>amq_queue_34</test>
    The server MUST create a default binding for a newly-created queue
    to the default exchange, which is an exchange of type 'direct'.
  </rule>
      <rule implement="SHOULD">
        <test>amq_queue_35</test>
    The server SHOULD support a minimum of 256 queues per virtual host
    and ideally, impose no limit except as defined by available resources.
  </rule>
      <chassis name="server" implement="MUST"/>
      <response name="declare-ok"/>
      <field name="ticket" domain="access ticket">

share/fixed_amqp0-8.xml  view on Meta::CPAN

      <field name="consumer count" type="long">
    number of consumers
    <doc>
      Reports the number of active consumers for the queue. Note that
      consumers can suspend activity (Channel.Flow) in which case they
      do not appear in this count.
    </doc>
      </field>
    </method>
    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
    <method name="bind" synchronous="1" index="20">
  bind queue to an exchange
  <doc>
    This method binds a queue to an exchange.  Until a queue is
    bound it will not receive any messages.  In a classic messaging
    model, store-and-forward queues are bound to a dest exchange
    and subscription queues are bound to a dest_wild exchange.
  </doc>
      <rule implement="MUST">
        <test>amq_queue_25</test>
    A server MUST allow ignore duplicate bindings - that is, two or
    more bind methods for a specific queue, with identical arguments
    - without treating these as an error.
  </rule>
      <rule implement="MUST">
        <test>amq_queue_39</test>
    If a bind fails, the server MUST raise a connection exception.
  </rule>
      <rule implement="MUST">
        <test>amq_queue_12</test>
    The server MUST NOT allow a durable queue to bind to a transient
    exchange. If the client attempts this the server MUST raise a
    channel exception.
  </rule>
      <rule implement="SHOULD">
        <test>amq_queue_13</test>
    Bindings for durable queues are automatically durable and the
    server SHOULD restore such bindings after a server restart.
  </rule>
      <rule implement="MUST">
        <test>amq_queue_17</test>
    If the client attempts to an exchange that was declared as internal,
    the server MUST raise a connection exception with reply code 530
    (not allowed).
  </rule>
      <rule implement="SHOULD">
        <test>amq_queue_40</test>
    The server SHOULD support at least 4 bindings per queue, and
    ideally, impose no limit except as defined by available resources.
  </rule>
      <chassis name="server" implement="MUST"/>
      <response name="bind-ok"/>
      <field name="ticket" domain="access ticket">
        <doc>
      The client provides a valid access ticket giving "active"
      access rights to the queue's access realm.
    </doc>
      </field>

  <field name = "queue" domain = "queue name">
    <doc>
      Specifies the name of the queue to bind.  If the queue name is
      empty, refers to the current queue for the channel, which is
      the last declared queue.
    </doc>
    <doc name = "rule">
      If the client did not previously declare a queue, and the queue
      name in this method is empty, the server MUST raise a connection
      exception with reply code 530 (not allowed).
    </doc>
    <doc name = "rule" test = "amq_queue_26">
      If the queue does not exist the server MUST raise a channel exception
      with reply code 404 (not found).
    </doc>
  </field>

  <field name="exchange" domain="exchange name">
          The name of the exchange to bind to.
          <rule implement="MUST">
          <test>amq_queue_14</test>
      If the exchange does not exist the server MUST raise a channel
      exception with reply code 404 (not found).
    </rule>
      </field>
      <field name="routing key" type="shortstr">
     message routing key
    <doc>
      Specifies the routing key for the binding.  The routing key is
      used for routing messages depending on the exchange configuration.
      Not all exchanges use a routing key - refer to the specific
      exchange documentation.  If the routing key is empty and the queue
      name is empty, the routing key will be the current queue for the
      channel, which is the last declared queue.
    </doc>
  </field>

  <field name = "nowait" type = "bit">
    do not send a reply method
    <doc>
    If set, the server will not respond to the method. The client should
    not wait for a reply method.  If the server could not complete the
    method it will raise a channel or connection exception.
    </doc>
  </field>

  <field name="arguments" type="table">
    arguments for binding
    <doc>
      A set of arguments for the binding.  The syntax and semantics of
      these arguments depends on the exchange class.
    </doc>
      </field>
    </method>
    <method name="bind-ok" synchronous="1" index="21">
  confirm bind successful
  <doc>
    This method confirms that the bind was successful.
  </doc>
      <chassis name="client" implement="MUST"/>
    </method>

    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
    <method name="unbind" synchronous="1" index="50">
      unbind a queue from an exchange
      <doc>This method unbinds a queue from an exchange.</doc>
      <rule implement="MUST">
        If a unbind fails, the server MUST raise a connection exception.
      </rule>
      <chassis name="server" implement="MUST"/>
      <response name="unbind-ok"/>

      <field name="ticket" domain="access ticket">
        <doc>
          The client provides a valid access ticket giving "active"
          access rights to the queue's access realm.
        </doc>
      </field>

      <field name="queue" domain="queue name">
        <doc>Specifies the name of the queue to unbind.</doc>
      </field>

      <field name="exchange" domain="exchange name">
        <doc>The name of the exchange to unbind from.</doc>
      </field>

      <field name="routing key" type="shortstr">
        routing key of binding
        <doc>Specifies the routing key of the binding to unbind.</doc>
      </field>

      <field name="arguments" type="table">
        arguments of binding
        <doc>Specifies the arguments of the binding to unbind.</doc>
      </field>
    </method>

    <method name="unbind-ok" synchronous="1" index="51">
      confirm unbind successful
      <doc>This method confirms that the unbind was successful.</doc>
      <chassis name="client" implement="MUST"/>
    </method>

    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
    <method name="purge" synchronous="1" index="30">
  purge a queue
  <doc>
    This method removes all messages from a queue.  It does not cancel
    consumers.  Purged messages are deleted without any formal "undo"
    mechanism.

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

<?xml version = "1.0"?>

<!--
     WARNING: Modified from the official 0-9-1 specification XML by
     the addition of:
     confirm.select and confirm.select-ok,
     exchange.bind and exchange.bind-ok,
     exchange.unbind and exchange.unbind-ok,
     basic.nack,
     the ability for the Server to send basic.ack, basic.nack and
      basic.cancel to the client, and
     the un-deprecation of exchange.declare{auto-delete} and exchange.declare{internal}
-->

<!--
    Copyright Notice
    ================
    Copyright (c) 2006-2008 Cisco Systems, Credit Suisse, Deutsche Boerse

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

        The server MUST, in each virtual host, pre-declare an exchange instance
        for each standard exchange type that it implements, where the name of the
        exchange instance, if defined, is "amq." followed by the exchange type name.
      </doc>
      <doc>
        The server MUST, in each virtual host, pre-declare at least two direct
        exchange instances: one named "amq.direct", the other with no public name
        that serves as a default  exchange for Publish methods.
      </doc>
      <doc type = "scenario">
        Client declares a temporary queue and attempts to bind to each required
        exchange instance ("amq.fanout", "amq.direct", "amq.topic", and "amq.headers"
        if those types are defined).
      </doc>
    </rule>
    <rule name = "default-exchange">
      <doc>
        The server MUST pre-declare a direct exchange with no public name to act as
        the default exchange for content Publish methods and for default queue bindings.
      </doc>
      <doc type = "scenario">
        Client checks that the default exchange is active by specifying a queue
        binding with no exchange name, and publishing a message with a suitable
        routing key but without specifying the exchange name, then ensuring that
        the message arrives in the queue correctly.
      </doc>
    </rule>
    <rule name = "default-access">
      <doc>
        The server MUST NOT allow clients to access the default exchange except
        by specifying an empty exchange name in the Queue.Bind and content Publish
        methods.
      </doc>

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

        <doc>
          If set, the exchange is deleted when all queues have
          finished using it.
        </doc>
        <rule name = "amq_exchange_02">
          <doc>
            The server SHOULD allow for a reasonable delay between the
            point when it determines that an exchange is not being
            used (or no longer used), and the point when it deletes
            the exchange.  At the least it must allow a client to
            create an exchange and then bind a queue to it, with a
            small but non-zero delay between these two actions.
          </doc>
        </rule>
        <rule name = "amq_exchange_25">
          <doc>
            The server MUST ignore the auto-delete field if the
            exchange already exists.
          </doc>
        </rule>
      </field>

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

        This method confirms a Declare method and confirms the name of the exchange,
        essential for automatically-named exchanges.
      </doc>
      <chassis name = "client" implement = "MUST" />
    </method>

    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->

    <method name = "delete" synchronous = "1" index = "20" label = "delete an exchange">
      <doc>
        This method deletes an exchange. When an exchange is deleted all queue bindings on
        the exchange are cancelled.
      </doc>

      <chassis name = "server" implement = "MUST" />
      <response name = "delete-ok" />

      <!-- Deprecated: "ticket", must be zero -->
      <field name = "reserved-1" type = "short" reserved = "1" />

      <field name = "exchange" domain = "exchange-name">
        <rule name = "exists" on-failure = "not-found">
          <doc>
            The client MUST NOT attempt to delete an exchange that does not exist.
          </doc>
        </rule>
        <assert check = "notnull" />
      </field>

      <field name = "if-unused" domain = "bit" label = "delete only if unused">
        <doc>
          If set, the server will only delete the exchange if it has no queue bindings. If
          the exchange has queue bindings the server does not delete it but raises a
          channel exception instead.
        </doc>
        <rule name = "in-use" on-failure = "precondition-failed">
          <doc>
            The server MUST NOT delete an exchange that has bindings on it, if the if-unused
            field is true.
          </doc>
          <doc type = "scenario">
            The client declares an exchange, binds a queue to it, then tries to delete it
            setting if-unused to true.
          </doc>
        </rule>
      </field>

      <field name = "no-wait" domain = "no-wait" />
    </method>

    <method name = "delete-ok" synchronous = "1" index = "21"
      label = "confirm deletion of an exchange">
      <doc>This method confirms the deletion of an exchange.</doc>
      <chassis name = "client" implement = "MUST" />
    </method>

    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->

    <method name = "bind" synchronous = "1" index = "30"
            label = "bind exchange to an exchange">

      <doc>This method binds an exchange to an exchange.</doc>

      <rule name = "duplicates">
        <doc>
          A server MUST allow and ignore duplicate bindings - that is,
          two or more bind methods for a specific exchanges, with
          identical arguments - without treating these as an error.
        </doc>
        <doc type = "scenario">
          A client binds an exchange to an exchange. The client then
          repeats the bind (with identical arguments).
        </doc>
      </rule>

      <rule name = "cyclical">
        <doc>
          A server MUST allow cycles of exchange bindings to be
          created including allowing an exchange to be bound to
          itself.
        </doc>
        <doc type = "scenario">
          A client declares an exchange and binds it to itself.
        </doc>
      </rule>

      <rule name = "unique">
        <doc>
          A server MUST not deliver the same message more than once to
          a destination exchange, even if the topology of exchanges
          and bindings results in multiple (even infinite) routes to
          that exchange.
        </doc>
        <doc type = "scenario">
          A client declares an exchange and binds it using multiple
          bindings to the amq.topic exchange. The client then
          publishes a message to the amq.topic exchange that matches
          all the bindings.
        </doc>
      </rule>

      <chassis name = "server" implement = "MUST"/>

      <response name = "bind-ok"/>

      <!-- Deprecated: "ticket", must be zero -->
      <field name = "reserved-1" type = "short" reserved = "1"/>

      <field name = "destination" domain = "exchange-name"
             label = "name of the destination exchange to bind to">
        <doc>Specifies the name of the destination exchange to bind.</doc>
        <rule name = "exchange-existence" on-failure = "not-found">
          <doc>
            A client MUST NOT be allowed to bind a non-existent
            destination exchange.
          </doc>
          <doc type = "scenario">
            A client attempts to bind an undeclared exchange to an
            exchange.
          </doc>
        </rule>
        <rule name = "default-exchange">
          <doc>
            The server MUST accept a blank exchange name to mean the
            default exchange.
          </doc>
          <doc type = "scenario">
            The client declares an exchange and binds a blank exchange
            name to it.
          </doc>
        </rule>
      </field>

      <field name = "source" domain = "exchange-name"
             label = "name of the source exchange to bind to">
        <doc>Specifies the name of the source exchange to bind.</doc>
        <rule name = "exchange-existence" on-failure = "not-found">
          <doc>
            A client MUST NOT be allowed to bind a non-existent source
            exchange.
          </doc>
          <doc type = "scenario">
            A client attempts to bind an exchange to an undeclared
            exchange.
          </doc>
        </rule>
        <rule name = "default-exchange">
          <doc>
            The server MUST accept a blank exchange name to mean the
            default exchange.
          </doc>
          <doc type = "scenario">
            The client declares an exchange and binds it to a blank
            exchange name.
          </doc>
        </rule>
      </field>

      <field name = "routing-key" domain = "shortstr"
             label = "message routing key">
        <doc>
          Specifies the routing key for the binding. The routing key
          is used for routing messages depending on the exchange
          configuration. Not all exchanges use a routing key - refer
          to the specific exchange documentation.
        </doc>
      </field>

      <field name = "no-wait" domain = "no-wait"/>

      <field name = "arguments" domain = "table"
             label = "arguments for binding">
        <doc>
          A set of arguments for the binding. The syntax and semantics
          of these arguments depends on the exchange class.
        </doc>
      </field>
    </method>

    <method name="bind-ok" synchronous="1" index="31"
            label = "confirm bind successful">
      <doc>This method confirms that the bind was successful.</doc>

      <chassis name="client" implement="MUST"/>
    </method>

    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->

    <method name = "unbind" synchronous = "1" index = "40"
            label = "unbind an exchange from an exchange">
      <doc>This method unbinds an exchange from an exchange.</doc>
      <rule name = "01">
        <doc>If a unbind fails, the server MUST raise a connection exception.</doc>
      </rule>
      <chassis name = "server" implement = "MUST"/>
      <response name = "unbind-ok"/>

      <!-- Deprecated: "ticket", must be zero -->
      <field name = "reserved-1" type = "short" reserved = "1"/>

      <field name = "destination" domain = "exchange-name">
        <doc>Specifies the name of the destination exchange to unbind.</doc>
        <rule name = "must-exist" on-failure = "not-found">
          <doc>
            The client MUST NOT attempt to unbind an exchange that
            does not exist from an exchange.
          </doc>
          <doc type = "scenario">
            The client attempts to unbind a non-existent exchange from
            an exchange.
          </doc>
        </rule>
        <rule name = "default-exchange">
          <doc>
            The server MUST accept a blank exchange name to mean the
            default exchange.
          </doc>
          <doc type = "scenario">
            The client declares an exchange, binds a blank exchange
            name to it, and then unbinds a blank exchange name from
            it.
          </doc>
        </rule>
      </field>

      <field name = "source" domain = "exchange-name">
        <doc>Specifies the name of the source exchange to unbind.</doc>
        <rule name = "must-exist" on-failure = "not-found">
          <doc>
            The client MUST NOT attempt to unbind an exchange from an
            exchange that does not exist.
          </doc>
          <doc type = "scenario">
            The client attempts to unbind an exchange from a
            non-existent exchange.
          </doc>
        </rule>
        <rule name = "default-exchange">
          <doc>
            The server MUST accept a blank exchange name to mean the
            default exchange.
          </doc>
          <doc type = "scenario">
            The client declares an exchange, binds an exchange to a
            blank exchange name, and then unbinds an exchange from a
            black exchange name.
          </doc>
        </rule>
      </field>

      <field name = "routing-key" domain = "shortstr"
             label = "routing key of binding">
        <doc>Specifies the routing key of the binding to unbind.</doc>
      </field>

      <field name = "no-wait" domain = "no-wait"/>

      <field name = "arguments" domain = "table"
             label = "arguments of binding">
        <doc>Specifies the arguments of the binding to unbind.</doc>
      </field>
    </method>

    <method name = "unbind-ok" synchronous = "1" index = "51"
            label = "confirm unbind successful">
      <doc>This method confirms that the unbind was successful.</doc>
      <chassis name = "client" implement = "MUST"/>
    </method>

  </class>

  <!-- ==  QUEUE  ============================================================ -->

  <class name = "queue" handler = "channel" index = "50" label = "work with queues">
    <doc>
      Queues store and forward messages. Queues can be configured in the server or created at

share/fixed_amqp0-9-1.xml  view on Meta::CPAN


    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->

    <method name = "declare" synchronous = "1" index = "10" label = "declare queue, create if needed">
      <doc>
        This method creates or checks a queue. When creating a new queue the client can
        specify various properties that control the durability of the queue and its
        contents, and the level of sharing for the queue.
      </doc>

      <rule name = "default-binding">
        <doc>
          The server MUST create a default binding for a newly-declared queue to the
          default exchange, which is an exchange of type 'direct' and use the queue
          name as the routing key.
        </doc>
        <doc type = "scenario">
          Client declares a new queue, and then without explicitly binding it to an
          exchange, attempts to send a message through the default exchange binding,
          i.e. publish a message to the empty exchange, with the queue name as routing
          key.
        </doc>
      </rule>

      <rule name = "minimum-queues">
        <doc>
          The server SHOULD support a minimum of 256 queues per virtual host and ideally,
          impose no limit except as defined by available resources.
        </doc>

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

          </doc>
        </rule>

        <rule name = "exclusive" on-failure = "resource-locked">
          <doc>
            The client MAY NOT attempt to use a queue that was declared as exclusive
            by another still-open connection.
          </doc>
          <doc type = "scenario">
            One client declares an exclusive queue. A second client on a different
            connection attempts to declare, bind, consume, purge, delete, or declare
            a queue of the same name.
          </doc>
        </rule>
      </field>

      <field name = "auto-delete" domain = "bit" label = "auto-delete queue when unused">
        <doc>
          If set, the queue is deleted when all consumers have finished using it.  The last
          consumer can be cancelled either explicitly or because its channel is closed. If
          there was no consumer ever on the queue, it won't be deleted.  Applications can

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

      <field name = "consumer-count" domain = "long" label = "number of consumers">
        <doc>
          Reports the number of active consumers for the queue. Note that consumers can
          suspend activity (Channel.Flow) in which case they do not appear in this count.
        </doc>
      </field>
    </method>

    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->

    <method name = "bind" synchronous = "1" index = "20" label = "bind queue to an exchange">
      <doc>
        This method binds a queue to an exchange. Until a queue is bound it will not
        receive any messages. In a classic messaging model, store-and-forward queues
        are bound to a direct exchange and subscription queues are bound to a topic
        exchange.
      </doc>

      <rule name = "duplicates">
        <doc>
          A server MUST allow ignore duplicate bindings - that is, two or more bind
          methods for a specific queue, with identical arguments - without treating these
          as an error.
        </doc>
        <doc type = "scenario">
          A client binds a named queue to an exchange. The client then repeats the bind
          (with identical arguments).
        </doc>
      </rule>

      <rule name = "unique">
        <doc>
          A server MUST not deliver the same message more than once to a queue, even if
          the queue has multiple bindings that match the message.
        </doc>
        <doc type = "scenario">
          A client declares a named queue and binds it using multiple bindings to the
          amq.topic exchange. The client then publishes a message that matches all its
          bindings.
        </doc>
      </rule>

      <rule name = "transient-exchange">
        <doc>
          The server MUST allow a durable queue to bind to a transient exchange.
        </doc>
        <doc type = "scenario">
          A client declares a transient exchange. The client then declares a named durable
          queue and then attempts to bind the transient exchange to the durable queue.
        </doc>
      </rule>

      <rule name = "durable-exchange">
        <doc>
          Bindings of durable queues to durable exchanges are automatically durable
          and the server MUST restore such bindings after a server restart.
        </doc>
        <doc type = "scenario">
          A server declares a named durable queue and binds it to a durable exchange. The
          server is restarted. The client then attempts to use the queue/exchange combination.
        </doc>
      </rule>

      <rule name = "binding-count">
        <doc>
          The server SHOULD support at least 4 bindings per queue, and ideally, impose no
          limit except as defined by available resources.
        </doc>
        <doc type = "scenario">
          A client declares a named queue and attempts to bind it to 4 different
          exchanges.
        </doc>
      </rule>

      <chassis name = "server" implement = "MUST" />

      <response name = "bind-ok" />

      <!-- Deprecated: "ticket", must be zero -->
      <field name = "reserved-1" type = "short" reserved = "1" />

      <field name = "queue" domain = "queue-name">
        <doc>Specifies the name of the queue to bind.</doc>
        <rule name = "queue-known" on-failure = "not-found">
          <doc>
            The client MUST either specify a queue name or have previously declared a
            queue on the same channel
          </doc>
          <doc type = "scenario">
            The client opens a channel and attempts to bind an unnamed queue.
          </doc>
        </rule>
        <rule name = "must-exist" on-failure = "not-found">
          <doc>
            The client MUST NOT attempt to bind a queue that does not exist.
          </doc>
          <doc type = "scenario">
            The client attempts to bind a non-existent queue.
          </doc>
        </rule>
      </field>

      <field name = "exchange" domain = "exchange-name" label = "name of the exchange to bind to">
        <rule name = "exchange-existence" on-failure = "not-found">
          <doc>
            A client MUST NOT be allowed to bind a queue to a non-existent exchange.
          </doc>
          <doc type = "scenario">
            A client attempts to bind an named queue to a undeclared exchange.
          </doc>
        </rule>
        <rule name = "default-exchange">
          <doc>
            The server MUST accept a blank exchange name to mean the default exchange.
          </doc>
          <doc type = "scenario">
            The client declares a queue and binds it to a blank exchange name.
          </doc>
        </rule>
      </field>

      <field name = "routing-key" domain = "shortstr" label = "message routing key">
        <doc>
          Specifies the routing key for the binding. The routing key is used for routing
          messages depending on the exchange configuration. Not all exchanges use a
          routing key - refer to the specific exchange documentation.  If the queue name
          is empty, the server uses the last queue declared on the channel.  If the
          routing key is also empty, the server uses this queue name for the routing
          key as well.  If the queue name is provided but the routing key is empty, the
          server does the binding with that empty routing key.  The meaning of empty
          routing keys depends on the exchange implementation.
        </doc>
        <rule name = "direct-exchange-key-matching">
          <doc>
            If a message queue binds to a direct exchange using routing key K and a
            publisher sends the exchange a message with routing key R, then the message
            MUST be passed to the message queue if K = R.
          </doc>
        </rule>
      </field>

      <field name = "no-wait" domain = "no-wait" />

      <field name = "arguments" domain = "table" label = "arguments for binding">
        <doc>
          A set of arguments for the binding. The syntax and semantics of these arguments
          depends on the exchange class.
        </doc>
      </field>
    </method>

    <method name = "bind-ok" synchronous = "1" index = "21" label = "confirm bind successful">
      <doc>This method confirms that the bind was successful.</doc>

      <chassis name = "client" implement = "MUST" />
    </method>

    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->

    <method name = "unbind" synchronous = "1" index = "50" label = "unbind a queue from an exchange">
      <doc>This method unbinds a queue from an exchange.</doc>
      <rule name = "01">
        <doc>If a unbind fails, the server MUST raise a connection exception.</doc>
      </rule>
      <chassis name="server" implement="MUST"/>
      <response name="unbind-ok"/>

      <!-- Deprecated: "ticket", must be zero -->
      <field name = "reserved-1" type = "short" reserved = "1" />

      <field name = "queue" domain = "queue-name">
        <doc>Specifies the name of the queue to unbind.</doc>
        <rule name = "queue-known" on-failure = "not-found">
          <doc>
            The client MUST either specify a queue name or have previously declared a
            queue on the same channel
          </doc>
          <doc type = "scenario">
            The client opens a channel and attempts to unbind an unnamed queue.
          </doc>
        </rule>
        <rule name = "must-exist" on-failure = "not-found">
          <doc>
            The client MUST NOT attempt to unbind a queue that does not exist.
          </doc>
          <doc type = "scenario">
            The client attempts to unbind a non-existent queue.
          </doc>
        </rule>
      </field>

      <field name = "exchange" domain = "exchange-name">
        <doc>The name of the exchange to unbind from.</doc>
        <rule name = "must-exist" on-failure = "not-found">
          <doc>
            The client MUST NOT attempt to unbind a queue from an exchange that
            does not exist.
          </doc>
          <doc type = "scenario">
            The client attempts to unbind a queue from a non-existent exchange.
          </doc>
        </rule>
        <rule name = "default-exchange">
          <doc>
            The server MUST accept a blank exchange name to mean the default exchange.
          </doc>
          <doc type = "scenario">
            The client declares a queue and binds it to a blank exchange name.
          </doc>
        </rule>
      </field>

      <field name = "routing-key" domain = "shortstr" label = "routing key of binding">
        <doc>Specifies the routing key of the binding to unbind.</doc>
      </field>

      <field name = "arguments" domain = "table" label = "arguments of binding">
        <doc>Specifies the arguments of the binding to unbind.</doc>
      </field>
    </method>

    <method name = "unbind-ok" synchronous = "1" index = "51" label = "confirm unbind successful">
      <doc>This method confirms that the unbind was successful.</doc>
      <chassis name = "client" implement = "MUST"/>
    </method>

    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->

    <method name = "purge" synchronous = "1" index = "30" label = "purge a queue">
      <doc>
        This method removes all messages from a queue which are not awaiting
        acknowledgment.
      </doc>

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

      <field name = "if-empty" domain = "bit" label = "delete only if empty">
        <doc>
          If set, the server will only delete the queue if it has no messages.
        </doc>
        <rule name = "not-empty" on-failure = "precondition-failed">
          <doc>
            The server MUST NOT delete a queue that has messages on it, if the
            if-empty field is true.
          </doc>
          <doc type = "scenario">
            The client declares a queue, binds it and publishes some messages into it,
            then tries to delete it setting if-empty to true.
          </doc>
        </rule>
      </field>

      <field name = "no-wait" domain = "no-wait" />
    </method>

    <method name = "delete-ok" synchronous = "1" index = "41" label = "confirm deletion of a queue">
      <doc>This method confirms the deletion of a queue.</doc>

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

          </doc>
          <doc type = "scenario">
            The client attempts to publish a content to a non-existent exchange.
          </doc>
        </rule>
        <rule name = "default-exchange">
          <doc>
            The server MUST accept a blank exchange name to mean the default exchange.
          </doc>
          <doc type = "scenario">
            The client declares a queue and binds it to a blank exchange name.
          </doc>
        </rule>
        <rule name = "02">
          <doc>
            If the exchange was declared as an internal exchange, the server MUST raise
            a channel exception with a reply code 403 (access refused).
          </doc>
          <doc type = "scenario">
            TODO.
          </doc>

xt/04_anyevent.t  view on Meta::CPAN

    exchange   => 'test_x_dest',
    on_success => sub {
        pass('declare destination exchange');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->bind_exchange(
    source      => 'test_x',
    destination => 'test_x_dest',
    on_success => sub {
        pass('bind exchange -> dest');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->declare_queue(
    queue      => 'test_q',
    on_success => sub {
        pass('declare queue');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->bind_queue(
    queue       => 'test_q',
    exchange    => 'test_x',
    routing_key => 'test_r',
    on_success  => sub {
        pass('bound queue');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

xt/04_anyevent.t  view on Meta::CPAN

    queue      => 'test_q',
    on_success => sub {
        pass('purge queue');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->unbind_queue(
    queue       => 'test_q',
    exchange    => 'test_x',
    routing_key => 'test_r',
    on_success  => sub {
        pass('unbind queue');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->delete_queue(
    queue      => 'test_q',
    on_success => sub {
        pass('delete queue');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->unbind_exchange(
    source      => 'test_x',
    destination => 'test_x_dest',
    on_success => sub {
        pass('unbind exchange');
        $done->send;
    },
    on_failure => failure_cb($done),
);
$done->recv;

$done = AnyEvent->condvar;
$ch->delete_exchange(
    exchange   => 'test_x',
    on_success => sub {



( run in 0.737 second using v1.01-cache-2.11-cpan-2398b32b56e )