AnyEvent-RabbitMQ-Simple

 view release on metacpan or  search on metacpan

example/synopsis.pl  view on Meta::CPAN

            internal => 0,
            auto_delete => 1,
        },
        'errors' => {
            durable => 0,
            type => 'topic',
            internal => 0,
            auto_delete => 1,
        },
        'info' => {
            durable => 0,
            type => 'topic',
            internal => 0,
            auto_delete => 1,
        },
    ],

    # declare queues
    queues => [
        'debug-queue' => {
            durable => 0,
            auto_delete => 1,
        },
        'stats-logs' => {
            durable => 0,
            auto_delete => 1,
        },
        'ftp-error-logs' => {
            durable => 0,
            auto_delete => 1,
        },
        'mail-error-logs' => {
            durable => 0,
            auto_delete => 1,
        },
        'info-logs' => {
            durable => 0,
            auto_delete => 1,
        },
    ],

    # exchange to exchange bindings, with optional routing key
    bind_exchanges => [
        { 'stats'   =>   'logger'                 },
        { 'errors'  => [ 'logger', '*.error.#' ]  },
        { 'info'    => [ 'logger', '*.info.#'  ]  },
    ],


    # queue to exchange bindings, with optional routing key
    bind_queues => [
        { 'debug-queue'     =>   'logger'                   },
        { 'ftp-error-logs'  => [ 'errors', 'ftp.error.#'  ] },
        { 'mail-error-logs' => [ 'errors', 'mail.error.#' ] },
        { 'info-logs'       => [ 'info',   'info.#'       ] },
        { 'stats-logs'      => [ 'stats',  'mail.stats'   ] },
    ],

);

# publisher timer
my $t;

# connect and set up channel
my $conn = $rmq->connect();
$conn->cb(
    sub {
        print "waiting for channel..\n";
        my $channel = shift->recv or $loop->croak("Could not open channel");

        print "************* consuming\n";
        for my $q ( qw( debug-queue ftp-error-logs mail-error-logs info-logs stats-logs ) ) {
            consume($channel, $q);
        }

        print "************* starting publishing\n";
        $t = AE::timer 0, 1.0, sub { publish($channel, "message prepared at ". scalar(localtime) ) };
    }
);

# consumes from requested queue
sub consume {
    my ($channel, $queue) = @_;

    my $consumer_tag;

    $channel->consume(
        queue => $queue,
        no_ack => 0,
        on_success => sub {
            my $frame = shift;
            $consumer_tag = $frame->method_frame->consumer_tag;
            print "************* consuming from $queue with $consumer_tag\n";
        },
        on_consume => sub {
            my $res = shift;
            my $body = $res->{body}->payload;
            print "+++++++++++++ consumed($queue): $body\n";
            $channel->ack(
                delivery_tag => $res->{deliver}->method_frame->delivery_tag
            );
        },
        on_failure => sub {
            print "************* failed to consume($queue)\n";
        }
    );
}

# randomly generates routing key and message body
sub publish {
    my ($channel, $msg) = @_;

    unless ( $channel->is_open ) {
        warn "Cannot publish, channel closed";
        return;
    }

    my @system = qw( mail ftp web );
    my @levels = qw( debug info error stats );

    my $routing_key = $system[rand @system] .'.'. $levels[ rand @levels ];

    $msg = sprintf("[%s] %s", uc($routing_key), $msg);
    print "\n------- publishing: $msg\n";
    $channel->publish(
        routing_key => $routing_key,
        exchange => 'logger',
        body => $msg,
        on_ack => sub {
            print "------- published: $msg\n";
        },
        on_return => sub {
            print "************* failed to publish: $msg\n";
        }
    );
}



( run in 1.630 second using v1.01-cache-2.11-cpan-02777c243ea )