Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Worker.pm  view on Meta::CPAN

our $Logger = sub { warn(@_) }; # redefined later by __init_logger
our $LogLevel = LOG_INFO;

sub log_fatal    (@) { $LogLevel >= LOG_FATAL  && $Logger->( LOG_FATAL,  @_ ) }
sub log_alert    (@) { $LogLevel >= LOG_ALERT  && $Logger->( LOG_ALERT,  @_ ) }
sub log_critical (@) { $LogLevel >= LOG_CRIT   && $Logger->( LOG_CRIT,   @_ ) }
sub log_error    (@) { $LogLevel >= LOG_ERROR  && $Logger->( LOG_ERROR,  @_ ) }
sub log_warn     (@) { $LogLevel >= LOG_WARN   && $Logger->( LOG_WARN,   @_ ) }
sub log_warning  (@) { $LogLevel >= LOG_WARN   && $Logger->( LOG_WARN,   @_ ) }
sub log_notice   (@) { $LogLevel >= LOG_NOTICE && $Logger->( LOG_NOTICE, @_ ) }
sub log_info     (@) { $LogLevel >= LOG_INFO   && $Logger->( LOG_INFO,   @_ ) }
sub log_debug    (@) { $LogLevel >= LOG_DEBUG  && $Logger->( LOG_DEBUG,  @_ ) }
sub log_trace    (@) { $LogLevel >= LOG_TRACE  && $Logger->( LOG_TRACE,  @_ ) }
sub log_level   (;$) { $LogLevel =  shift      if scalar @_; return $LogLevel }

our $BUSY_SINCE; *BUSY_SINCE = \$Beekeeper::MQTT::BUSY_SINCE;
our $BUSY_TIME;  *BUSY_TIME  = \$Beekeeper::MQTT::BUSY_TIME;

our $REPORT_STATUS_PERIOD = 5;
our $UNSUBSCRIBE_LINGER   = 2;

my %AUTH_TOKENS;
my $DEFLATE;
my $JSON;


sub new {
    my ($class, %args) = @_;

    # Parameters passed by WorkerPool->spawn_worker
    
    my $self = {
        _WORKER => undef,
        _CLIENT => undef,
        _BUS    => undef,
        _LOGGER => undef,
    };

    bless $self, $class;

    $self->{_WORKER} = {
        parent_pid      => $args{'parent_pid'},
        foreground      => $args{'foreground'},   # --foreground option
        debug           => $args{'debug'},        # --debug option
        bus_config      => $args{'bus_config'},   # content of bus.config.json
        pool_config     => $args{'pool_config'},  # content of pool.config.json
        pool_id         => $args{'pool_id'},
        bus_id          => $args{'bus_id'},
        config          => $args{'config'},
        hostname        => Sys::Hostname::hostname(),
        stop_cv         => undef,
        callbacks       => {},
        task_queue_high => [],
        task_queue_low  => [],
        queued_tasks    => 0,
        in_progress     => 0,
        last_report     => 0,
        call_count      => 0,
        notif_count     => 0,
        error_count     => 0,
        busy_time       => 0,
    };

    $JSON = JSON::XS->new;
    $JSON->utf8;             # encode result as utf8
    $JSON->allow_blessed;    # encode blessed references as null
    $JSON->convert_blessed;  # use TO_JSON methods to serialize objects

    $DEFLATE = Compress::Raw::Zlib::Deflate->new( -AppendOutput => 1 );

    if (defined $SIG{TERM} && $SIG{TERM} eq 'DEFAULT') {
        # Stop working gracefully when TERM signal is received
        $SIG{TERM} = sub { $self->stop_working };
    }

    if (defined $SIG{INT} && $SIG{INT} eq 'DEFAULT' && $args{'foreground'}) {
        # In foreground mode also stop working gracefully when INT signal is received
        $SIG{INT} = sub { $self->stop_working };
    }

    eval {

        # Init logger as soon as possible
        $self->__init_logger;

        # Connect to broker
        $self->__init_client;

        # Pass broker connection to logger
        $self->{_LOGGER}->{_BUS} = $self->{_BUS} if (exists $self->{_LOGGER}->{_BUS});

        $self->__init_auth_tokens;

        $self->__init_worker;
    };

    if ($@) {
        log_fatal "Worker died while initialization: $@";
        log_fatal "$class could not be started";
        CORE::exit( COMPILE_ERROR_EXIT_CODE );
    }

    return $self;
}

sub __init_auth_tokens {
    my ($self) = @_;

    # Using a hashing function makes harder to access the wrong worker pool by mistake,
    # but it is not an effective access restriction: anyone with access to the backend
    # bus credentials can easily inspect and clone auth data tokens

    my $salt = $self->{_CLIENT}->{auth_salt};

    $AUTH_TOKENS{'BKPR_SYSTEM'} = md5_base64('BKPR_SYSTEM'. $salt);
    $AUTH_TOKENS{'BKPR_ADMIN'}  = md5_base64('BKPR_ADMIN' . $salt);
    $AUTH_TOKENS{'BKPR_ROUTER'} = md5_base64('BKPR_ROUTER'. $salt);
}

sub __has_authorization_token {
    my ($self, $auth_level) = @_;

lib/Beekeeper/Worker.pm  view on Meta::CPAN



sub stop_accepting_calls {
    my ($self, @methods) = @_;

    my ($file, $line) = (caller)[1,2];
    my $at = "at $file line $line\n";

    die "No method specified $at" unless @methods;

    foreach my $fq_meth (@methods) {

        $fq_meth =~ m/^  ( [\w-]+ (?: \.[\w-]+ )* ) 
                      \. ( [\w-]+ | \* ) $/x or die "Invalid remote call method '$fq_meth' $at";

        my ($service, $method) = ($1, $2);

        unless ($method eq '*') {
            # Known limitation. As all calls for an entire service class are received
            # through a single MQTT subscription (in order to load balance them), it is 
            # not possible to reject a single method. A workaround is to use a different
            # class for each method that need to be individually rejected.
            die "Cannot stop accepting individual methods, only '$service.*' is allowed $at";
        }

        my $worker    = $self->{_WORKER};
        my $callbacks = $worker->{callbacks};

        my @cb_keys = grep { $_ =~ m/^req.\Q$service\E\b/ } keys %$callbacks;

        unless (@cb_keys) {
            log_warn "Not previously accepting remote calls '$fq_meth' $at";
            next;
        }

        my $local_bus = $self->{_BUS}->{bus_role};

        my $topic = "\$share/BKPR/req/$local_bus/$service";
        $topic =~ tr|.*|/#|;

        # Cannot remove callbacks right now, as new requests could be in flight or be already 
        # queued. We must wait for unsubscription completion, and then until the task queue 
        # is empty to ensure that all received requests were processed. And even then wait a
        # bit more, as some brokers may send requests *after* unsubscription.
        my $postpone = sub {

            $worker->{stop_cv}->begin;

            my $unsub_tmr; $unsub_tmr = AnyEvent->timer( 
                after => $UNSUBSCRIBE_LINGER, cb => sub {

                    delete $worker->{callbacks}->{$_} foreach @cb_keys;
                    delete $worker->{subscriptions}->{$service};
                    undef $unsub_tmr;

                    return unless $worker->{shutting_down};

                    if ($worker->{in_progress} > 0) {

                        # The task queue is empty now, but an asynchronous method handler is
                        # still busy processing some requests received previously. Wait for
                        # these requests to be completed before telling _work_forever to stop

                        my $wait_time = 60;
                        $worker->{stop_cv}->begin;

                        my $busy_tmr; $busy_tmr = AnyEvent->timer( after => 1, interval => 1, cb => sub {
                            unless ($worker->{in_progress} > 0 && --$wait_time > 0) {
                                undef $busy_tmr;
                                $worker->{stop_cv}->end;
                            }
                        });
                    }

                    # Tell _work_forever to stop
                    $worker->{stop_cv}->end;
                }
            );
        };

        $self->{_BUS}->unsubscribe(
            topic        => $topic,
            on_unsuback  => sub {
                my ($success, $prop) = @_;

                log_error "Could not unsubscribe from topic '$topic' $at" unless $success; 

                my $postponed = $worker->{postponed} ||= [];
                push @$postponed, $postpone;

                AnyEvent::postpone { $self->__drain_task_queue };
            }
        );
    }
}


sub __work_forever {
    my $self = shift;

    # Called by WorkerPool->spawn_worker

    eval {

        my $worker = $self->{_WORKER};

        $worker->{stop_cv} = AnyEvent->condvar;

        # Blocks here until stop_working is called
        $worker->{stop_cv}->recv;

        $self->on_shutdown;

        $self->__report_exit;
    };

    if ($@) {
        log_fatal "Worker died: $@";
        CORE::exit(255);
    }

    if ($self->{_BUS}->{is_connected}) {
        $self->{_BUS}->disconnect;
    }
}


sub stop_working {
    my ($self, %args) = @_;

    my $worker = $self->{_WORKER};

    # This is the default handler for TERM signal

    return if $worker->{shutting_down};
    $worker->{shutting_down} = 1;

    unless (defined $worker->{stop_cv}) {
        # Worker did not completed initialization yet
        CORE::exit(0);
    }

    my %services;
    foreach my $fq_meth (keys %{$worker->{callbacks}}) {
        next unless $fq_meth =~ m/^req\.(?!_sync)(.*)\./;
        $services{$1} = 1;
    }

    if (keys %services) {

        # Cannot exit right now, as some requests could be in flight or already queued.
        # So tell the broker to stop sending requests, and exit after the task queue is empty
        foreach my $service (keys %services) {

            $self->stop_accepting_calls( $service . '.*' );
        }
    }
    else {
        # Tell _work_forever to stop
        $worker->{stop_cv}->send;
    }
}


sub __report_status {
    my $self = shift;

    my $worker = $self->{_WORKER};
    my $client = $self->{_CLIENT};

    my $now = Time::HiRes::time;
    my $period = $now - ($worker->{last_report} || ($now - 1));

    $worker->{last_report} = $now;

    # Average calls per second
    my $cps = sprintf("%.2f", $worker->{call_count} / $period);
    $worker->{call_count} = 0;

    # Average notifications per second
    my $nps = sprintf("%.2f", $worker->{notif_count} / $period);
    $worker->{notif_count} = 0;

    # Average errors per second
    my $err = sprintf("%.2f", $worker->{error_count} / $period);
    $worker->{error_count} = 0;

    # Average load as percentage of wall clock busy time (not cpu usage)
    my $load = sprintf("%.2f", ($BUSY_TIME - $worker->{busy_time}) / $period * 100);
    $worker->{busy_time} = $BUSY_TIME;

    # Queues
    my %queues;
    foreach my $queue (keys %{$worker->{callbacks}}) {
        next unless $queue =~ m/^req\.(?!_sync)(.*)\./;
        $queues{$1} = 1;
    }

    local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
    local $client->{caller_id};

    # Tell any supervisor our stats
    $self->fire_remote(
        method => '_bkpr.supervisor.worker_status',
        params => {
            class => ref($self),
            host  => $worker->{hostname},
            pool  => $worker->{pool_id},
            pid   => $$,
            cps   => $cps,
            nps   => $nps,
            err   => $err,
            load  => $load,
            queue => [ keys %queues ],
        },
    );
}

sub __report_exit {
    my $self = shift;

    return unless $self->{_BUS}->{is_connected};

    my $worker = $self->{_WORKER};
    my $client = $self->{_CLIENT};

    local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
    local $client->{caller_id};

    $self->fire_remote(
        method => '_bkpr.supervisor.worker_exit',
        params => {
            class => ref($self),
            host  => $worker->{hostname},
            pool  => $worker->{pool_id},
            pid   => $$,
        },
    );
}

1;

__END__

=pod

=encoding utf8

=head1 NAME



( run in 3.030 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )