AnyEvent-RabbitMQ-Simple

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

            #   |  |                                        (mail.error.#)
            #   |  |
            #   |   \-----------> info --------------->   info-logs
            #   |                   (topic:*.info.#)        (*.info.#)
            #   |
            #    \------------------------------------>   debug-queue


            # declare exchanges
            exchanges => [
                'logger' => {
                    durable => 0,
                    type => 'fanout',
                    internal => 0,
                    auto_delete => 1,
                },
                'stats' => {
                    durable => 0,
                    type => 'direct',
                    internal => 0,
                    auto_delete => 1,
                },
                'errors' => {
                    durable => 0,
                    type => 'topic',
                    internal => 0,
                    auto_delete => 1,
                },
                'info' => {
                    durable => 0,
                    type => 'topic',
                    internal => 0,
                    auto_delete => 1,
                },
            ],

            # declare queues
            queues => [
                'debug-queue' => {
                    durable => 0,
                    auto_delete => 1,
                },
                'stats-logs' => {
                    durable => 0,
                    auto_delete => 1,
                },
                'ftp-error-logs' => {
                    durable => 0,
                    auto_delete => 1,
                },
                'mail-error-logs' => {
                    durable => 0,
                    auto_delete => 1,
                },
                'info-logs' => {
                    durable => 0,
                    auto_delete => 1,
                },
            ],

            # exchange to exchange bindings, with optional routing key
            bind_exchanges => [
                { 'stats'   =>   'logger'                 },
                { 'errors'  => [ 'logger', '*.error.#' ]  },
                { 'info'    => [ 'logger', '*.info.#'  ]  },
            ],


            # queue to exchange bindings, with optional routing key
            bind_queues => [
                { 'debug-queue'     =>   'logger'                   },
                { 'ftp-error-logs'  => [ 'errors', 'ftp.error.#'  ] },
                { 'mail-error-logs' => [ 'errors', 'mail.error.#' ] },
                { 'info-logs'       => [ 'info',   'info.#'       ] },
                { 'stats-logs'      => [ 'stats',  'mail.stats'   ] },
            ],

        );

        # publisher timer
        my $t;

        # connect and set up channel
        my $conn = $rmq->connect();
        $conn->cb(
            sub {
                print "waiting for channel..\n";
                my $channel = shift->recv or $loop->croak("Could not open channel");

                print "************* consuming\n";
                for my $q ( qw( debug-queue ftp-error-logs mail-error-logs info-logs stats-logs ) ) {
                    consume($channel, $q);
                }

                print "************* starting publishing\n";
                $t = AE::timer 0, 1.0, sub { publish($channel, "message prepared at ". scalar(localtime) ) };
            }
        );

        # consumes from requested queue
        sub consume {
            my ($channel, $queue) = @_;

            my $consumer_tag;

            $channel->consume(
                queue => $queue,
                no_ack => 0,
                on_success => sub {
                    my $frame = shift;
                    $consumer_tag = $frame->method_frame->consumer_tag;
                    print "************* consuming from $queue with $consumer_tag\n";
                },
                on_consume => sub {
                    my $res = shift;
                    my $body = $res->{body}->payload;
                    print "+++++++++++++ consumed($queue): $body\n";
                    $channel->ack(
                        delivery_tag => $res->{deliver}->method_frame->delivery_tag
                    );
                },
                on_failure => sub {
                    print "************* failed to consume($queue)\n";
                }
            );
        }

        # randomly generates routing key and message body
        sub publish {
            my ($channel, $msg) = @_;

README  view on Meta::CPAN

            ...
        );

    Optional name of exchange to declare with its default configuration
    options.

    See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for
    details.

   exchanges
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            exchanges => [
                'name_of_exchange' => {
                    durable => 1,
                    type => 'fanout',
                    ... # other exchange configuration parameters
                },
                ...
            ],
            ...
        );

    Optional list of exchanges to declare with their configuration options.

    See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for
    details.

   queue
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            queue => 'name_of_queue',
            ...
        );

    Optional name of queue to declare with its default configuration
    options.

    If no queues were declared or empty name has been specified a unique
    generated queue name will be available:

        my $gen_queue = $rmq->gen_queue;

    See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.

   queues
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            queues => [
                'name_of_queue' => {
                    durable => 1,
                    no_ack => 0,
                    ... # other queue configuration parameters
                },
                ...
            ],
            ...
        );

    Optional list of queues to declare with their configuration options.

    See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.

   bind_exchanges
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            bind_exchanges => [
                # without routing key
                { 'destination1' => 'source' },

                # with routing key
                { 'destination2'  => [ 'source', 'routing_key' ]  },
                ...
            ],
            ...
        );

    Optional list of exchange-to-exchange bindings.

    See "bind_exchange" in AnyEvent::RabbitMQ::Channel for details.

   bind_queues
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            bind_queues => [
                # without routing key
                { 'queue1' => 'exchange' },

                # with routing key
                { 'queue2'  => [ 'exchange', 'routing_key' ]  },
                ...
            ],
            ...
        );

    Optional list of queue-to-exchange bindings.

    See "bind_queue" in AnyEvent::RabbitMQ::Channel for details.

   failure_cb
        my $rmq = AnyEvent::RabbitMQ::Simple->new(
            failure_cb => sub {
                my ($event, $details, $why) = @_;
                if ( ref $why ) {
                    my $method_frame = $why->method_frame;
                    $why = $method_frame->reply_text;
                }
                $loop->croak("[ERROR] $event($details): $why" );
            },
            ...
        );

    Required catch-all error handling callback. The value of $event is one
    of:

    ConnectOnFailure
    ConnectOnReadFailure
    ConnectOnReturn
    ConnectOnClose
    OpenChannelOnFailure
    OpenChannelOnReturn
    OpenChannelOnClose
    DeclareExchangeOnFailure
        Value of $details has following format: "name:$name_of_exchange".

    BindExchangeOnFailure
        Value of $details has following format:
        "source:$name_of_source_exchange,
        destination:$name_of_destination_exchange".

    DeclareQueueOnFailure
        Value of $details has following format: "name:$name_of_queue".

    BindQueueOnFailure
        Value of $details has following format: "queue:$name_of_queue,
        exchange:$name_of_exchange".

    ConfirmChannelOnFailure
    QosChannelOnFailure

  connect
        my $conn = $rmq->connect();
        $conn->cb(
            sub {
                my $channel = shift->recv or $loop->croak("Could not open channel");

                ...
            }
        );

    Returns the AnyEvent condvar that returns AnyEvent::RabbitMQ::Channel
    object after all the configuration steps were successful.

  disconnect
        $rmq->disconnect();

    Disconnects from RabbitMQ server.



( run in 0.698 second using v1.01-cache-2.11-cpan-2398b32b56e )