Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Worker.pm  view on Meta::CPAN

sub stop_working {
    my ($self, %args) = @_;

    my $worker = $self->{_WORKER};

    # This is the default handler for TERM signal

    return if $worker->{shutting_down};
    $worker->{shutting_down} = 1;

    unless (defined $worker->{stop_cv}) {
        # Worker did not completed initialization yet
        CORE::exit(0);
    }

    my %services;
    foreach my $fq_meth (keys %{$worker->{callbacks}}) {
        next unless $fq_meth =~ m/^req\.(?!_sync)(.*)\./;
        $services{$1} = 1;
    }

    if (keys %services) {

        # Cannot exit right now, as some requests could be in flight or already queued.
        # So tell the broker to stop sending requests, and exit after the task queue is empty
        foreach my $service (keys %services) {

            $self->stop_accepting_calls( $service . '.*' );
        }
    }
    else {
        # Tell _work_forever to stop
        $worker->{stop_cv}->send;
    }
}


sub __report_status {
    my $self = shift;

    my $worker = $self->{_WORKER};
    my $client = $self->{_CLIENT};

    my $now = Time::HiRes::time;
    my $period = $now - ($worker->{last_report} || ($now - 1));

    $worker->{last_report} = $now;

    # Average calls per second
    my $cps = sprintf("%.2f", $worker->{call_count} / $period);
    $worker->{call_count} = 0;

    # Average notifications per second
    my $nps = sprintf("%.2f", $worker->{notif_count} / $period);
    $worker->{notif_count} = 0;

    # Average errors per second
    my $err = sprintf("%.2f", $worker->{error_count} / $period);
    $worker->{error_count} = 0;

    # Average load as percentage of wall clock busy time (not cpu usage)
    my $load = sprintf("%.2f", ($BUSY_TIME - $worker->{busy_time}) / $period * 100);
    $worker->{busy_time} = $BUSY_TIME;

    # Queues
    my %queues;
    foreach my $queue (keys %{$worker->{callbacks}}) {
        next unless $queue =~ m/^req\.(?!_sync)(.*)\./;
        $queues{$1} = 1;
    }

    local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
    local $client->{caller_id};

    # Tell any supervisor our stats
    $self->fire_remote(
        method => '_bkpr.supervisor.worker_status',
        params => {
            class => ref($self),
            host  => $worker->{hostname},
            pool  => $worker->{pool_id},
            pid   => $$,
            cps   => $cps,
            nps   => $nps,
            err   => $err,
            load  => $load,
            queue => [ keys %queues ],
        },
    );
}

sub __report_exit {
    my $self = shift;

    return unless $self->{_BUS}->{is_connected};

    my $worker = $self->{_WORKER};
    my $client = $self->{_CLIENT};

    local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
    local $client->{caller_id};

    $self->fire_remote(
        method => '_bkpr.supervisor.worker_exit',
        params => {
            class => ref($self),
            host  => $worker->{hostname},
            pool  => $worker->{pool_id},
            pid   => $$,
        },
    );
}

1;

__END__

=pod

=encoding utf8



( run in 3.069 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )