AnyEvent-JSONRPC
view release on metacpan or search on metacpan
lib/AnyEvent/JSONRPC/TCP/Client.pm view on Meta::CPAN
package AnyEvent::JSONRPC::TCP::Client;
use Any::Moose;
use Any::Moose '::Util::TypeConstraints';
extends 'AnyEvent::JSONRPC::Client';
use Carp;
use Scalar::Util 'weaken';
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use JSON::RPC::Common::Procedure::Call;
use JSON::RPC::Common::Procedure::Return;
has host => (
is => 'ro',
isa => 'Str',
required => 1,
);
has port => (
is => 'ro',
isa => 'Int|Str',
required => 1,
);
has handler => (
is => 'rw',
isa => 'AnyEvent::Handle',
);
has on_error => (
is => 'rw',
isa => 'CodeRef',
lazy => 1,
default => sub {
return sub {
my ($handle, $fatal, $message) = @_;
croak sprintf "Client got error: %s", $message;
};
},
);
has version => (
is => 'rw',
isa => enum( [qw( 1.0 1.1 2.0 )] ),
default => "2.0",
);
has handler_options => (
is => 'ro',
isa => 'HashRef',
default => sub { {} },
);
has _request_pool => (
is => 'ro',
isa => 'ArrayRef',
lazy => 1,
default => sub { [] },
);
has _next_id => (
is => 'ro',
isa => 'CodeRef',
lazy => 1,
default => sub {
my $id = 0;
sub { ++$id };
},
);
has _callbacks => (
is => 'ro',
isa => 'HashRef',
lazy => 1,
default => sub { {} },
);
has _connection_guard => (
is => 'rw',
isa => 'Object',
);
no Any::Moose;
sub BUILD {
my $self = shift;
my $guard = tcp_connect $self->host, $self->port, sub {
my ($fh) = @_
or return
$self->on_error->(
undef, 1,
"Failed to connect $self->{host}:$self->{port}: $!",
);
my $handle = AnyEvent::Handle->new(
on_error => sub {
my ($h, $fatal, $msg) = @_;
$self->on_error->(@_);
$h->destroy;
},
%{ $self->handler_options },
fh => $fh,
);
$handle->on_read(sub {
shift->unshift_read(json => sub {
$self->_handle_response( $_[1] );
});
});
while (my $pooled = shift @{ $self->_request_pool }) {
$handle->push_write( json => $pooled->deflate );
}
$self->handler( $handle );
};
weaken $self;
$self->_connection_guard($guard);
}
sub call {
my ($self, $method, @params) = @_;
my $request = JSON::RPC::Common::Procedure::Call->inflate (
version => $self->version,
id => $self->_next_id->(),
method => $method,
params => $self->_params( @params ),
);
if ($self->handler) {
my $json = $request->deflate;
$self->handler->push_write( json => $json );
}
else {
push @{ $self->_request_pool }, $request;
}
$self->_callbacks->{ $request->id } = AnyEvent->condvar;
}
sub _handle_response {
my ($self, $json) = @_;
my $response = JSON::RPC::Common::Procedure::Return->inflate( $json );
my $d = delete $self->_callbacks->{ $response->id };
unless ($d) {
warn q/Invalid response from server/;
return;
}
if (my $error = $response->error) {
$d->croak($error);
}
else {
$d->send($response->result);
}
}
sub notify {
my ($self, $method, @params) = @_;
my $request = JSON::RPC::Common::Call->inflate (
version => $self->version,
method => $method,
params => $self->_params( @params ),
);
if ($self->handler) {
$self->handler->push_write( json => $request->deflate );
}
else {
push @{ $self->_request_pool }, $request;
}
}
( run in 1.726 second using v1.01-cache-2.11-cpan-39bf76dae61 )