AnyEvent-RabbitMQ-PubSub
view release on metacpan or search on metacpan
xt/republish.t view on Meta::CPAN
#!/usr/bin/env perl
use strict;
use warnings;
use FindBin qw($Bin);
use lib "$Bin/../lib";
use Data::Dumper;
use AnyEvent::RabbitMQ::PubSub;
use AnyEvent::RabbitMQ::PubSub::Publisher;
use AnyEvent::RabbitMQ::PubSub::Consumer;
use Test::More tests => 1;
my $rmq_connect_opts = {
host => $ENV{RMQ_HOST} // 'localhost',
port => 5672,
user => $ENV{RMQ_USER} // 'guest',
pass => $ENV{RMQ_PASS} // 'guest',
vhost => $ENV{RMQ_VHOST} // 'test',
};
my $exchange = {
exchange => 'rabbitmq_pubsub_test',
type => 'topic',
durable => 0,
auto_delete => 1,
};
my $queue = {
queue => 'rabbitmq_pubsub_test_queue',
auto_delete => 1,
};
my $routing_key = 'rk';
my ($ar, $channel) = AnyEvent::RabbitMQ::PubSub::connect(%$rmq_connect_opts);
my $cv = AnyEvent->condvar;
my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new(
channel => $channel,
exchange => $exchange,
queue => $queue,
routing_key => $routing_key,
);
$consumer->init();
my $publisher = AnyEvent::RabbitMQ::PubSub::Publisher->new(
channel => $channel,
exchange => $exchange,
routing_key => $routing_key,
);
$publisher->init();
my @consumed = ();
my @messages = ('hello world', 'republish', 'hello again');
$consumer->consume(
$cv,
sub {
my ($self, $msg) = @_;
push @consumed, $msg->{body}->payload;
if ($msg->{body}->payload eq 'republish') {
if ($msg->{header}{headers}{trials}) {
$consumer->ack($msg);
$cv->send();
( run in 0.814 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )