Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/Worker.pm  view on Meta::CPAN

        stop_cv         => undef,
        callbacks       => {},
        task_queue_high => [],
        task_queue_low  => [],
        queued_tasks    => 0,
        in_progress     => 0,
        last_report     => 0,
        call_count      => 0,
        notif_count     => 0,
        error_count     => 0,
        busy_time       => 0,
    };

    $JSON = JSON::XS->new;
    $JSON->utf8;             # encode result as utf8
    $JSON->allow_blessed;    # encode blessed references as null
    $JSON->convert_blessed;  # use TO_JSON methods to serialize objects

    $DEFLATE = Compress::Raw::Zlib::Deflate->new( -AppendOutput => 1 );

    if (defined $SIG{TERM} && $SIG{TERM} eq 'DEFAULT') {
        # Stop working gracefully when TERM signal is received
        $SIG{TERM} = sub { $self->stop_working };
    }

    if (defined $SIG{INT} && $SIG{INT} eq 'DEFAULT' && $args{'foreground'}) {
        # In foreground mode also stop working gracefully when INT signal is received
        $SIG{INT} = sub { $self->stop_working };
    }

    eval {

        # Init logger as soon as possible
        $self->__init_logger;

        # Connect to broker
        $self->__init_client;

        # Pass broker connection to logger
        $self->{_LOGGER}->{_BUS} = $self->{_BUS} if (exists $self->{_LOGGER}->{_BUS});

        $self->__init_auth_tokens;

        $self->__init_worker;
    };

    if ($@) {
        log_fatal "Worker died while initialization: $@";
        log_fatal "$class could not be started";
        CORE::exit( COMPILE_ERROR_EXIT_CODE );
    }

    return $self;
}

sub __init_auth_tokens {
    my ($self) = @_;

    # Using a hashing function makes harder to access the wrong worker pool by mistake,
    # but it is not an effective access restriction: anyone with access to the backend
    # bus credentials can easily inspect and clone auth data tokens

    my $salt = $self->{_CLIENT}->{auth_salt};

    $AUTH_TOKENS{'BKPR_SYSTEM'} = md5_base64('BKPR_SYSTEM'. $salt);
    $AUTH_TOKENS{'BKPR_ADMIN'}  = md5_base64('BKPR_ADMIN' . $salt);
    $AUTH_TOKENS{'BKPR_ROUTER'} = md5_base64('BKPR_ROUTER'. $salt);
}

sub __has_authorization_token {
    my ($self, $auth_level) = @_;

    my $auth_data = $self->{_CLIENT}->{auth_data};

    return 0 unless $auth_data && $auth_level;
    return 0 unless exists $AUTH_TOKENS{$auth_level};
    return 0 unless $AUTH_TOKENS{$auth_level} eq $auth_data;

    return 1;
}

sub __init_logger {
    my $self = shift;

    # Honor --debug command line option and 'debug' config option from pool.config.json
    $LogLevel = LOG_DEBUG if $self->{_WORKER}->{debug} || $self->{_WORKER}->{config}->{debug};

    my $log_handler  = $self->log_handler;
    $self->{_LOGGER} = $log_handler;

    $Logger = sub {
        # ($level, @messages) = @_
        $log_handler->log(@_);
    };

    $SIG{__WARN__} = sub { $Logger->( LOG_WARN,  @_ ) };
}

sub log_handler {
    my $self = shift;

    Beekeeper::Logger->new(
        worker_class => ref $self,
        foreground   => $self->{_WORKER}->{foreground},
        log_file     => $self->{_WORKER}->{config}->{log_file},
        host         => $self->{_WORKER}->{hostname},
        pool         => $self->{_WORKER}->{pool_id},
        _BUS         => $self->{_BUS},
        @_
    );
}

sub __init_client {
    my $self = shift;

    my $bus_id = $self->{_WORKER}->{bus_id};
    my $config = $self->{_WORKER}->{bus_config}->{$bus_id};

    my $client = Beekeeper::Client->new(
        %$config,
        timeout  => 0,  # retry forever



( run in 1.140 second using v1.01-cache-2.11-cpan-39bf76dae61 )