Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Client.pm  view on Meta::CPAN

    # Keep a global reference to $self
    $singleton = $self;

    return $self;
}


sub send_notification {
    my ($self, %args) = @_;

    my $fq_meth = $args{'method'} or croak "Method was not specified";

    $fq_meth .= '@' . $args{'address'} if (defined $args{'address'});

    $fq_meth =~ m/^     ( [\w-]+ (?:\.[\w-]+)* )
                     \. ( [\w-]+ ) 
                 (?: \@ ( [\w-]+ ) (\.[\w-]+)* )? $/x or croak "Invalid method '$fq_meth'";

    my ($service, $method, $remote_bus, $addr) = ($1, $2, $3, $4);

    my $json = encode_json({
        jsonrpc => '2.0',
        method  => "$service.$method",
        params  => $args{'params'},
    });

    my %send_args;

    my $local_bus = $self->{_BUS}->{bus_role};

    $remote_bus = $self->{_CLIENT}->{forward_to} unless (defined $remote_bus);

    if (defined $remote_bus) {

        $send_args{'topic'}  = "msg/$remote_bus-" . int( rand(QUEUE_LANES) + 1 );
        $send_args{'topic'} =~ tr|.|/|;

        $send_args{'fwd_to'} = "msg/$remote_bus/$service/$method";
        $send_args{'fwd_to'} .= "\@$addr" if (defined $addr && $addr =~ s/^\.//);
        $send_args{'fwd_to'} =~ tr|.|/|;
    }
    else {
        $send_args{'topic'} = "msg/$local_bus/$service/$method";
        $send_args{'topic'} =~ tr|.|/|;
    }

    $send_args{'auth'} = $self->{_CLIENT}->{auth_data} if defined $self->{_CLIENT}->{auth_data};
    $send_args{'clid'} = $self->{_CLIENT}->{caller_id} if defined $self->{_CLIENT}->{caller_id};

    if (exists $args{'buffer_id'}) {
        $send_args{'buffer_id'} = $args{'buffer_id'};
    }

    $self->{_BUS}->publish( payload => \$json, %send_args );
}


sub accept_notifications {
    my ($self, %args) = @_;

    my ($file, $line) = (caller)[1,2];
    my $at = "at $file line $line\n";

    my $callbacks = $self->{_CLIENT}->{callbacks};

    foreach my $fq_meth (keys %args) {

        $fq_meth =~ m/^  ( [\w-]+ (?: \.[\w-]+ )* ) 
                      \. ( [\w-]+ | \* ) $/x or croak "Invalid notification method '$fq_meth'";

        my ($service, $method) = ($1, $2);

        my $callback = $args{$fq_meth};

        unless (ref $callback eq 'CODE') {
            croak "Invalid callback for '$fq_meth'";
        }

        croak "Already accepting notifications '$fq_meth'" if exists $callbacks->{"msg.$fq_meth"};
        $callbacks->{"msg.$fq_meth"} = $callback;

        #TODO: Allow to accept private notifications without subscribing

        my $local_bus = $self->{_BUS}->{bus_role};

        my $topic = "msg/$local_bus/$service/$method";
        $topic =~ tr|.*|/#|;

        $self->{_BUS}->subscribe(
            topic      => $topic,
            on_publish => sub {
                my ($payload_ref, $mqtt_properties) = @_;

                local $@;
                my $request = eval { decode_json($$payload_ref) };

                unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
                    warn "Received invalid JSON-RPC 2.0 notification $at";
                    return;
                }

                bless $request, 'Beekeeper::JSONRPC::Notification';
                $request->{_mqtt_properties} = $mqtt_properties;

                my $method = $request->{method};

                unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
                    warn "Received notification with invalid method '$method' $at";
                    return;
                }

                my $cb = $callbacks->{"msg.$1.$2"} || 
                         $callbacks->{"msg.$1.*"};

                unless ($cb) {
                    warn "No callback found for received notification '$method' $at";
                    return;
                }

                $cb->($request->{params}, $request);
            },
            on_suback => sub {
                my ($success, $prop) = @_;
                die "Could not subscribe to topic '$topic' $at" unless $success;
            }
        );
    }
}


sub stop_accepting_notifications {
    my ($self, @methods) = @_;

    my ($file, $line) = (caller)[1,2];
    my $at = "at $file line $line\n";

    croak "No method specified" unless @methods;

    foreach my $fq_meth (@methods) {

        $fq_meth =~ m/^  ( [\w-]+ (?: \.[\w-]+ )* ) 
                      \. ( [\w-]+ | \* ) $/x or croak "Invalid method '$fq_meth'";

        my ($service, $method) = ($1, $2);

        unless (defined $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"}) {
            carp "Not previously accepting notifications '$fq_meth'";
            next;
        }

        my $local_bus = $self->{_BUS}->{bus_role};

        my $topic = "msg/$local_bus/$service/$method";
        $topic =~ tr|.*|/#|;

        $self->{_BUS}->unsubscribe(
            topic       => $topic,
            on_unsuback => sub {
                my ($success, $prop) = @_;

                die "Could not unsubscribe from topic '$topic' $at" unless $success; 

                delete $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"};
            },
        );
    }
}


our $AE_WAITING;

sub call_remote {
    my $self = shift;

    my $req = $self->__do_rpc_request( @_, req_type => 'SYNCHRONOUS' );

    # Make AnyEvent allow one level of recursive condvar blocking, as we may
    # block both in $worker->__work_forever and in $client->__do_rpc_request
    $AE_WAITING && Carp::confess "Recursive blocking call attempted: "                  .
        "trying to make a call_remote while another call_remote is still in progress, " .
        "but it is not possible to make two blocking calls simultaneously "             .
        "(tip: one of the two calls must be made with call_remote_async)";

    local $AE_WAITING = 1;
    local $AnyEvent::CondVar::Base::WAITING = 0;

    # Block until a response is received or request timed out
    $req->{_waiting_response}->recv;

    my $resp = $req->{_response};

    if (!exists $resp->{result} && $req->{_raise_error}) {
        my $errmsg = $resp->code . " " . $resp->message;
        croak "Call to '$req->{method}' failed: $errmsg";



( run in 0.698 second using v1.01-cache-2.11-cpan-5511b514fd6 )