Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Worker.pm  view on Meta::CPAN

    log_notice
    log_info
    log_debug
    log_trace
    log_level
);

our %EXPORT_TAGS = ('log' => [ @EXPORT_OK, @EXPORT ]);

our $Logger = sub { warn(@_) }; # redefined later by __init_logger
our $LogLevel = LOG_INFO;

sub log_fatal    (@) { $LogLevel >= LOG_FATAL  && $Logger->( LOG_FATAL,  @_ ) }
sub log_alert    (@) { $LogLevel >= LOG_ALERT  && $Logger->( LOG_ALERT,  @_ ) }
sub log_critical (@) { $LogLevel >= LOG_CRIT   && $Logger->( LOG_CRIT,   @_ ) }
sub log_error    (@) { $LogLevel >= LOG_ERROR  && $Logger->( LOG_ERROR,  @_ ) }
sub log_warn     (@) { $LogLevel >= LOG_WARN   && $Logger->( LOG_WARN,   @_ ) }
sub log_warning  (@) { $LogLevel >= LOG_WARN   && $Logger->( LOG_WARN,   @_ ) }
sub log_notice   (@) { $LogLevel >= LOG_NOTICE && $Logger->( LOG_NOTICE, @_ ) }
sub log_info     (@) { $LogLevel >= LOG_INFO   && $Logger->( LOG_INFO,   @_ ) }
sub log_debug    (@) { $LogLevel >= LOG_DEBUG  && $Logger->( LOG_DEBUG,  @_ ) }
sub log_trace    (@) { $LogLevel >= LOG_TRACE  && $Logger->( LOG_TRACE,  @_ ) }
sub log_level   (;$) { $LogLevel =  shift      if scalar @_; return $LogLevel }

our $BUSY_SINCE; *BUSY_SINCE = \$Beekeeper::MQTT::BUSY_SINCE;
our $BUSY_TIME;  *BUSY_TIME  = \$Beekeeper::MQTT::BUSY_TIME;

our $REPORT_STATUS_PERIOD = 5;
our $UNSUBSCRIBE_LINGER   = 2;

my %AUTH_TOKENS;
my $DEFLATE;
my $JSON;


sub new {
    my ($class, %args) = @_;

    # Parameters passed by WorkerPool->spawn_worker
    
    my $self = {
        _WORKER => undef,
        _CLIENT => undef,
        _BUS    => undef,
        _LOGGER => undef,
    };

    bless $self, $class;

    $self->{_WORKER} = {
        parent_pid      => $args{'parent_pid'},
        foreground      => $args{'foreground'},   # --foreground option
        debug           => $args{'debug'},        # --debug option
        bus_config      => $args{'bus_config'},   # content of bus.config.json
        pool_config     => $args{'pool_config'},  # content of pool.config.json
        pool_id         => $args{'pool_id'},
        bus_id          => $args{'bus_id'},
        config          => $args{'config'},
        hostname        => Sys::Hostname::hostname(),
        stop_cv         => undef,
        callbacks       => {},
        task_queue_high => [],
        task_queue_low  => [],
        queued_tasks    => 0,
        in_progress     => 0,
        last_report     => 0,
        call_count      => 0,
        notif_count     => 0,
        error_count     => 0,
        busy_time       => 0,
    };

    $JSON = JSON::XS->new;
    $JSON->utf8;             # encode result as utf8
    $JSON->allow_blessed;    # encode blessed references as null
    $JSON->convert_blessed;  # use TO_JSON methods to serialize objects

    $DEFLATE = Compress::Raw::Zlib::Deflate->new( -AppendOutput => 1 );

    if (defined $SIG{TERM} && $SIG{TERM} eq 'DEFAULT') {
        # Stop working gracefully when TERM signal is received
        $SIG{TERM} = sub { $self->stop_working };
    }

    if (defined $SIG{INT} && $SIG{INT} eq 'DEFAULT' && $args{'foreground'}) {
        # In foreground mode also stop working gracefully when INT signal is received
        $SIG{INT} = sub { $self->stop_working };
    }

    eval {

        # Init logger as soon as possible
        $self->__init_logger;

        # Connect to broker
        $self->__init_client;

        # Pass broker connection to logger
        $self->{_LOGGER}->{_BUS} = $self->{_BUS} if (exists $self->{_LOGGER}->{_BUS});

        $self->__init_auth_tokens;

        $self->__init_worker;
    };

    if ($@) {
        log_fatal "Worker died while initialization: $@";
        log_fatal "$class could not be started";
        CORE::exit( COMPILE_ERROR_EXIT_CODE );
    }

    return $self;
}

sub __init_auth_tokens {
    my ($self) = @_;

    # Using a hashing function makes harder to access the wrong worker pool by mistake,
    # but it is not an effective access restriction: anyone with access to the backend
    # bus credentials can easily inspect and clone auth data tokens

lib/Beekeeper/Worker.pm  view on Meta::CPAN

    my $self = shift;

    my $bus_id = $self->{_WORKER}->{bus_id};
    my $config = $self->{_WORKER}->{bus_config}->{$bus_id};

    my $client = Beekeeper::Client->new(
        %$config,
        timeout  => 0,  # retry forever
        on_error => sub { 
            my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
            log_fatal "Connection to $bus_id failed: $errmsg";
            $self->stop_working;
        },
    );

    $self->{_CLIENT} = $client->{_CLIENT};
    $self->{_BUS}    = $client->{_BUS};

    $Beekeeper::Client::singleton = $self;
}

sub __init_worker {
    my $self = shift;

    $self->on_startup;

    $self->__report_status;

    AnyEvent->now_update;

    $self->{_WORKER}->{report_status_timer} = AnyEvent->timer(
        after    => rand( $REPORT_STATUS_PERIOD ), 
        interval => $REPORT_STATUS_PERIOD,
        cb       => sub { $self->__report_status },
    );
}


sub on_startup {
    # Placeholder, intended to be overrided
    my $class = ref $_[0];
    log_fatal "Worker class $class doesn't define on_startup() method";
}

sub on_shutdown {
    # Placeholder, can be overrided
}

sub authorize_request {
    # Placeholder, must to be overrided
    my $class = ref $_[0];
    log_fatal "Worker class $class doesn't define authorize_request() method";
    return undef; # do not authorize
}


sub accept_notifications {
    my ($self, %args) = @_;

    my $worker    = $self->{_WORKER};
    my $callbacks = $worker->{callbacks};

    my ($file, $line) = (caller)[1,2];
    my $at = "at $file line $line\n";

    foreach my $fq_meth (keys %args) {

        $fq_meth =~ m/^  ( [\w-]+ (?: \.[\w-]+ )* ) 
                      \. ( [\w-]+ | \* ) $/x or die "Invalid notification method '$fq_meth' $at";

        my ($service, $method) = ($1, $2);

        my $callback = $self->__get_cb_coderef($fq_meth, $args{$fq_meth});

        die "Already accepting notifications '$fq_meth' $at" if exists $callbacks->{"msg.$fq_meth"};
        $callbacks->{"msg.$fq_meth"} = $callback;

        my $local_bus = $self->{_BUS}->{bus_role};

        my $topic = "msg/$local_bus/$service/$method";
        $topic =~ tr|.*|/#|;

        $self->{_BUS}->subscribe(
            topic      => $topic,
            on_publish => sub {
                # ($payload_ref, $properties) = @_;

                # Enqueue notification
                push @{$worker->{task_queue_high}}, [ @_ ];

                unless ($worker->{queued_tasks}) {
                    $worker->{queued_tasks} = 1;
                    AnyEvent::postpone { $self->__drain_task_queue };
                }
            },
            on_suback => sub {
                my ($success, $prop) = @_;
                die "Could not subscribe to topic '$topic' $at" unless $success;
            }
        );
    }
}

sub __get_cb_coderef {
    my ($self, $method, $callback) = @_;

    if (ref $callback eq 'CODE') {
        # Already a coderef
        return $callback;
    }
    elsif (!ref($callback) && $self->can($callback)) {
        # Return a reference to given method
        no strict 'refs';
        my $class = ref $self;
        return \&{"${class}::${callback}"};
    }
    else {
        my ($file, $line) = (caller(1))[1,2];
        my $at = "at $file line $line\n";
        die "Invalid handler '$callback' for '$method' $at";
    }
}


sub accept_remote_calls {
    my ($self, %args) = @_;

    my $worker = $self->{_WORKER};
    my $callbacks = $worker->{callbacks};
    my %subscribed_to;

    my ($file, $line) = (caller)[1,2];
    my $at = "at $file line $line\n";

    foreach my $fq_meth (keys %args) {

        $fq_meth =~ m/^  ( [\w-]+ (?: \.[\w-]+ )* ) 
                      \. ( [\w-]+ | \* ) $/x or die "Invalid remote call method '$fq_meth' $at";

        my ($service, $method) = ($1, $2);

        my $callback = $self->__get_cb_coderef($fq_meth, $args{$fq_meth});

        die "Already accepting remote calls '$fq_meth' $at" if exists $callbacks->{"req.$fq_meth"};
        $callbacks->{"req.$fq_meth"} = $callback;

        next if $subscribed_to{$service};
        $subscribed_to{$service} = 1;

        if (keys %subscribed_to == 2) {
            log_warn "Running multiple services within a single worker hurts load balancing $at";
        }

        my $local_bus = $self->{_BUS}->{bus_role};

        my $topic = "\$share/BKPR/req/$local_bus/$service";
        $topic =~ tr|.*|/#|;

        $self->{_BUS}->subscribe(
            topic       => $topic,
            maximum_qos => 1,
            on_publish  => sub {
                # ($payload_ref, $mqtt_properties) = @_;

                # Enqueue request
                push @{$worker->{task_queue_low}}, [ @_ ];

                unless ($worker->{queued_tasks}) {
                    $worker->{queued_tasks} = 1;
                    AnyEvent::postpone { $self->__drain_task_queue };
                }
            },
            on_suback => sub {
                my ($success, $prop) = @_;
                die "Could not subscribe to topic '$topic' $at" unless $success;
            }
        );
    }
}

my $_TASK_QUEUE_DEPTH = 0;

sub __drain_task_queue {
    my $self = shift;

    # Ensure that draining does not recurse
    Carp::confess "Unexpected task queue processing recursion" if $_TASK_QUEUE_DEPTH;
    $_TASK_QUEUE_DEPTH++;

    my $timing_tasks;

    unless (defined $BUSY_SINCE) {
        # Measure time elapsed while processing requests
        $BUSY_SINCE = Time::HiRes::time;
        $timing_tasks = 1; 
    }

    my $worker = $self->{_WORKER};
    my $client = $self->{_CLIENT};
    my $task;

    # When requests or notifications are received these are not executed immediately
    # because that could happen in the middle of the process of another request,
    # so these tasks get queued until the worker is ready to process the next one.
    #
    # Callbacks are executed here, exception handling is done here, responses are
    # sent back here. This is one of the most important methods of the framework.
    #
    # Notifications have higher priority and are processed first.

    DRAIN: {

        while ($task = shift @{$worker->{task_queue_high}}) {

            ## Notification

            my ($payload_ref, $mqtt_properties) = @$task;

            $worker->{notif_count}++;

            eval {

                my $request = decode_json($$payload_ref);

                unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
                    log_error "Received invalid JSON-RPC 2.0 notification";
                    return;
                }

                bless $request, 'Beekeeper::JSONRPC::Notification';
                $request->{_mqtt_properties} = $mqtt_properties;

                my $method = $request->{method};

                unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
                    log_error "Received notification with invalid method '$method'";
                    return;
                }

                my $cb = $worker->{callbacks}->{"msg.$1.$2"} || 
                         $worker->{callbacks}->{"msg.$1.*"};

                local $client->{caller_id}   = $mqtt_properties->{'clid'};
                local $client->{caller_addr} = $mqtt_properties->{'addr'};
                local $client->{auth_data}   = $mqtt_properties->{'auth'};

                unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
                    log_error "Notification '$method' was not authorized";
                    return;
                }

                unless ($cb) {
                    log_error "No handler found for received notification '$method'";
                    return;
                }

                $cb->($self, $request->{params}, $request);
            };

            if ($@) {
                # Got an exception while processing message
                log_error $@;
                $worker->{error_count}++;
            }
        }

        if ($task = shift @{$worker->{task_queue_low}}) {

            ## RPC Call

            my ($payload_ref, $mqtt_properties) = @$task;

            $worker->{call_count}++;
            my ($request, $request_id, $result, $response);

            $result = eval {

                $request = decode_json($$payload_ref);

                unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
                    log_error "Received invalid JSON-RPC 2.0 request";
                    die Beekeeper::JSONRPC::Error->invalid_request;
                }

                $request_id = $request->{id};
                my $method  = $request->{method};

                bless $request, 'Beekeeper::JSONRPC::Request';
                $request->{_mqtt_properties} = $mqtt_properties;

                unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
                    log_error "Received request with invalid method '$method'";
                    die Beekeeper::JSONRPC::Error->method_not_found;
                }

                my $cb = $worker->{callbacks}->{"req.$1.$2"} || 
                         $worker->{callbacks}->{"req.$1.*"};

                local $client->{caller_id}   = $mqtt_properties->{'clid'};
                local $client->{caller_addr} = $mqtt_properties->{'addr'};
                local $client->{auth_data}   = $mqtt_properties->{'auth'};

                unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
                    log_error "Request '$method' was not authorized";
                    die Beekeeper::JSONRPC::Error->request_not_authorized;
                }

                unless ($cb) {
                    log_error "No handler found for received request '$method'";
                    die Beekeeper::JSONRPC::Error->method_not_found;
                }

                # Execute method handler
                $cb->($self, $request->{params}, $request);
            };

            if ($@) {
                # Got an exception while executing method handler
                if (blessed($@) && $@->isa('Beekeeper::JSONRPC::Error')) {
                    # Handled exception
                    $response = $@;
                    $worker->{error_count}++;
                }
                else {
                    # Unhandled exception
                    log_error $@;
                    $worker->{error_count}++;
                    $response = Beekeeper::JSONRPC::Error->server_error;
                    # Sending exact error to caller is very handy, but it is also a security risk
                    $response->{error}->{data} = $@ if $worker->{debug};
                    $worker->{error_count}++;
                }
            }
            elsif (blessed($result) && $result->isa('Beekeeper::JSONRPC::Error')) {
                # Explicit error response
                $response = $result;
                $worker->{error_count}++;
            }
            elsif ($request->{_async_response}) {
                # Response was deferred and will be sent later
                $worker->{in_progress}++;
                $request->{_worker} = $self;
            }
            else {
                # Build a success response
                $response = {
                    jsonrpc => '2.0',
                    result  => $result,
                };
            }

            if (defined $request_id && defined $response) {

                # Send back response to caller

                $response->{id} = $request_id;

lib/Beekeeper/Worker.pm  view on Meta::CPAN

    else {
        # Build a success response
        $response = {
            jsonrpc => '2.0',
            result  => $result,
        };
    }

    $response->{id} = $request->{id};

    local $@;
    my $json = eval { $JSON->encode( $response ) };

    if ($@) {
        # Probably response contains blessed references 
        log_error "Couldn't serialize response as JSON: $@";
        $response = Beekeeper::JSONRPC::Error->server_error;
        $response->{id} = $request->{id};
        $json = $JSON->encode( $response );
        $self->{_WORKER}->{error_count}++;
    }

    if ($request->{_deflate_response} && length($json) > $request->{_deflate_response}) {
        my $compressed_json;
        $DEFLATE->deflate(\$json, $compressed_json);
        $DEFLATE->flush(\$compressed_json);
        $DEFLATE->deflateReset;
        $json = $compressed_json;
    }

    $self->{_BUS}->publish(
        topic    => $request->{_mqtt_properties}->{'response_topic'},
        addr     => $request->{_mqtt_properties}->{'addr'},
        payload  => \$json,
    );

    if (defined $timing_tasks) {
        $BUSY_TIME += Time::HiRes::time - $BUSY_SINCE;
        undef $BUSY_SINCE;
    }
}


sub stop_accepting_notifications {
    my ($self, @methods) = @_;

    my ($file, $line) = (caller)[1,2];
    my $at = "at $file line $line\n";

    die "No method specified $at" unless @methods;

    foreach my $fq_meth (@methods) {

        $fq_meth =~ m/^  ( [\w-]+ (?: \.[\w-]+ )* ) 
                      \. ( [\w-]+ | \* ) $/x or die "Invalid method '$fq_meth' $at";

        my ($service, $method) = ($1, $2);

        my $worker = $self->{_WORKER};

        unless (defined $worker->{callbacks}->{"msg.$fq_meth"}) {
            log_warn "Not previously accepting notifications '$fq_meth' $at";
            next;
        }

        my $local_bus = $self->{_BUS}->{bus_role};

        my $topic = "msg/$local_bus/$service/$method";
        $topic =~ tr|.*|/#|;

        # Cannot remove callbacks right now, as new notifications could be in flight or be 
        # already queued. We must wait for unsubscription completion, and then until the 
        # notification queue is empty to ensure that all received ones were processed. And 
        # even then wait a bit more, as some brokers may send messages *after* unsubscription.
        my $postpone = sub {

           my $unsub_tmr; $unsub_tmr = AnyEvent->timer( 
                after => $UNSUBSCRIBE_LINGER, cb => sub {

                    delete $worker->{callbacks}->{"msg.$fq_meth"};
                    undef $unsub_tmr;
                }
            );
        };

        $self->{_BUS}->unsubscribe(
            topic       => $topic,
            on_unsuback => sub {
                my ($success, $prop) = @_;

                log_error "Could not unsubscribe from topic '$topic' $at" unless $success; 

                my $postponed = $worker->{postponed} ||= [];
                push @$postponed, $postpone;

                AnyEvent::postpone { $self->__drain_task_queue };
            }
        );
    }
}


sub stop_accepting_calls {
    my ($self, @methods) = @_;

    my ($file, $line) = (caller)[1,2];
    my $at = "at $file line $line\n";

    die "No method specified $at" unless @methods;

    foreach my $fq_meth (@methods) {

        $fq_meth =~ m/^  ( [\w-]+ (?: \.[\w-]+ )* ) 
                      \. ( [\w-]+ | \* ) $/x or die "Invalid remote call method '$fq_meth' $at";

        my ($service, $method) = ($1, $2);

        unless ($method eq '*') {
            # Known limitation. As all calls for an entire service class are received
            # through a single MQTT subscription (in order to load balance them), it is 
            # not possible to reject a single method. A workaround is to use a different
            # class for each method that need to be individually rejected.
            die "Cannot stop accepting individual methods, only '$service.*' is allowed $at";
        }

        my $worker    = $self->{_WORKER};
        my $callbacks = $worker->{callbacks};

        my @cb_keys = grep { $_ =~ m/^req.\Q$service\E\b/ } keys %$callbacks;

        unless (@cb_keys) {
            log_warn "Not previously accepting remote calls '$fq_meth' $at";
            next;
        }

        my $local_bus = $self->{_BUS}->{bus_role};

        my $topic = "\$share/BKPR/req/$local_bus/$service";
        $topic =~ tr|.*|/#|;

        # Cannot remove callbacks right now, as new requests could be in flight or be already 
        # queued. We must wait for unsubscription completion, and then until the task queue 
        # is empty to ensure that all received requests were processed. And even then wait a
        # bit more, as some brokers may send requests *after* unsubscription.
        my $postpone = sub {

            $worker->{stop_cv}->begin;

            my $unsub_tmr; $unsub_tmr = AnyEvent->timer( 
                after => $UNSUBSCRIBE_LINGER, cb => sub {

                    delete $worker->{callbacks}->{$_} foreach @cb_keys;
                    delete $worker->{subscriptions}->{$service};
                    undef $unsub_tmr;

                    return unless $worker->{shutting_down};

                    if ($worker->{in_progress} > 0) {

                        # The task queue is empty now, but an asynchronous method handler is
                        # still busy processing some requests received previously. Wait for
                        # these requests to be completed before telling _work_forever to stop

                        my $wait_time = 60;
                        $worker->{stop_cv}->begin;

                        my $busy_tmr; $busy_tmr = AnyEvent->timer( after => 1, interval => 1, cb => sub {
                            unless ($worker->{in_progress} > 0 && --$wait_time > 0) {
                                undef $busy_tmr;
                                $worker->{stop_cv}->end;
                            }
                        });
                    }

                    # Tell _work_forever to stop
                    $worker->{stop_cv}->end;
                }
            );
        };

        $self->{_BUS}->unsubscribe(
            topic        => $topic,
            on_unsuback  => sub {
                my ($success, $prop) = @_;

                log_error "Could not unsubscribe from topic '$topic' $at" unless $success; 

                my $postponed = $worker->{postponed} ||= [];
                push @$postponed, $postpone;

                AnyEvent::postpone { $self->__drain_task_queue };
            }
        );
    }
}


sub __work_forever {
    my $self = shift;

    # Called by WorkerPool->spawn_worker

    eval {

        my $worker = $self->{_WORKER};

        $worker->{stop_cv} = AnyEvent->condvar;

        # Blocks here until stop_working is called
        $worker->{stop_cv}->recv;

        $self->on_shutdown;

        $self->__report_exit;
    };

    if ($@) {
        log_fatal "Worker died: $@";
        CORE::exit(255);
    }

    if ($self->{_BUS}->{is_connected}) {
        $self->{_BUS}->disconnect;
    }
}


sub stop_working {
    my ($self, %args) = @_;

    my $worker = $self->{_WORKER};

    # This is the default handler for TERM signal

    return if $worker->{shutting_down};
    $worker->{shutting_down} = 1;

    unless (defined $worker->{stop_cv}) {
        # Worker did not completed initialization yet
        CORE::exit(0);
    }

    my %services;
    foreach my $fq_meth (keys %{$worker->{callbacks}}) {
        next unless $fq_meth =~ m/^req\.(?!_sync)(.*)\./;
        $services{$1} = 1;
    }

    if (keys %services) {

        # Cannot exit right now, as some requests could be in flight or already queued.
        # So tell the broker to stop sending requests, and exit after the task queue is empty
        foreach my $service (keys %services) {

            $self->stop_accepting_calls( $service . '.*' );
        }
    }
    else {
        # Tell _work_forever to stop
        $worker->{stop_cv}->send;
    }
}


sub __report_status {
    my $self = shift;

    my $worker = $self->{_WORKER};
    my $client = $self->{_CLIENT};

    my $now = Time::HiRes::time;
    my $period = $now - ($worker->{last_report} || ($now - 1));

    $worker->{last_report} = $now;

    # Average calls per second
    my $cps = sprintf("%.2f", $worker->{call_count} / $period);
    $worker->{call_count} = 0;

    # Average notifications per second
    my $nps = sprintf("%.2f", $worker->{notif_count} / $period);
    $worker->{notif_count} = 0;

    # Average errors per second
    my $err = sprintf("%.2f", $worker->{error_count} / $period);
    $worker->{error_count} = 0;

    # Average load as percentage of wall clock busy time (not cpu usage)
    my $load = sprintf("%.2f", ($BUSY_TIME - $worker->{busy_time}) / $period * 100);
    $worker->{busy_time} = $BUSY_TIME;

    # Queues
    my %queues;
    foreach my $queue (keys %{$worker->{callbacks}}) {
        next unless $queue =~ m/^req\.(?!_sync)(.*)\./;
        $queues{$1} = 1;
    }

    local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
    local $client->{caller_id};

    # Tell any supervisor our stats
    $self->fire_remote(
        method => '_bkpr.supervisor.worker_status',
        params => {
            class => ref($self),
            host  => $worker->{hostname},
            pool  => $worker->{pool_id},
            pid   => $$,
            cps   => $cps,
            nps   => $nps,
            err   => $err,
            load  => $load,
            queue => [ keys %queues ],
        },
    );
}

sub __report_exit {
    my $self = shift;

    return unless $self->{_BUS}->{is_connected};

    my $worker = $self->{_WORKER};
    my $client = $self->{_CLIENT};

    local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
    local $client->{caller_id};

    $self->fire_remote(
        method => '_bkpr.supervisor.worker_exit',
        params => {
            class => ref($self),
            host  => $worker->{hostname},
            pool  => $worker->{pool_id},
            pid   => $$,
        },
    );
}

1;

__END__

=pod

=encoding utf8

=head1 NAME

Beekeeper::Worker - Base class for creating services

=head1 VERSION



( run in 0.896 second using v1.01-cache-2.11-cpan-39bf76dae61 )