AnyEvent-RabbitMQ-Simple

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Simple.pm  view on Meta::CPAN

                internal => 0,
                auto_delete => 1,
            },
            'errors' => {
                durable => 0,
                type => 'topic',
                internal => 0,
                auto_delete => 1,
            },
            'info' => {
                durable => 0,
                type => 'topic',
                internal => 0,
                auto_delete => 1,
            },
        ],

        # declare queues
        queues => [
            'debug-queue' => {
                durable => 0,
                auto_delete => 1,
            },
            'stats-logs' => {
                durable => 0,
                auto_delete => 1,
            },
            'ftp-error-logs' => {
                durable => 0,
                auto_delete => 1,
            },
            'mail-error-logs' => {
                durable => 0,
                auto_delete => 1,
            },
            'info-logs' => {
                durable => 0,
                auto_delete => 1,
            },
        ],

        # exchange to exchange bindings, with optional routing key
        bind_exchanges => [
            { 'stats'   =>   'logger'                 },
            { 'errors'  => [ 'logger', '*.error.#' ]  },
            { 'info'    => [ 'logger', '*.info.#'  ]  },
        ],


        # queue to exchange bindings, with optional routing key
        bind_queues => [
            { 'debug-queue'     =>   'logger'                   },
            { 'ftp-error-logs'  => [ 'errors', 'ftp.error.#'  ] },
            { 'mail-error-logs' => [ 'errors', 'mail.error.#' ] },
            { 'info-logs'       => [ 'info',   'info.#'       ] },
            { 'stats-logs'      => [ 'stats',  'mail.stats'   ] },
        ],

    );

    # publisher timer
    my $t;

    # connect and set up channel
    my $conn = $rmq->connect();
    $conn->cb(
        sub {
            print "waiting for channel..\n";
            my $channel = shift->recv or $loop->croak("Could not open channel");

            print "************* consuming\n";
            for my $q ( qw( debug-queue ftp-error-logs mail-error-logs info-logs stats-logs ) ) {
                consume($channel, $q);
            }

            print "************* starting publishing\n";
            $t = AE::timer 0, 1.0, sub { publish($channel, "message prepared at ". scalar(localtime) ) };
        }
    );

    # consumes from requested queue
    sub consume {
        my ($channel, $queue) = @_;

        my $consumer_tag;

        $channel->consume(
            queue => $queue,
            no_ack => 0,
            on_success => sub {
                my $frame = shift;
                $consumer_tag = $frame->method_frame->consumer_tag;
                print "************* consuming from $queue with $consumer_tag\n";
            },
            on_consume => sub {
                my $res = shift;
                my $body = $res->{body}->payload;
                print "+++++++++++++ consumed($queue): $body\n";
                $channel->ack(
                    delivery_tag => $res->{deliver}->method_frame->delivery_tag
                );
            },
            on_failure => sub {
                print "************* failed to consume($queue)\n";
            }
        );
    }

    # randomly generates routing key and message body
    sub publish {
        my ($channel, $msg) = @_;

        unless ( $channel->is_open ) {
            warn "Cannot publish, channel closed";
            return;
        }

        my @system = qw( mail ftp web );
        my @levels = qw( debug info error stats );

        my $routing_key = $system[rand @system] .'.'. $levels[ rand @levels ];

        $msg = sprintf("[%s] %s", uc($routing_key), $msg);
        print "\n------- publishing: $msg\n";
        $channel->publish(
            routing_key => $routing_key,
            exchange => 'logger',
            body => $msg,
            on_ack => sub {
                print "------- published: $msg\n";
            },
            on_return => sub {
                print "************* failed to publish: $msg\n";
            }
        );
    }



( run in 0.541 second using v1.01-cache-2.11-cpan-02777c243ea )