AnyEvent-JSONRPC

 view release on metacpan or  search on metacpan

lib/AnyEvent/JSONRPC/TCP/Server.pm  view on Meta::CPAN

package AnyEvent::JSONRPC::TCP::Server;
use Moose;

extends 'AnyEvent::JSONRPC::Server';

use Carp;
use Scalar::Util 'weaken';

use AnyEvent::Handle;
use AnyEvent::Socket;

use AnyEvent::JSONRPC::InternalHandle;
use AnyEvent::JSONRPC::CondVar;
use JSON::RPC::Common::Procedure::Call;

has address => (
    is      => 'ro',
    isa     => 'Maybe[Str]',
    default => undef,
);

has port => (
    is      => 'ro',
    isa     => 'Int|Str',
    default => 4423,
);

has on_error => (
    is      => 'rw',
    isa     => 'CodeRef',
    lazy    => 1,
    default => sub {
        return sub {
            my ($handle, $fatal, $message) = @_;
            carp sprintf "Server got error: %s", $message;
        };
    },
);

has on_eof => (
    is      => 'rw',
    isa     => 'CodeRef',
    lazy    => 1,
    default => sub {
        return sub { };
    },
);

has handler_options => (
    is      => 'ro',
    isa     => 'HashRef',
    default => sub { {} },
);

has _handlers => (
    is      => 'ro',
    isa     => 'ArrayRef',
    default => sub { [] },
);

has methods => (
    isa     => 'HashRef[CodeRef]',
    lazy    => 1,
    traits  => ['Hash'],
    handles => {
        reg_cb => 'set',
        method => 'get',
    },
    default => sub { {} },
);

no Moose;

sub BUILD {
    my $self = shift;

    tcp_server $self->address, $self->port, sub {
        my ($fh, $host, $port) = @_;
        my $indicator = "$host:$port";

        my $handle = AnyEvent::Handle->new(
            on_error => sub {
                my ($h, $fatal, $msg) = @_;
                $self->on_error->(@_);
                $h->destroy;
            },
            on_eof => sub {
                my ($h) = @_;
                # client disconnected
                $self->on_eof->(@_);
                $h->destroy;
            },
            json => $self->json,
            %{ $self->handler_options },
            fh => $fh,
        );
        $handle->on_read(sub {
            shift->unshift_read( json => sub {
                $self->_dispatch($indicator, @_);
            }),
        });

        $self->_handlers->[ fileno($fh) ] = $handle;
    };
    weaken $self;

    $self;
}

sub _dispatch {
    my ($self, $indicator, $handle, $request) = @_;

    return $self->_batch($handle, @$request) if ref $request eq "ARRAY";
    return unless $request and ref $request eq "HASH";

    my $call   = JSON::RPC::Common::Procedure::Call->inflate($request);
    my $target = $self->method( $call->method );

    my $cv = AnyEvent::JSONRPC::CondVar->new( call => $call );
    $cv->cb( sub {
        my $response = $cv->recv;

        $handle->push_write( json => $response->deflate ) if not $cv->is_notification;
    });

    $target ||= sub { shift->error(qq/No such method "$request->{method}" found/) };
    $target->( $cv, $call->params_list );
}

sub _batch {
    my ($self, $handle, @request) = @_;

    my @response;
    for my $request (@request) {
        my $internal = AnyEvent::JSONRPC::InternalHandle->new;

        $self->_dispatch(undef, $internal, $request);

        push @response, $internal;
    }
    
    $handle->push_write( json => [ map { $_->recv } @response ] );
}

__PACKAGE__->meta->make_immutable;

__END__

=for stopwords JSONRPC TCP TCP-based unix Str

=head1 NAME

AnyEvent::JSONRPC::TCP::Server - Simple TCP-based JSONRPC server

=head1 SYNOPSIS

    use AnyEvent::JSONRPC::TCP::Server;
    
    my $server = AnyEvent::JSONRPC::TCP::Server->new( port => 4423 );
    $server->reg_cb(
        echo => sub {
            my ($res_cv, @params) = @_;
            $res_cv->result(@params);
        },
        sum => sub {



( run in 1.262 second using v1.01-cache-2.11-cpan-39bf76dae61 )