AnyEvent-JSONRPC
view release on metacpan or search on metacpan
lib/AnyEvent/JSONRPC/TCP/Server.pm view on Meta::CPAN
package AnyEvent::JSONRPC::TCP::Server;
use Moose;
extends 'AnyEvent::JSONRPC::Server';
use Carp;
use Scalar::Util 'weaken';
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::JSONRPC::InternalHandle;
use AnyEvent::JSONRPC::CondVar;
use JSON::RPC::Common::Procedure::Call;
has address => (
is => 'ro',
isa => 'Maybe[Str]',
default => undef,
);
has port => (
is => 'ro',
isa => 'Int|Str',
default => 4423,
);
has on_error => (
is => 'rw',
isa => 'CodeRef',
lazy => 1,
default => sub {
return sub {
my ($handle, $fatal, $message) = @_;
carp sprintf "Server got error: %s", $message;
};
},
);
has on_eof => (
is => 'rw',
isa => 'CodeRef',
lazy => 1,
default => sub {
return sub { };
},
);
has handler_options => (
is => 'ro',
isa => 'HashRef',
default => sub { {} },
);
has _handlers => (
is => 'ro',
isa => 'ArrayRef',
default => sub { [] },
);
has methods => (
isa => 'HashRef[CodeRef]',
lazy => 1,
traits => ['Hash'],
handles => {
reg_cb => 'set',
method => 'get',
},
default => sub { {} },
);
no Moose;
sub BUILD {
my $self = shift;
tcp_server $self->address, $self->port, sub {
my ($fh, $host, $port) = @_;
my $indicator = "$host:$port";
my $handle = AnyEvent::Handle->new(
on_error => sub {
my ($h, $fatal, $msg) = @_;
$self->on_error->(@_);
$h->destroy;
},
on_eof => sub {
my ($h) = @_;
# client disconnected
$self->on_eof->(@_);
$h->destroy;
},
json => $self->json,
%{ $self->handler_options },
fh => $fh,
);
$handle->on_read(sub {
shift->unshift_read( json => sub {
$self->_dispatch($indicator, @_);
}),
});
$self->_handlers->[ fileno($fh) ] = $handle;
};
weaken $self;
$self;
}
sub _dispatch {
my ($self, $indicator, $handle, $request) = @_;
return $self->_batch($handle, @$request) if ref $request eq "ARRAY";
return unless $request and ref $request eq "HASH";
my $call = JSON::RPC::Common::Procedure::Call->inflate($request);
my $target = $self->method( $call->method );
my $cv = AnyEvent::JSONRPC::CondVar->new( call => $call );
$cv->cb( sub {
my $response = $cv->recv;
$handle->push_write( json => $response->deflate ) if not $cv->is_notification;
});
$target ||= sub { shift->error(qq/No such method "$request->{method}" found/) };
$target->( $cv, $call->params_list );
}
sub _batch {
my ($self, $handle, @request) = @_;
my @response;
for my $request (@request) {
my $internal = AnyEvent::JSONRPC::InternalHandle->new;
$self->_dispatch(undef, $internal, $request);
push @response, $internal;
}
$handle->push_write( json => [ map { $_->recv } @response ] );
}
__PACKAGE__->meta->make_immutable;
__END__
=for stopwords JSONRPC TCP TCP-based unix Str
=head1 NAME
AnyEvent::JSONRPC::TCP::Server - Simple TCP-based JSONRPC server
=head1 SYNOPSIS
use AnyEvent::JSONRPC::TCP::Server;
my $server = AnyEvent::JSONRPC::TCP::Server->new( port => 4423 );
$server->reg_cb(
echo => sub {
my ($res_cv, @params) = @_;
$res_cv->result(@params);
},
sum => sub {
( run in 1.262 second using v1.01-cache-2.11-cpan-39bf76dae61 )