IPC-QWorker

 view release on metacpan or  search on metacpan

lib/IPC/QWorker.pm  view on Meta::CPAN

				$self->{'_io_select'}->add( $worker->{'pipe'} );
    }
}

sub push_queue {
    my $self = shift;

    push( @{ $self->{'_queue'} }, @_ );
}

sub _get_ready_workers {
		my $self = shift();
		my $timeout = shift();
		my @can_read_pipes;
		my $i;
		my $wpid;

		# if we have no ready workers find some
		@can_read_pipes = $self->{'_io_select'}->can_read($timeout);
		if ($IPC::QWorker::DEBUG) {
			print STDERR "found " . scalar(@can_read_pipes) . " ready workers!\n";
		}
		foreach $i (@can_read_pipes) {
			# get pid from a msg like "12345 READY\n"
			($wpid) = split(' ', readline($i));
			$self->{'_pids'}->{$wpid}->{'ready'} = 1;
			push(@{$self->{'_ready_workers'}}, $self->{'_pids'}->{$wpid});
		}
}

sub process_queue {
    my $self = shift;
		my $timeout = shift;
    my $qentry;
    my $worker;

		if(defined($timeout)) {
			# if timeout is set wait for timeout till a worker is ready
			$self->_get_ready_workers($timeout);
			while($worker = shift(@{$self->{'_ready_workers'}})) {
				$worker->send_entry(shift(@{ $self->{'_queue'}}));
			}
		} else {
			# loop over the Q till its empty
			# will block while waiting for ready workers
			# returns when the queue is empty
			while($qentry = shift(@{ $self->{'_queue'}})) {
		    while(!scalar(@{$self->{'_ready_workers'}})) {
					if ($IPC::QWorker::DEBUG) {
						print STDERR "no ready workers. wait for workers...\n";
					}
					$self->_get_ready_workers();
				}

				$worker = shift(@{$self->{'_ready_workers'}});
				$worker->send_entry($qentry);
			}
		}
}

sub _get_busy_workers {
		my $self = shift();
		my @result;
		my $worker;

		foreach $worker (@{$self->{'_workers'}}) {
			if(!$worker->{'ready'}) {
				push(@result, $worker);
			}
		}
		return(@result);
}

# will block till all workers are finished
sub flush_queue {
		my $self = shift();
		my @busy_workers;
		my $select = IO::Select->new();

		while(scalar(@busy_workers = $self->_get_busy_workers())) {
			if ($IPC::QWorker::DEBUG) {
				print STDERR "still " . scalar(@busy_workers) . " busy workers...\n";
			}
			$self->_get_ready_workers();
		}
}

sub stop_workers {
    my $self = shift;
    my $worker;

    # may be we could also use signals here
    foreach $worker ( @{ $self->{'_workers'} } ) {
        $worker->exit_child();
    }
}

1;

# vim:ts=2:expandtab:syntax=perl:

__END__

=pod

=encoding UTF-8

=head1 NAME

IPC::QWorker - processing a queue in parallel

=head1 VERSION

version 0.08

=head1 SYNOPSIS

  my $qworker = IPC::QWorker->new();
  
  $qworker->create_workers(10,
    'dump' => sub {
      my $ctx = shift();
      print $$.": ".Dumper(@_)."\n";
      $ctx->{'count'}++;
    },
    '_init' => sub {
      my $ctx = shift();	
      $ctx->{'count'} = 0 ;
    },
    '_destroy' => sub {
      my $ctx = shift();
      print $$.": did ".$ctx->{'count'}." operations!\n";
    }
  );
          
  foreach $i (1..120) {
    $qworker->push_queue(
      IPC::QWorker::WorkUnit->new(
        'cmd' => 'dump',
        'params' => $i,
      )
    );



( run in 1.955 second using v1.01-cache-2.11-cpan-39bf76dae61 )