AnyEvent-RabbitMQ-Simple

 view release on metacpan or  search on metacpan

example/synopsis.pl  view on Meta::CPAN

    #   |  |                                        (mail.error.#)
    #   |  |
    #   |   \-----------> info --------------->   info-logs
    #   |                   (topic/*.info.#)        (*.info.#)
    #   |
    #    \------------------------------------>   debug-queue


    # declare exchanges
    exchanges => [
        'logger' => {
            durable => 0,
            type => 'fanout',
            internal => 0,
            auto_delete => 1,
        },
        'stats' => {
            durable => 0,
            type => 'direct',
            internal => 0,
            auto_delete => 1,
        },
        'errors' => {
            durable => 0,
            type => 'topic',
            internal => 0,
            auto_delete => 1,
        },
        'info' => {
            durable => 0,
            type => 'topic',
            internal => 0,
            auto_delete => 1,
        },
    ],

    # declare queues
    queues => [
        'debug-queue' => {
            durable => 0,
            auto_delete => 1,
        },
        'stats-logs' => {
            durable => 0,
            auto_delete => 1,
        },
        'ftp-error-logs' => {
            durable => 0,
            auto_delete => 1,
        },
        'mail-error-logs' => {
            durable => 0,
            auto_delete => 1,
        },
        'info-logs' => {
            durable => 0,
            auto_delete => 1,
        },
    ],

    # exchange to exchange bindings, with optional routing key
    bind_exchanges => [
        { 'stats'   =>   'logger'                 },
        { 'errors'  => [ 'logger', '*.error.#' ]  },
        { 'info'    => [ 'logger', '*.info.#'  ]  },
    ],


    # queue to exchange bindings, with optional routing key
    bind_queues => [
        { 'debug-queue'     =>   'logger'                   },
        { 'ftp-error-logs'  => [ 'errors', 'ftp.error.#'  ] },
        { 'mail-error-logs' => [ 'errors', 'mail.error.#' ] },
        { 'info-logs'       => [ 'info',   'info.#'       ] },
        { 'stats-logs'      => [ 'stats',  'mail.stats'   ] },
    ],

);

# publisher timer
my $t;

# connect and set up channel
my $conn = $rmq->connect();
$conn->cb(
    sub {
        print "waiting for channel..\n";
        my $channel = shift->recv or $loop->croak("Could not open channel");

        print "************* consuming\n";
        for my $q ( qw( debug-queue ftp-error-logs mail-error-logs info-logs stats-logs ) ) {
            consume($channel, $q);
        }

        print "************* starting publishing\n";
        $t = AE::timer 0, 1.0, sub { publish($channel, "message prepared at ". scalar(localtime) ) };
    }
);

# consumes from requested queue
sub consume {
    my ($channel, $queue) = @_;

    my $consumer_tag;

    $channel->consume(
        queue => $queue,
        no_ack => 0,
        on_success => sub {
            my $frame = shift;
            $consumer_tag = $frame->method_frame->consumer_tag;
            print "************* consuming from $queue with $consumer_tag\n";
        },
        on_consume => sub {
            my $res = shift;
            my $body = $res->{body}->payload;
            print "+++++++++++++ consumed($queue): $body\n";
            $channel->ack(
                delivery_tag => $res->{deliver}->method_frame->delivery_tag
            );
        },
        on_failure => sub {
            print "************* failed to consume($queue)\n";
        }
    );
}

# randomly generates routing key and message body
sub publish {
    my ($channel, $msg) = @_;



( run in 1.520 second using v1.01-cache-2.11-cpan-2398b32b56e )