AnyEvent-RabbitMQ-RPC
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/RPC.pm view on Meta::CPAN
$self->{unserialize} = \&YAML::Any::Load;
} elsif ($args{serialize} eq "JSON") {
require JSON::Any;
JSON::Any->import;
my $json = JSON::Any->new;
$self->{serialize} = sub { $json->objToJson( [@_] ) };
$self->{unserialize} = sub { (@{ $json->jsonToObj(@_) })[0] };
} elsif ($args{serialize} eq "Storable") {
require Storable;
$self->{serialize} = sub { Storable::nfreeze( [@_] )};
$self->{unserialize} = sub { (@{ Storable::thaw(@_) })[0] };
}
# If they have a callback waiting for them, bail now
return if $args{on_success};
# Otherwise, block on having set up the channel
return $cv->recv;
}
sub connection {
my $self = shift;
return $self->{connection};
}
sub channel {
my $self = shift;
return $self->{channel};
}
sub rpc_queue {
my $self = shift;
my %args = @_;
# These queues are durable -- as such, we should only need to check
# that they are there once per process.
return $args{on_success}->()
if $self->{queues}{$args{queue}};
$self->channel->declare_queue(
no_ack => 0,
durable => 1,
exclusive => 0,
%args,
on_success => sub {
$self->{queues}{$args{queue}}++;
$args{on_success}->();
},
);
}
sub reply_queue {
my $self = shift;
my %args = @_;
$self->channel->declare_queue(
no_ack => 1,
durable => 0,
exclusive => 1,
on_success => sub {
$args{on_success}->(shift->method_frame->queue);
},
on_failure => $args{on_failure},
);
}
sub register {
my $self = shift;
my %args = (
name => undef,
run => sub {},
on_failure => sub { warn "Failure: @_" },
@_
);
# Ensure we have the queue
$self->rpc_queue(
queue => $args{name},
on_success => sub {
# And set up a listen on it
$self->channel->consume(
queue => $args{name},
no_ack => 0,
on_consume => sub {
my $frame = shift;
my $failed;
my $args = $frame->{body}->payload;
if ($self->{unserialize}) {
try {
$args = $self->{unserialize}->($args);
} catch {
$failed = 1;
$args{on_failure}->("Unserialization failed: $_");
};
return if $failed;
}
# Call the sub
my $return;
try {
$return = $args{run}->( $args );
} catch {
$failed = 1;
$args{on_failure}->("Call died: $_");
};
return if $failed;
# Send the response, if they asked for it
if (my $reply_to = $frame->{header}->reply_to) {
if ($self->{serialize}) {
try {
$return = $self->{serialize}->($return);
} catch {
$failed = 1;
$args{on_failure}->("Serialization failed: $_");
};
return if $failed;
}
$return = "0E0" if not $return;
$self->channel->publish(
exchange => '',
routing_key => $reply_to,
body => $return,
);
}
# And finally mark the task as complete
$self->channel->ack;
},
on_failure => $args{on_failure},
);
},
on_failure => $args{on_failure},
);
}
sub call {
my $self = shift;
my %args = (
name => undef,
args => undef,
on_sent => undef,
on_failure => sub { warn "RPC Failure: @_" },
@_
);
my $finished;
if (defined wantarray and not $args{on_reply}) {
# We we're called in a not-void context, and without a reply
# callback, assume this is a syncronous call, and set up
# $finished to block on the reply
$args{on_reply} = $finished = AE::cv;
my $fail = $args{on_failure};
$args{on_failure} = sub {
$fail->(@_) if $fail;
$finished->send(undef);
}
}
my $sent_failure = $args{on_sent} ? sub {
$args{on_sent}->(0);
$args{on_failure}->(@_);
} : $args{on_failure};
my $send; $send = sub {
my $REPLIES = shift;
my $args = $args{args};
if ($self->{serialize}) {
my $failed;
try {
$args = $self->{serialize}->($args);
} catch {
$failed = 1;
$args{on_failure}->("Serialization failed: $_");
};
return if $failed;
}
$args = "0E0" if not $args;
$self->channel->publish(
exchange => '',
routing_key => $args{name},
body => $args,
header => {
($REPLIES ? (reply_to => $REPLIES) : ()),
delivery_mode => 2, # Persistent storage
},
);
$args{on_sent}->(1) if $args{on_sent};
};
unless ($args{on_reply}) {
# Fire and forget
$self->rpc_queue(
queue => $args{name},
on_success => sub { $send->(undef) },
on_failure => $sent_failure,
);
return;
}
# We need to set up an ephemeral reply queue
$self->rpc_queue(
queue => $args{name},
on_success => sub {
$self->reply_queue(
on_success => sub {
my $REPLIES = shift;
$self->channel->consume(
queue => $REPLIES,
no_ack => 1,
on_consume => sub {
my $frame = shift;
# We got a reply, tear down our reply queue
$self->channel->delete_queue(
queue => $REPLIES,
);
my $return = $frame->{body}->payload;
if ($self->{unserialize}) {
my $failed;
try {
$return = $self->{unserialize}->($return);
} catch {
$args{on_failure}->("Unserialization failed: $_");
$failed = 1;
};
return if $failed;
}
$args{on_reply}->($return);
},
on_success => sub { $send->($REPLIES) },
on_failure => $sent_failure,
);
},
on_failure => $sent_failure,
);
},
on_failure => $sent_failure,
);
return $finished->recv if $finished;
return 1;
}
1;
__END__
=head1 NAME
AnyEvent::RabbitMQ::RPC - RPC queues via RabbitMQ
=head1 SYNOPSIS
use AnyEvent::RabbitMQ::RPC;
my $rpc = AnyEvent::RabbitMQ::RPC->new(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
serialize => 'Storable',
);
print $rpc->call(
name => 'MethodName',
args => { some => "data" },
);
=head1 DESCRIPTION
C<AnyEvent::RabbitMQ::RPC> provides an AnyEvent-based reliable job queue
atop the RabbitMQ event server. This can be used as a replacement for
similar reliable job queue/RPC client-worker models, such as
L<TheSchwartz>.
RPC classes can L<register> calls that they can handle, and/or use
( run in 1.558 second using v1.01-cache-2.11-cpan-df04353d9ac )