AnyEvent-MPRPC

 view release on metacpan or  search on metacpan

lib/AnyEvent/MPRPC/Client.pm  view on Meta::CPAN

package AnyEvent::MPRPC::Client;
use strict;
use warnings;
use Any::Moose;

use Carp;
use Scalar::Util 'weaken';

use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use AnyEvent::MessagePack;
use AnyEvent::MPRPC::Constant;

has host => (
    is       => 'ro',
    isa      => 'Str',

lib/AnyEvent/MPRPC/Client.pm  view on Meta::CPAN

        $self->on_connect
            and $connect_timeout = $self->on_connect->($self, @_);

        # For backward compatibility, if connect_timeout option isn't specifed
        # use return value of on_connect callback as connect timeout seconds.
        $self->connect_timeout
            and $connect_timeout = $self->connect_timeout;

        return $connect_timeout;
    };
    weaken $self;

    $self->_connection_guard($guard);
}

sub call {
    my ($self, $method) = (shift, shift);
    my $param = (@_ == 1 && ref $_[0] eq "ARRAY") ? $_[0] : [@_];

    my $msgid = $self->_next_id->();

lib/AnyEvent/MPRPC/Client.pm  view on Meta::CPAN

        push @{ $self->_request_pool }, $request;
    }

    # $msgid is stringified, but $request->{MP_RES_MSGID] is still IV
    $self->_callbacks->{ $msgid } = AnyEvent->condvar;
}

sub _handle_response_cb {
    my $self = shift;

    weaken $self;

    return sub {
        $self || return;

        my ($handle, $res) = @_;

        my $d = delete $self->_callbacks->{ $res->[MP_RES_MSGID] };

        if (my $error = $res->[MP_RES_ERROR]) {
            if ($d) {

lib/AnyEvent/MPRPC/Server.pm  view on Meta::CPAN

package AnyEvent::MPRPC::Server;
use strict;
use warnings;
use Any::Moose;

use Carp;
use Scalar::Util 'weaken';

use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::MPRPC::CondVar;
use AnyEvent::MessagePack;

use AnyEvent::MPRPC::Constant;

has address => (
    is      => 'ro',

lib/AnyEvent/MPRPC/Server.pm  view on Meta::CPAN

                $h->destroy;
            },
            %{ $self->handler_options },
            fh => $fh,
        );

        $handle->unshift_read(msgpack => $self->_dispatch_cb($indicator));

        $self->_handlers->[ fileno($fh) ] = $handle;
    }) unless defined $self->server;
    weaken $self;

    $self;
}

sub reg_cb {
    my ($self, %callbacks) = @_;

    while (my ($method, $callback) = each %callbacks) {
        $self->_callbacks->{ $method } = $callback;
    }
}

sub _dispatch_cb {
    my ($self, $indicator) = @_;

    weaken $self;

    return sub {
        $self || return;

        my ($handle, $request) = @_;
        $self->on_dispatch->($indicator, $handle, $request);
        return if $handle->destroyed;

        $handle->unshift_read(msgpack => $self->_dispatch_cb($indicator));

lib/AnyEvent/MPRPC/Server.pm  view on Meta::CPAN

            my $type   = shift;
            my $result = @_ > 1 ? \@_ : $_[0];

            $handle->push_write( msgpack => [
                MP_TYPE_RESPONSE,
                int($id), # should be IV.
                $type eq 'error'  ? $result : undef,
                $type eq 'result' ? $result : undef,
            ]) if $handle;
        };
        weaken $handle;

        my $cv = AnyEvent::MPRPC::CondVar->new;
        $cv->_cb(
            sub { $res_cb->( result => $_[0]->recv ) },
            sub { $res_cb->( error  => $_[0]->recv ) },
        );

        $target ||= sub { shift->error(qq/No such method "@{[ $request->[MP_REQ_METHOD] ]}" found/) };
        $target->( $cv, $request->[MP_REQ_PARAMS] );
    };



( run in 0.239 second using v1.01-cache-2.11-cpan-65fba6d93b7 )