Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Client.pm  view on Meta::CPAN

        payload => \$json,
        qos     => 1,
        %send_args,
    );

    if ($FIRE_FORGET) {
         # Nothing else to do
         return;
    }
    elsif ($SYNCHRONOUS) {

        $req->{_raise_error} = (defined $raise_error) ? $raise_error : 1;

        # Wait until a response is received in the reply queue
        $req->{_waiting_response} = AnyEvent->condvar;
        $req->{_waiting_response}->begin;
    }
    else {

        $req->{_on_success_cb} = $args{'on_success'};
        $req->{_on_error_cb}   = $args{'on_error'};

        if ($raise_error && !$req->{_on_error_cb}) {
            $req->{_on_error_cb} = sub {
                my $errmsg = $_[0]->code . " " . $_[0]->message;
                croak "Call to '$service.$method' failed: $errmsg";
            };
        }

        # Use shared cv for all requests
        if (!$client->{async_cv} || $client->{async_cv}->ready) {
            $client->{async_cv} = AnyEvent->condvar;
        }

        $req->{_waiting_response} = $client->{async_cv};
        $req->{_waiting_response}->begin;
    }

    $client->{in_progress}->{$req_id} = $req;

    # Ensure that timeout is set properly when the event loop was blocked
    if ($__now != CORE::time) { $__now = CORE::time; AnyEvent->now_update }

    # Request timeout timer
    my $timeout = $args{'timeout'} || REQ_TIMEOUT;
    $req->{_timeout} = AnyEvent->timer( after => $timeout, cb => sub {
        my $req = delete $client->{in_progress}->{$req_id};
        $req->{_response} = Beekeeper::JSONRPC::Error->request_timeout;
        $req->{_on_error_cb}->($req->{_response}) if $req->{_on_error_cb};
        $req->{_waiting_response}->end;
    });

    bless $req, 'Beekeeper::JSONRPC::Request';
    return $req;
}

sub __create_response_topic {
    my $self = shift;
    my $client = $self->{_CLIENT};

    my ($file, $line) = (caller(2))[1,2];
    my $at = "at $file line $line\n";

    # Subscribe to an exclusive topic for receiving RPC responses

    my $response_topic = 'priv/' . $self->{_BUS}->{client_id};
    $client->{response_topic} = $response_topic;

    $self->{_BUS}->subscribe(
        topic       => $response_topic,
        maximum_qos => 0,
        on_publish  => sub {
            my ($payload_ref, $mqtt_properties) = @_;

            my $resp;
            local $@;
            eval {

                if (substr($$payload_ref,0,1) eq "\x78") {
                    my $decompressed_json;
                    $INFLATE->inflate($payload_ref, $decompressed_json);
                    $INFLATE->inflateReset();
                    $resp = decode_json($decompressed_json);
                }
                else {
                    $resp = decode_json($$payload_ref);
                }
            };

            unless (ref $resp eq 'HASH' && $resp->{jsonrpc} eq '2.0') {
                warn "Received invalid JSON-RPC 2.0 message $at";
                return;
            }

            if (exists $resp->{'id'}) {

                # Response of an RPC request

                my $req_id = $resp->{'id'};
                my $req = delete $client->{in_progress}->{$req_id};

                # Ignore unexpected responses
                return unless $req;

                # Cancel request timeout
                delete $req->{_timeout};

                if (exists $resp->{'result'}) {
                    # Success response
                    $req->{_response} = bless $resp, 'Beekeeper::JSONRPC::Response';
                    $req->{_on_success_cb}->($resp) if $req->{_on_success_cb};
                }
                else {
                    # Error response
                    $req->{_response} = bless $resp, 'Beekeeper::JSONRPC::Error';
                    $req->{_on_error_cb}->($resp) if $req->{_on_error_cb};
                }
        
                $req->{_waiting_response}->end;
            }
            else {



( run in 1.931 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )