AnyEvent-JSONRPC-Lite

 view release on metacpan or  search on metacpan

lib/AnyEvent/JSONRPC/Lite/Client.pm  view on Meta::CPAN

package AnyEvent::JSONRPC::Lite::Client;
use Any::Moose;

use Carp;
use Scalar::Util 'weaken';

use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;

has host => (
    is       => 'ro',
    isa      => 'Str',
    required => 1,
);

has port => (
    is       => 'ro',
    isa      => 'Int|Str',
    required => 1,
);

has handler => (
    is  => 'rw',
    isa => 'AnyEvent::Handle',
);

has on_error => (
    is      => 'rw',
    isa     => 'CodeRef',
    lazy    => 1,
    default => sub {
        return sub {
            my ($handle, $fatal, $message) = @_;
            croak sprintf "Client got error: %s", $message;
        };
    },
);

has handler_options => (
    is      => 'ro',
    isa     => 'HashRef',
    default => sub { {} },
);

has _request_pool => (
    is      => 'ro',
    isa     => 'ArrayRef',
    lazy    => 1,
    default => sub { [] },
);

has _next_id => (
    is      => 'ro',
    isa     => 'CodeRef',
    lazy    => 1,
    default => sub {
        my $id = 0;
        sub { ++$id };
    },
);

has _callbacks => (
    is      => 'ro',
    isa     => 'HashRef',
    lazy    => 1,
    default => sub { {} },
);

has _connection_guard => (
    is  => 'rw',
    isa => 'Object',
);

no Any::Moose;

sub BUILD {
    my $self = shift;

    my $guard = tcp_connect $self->host, $self->port, sub {
        my ($fh) = @_
            or return
                $self->on_error->(
                    undef, 1,
                    "Failed to connect $self->{host}:$self->{port}: $!",
                );

        my $handle = AnyEvent::Handle->new(
            on_error => sub {
                my ($h, $fatal, $msg) = @_;
                $self->on_error->(@_);
                $h->destroy;
            },
            %{ $self->handler_options },
            fh => $fh,
        );

        $handle->on_read(sub {
            shift->unshift_read(json => sub {
                $self->_handle_response( $_[1] );
            });
        });

        while (my $pooled = shift @{ $self->_request_pool }) {
            $handle->push_write( json => $pooled );
        }

        $self->handler( $handle );
    };
    weaken $self;

    $self->_connection_guard($guard);
}

sub call {
    my ($self, $method, @params) = @_;

    my $request = {
        id     => $self->_next_id->(),
        method => $method,
        params => \@params,
    };

    if ($self->handler) {
        $self->handler->push_write( json => $request );
    }
    else {
        push @{ $self->_request_pool }, $request;
    }

    $self->_callbacks->{ $request->{id} } = AnyEvent->condvar;
}

sub _handle_response {
    my ($self, $res) = @_;

    my $d = delete $self->_callbacks->{ $res->{id} };
    unless ($d) {
        warn q/Invalid response from server/;
        return;
    }

    if (my $error = $res->{error}) {
        $d->croak($error);
    }
    else {
        $d->send($res->{result});
    }
}

sub notify {
    my ($self, $method, @params) = @_;

    my $request = {
        method => $method,
        params => \@params,
    };

    if ($self->handler) {
        $self->handler->push_write( json => $request );
    }
    else {
        push @{ $self->_request_pool }, $request;
    }
}

__PACKAGE__->meta->make_immutable;

__END__



( run in 0.598 second using v1.01-cache-2.11-cpan-39bf76dae61 )