Parallel-Fork-BossWorkerAsync
view release on metacpan or search on metacpan
lib/Parallel/Fork/BossWorkerAsync.pm view on Meta::CPAN
$self->{workers}->{ $w1 } = { pid => $pid, socket => $w1 };
# Make nonblocking
$self->blocking( $w1, 0 );
} else {
# Worker (child)
close($self->{app_socket});
delete($self->{workers});
close($w1);
$self->{socket} = $w2;
open(STDIN, '/dev/null');
$self->worker_loop();
exit;
}
}
$self->log("start_workers: start workers complete\n");
};
if ($@) {
croak($@);
}
}
# -----------------------------------------------------------------
# Boss process; have an open socket to the app, and one to each worker.
# Loop select(), checking for read and write on app socket, and read
# on working children, and write on idle children.
# Keep track of idle vs. working children.
# When receive a shutdown order from the app, keep looping until the
# job queue is empty, and all results have been retrieved (all
# children will now be idle.) Then close the worker sockets.
# They'll be reading, and will notice this and exit.
# Don't deserialize any data. Just look for the delimiters to know
# we're processing whole records.
#
sub boss_loop {
my ($self)=@_;
$self->log("boss_loop: start\n");
eval {
# handy
my $workers = $self->{workers};
# All workers start out idle
for my $s (keys(%$workers)) {
$workers->{ $s }->{idle} = 1;
}
while ( 1 ) {
# When to exit loop?
# shutting_down = 1
# job_queue empty
# all workers idle, and no partial jobs
# result_queue empty
if ($self->{shutting_down} &&
! @{ $self->{job_queue} } &&
! @{ $self->{result_queue} } ) {
my $busy=0;
my $partials = 0;
for my $s (keys(%$workers)) {
if ( ! $workers->{ $s }->{idle}) {
$busy ++;
last;
} elsif (exists($workers->{ $s }->{partial_job})) {
$partials ++;
last;
}
}
if ( ! $busy && ! $partials) {
# Close all workers
for my $s (keys(%$workers)) {
close($workers->{ $s }->{socket});
}
close($self->{app_socket});
last;
}
}
# Set up selectors:
# Always check app for read, unless shutting down. App write only if
# there's something in @result_queue.
my (@rpids, @wpids);
my $rs = IO::Select->new();
if ( ! $self->{shutting_down}) {
$rs->add($self->{app_socket});
push(@rpids, "app");
}
my $ws = IO::Select->new();
if ( @{ $self->{result_queue} } ) {
$ws->add($self->{app_socket});
push(@wpids, "app");
}
# Check workers for read only if not idle
# Otherwise, IF job_queue isn't empty,
# check nonidle workers for write.
for my $s (keys(%$workers)) {
if ( $workers->{ $s }->{idle}) {
if ( @{ $self->{job_queue} } || exists($workers->{ $s }->{partial_job})) {
$ws->add($workers->{ $s }->{socket});
push(@wpids, $workers->{ $s }->{pid});
}
} else {
$rs->add($workers->{ $s }->{socket});
push(@rpids, $workers->{ $s }->{pid});
}
}
# Blocking
my @rdy = IO::Select->select($rs, $ws, undef);
if ( ! @rdy) {
if ($! == EINTR) {
# signal interrupt, continue waiting
next;
}
croak("select failed: $!");
}
my ($r, $w) = @rdy[0,1];
# Now we have zero or more reabable sockets, and
# zero or more writable sockets, but there's at
# least one socket among the two groups.
# Read first, as things read can be further handled
# by writables immediately afterwards.
for my $rh (@$r) {
my ($source, $queue, $rstream);
if ($rh != $self->{app_socket}) {
$source = $workers->{$rh}->{pid};
( run in 1.380 second using v1.01-cache-2.11-cpan-39bf76dae61 )