AnyMQ-AMQP
view release on metacpan or search on metacpan
lib/AnyMQ/Trait/AMQP.pm view on Meta::CPAN
sub connect {
my $self = shift;
my $cv = shift;
my $rf = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
$self->_rf($rf);
# XXX: wrapped object with monadic method modifier
# my $channel = run_monad { $rf->connect(....)->open_channel()->return }
# my $queue = run_monad { $channel->declare_queue(....)->return }->method_frame->queue;
# run_monad { $channel->consume( ....) }
my $init = sub {
my $channel = shift;
$channel->declare_queue(
exclusive => 1,
on_success => sub {
my $method = shift;
my $queue = $method->method_frame->queue;
$self->_rf_queue($queue);
$channel->consume(queue => $queue,
no_ack => 1,
on_success => sub {
$cv->send('init');
},
on_consume => $self->on_consume,
on_failure => $cv,
);
},
lib/AnyMQ/Trait/AMQP.pm view on Meta::CPAN
# XXX: try to reconnect and reinstantiate all topics
warn "==> connection closed";
},
on_failure => $cv,
);
}
sub on_consume {
my $self = shift;
sub {
my $frame = shift;
my $payload = $frame->{body}->payload;
my $reply_to = $frame->{header}->reply_to;
return if $reply_to && $reply_to eq $self->_rf_queue;
my $topic = $frame->{deliver}->method_frame->routing_key;
try { $self->topics->{$topic}->AnyMQ::Topic::publish(JSON::from_json($payload)) }
catch { croak "failed to republsih on $topic: $_" };
};
}
sub new_topic {
my ($self, $opt) = @_;
$opt = { name => $opt } unless ref $opt;
AnyMQ::Topic->new_with_traits(
traits => ['AMQP'],
( run in 1.070 second using v1.01-cache-2.11-cpan-df04353d9ac )