Parallel-MPM-Prefork

 view release on metacpan or  search on metacpan

lib/Parallel/MPM/Prefork.pm  view on Meta::CPAN


our $VERSION = '0.14';

our (@EXPORT_OK, @EXPORT_TAGS) = ();
our @EXPORT =
  qw(
      pf_init
      pf_done
      pf_whip_kids
      pf_kid_new
      pf_kid_busy
      pf_kid_yell
      pf_kid_idle
      pf_kid_exit
  );

our $error;

my $pgid;
my $done;
my $debug;

lib/Parallel/MPM/Prefork.pm  view on Meta::CPAN

my $child_data_fd;

my $child_fds;

my $child_data_hook;
my $child_sigh;

my $dhook_in_main;
my $dhook_pid;

my $num_busy;
my $num_idle;
my %busy;
my %idle;

my $sigset_bak = POSIX::SigSet->new();
my $sigset_all = POSIX::SigSet->new();
$sigset_all->fillset();

#
# Public interface
#

sub pf_init {
  my %opts = @_;

  eval {
    setpgrp();
    $pgid = getpgrp();

    $timeout = $am_parent = 1;
    $dhook_pid = $done = $num_busy = $num_idle = 0;
    $child_fds = $child_stat_fd = $child_data_fd = $error = '';

    undef %busy;
    undef %idle;

    $debug = $opts{debug};

    # Just like Apache, we allow start_servers to be larger than
    # max_spare_servers to accommodate for high initial load.
    $max_servers = int($opts{max_servers} // MAX_SERVERS);
    $max_spare_servers = int($opts{max_spare_servers} // MAX_SPARE);
    $min_spare_servers = int($opts{min_spare_servers} // MIN_SPARE);
    $start_servers = int($opts{start_servers} // START_SERVERS);

lib/Parallel/MPM/Prefork.pm  view on Meta::CPAN

      _kill_idlers($plus);
    }

    _log_child_status() if $debug;
    _read_child_drool();
  }

  return -1;
}

sub pf_kid_busy {
  syswrite $parent_stat_fh, "R$$\n" if ! $am_parent;
}

sub pf_kid_idle {
  syswrite $parent_stat_fh, "S$$\n" if ! $am_parent;
}

sub pf_kid_yell($;$$) {
  my ($data, $thaw, $exitcode) = @_;

lib/Parallel/MPM/Prefork.pm  view on Meta::CPAN

  while (1) {
    select my $rfds = $child_fds, undef, undef, undef;
    _read_child_data();
  }
}

sub _spawn {
  my $code = shift;
  my $args = shift;

  if ($num_idle + $num_busy >= $max_servers) {
    warn "Server seems busy, consider increasing max_servers.\n";
    _log_child_status();
    return -1;
  }

  # Temporarily block signal delivery until child has installed all handlers
  # and knows for sure it's not the parent.
  sigprocmask(SIG_BLOCK, $sigset_all, $sigset_bak);

  my $cpid = fork();

lib/Parallel/MPM/Prefork.pm  view on Meta::CPAN

}

sub _wait_for_children {
  my $ct;
  while ((my $pid = waitpid -$pgid, WNOHANG) > 0) {
    if ($pid == $dhook_pid) {
      warn "ERROR: data_hook_helper exited, forking new one.\n";
      $dhook_pid = _fork_data_hook_helper($parent_data_fh, $child_data_fh);
    }
    else {
      delete $busy{$pid} and $num_busy--;
      delete $idle{$pid} and $num_idle--;
      warn "PID $pid exited.\n" if $debug;
    }
    $ct++;
  }
  $ct;
}

sub _read_child_drool {
  my $status_changed;

lib/Parallel/MPM/Prefork.pm  view on Meta::CPAN

    $timeout = 1;
  } until _wait_for_children() || $status_changed;
}

# An in-memory scoreboard would surely be nicer ...
sub _read_child_status {
  sigprocmask(SIG_BLOCK, $sigset_all, $sigset_bak);
  while (<$child_stat_fh>) {
    my ($status, $pid) = unpack 'aA*';
    # Ignore delayed status messages from no longer existing children
    next unless $busy{$pid} or $idle{$pid};
    if ($status eq 'R') {
      delete $idle{$pid} and $num_idle--;
      $busy{$pid}++ or $num_busy++;
    }
    elsif ($status eq 'S') {
      delete $busy{$pid} and $num_busy--;
      $idle{$pid}++ or $num_idle++;
    }
    elsif ($status ne '0') { # 0 = Jeffries tube. cg use only!
      warn "ERROR: Dubious status: $_";
    }
  }
  sigprocmask(SIG_SETMASK, $sigset_bak);
}

sub _read_child_data {

lib/Parallel/MPM/Prefork.pm  view on Meta::CPAN

  kill 'TERM', my @idlers = (keys %idle)[0 .. --$plus];
  delete @idle{@idlers};
}

sub _log_child_action {
  my ($what, $count, @more) = @_;
  warn "$what $count child", $count == 1 ? ".\n" : "ren.\n", @more;
}

sub _log_child_status {
  warn "busy:$num_busy idle:$num_idle\n";
}

1;

__END__

=pod

=encoding utf8

lib/Parallel/MPM/Prefork.pm  view on Meta::CPAN

  }

  END {
    pf_done();
  }

  # A simple echo server.
  sub echo_server {
    my $sock = shift;
    CONN: while (accept my $conn, $sock) {
      pf_kid_busy(); # tell parent we're busy
      /^quit/ ? last CONN : syswrite $conn, $_ while <$conn>;
      pf_kid_yell({ foo => 'bar' }, 1);  # send data to parent
      pf_kid_idle(); # tell parent we're idle again
    }
  }

  sub mksock {
    socket my $SOCK, AF_INET, SOCK_STREAM, 0;
    setsockopt($SOCK, SOL_SOCKET, SO_REUSEADDR, 1);
    bind $SOCK, pack_sockaddr_in(20116, inet_aton('127.0.0.1'));

lib/Parallel/MPM/Prefork.pm  view on Meta::CPAN

returns undef if a fork() failed or 0 if pf_done() has already been called;

Typical code:

  $SIG{TERM} = $SIG{INT} = sub { pf_done(0) };
  1 while pf_whip_kids(\&echo_server, [$SOCK]);

=head3 $code

Code reference to be called in the child processes. Must make sure it calls
pf_kid_busy() and pf_kid_idle() as needed. If it returns, the child will exit
via C<exit(0)>.

=head3 $args (optional)

Array reference holding arguments to be passed when $code is called (C<<
$code->(@$args) >>).

=head2 pf_kid_new ()

Forks a new child process if too few are idle (E<lt>
min_spare_servers). Blocks otherwise and kills child processes if too many are
idle (E<gt> max_spare_servers).

If a new child process was forked, returns the child pid to the parent, 0 to
the child, undef if fork() failed.

As a special case it always returns -1 immediately if pf_done() has already
been called.

The newly created child is considered idle by the parent. It should call
pf_kid_busy() as soon as it starts working and pf_kid_idle() when it is
available again so that the parent can arrange for enough available child
processes.

Typical code:

  $SIG{TERM} = $SIG{INT} = sub { pf_done(0) };
  while (1) {
    my $pid = pf_kid_new() // die "Could not fork: $!";
    last if $pid < 0;  # pf_done()
    next if $pid;  # parent
    # child:
    pf_kid_busy();
    # do some rocket science
    pf_kid_idle();
    pf_kid_exit();
  }

  END {
    pf_done();
  }

=head2 pf_kid_busy ()

To be called by a child process to tell the main process it is busy.

=head2 pf_kid_idle ()

To be called by a child process to tell the main process it is idle.

=head2 pf_kid_exit ( $exitcode, $data, $thaw )

Calls C<pf_kid_yell($data, $thaw)> and then exits from the child via
C<exit($exitcode)>. C<$exitcode> will be tuncated to an 8-bit unsigned
integer, defaults to 0 if omitted. C<$data> and C<$thaw> are optional (see

lib/Parallel/MPM/Prefork.pm  view on Meta::CPAN

waitpid($pid, ...) in the main process.

system(LIST) can be replaced by system('setsid', LIST);

However, Parallel::MPM::Prefork will still catch SIGCHLD (see previous note).

=head2 Difference to Parallel::ForkManager

With Parallel::ForkManager, the main process decides in advance how much work
there is to do, how to split it up and how many child processes will work in
parallel. A child is always considered busy.

With Parallel::MPM::Prefork, the child processes take on work automatically as
it arrives. A child may be busy or idle. The main process only makes sure
there are always enough child processes available without too many idling
around.

Keep in mind that these are completely different use cases.

=head1 SEE ALSO

=head2 Net::Server::Prefork

Similar to Parallel::MPM::Prefork but limited to serving network



( run in 0.509 second using v1.01-cache-2.11-cpan-87723dcf8b7 )