Parallel-Fork-BossWorkerAsync

 view release on metacpan or  search on metacpan

lib/Parallel/Fork/BossWorkerAsync.pm  view on Meta::CPAN

        $self->{workers}->{ $w1 } = { pid => $pid, socket => $w1 };

        # Make nonblocking
        $self->blocking( $w1, 0 );
        
      } else {
        # Worker (child)
        close($self->{app_socket});
        delete($self->{workers});
        close($w1);
        $self->{socket} = $w2;
        open(STDIN, '/dev/null');
      
        $self->worker_loop();
        exit;
      }
    }

    $self->log("start_workers: start workers complete\n");
  };
  if ($@) {
    croak($@);
  }
}

# -----------------------------------------------------------------
# Boss process; have an open socket to the app, and one to each worker.
# Loop select(), checking for read and write on app socket, and read
# on working children, and write on idle children.
# Keep track of idle vs. working children.
# When receive a shutdown order from the app, keep looping until the
# job queue is empty, and all results have been retrieved (all
# children will now be idle.)  Then close the worker sockets.
# They'll be reading, and will notice this and exit.
# Don't deserialize any data.  Just look for the delimiters to know
# we're processing whole records.
#

sub boss_loop {
  my ($self)=@_;

  $self->log("boss_loop: start\n");
  eval {
    # handy
    my $workers = $self->{workers};
    
    # All workers start out idle
    for my $s (keys(%$workers)) {
      $workers->{ $s }->{idle} = 1;
    }
    
    while ( 1 ) {
      # When to exit loop?
      #   shutting_down = 1
      #   job_queue empty
      #   all workers idle, and no partial jobs
      #   result_queue empty
      if ($self->{shutting_down}  &&
          ! @{ $self->{job_queue} }  &&
          ! @{ $self->{result_queue} } ) {
        my $busy=0;
        my $partials = 0;
        for my $s (keys(%$workers)) {
          if ( ! $workers->{ $s }->{idle}) {
            $busy ++;
            last;
          } elsif (exists($workers->{ $s }->{partial_job})) {
            $partials ++;
            last;
          }
        }
        if ( ! $busy  &&  ! $partials) {
          # Close all workers
          for my $s (keys(%$workers)) {
            close($workers->{ $s }->{socket});
          }
          close($self->{app_socket});
          last;
        }
      }
      
      # Set up selectors:
      # Always check app for read, unless shutting down.  App write only if
      # there's something in @result_queue.
      my (@rpids, @wpids);
      my $rs = IO::Select->new();
      if ( ! $self->{shutting_down}) {
        $rs->add($self->{app_socket});
        push(@rpids, "app");
      }
      my $ws = IO::Select->new();
      if ( @{ $self->{result_queue} } ) {
        $ws->add($self->{app_socket});
        push(@wpids, "app");
      }
      
      # Check workers for read only if not idle
      # Otherwise, IF job_queue isn't empty,
      # check nonidle workers for write.
      for my $s (keys(%$workers)) {
        if ( $workers->{ $s }->{idle}) {
          if ( @{ $self->{job_queue} }  ||  exists($workers->{ $s }->{partial_job})) {
            $ws->add($workers->{ $s }->{socket});
            push(@wpids, $workers->{ $s }->{pid});
          }
        } else {
          $rs->add($workers->{ $s }->{socket});
          push(@rpids, $workers->{ $s }->{pid});
        }
      }
      
      # Blocking
      my @rdy = IO::Select->select($rs, $ws, undef);
      if ( ! @rdy) {
        if ($! == EINTR) {
          # signal interrupt, continue waiting
          next;
        }
        croak("select failed: $!");
      }
      my ($r, $w) = @rdy[0,1];
      
      # Now we have zero or more reabable sockets, and
      # zero or more writable sockets, but there's at
      # least one socket among the two groups.
      # Read first, as things read can be further handled
      # by writables immediately afterwards.
      
      for my $rh (@$r) {
        my ($source, $queue, $rstream);
        if ($rh != $self->{app_socket}) {
          $source = $workers->{$rh}->{pid};



( run in 1.380 second using v1.01-cache-2.11-cpan-39bf76dae61 )