AnyEvent-RabbitMQ-Simple
view release on metacpan or search on metacpan
example/simplest.pl view on Meta::CPAN
#!/usr/bin/env perl
use strict;
use warnings;
use AnyEvent::RabbitMQ::Simple;
# create main loop
my $loop = AE::cv;
my $rmq = AnyEvent::RabbitMQ::Simple->new(
failure_cb => sub {
my ($event, $details, $why) = @_;
if ( ref $why ) {
my $method_frame = $why->method_frame;
$why = $method_frame->reply_text;
}
$loop->croak("[ERROR] $event($details): $why" );
},
);
# publisher timer
my $t;
# connect and set up channel
my $conn = $rmq->connect();
$conn->cb(
sub {
print "waiting for channel..\n";
my $channel = shift->recv or $loop->croak("Could not open channel");
# generated queue name
my $queue_name = $rmq->gen_queue;
print "************* consuming\n";
consume($channel, $queue_name);
print "************* starting publishing\n";
$t = AE::timer 0, 1.0, sub { publish($channel, $queue_name, "message prepared at ". scalar(localtime) ) };
}
);
# consumes from requested queue
sub consume {
my ($channel, $queue) = @_;
my $consumer_tag;
$channel->consume(
queue => $queue,
on_success => sub {
my $frame = shift;
$consumer_tag = $frame->method_frame->consumer_tag;
print "************* consuming from $queue with $consumer_tag\n";
},
on_consume => sub {
my $res = shift;
my $body = $res->{body}->payload;
print "+++++++++++++ consumed($queue): $body\n";
},
on_failure => sub {
print "************* failed to consume($queue)\n";
}
);
}
# randomly generates routing key and message body
sub publish {
my ($channel, $routing_key, $msg) = @_;
unless ( $channel->is_open ) {
warn "Cannot publish, channel closed";
return;
}
$msg = sprintf("[%s] %s", uc($routing_key), $msg);
print "\n------- publishing: $msg\n";
$channel->publish(
routing_key => $routing_key,
exchange => '',
body => $msg,
);
}
# wait forever or die on error
my $done = $loop->recv;
( run in 1.445 second using v1.01-cache-2.11-cpan-02777c243ea )