AnyEvent-MPRPC
view release on metacpan or search on metacpan
lib/AnyEvent/MPRPC/Client.pm view on Meta::CPAN
package AnyEvent::MPRPC::Client;
use strict;
use warnings;
use Any::Moose;
use Carp;
use Scalar::Util 'weaken';
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use AnyEvent::MessagePack;
use AnyEvent::MPRPC::Constant;
has host => (
is => 'ro',
isa => 'Str',
lib/AnyEvent/MPRPC/Client.pm view on Meta::CPAN
$self->on_connect
and $connect_timeout = $self->on_connect->($self, @_);
# For backward compatibility, if connect_timeout option isn't specifed
# use return value of on_connect callback as connect timeout seconds.
$self->connect_timeout
and $connect_timeout = $self->connect_timeout;
return $connect_timeout;
};
weaken $self;
$self->_connection_guard($guard);
}
sub call {
my ($self, $method) = (shift, shift);
my $param = (@_ == 1 && ref $_[0] eq "ARRAY") ? $_[0] : [@_];
my $msgid = $self->_next_id->();
lib/AnyEvent/MPRPC/Client.pm view on Meta::CPAN
push @{ $self->_request_pool }, $request;
}
# $msgid is stringified, but $request->{MP_RES_MSGID] is still IV
$self->_callbacks->{ $msgid } = AnyEvent->condvar;
}
sub _handle_response_cb {
my $self = shift;
weaken $self;
return sub {
$self || return;
my ($handle, $res) = @_;
my $d = delete $self->_callbacks->{ $res->[MP_RES_MSGID] };
if (my $error = $res->[MP_RES_ERROR]) {
if ($d) {
lib/AnyEvent/MPRPC/Server.pm view on Meta::CPAN
package AnyEvent::MPRPC::Server;
use strict;
use warnings;
use Any::Moose;
use Carp;
use Scalar::Util 'weaken';
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::MPRPC::CondVar;
use AnyEvent::MessagePack;
use AnyEvent::MPRPC::Constant;
has address => (
is => 'ro',
lib/AnyEvent/MPRPC/Server.pm view on Meta::CPAN
$h->destroy;
},
%{ $self->handler_options },
fh => $fh,
);
$handle->unshift_read(msgpack => $self->_dispatch_cb($indicator));
$self->_handlers->[ fileno($fh) ] = $handle;
}) unless defined $self->server;
weaken $self;
$self;
}
sub reg_cb {
my ($self, %callbacks) = @_;
while (my ($method, $callback) = each %callbacks) {
$self->_callbacks->{ $method } = $callback;
}
}
sub _dispatch_cb {
my ($self, $indicator) = @_;
weaken $self;
return sub {
$self || return;
my ($handle, $request) = @_;
$self->on_dispatch->($indicator, $handle, $request);
return if $handle->destroyed;
$handle->unshift_read(msgpack => $self->_dispatch_cb($indicator));
lib/AnyEvent/MPRPC/Server.pm view on Meta::CPAN
my $type = shift;
my $result = @_ > 1 ? \@_ : $_[0];
$handle->push_write( msgpack => [
MP_TYPE_RESPONSE,
int($id), # should be IV.
$type eq 'error' ? $result : undef,
$type eq 'result' ? $result : undef,
]) if $handle;
};
weaken $handle;
my $cv = AnyEvent::MPRPC::CondVar->new;
$cv->_cb(
sub { $res_cb->( result => $_[0]->recv ) },
sub { $res_cb->( error => $_[0]->recv ) },
);
$target ||= sub { shift->error(qq/No such method "@{[ $request->[MP_REQ_METHOD] ]}" found/) };
$target->( $cv, $request->[MP_REQ_PARAMS] );
};
( run in 0.591 second using v1.01-cache-2.11-cpan-65fba6d93b7 )