AnyEvent-RabbitMQ-RPC
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/RPC.pm view on Meta::CPAN
sub reply_queue {
my $self = shift;
my %args = @_;
$self->channel->declare_queue(
no_ack => 1,
durable => 0,
exclusive => 1,
on_success => sub {
$args{on_success}->(shift->method_frame->queue);
},
on_failure => $args{on_failure},
);
}
sub register {
my $self = shift;
my %args = (
name => undef,
run => sub {},
lib/AnyEvent/RabbitMQ/RPC.pm view on Meta::CPAN
# Ensure we have the queue
$self->rpc_queue(
queue => $args{name},
on_success => sub {
# And set up a listen on it
$self->channel->consume(
queue => $args{name},
no_ack => 0,
on_consume => sub {
my $frame = shift;
my $failed;
my $args = $frame->{body}->payload;
if ($self->{unserialize}) {
try {
$args = $self->{unserialize}->($args);
} catch {
$failed = 1;
$args{on_failure}->("Unserialization failed: $_");
};
return if $failed;
}
lib/AnyEvent/RabbitMQ/RPC.pm view on Meta::CPAN
my $return;
try {
$return = $args{run}->( $args );
} catch {
$failed = 1;
$args{on_failure}->("Call died: $_");
};
return if $failed;
# Send the response, if they asked for it
if (my $reply_to = $frame->{header}->reply_to) {
if ($self->{serialize}) {
try {
$return = $self->{serialize}->($return);
} catch {
$failed = 1;
$args{on_failure}->("Serialization failed: $_");
};
return if $failed;
}
lib/AnyEvent/RabbitMQ/RPC.pm view on Meta::CPAN
$self->rpc_queue(
queue => $args{name},
on_success => sub {
$self->reply_queue(
on_success => sub {
my $REPLIES = shift;
$self->channel->consume(
queue => $REPLIES,
no_ack => 1,
on_consume => sub {
my $frame = shift;
# We got a reply, tear down our reply queue
$self->channel->delete_queue(
queue => $REPLIES,
);
my $return = $frame->{body}->payload;
if ($self->{unserialize}) {
my $failed;
try {
$return = $self->{unserialize}->($return);
} catch {
$args{on_failure}->("Unserialization failed: $_");
$failed = 1;
};
return if $failed;
}
( run in 0.608 second using v1.01-cache-2.11-cpan-df04353d9ac )