Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Worker.pm  view on Meta::CPAN

                my $method = $request->{method};

                unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
                    log_error "Received notification with invalid method '$method'";
                    return;
                }

                my $cb = $worker->{callbacks}->{"msg.$1.$2"} || 
                         $worker->{callbacks}->{"msg.$1.*"};

                local $client->{caller_id}   = $mqtt_properties->{'clid'};
                local $client->{caller_addr} = $mqtt_properties->{'addr'};
                local $client->{auth_data}   = $mqtt_properties->{'auth'};

                unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
                    log_error "Notification '$method' was not authorized";
                    return;
                }

                unless ($cb) {
                    log_error "No handler found for received notification '$method'";
                    return;
                }

                $cb->($self, $request->{params}, $request);
            };

            if ($@) {
                # Got an exception while processing message
                log_error $@;
                $worker->{error_count}++;
            }
        }

        if ($task = shift @{$worker->{task_queue_low}}) {

            ## RPC Call

            my ($payload_ref, $mqtt_properties) = @$task;

            $worker->{call_count}++;
            my ($request, $request_id, $result, $response);

            $result = eval {

                $request = decode_json($$payload_ref);

                unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
                    log_error "Received invalid JSON-RPC 2.0 request";
                    die Beekeeper::JSONRPC::Error->invalid_request;
                }

                $request_id = $request->{id};
                my $method  = $request->{method};

                bless $request, 'Beekeeper::JSONRPC::Request';
                $request->{_mqtt_properties} = $mqtt_properties;

                unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
                    log_error "Received request with invalid method '$method'";
                    die Beekeeper::JSONRPC::Error->method_not_found;
                }

                my $cb = $worker->{callbacks}->{"req.$1.$2"} || 
                         $worker->{callbacks}->{"req.$1.*"};

                local $client->{caller_id}   = $mqtt_properties->{'clid'};
                local $client->{caller_addr} = $mqtt_properties->{'addr'};
                local $client->{auth_data}   = $mqtt_properties->{'auth'};

                unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
                    log_error "Request '$method' was not authorized";
                    die Beekeeper::JSONRPC::Error->request_not_authorized;
                }

                unless ($cb) {
                    log_error "No handler found for received request '$method'";
                    die Beekeeper::JSONRPC::Error->method_not_found;
                }

                # Execute method handler
                $cb->($self, $request->{params}, $request);
            };

            if ($@) {
                # Got an exception while executing method handler
                if (blessed($@) && $@->isa('Beekeeper::JSONRPC::Error')) {
                    # Handled exception
                    $response = $@;
                    $worker->{error_count}++;
                }
                else {
                    # Unhandled exception
                    log_error $@;
                    $worker->{error_count}++;
                    $response = Beekeeper::JSONRPC::Error->server_error;
                    # Sending exact error to caller is very handy, but it is also a security risk
                    $response->{error}->{data} = $@ if $worker->{debug};
                    $worker->{error_count}++;
                }
            }
            elsif (blessed($result) && $result->isa('Beekeeper::JSONRPC::Error')) {
                # Explicit error response
                $response = $result;
                $worker->{error_count}++;
            }
            elsif ($request->{_async_response}) {
                # Response was deferred and will be sent later
                $worker->{in_progress}++;
                $request->{_worker} = $self;
            }
            else {
                # Build a success response
                $response = {
                    jsonrpc => '2.0',
                    result  => $result,
                };
            }

            if (defined $request_id && defined $response) {

                # Send back response to caller

                $response->{id} = $request_id;

                my $json = eval { $JSON->encode( $response ) };

                if ($@) {
                    # Probably response contains blessed references 
                    log_error "Couldn't serialize response as JSON: $@";
                    $response = Beekeeper::JSONRPC::Error->server_error;
                    $response->{id} = $request_id;
                    $json = $JSON->encode( $response );
                }

                if ($request->{_deflate_response} && length($json) > $request->{_deflate_response}) {
                    my $compressed_json;
                    $DEFLATE->deflate(\$json, $compressed_json);



( run in 0.786 second using v1.01-cache-2.11-cpan-99c4e6809bf )