AnyEvent-JSONRPC

 view release on metacpan or  search on metacpan

lib/AnyEvent/JSONRPC/TCP/Client.pm  view on Meta::CPAN

package AnyEvent::JSONRPC::TCP::Client;
use Any::Moose;
use Any::Moose '::Util::TypeConstraints';

extends 'AnyEvent::JSONRPC::Client';

use Carp;
use Scalar::Util 'weaken';

use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;

use JSON::RPC::Common::Procedure::Call;
use JSON::RPC::Common::Procedure::Return;

has host => (
    is       => 'ro',
    isa      => 'Str',
    required => 1,
);

has port => (
    is       => 'ro',
    isa      => 'Int|Str',
    required => 1,
);

has handler => (
    is  => 'rw',
    isa => 'AnyEvent::Handle',
);

has on_error => (
    is      => 'rw',
    isa     => 'CodeRef',
    lazy    => 1,
    default => sub {
        return sub {
            my ($handle, $fatal, $message) = @_;
            croak sprintf "Client got error: %s", $message;
        };
    },
);

has version => (
    is      => 'rw',
    isa     => enum( [qw( 1.0 1.1 2.0 )] ),
    default => "2.0",
);

has handler_options => (
    is      => 'ro',
    isa     => 'HashRef',
    default => sub { {} },
);

has _request_pool => (
    is      => 'ro',
    isa     => 'ArrayRef',
    lazy    => 1,
    default => sub { [] },
);

has _next_id => (
    is      => 'ro',
    isa     => 'CodeRef',
    lazy    => 1,
    default => sub {
        my $id = 0;
        sub { ++$id };
    },
);

has _callbacks => (
    is      => 'ro',
    isa     => 'HashRef',
    lazy    => 1,
    default => sub { {} },
);

has _connection_guard => (
    is  => 'rw',
    isa => 'Object',
);

no Any::Moose;

sub BUILD {
    my $self = shift;

    my $guard = tcp_connect $self->host, $self->port, sub {
        my ($fh) = @_
            or return
                $self->on_error->(
                    undef, 1,
                    "Failed to connect $self->{host}:$self->{port}: $!",
                );

        my $handle = AnyEvent::Handle->new(
            on_error => sub {
                my ($h, $fatal, $msg) = @_;
                $self->on_error->(@_);
                $h->destroy;
            },
            %{ $self->handler_options },
            fh => $fh,
        );

        $handle->on_read(sub {
            shift->unshift_read(json => sub {
                $self->_handle_response( $_[1] );
            });
        });

        while (my $pooled = shift @{ $self->_request_pool }) {
            $handle->push_write( json => $pooled->deflate );
        }

        $self->handler( $handle );
    };
    weaken $self;

    $self->_connection_guard($guard);
}

sub call {
    my ($self, $method, @params) = @_;

    my $request = JSON::RPC::Common::Procedure::Call->inflate (
        version => $self->version,
        id      => $self->_next_id->(),
        method  => $method,
        params  => $self->_params( @params ),
    );

    if ($self->handler) {
        my $json = $request->deflate;
        $self->handler->push_write( json => $json );
    }
    else {
        push @{ $self->_request_pool }, $request;
    }

    $self->_callbacks->{ $request->id } = AnyEvent->condvar;
}

sub _handle_response {
    my ($self, $json) = @_;

    my $response = JSON::RPC::Common::Procedure::Return->inflate( $json );
    my $d = delete $self->_callbacks->{ $response->id };
    unless ($d) {
        warn q/Invalid response from server/;
        return;
    }

    if (my $error = $response->error) {
        $d->croak($error);
    }
    else {
        $d->send($response->result);
    }
}

sub notify {
    my ($self, $method, @params) = @_;

    my $request = JSON::RPC::Common::Call->inflate (
        version => $self->version,
        method  => $method,
        params  => $self->_params( @params ),
    );

    if ($self->handler) {
        $self->handler->push_write( json => $request->deflate );
    }
    else {
        push @{ $self->_request_pool }, $request;
    }
}



( run in 1.726 second using v1.01-cache-2.11-cpan-39bf76dae61 )