Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Service/Sinkhole/Worker.pm  view on Meta::CPAN


sub log_handler {
    my $self = shift;

    # Use pool's logfile
    $self->SUPER::log_handler( foreground => 1 );
}

sub on_unserviced_queues {
    my ($self, $params) = @_;

    my $queues = $params->{queues};
 
    foreach my $queue (@$queues) {

        # Nothing to do if already draining $queue
        next if $self->{Draining}->{$queue};

        # As no one is processing requests, respond these with errors
        $self->{Draining}->{$queue} = 1;

        my $local_bus = $self->{_BUS}->{bus_role};
        log_error "Draining unserviced req/$local_bus/$queue";

        $self->accept_remote_calls( "$queue.*" => 'reject_request' );
    }
}

sub on_worker_status {
    my ($self, $status) = @_;

    return unless ($status->{queue});

    return if ($status->{class} eq 'Beekeeper::Service::Sinkhole::Worker');

    foreach my $queue (@{$status->{queue}}) {

        # Nothing to do if not draining queue
        next unless $self->{Draining}->{$queue};

        # A worker servicing a previously unserviced queue has just become
        # online, so do not respond with errors anymore
        delete $self->{Draining}->{$queue};

        my $local_bus = $self->{_BUS}->{bus_role};
        log_warn "Stopped draining req/$local_bus/$queue";

        $self->stop_accepting_calls( "$queue.*" );
    }
}

sub reject_request {
    my ($self, $params, $req) = @_;

    # Just return a JSONRPC error response

    if ($req->mqtt_properties->{'auth'}) {
        # When client provided some kind of authentication tell him the truth
        # about the service being down. Otherwise the one trying to fix the 
        # issue may be deceived into looking for auth/permissions problems
        return Beekeeper::JSONRPC::Error->method_not_available;
    }
    else {
        return Beekeeper::JSONRPC::Error->request_not_authorized;
    }
}

1;

__END__

=pod

=encoding utf8

=head1 NAME
 
Beekeeper::Service::Sinkhole::Worker - Handle unserviced call topics

=head1 VERSION
 
Version 0.09

=head1 DESCRIPTION

In the case of all workers of a given service being down, all requests sent to
the service will timeout as no one is serving them. This may cause a serious
disruption in the application, as any other service depending of the broken
one will halt too for the duration of the timeout.

In order to mitigate this situation all Sinkhole workers will be notified by
the Supervisor when unserviced topics are detected, making these to respond 
immediately to all requests with an error response. Then callers will quickly 
receive an error response instead of timing out.

As soon as a worker of the downed service becomes online again the Sinkhole
workers will stop rejecting requests.

A single Sinkhole worker can handle around 4000 req/s.

Sinkhole workers are not created automatically. In order to add Sinkhole workers
to a pool these must be declared into config file C<pool.config.json>:

  [
      {
          "pool_id" : "myapp",
          "bus_id"  : "backend",
          "workers" : {
              "Beekeeper::Service::Sinkhole::Worker" : { "worker_count": 2 },
               ...
          },
      },
  ]

This worker class does not expose public RPC methods.

=head1 AUTHOR

José Micó, C<jose.mico@gmail.com>

=head1 COPYRIGHT AND LICENSE

Copyright 2015-2023 José Micó.



( run in 1.401 second using v1.01-cache-2.11-cpan-99c4e6809bf )