Cantella-Worker

 view release on metacpan or  search on metacpan

lib/Cantella/Worker/Manager/Prefork.pm  view on Meta::CPAN

  is => 'rw',
  isa => 'Bool',
  required => 1,
  default => sub { 0 },
);

has worker_priority => (
  is => 'rw',
  isa => 'Int',
  required => 1,
  default => sub { 0 },
);

has pid_to_wheel_map => (
  is => 'ro',
  isa => 'HashRef',
  required => 1,
  default => sub { {} }
);

has wid_to_wheel_map => (
  is => 'ro',
  isa => 'HashRef',
  required => 1,
  default => sub { {} }
);

has worker_stdout_log_level => (
  is => 'rw',
  isa => 'Str',
  required => 1,
  default => sub { 'info' },
);

has worker_stderr_log_level => (
  is => 'rw',
  isa => 'Str',
  required => 1,
  default => sub { 'info' },
);


sub BUILD {
  my ($self, $args) = @_;
  POE::Session->create(
    object_states => [
      $self, {
        _start   => '_start',
        _pause   => '_pause',
        _resume  => '_resume',
        shutdown => '_shutdown',
        spawn_workers => '_spawn_workers',
        retire_worker => '_retire_worker',
        worker_process_error => '_worker_process_error',
        worker_process_close => '_worker_process_close',
        worker_process_stdout => '_worker_process_stdout',
        worker_process_stderr => '_worker_process_stderr',
        worker_process_sig_chld => '_worker_process_sig_chld',
      } ],
    inline_states => {
      _keep_alive => sub {
        $_[KERNEL]->delay(_keep_alive => 1000);
      },
      sig_term => sub {
        my ($kernel) = $_[KERNEL];
        $kernel->yield('shutdown');
        $kernel->sig_handled;
      },
      sig_int => sub {
        my ($kernel) = $_[KERNEL];
        $kernel->yield('shutdown');
        $kernel->sig_handled;
      },
      sig_usr1 => sub {
        my ($kernel) = $_[KERNEL];
        $kernel->yield('_pause');
        $kernel->sig_handled;
      },
      sig_usr2 => sub {
        my ($kernel) = $_[KERNEL];
        $kernel->yield('_resume');
        $kernel->sig_handled;
      },
    }
  );
}

sub current_worker_count{
  my $self = shift;
  scalar keys %{ $self->pid_to_wheel_map };
}

sub worker_wheels {
  my $self = shift;
  return values %{ $self->pid_to_wheel_map }
}

sub signal_workers {
  my ($self, $signal) = @_;
  for my $worker ( $self->worker_wheels ){
    unless ( $worker->kill($signal) ){
      $self->logger->warning("Failed to signal process ".$worker->PID);
    }
  }
}

sub start {
  my $self = shift;
  if( $self->has_program_name ){
    my $name = join('-', $self->program_name, 'pm');
    $0 = $name;
  }
  $poe_kernel->run;
}

sub pause {
  my ($self) = @_;
  $poe_kernel->call($self->alias, '_pause');
}

sub resume {
  my ($self) = @_;

lib/Cantella/Worker/Manager/Prefork.pm  view on Meta::CPAN

  if( exists $self->wid_to_wheel_map->{$wheel_id} ){
    $self->logger->debug("retiring wheel ${wheel_id} due to old age");
    $self->wid_to_wheel_map->{$wheel_id}->kill('TERM');
  }
}

# Wheel event, including the wheel's ID.
sub _worker_process_close {
  my ($self,$wheel_id) = @_[OBJECT, ARG0];
  $self->logger->debug("close for wheel ".$wheel_id);
  if (defined(my $child = delete $self->wid_to_wheel_map->{$wheel_id})) {
    delete $self->pid_to_wheel_map->{$child->PID};
  }
  return;
}

sub _worker_process_sig_chld {
  my ($self, $kernel, $heap, $pid, $status_code) = @_[OBJECT, KERNEL, HEAP, ARG1, ARG2];
  $self->logger->debug("SIGCHLD for pid ${pid} with exit code: ${status_code}");
  if (defined(my $child = delete $self->pid_to_wheel_map->{$pid})) {
    delete $self->wid_to_wheel_map->{$child->ID};
  }
  if( exists($heap->{spawn_workers}) && $heap->{spawn_workers} ){
    $kernel->yield('spawn_workers');
  }
  return;
}

sub _worker_process_error {
  my ($self, $operation, $errnum, $errstr, $wid) = @_[OBJECT, ARG0, ARG1, ARG2, ARG3];
  $errstr = "remote end closed" if $operation eq "read" and !$errnum;
  $self->logger->error("wheel $wid generated $operation error $errnum: '$errstr'");
}

sub _worker_process_stderr {
  my($self, $line, $wid) = @_[OBJECT, ARG0, ARG1];
  my $pid = $self->wid_to_wheel_map->{$wid}->PID;
  my $message =  "worker $pid STDERR: '${line}'";
  $self->logger->log(level => $self->worker_stderr_log_level, message => $message);
}

sub _worker_process_stdout {
  my($self, $line, $wid) = @_[OBJECT, ARG0, ARG1];
  my $pid = $self->wid_to_wheel_map->{$wid}->PID;
  my $message =  "worker $pid STDOUT: '${line}'";
  $self->logger->log(level => $self->worker_stdout_log_level, message => $message);
}

#--------#---------#---------#---------#---------#---------#---------#--------#

sub _start {
  my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
  $self->logger->info("starting worker manager");
  $kernel->alias_set($self->alias);
  $kernel->sig(INT => 'sig_int');
  $kernel->sig(TERM => 'sig_term');
  $kernel->sig(USR1 => 'sig_usr1');
  $kernel->sig(USR2 => 'sig_usr2');
  $heap->{spawn_workers} = 1;

  $kernel->delay(_keep_alive => 1000);
  $kernel->yield('spawn_workers');
}

sub _pause {
  my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
  $self->logger->info("pausing worker manager");
  $heap->{spawn_workers} = 0;
  $self->signal_workers('USR1');
}

sub _resume {
  my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
  $self->logger->info("resuming worker manager");
  $kernel->yield('spawn_workers');
  $heap->{spawn_workers} = 1;
  $self->signal_workers('USR2');
}

sub _shutdown {
  my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
  $self->logger->info("shutting down worker manager");
  $self->workers(0);
  $self->signal_workers('TERM');
  $kernel->alarm_remove_all();
  #cleaup heap, alias, alarms (no lingering refs n ish)
  %$heap = ();
  $kernel->alias_remove($self->alias);
}

#--------#---------#---------#---------#---------#---------#---------#--------#

__PACKAGE__->meta->make_immutable;

1;

__END__;

=head1 NAME

Cantella::Worker::Manager::Prefork - Preforking POE worker-pool manager

=head1 SYNOPSIS

    my $manager = Cantella::Worker::Manager::Prefork->new(
      logger => $logger,
      workers => 5,
      worker_class => 'TestWorkerClass',
      max_worker_age => 3600,
      worker_args => {
        interval => 5,
        logger => $logger,
      }
    );

    $manager->start;

=head1 MANAGER-WORKER COMMUNICATION

=head2 Signals

lib/Cantella/Worker/Manager/Prefork.pm  view on Meta::CPAN

shut down gracefully to avoid unsupervised processes.

=head1 ATTRIBUTES

=head2 logger

=over 4

=item B<logger> - reader

=back

Read-only L<Log::Dispatch> instance. Defaults to a device that logs
to Null. This attribute can coerce from a hash or array reference, see
L<MooseX::Types::Log::Dispatch> for details.

=head2 program_name

=over 4

=item B<program_name> - reader

=item B<has_program_name> - predicate

=back

Read-only non-blank string. If C<program_name> set, the manager process will
be renamed to "${program_name}-pm" and the children to "${program_name}".
Please see C<$0> in L<perlvar> for more information.

=head2 worker_class

=over 4

=item B<worker_class> - reader

=back

Required read-only class name of the worker class, which must consume the role
L<Cantella::Worker::Manager::Prefork>.

=head2 worker_args

=over 4

=item B<worker_args> - reader

=back

A read-only hash ref of arguments to be passed to instantiate the worker class.
Defaults to an empty hash ref.

=head2 workers

=over 4

=item B<workers> - accessor

=back

Read-write integer. The maximum number of subprocesses to have alive at any one
time. Defaults to 5.

=head2 max_worker_age

=over 4

=item B<max_worker_age> - accessor

=item B<has_max_worker_age> - predicate

=item B<clear_max_worker_age> - clearer

=back

Optional read-write integer. if this value is set, the manager will retire
workers when they reach a certain age and spawn a new worker. You may have seen
this behavior in mailscanner. It is useful for times when children may, in
rare cases, require large amounts of memory and you want to periodically
replace them to free it again.

=head2 alias

=over 4

=item B<alias> - accessor

=back

Read-only string. This is the alias of the session managing the workers and will
default to a UUID string by default.

=head2 close_on_call

=over 4

=item B<close_on_call> - accessor

=back

Read-write boolean value option for declaring whether file descriptors should
be closed in the forked process. Defaults to true.

=head2 worker_detaches

=over 4

=item B<worker_detches> - accessor

=back

Read-write boolean value option for declaring whether worker processes should
C<setsid()> to detach themselves from the manager's session. Defaults to true.

=head2 worker_sets_process_group

=over 4

=item B<worker_sets_process_group> - accessor

=back

lib/Cantella/Worker/Manager/Prefork.pm  view on Meta::CPAN

=item B<pid_to_wheel_map> - accessor

=back

A read-only HashRef used to map process IDs to L<POE::Wheel::Run> objects

=head2 wid_to_wheel_map

=over 4

=item B<wid_to_wheel_map> - accessor

=back

A read-only HashRef used to map Wheel IDs to L<POE::Wheel::Run> objects

=head2 worker_stderr_log_level

=over 4

=item B<worker_stderr_log_level> - reader

=back

Log level to use when logging stderr output that comes from the child processes.

=head2 worker_stdout_log_level

=over 4

=item B<worker_stdout_log_level> - reader

=back

Log level to use when logging stdout output that comes from the child processes.

=head1 METHODS

=head2 BUILD

=over 4

=item B<arguments:> C<\%args>

=item B<return value:> none

=back

Sets up the manager session and its event handlers.

=head2 current_worker_count

=over 4

=item B<arguments:> none

=item B<return value:> C<$number_of_live_children>

=back

Returns the number of children workers that are still alive.

=head2 worker_wheels

=over 4

=item B<arguments:> none

=item <return value:> C<@worker_poe_wheel_run_objects>

=back

Returns all of the wheel objects for currently living children workers

=head2 signal_workers

=over 4

=item B<arguments:> C<$signal>

=item <return value:>  none

=back

Sends a signal to all worker processes.

=head2 start

=over 4

=item B<arguments:> none

=item B<return value:> none

=back

Start the manager and begin spawning workers.

=head2 pause

=over 4

=item B<arguments:> C<$until>

=item B<return value:> none

=back

Pause the manager and stop it from spawning any new workers.

=head2 resume

=over 4

=item B<arguments:> C<$when>

=item B<return value:> none

=back

Resume the manager and resume spawning workers.

lib/Cantella/Worker/Manager/Prefork.pm  view on Meta::CPAN


=over 4

=item B<arguments:> none

=item B<return value:> none

=back

Stop the manager from spawning any new workers and end the session after all
workers have finished working.

=head1 EVENT HANDLERS

The following methods are L<POE> event handlers. They are not menat to be called
directly and will not work if you do. When applicable, arguments will be passed
into the methods in ARG0, ARG1, etc.

=head2 _spawn_workers

=over 4

=item B<handles event:> C<spawn_workers>

=item B<arguments:> none

=item B<return value:> none

=back

Spawn workers if C<current_worker_count> is less than C<workers>. This is also
the place where an alarm to retire a worker will be set if workers have a
maximum age.

The following events are set up to communicate with children processes:

=over 4

=item C<worker_process_stdout> - StdoutEvent

=item C<worker_process_stderr> - StderrEvent

=item C<worker_process_close> - CloseEvent

=item C<worker_process_sig_chld> - SIGCHLD

=back

=head2 _retire_worker

=over 4

=item B<handles event:> C<retire_worker>

=item B<arguments:> C<$wheel_id>

=item B<return value:> none

=back

If the worker process working under $wheel_id is still alive, send a sig TERM

=head2 _worker_process_close

=over 4

=item B<handles event:> C<worker_process_close>

=item B<arguments:> none

=item B<return value:> none

=back

See L<POE::Wheel::Run>

=head2 _worker_process_sig_chld

=over 4

=item B<handles event:> C<worker_process_sig_chld>

=item B<arguments:> C<$pid, $exit_status_code>

=item B<return value:> none

=back

Once a sig CHLD comes back from a worker, this event will schedule a
C<spawn_workers> event to make sure there's enough workers alive.

=head2 _worker_process_stdout

=over 4

=item B<handles event:> C<worker_process_stdout>

=item B<arguments:> C<$line>

=item B<return value:> none

=back

This event will be triggered for every line that the child process outputs
to STDOUT. By default, these will be logged to the log-level selcted with
C<worker_stdout_log_level>. See L<POE::Wheel::Run> for more.

=head2 _worker_process_stderr

=over 4

=item B<handles event:> C<worker_process_stderr>

=item B<arguments:> C<$line>

=item B<return value:> none

=back

This event will be triggered for every line that the child process outputs
to STDOUT. By default, these will be logged to the log-level selcted with
C<worker_stdout_log_level>. See L<POE::Wheel::Run> for more.

=head2 _start

=over 4

=item B<handles event:> C<_start>

=item B<arguments:> none

=item B<return value:> none

=back

Start the manager session.

=head2 _pause

=over 4

=item B<handles event:> C<_pause>

=item B<arguments:> none

=item B<return value:> none

=back

Signal workers to stop polling until notified and suspend the spawning of
new workers. The pause event can be triggered by sending signal C<USR1>

=head2 _resume

=over 4

=item B<handles event:> C<_resume>

=item B<arguments:> none

=item B<return value:> none

=back

Signal workers to resume polling and resume the spawning of new workers.
The resume event can be triggered sending signal C<USR2>

=head2 _shutdown

=over 4

=item B<handles event:> C<shutdown>

=item B<arguments:> none

=item B<return value:> none

=back

Remove all alarms, signal the workers to shutdown and wait for the session to die
The shutdown event can be triggered sending signal C<INT> or C<TERM>

=head1 OTHER EVENTS

=over 4

=item B<_keep_alive> - Used to keep the session alive while paused. Does nothing
other than schedule the next keep alive_1000 seconds away.

=item B<sig_int> - mark sig INT as handled and yield to C<shutdown>

=item B<sig_term> - mark sig TERM as handled and yield to C<shutdown>

=item B<sig_usr1> - mark sig USR1 as handled and yield to C<_pause>

=item B<sig_usr2> - mark sig USR2 as handled and yield to C<_resume>

=back

=head1 SEE ALSO

L<Cantella::Worker::Role::Worker>, L<Cantella::Worker::Role::Beanstalk>

=head1 AUTHOR

Guillermo Roditi (groditi) E<lt>groditi@cpan.orgE<gt>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2009-2010 by Guillermo Roditi.
This library is free software, you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut



( run in 1.584 second using v1.01-cache-2.11-cpan-df04353d9ac )