IPC-QWorker
view release on metacpan or search on metacpan
lib/IPC/QWorker.pm view on Meta::CPAN
$self->{'_io_select'}->add( $worker->{'pipe'} );
}
}
sub push_queue {
my $self = shift;
push( @{ $self->{'_queue'} }, @_ );
}
sub _get_ready_workers {
my $self = shift();
my $timeout = shift();
my @can_read_pipes;
my $i;
my $wpid;
# if we have no ready workers find some
@can_read_pipes = $self->{'_io_select'}->can_read($timeout);
if ($IPC::QWorker::DEBUG) {
print STDERR "found " . scalar(@can_read_pipes) . " ready workers!\n";
}
foreach $i (@can_read_pipes) {
# get pid from a msg like "12345 READY\n"
($wpid) = split(' ', readline($i));
$self->{'_pids'}->{$wpid}->{'ready'} = 1;
push(@{$self->{'_ready_workers'}}, $self->{'_pids'}->{$wpid});
}
}
sub process_queue {
my $self = shift;
my $timeout = shift;
my $qentry;
my $worker;
if(defined($timeout)) {
# if timeout is set wait for timeout till a worker is ready
$self->_get_ready_workers($timeout);
while($worker = shift(@{$self->{'_ready_workers'}})) {
$worker->send_entry(shift(@{ $self->{'_queue'}}));
}
} else {
# loop over the Q till its empty
# will block while waiting for ready workers
# returns when the queue is empty
while($qentry = shift(@{ $self->{'_queue'}})) {
while(!scalar(@{$self->{'_ready_workers'}})) {
if ($IPC::QWorker::DEBUG) {
print STDERR "no ready workers. wait for workers...\n";
}
$self->_get_ready_workers();
}
$worker = shift(@{$self->{'_ready_workers'}});
$worker->send_entry($qentry);
}
}
}
sub _get_busy_workers {
my $self = shift();
my @result;
my $worker;
foreach $worker (@{$self->{'_workers'}}) {
if(!$worker->{'ready'}) {
push(@result, $worker);
}
}
return(@result);
}
# will block till all workers are finished
sub flush_queue {
my $self = shift();
my @busy_workers;
my $select = IO::Select->new();
while(scalar(@busy_workers = $self->_get_busy_workers())) {
if ($IPC::QWorker::DEBUG) {
print STDERR "still " . scalar(@busy_workers) . " busy workers...\n";
}
$self->_get_ready_workers();
}
}
sub stop_workers {
my $self = shift;
my $worker;
# may be we could also use signals here
foreach $worker ( @{ $self->{'_workers'} } ) {
$worker->exit_child();
}
}
1;
# vim:ts=2:expandtab:syntax=perl:
__END__
=pod
=encoding UTF-8
=head1 NAME
IPC::QWorker - processing a queue in parallel
=head1 VERSION
version 0.08
=head1 SYNOPSIS
my $qworker = IPC::QWorker->new();
$qworker->create_workers(10,
'dump' => sub {
my $ctx = shift();
print $$.": ".Dumper(@_)."\n";
$ctx->{'count'}++;
},
'_init' => sub {
my $ctx = shift();
$ctx->{'count'} = 0 ;
},
'_destroy' => sub {
my $ctx = shift();
print $$.": did ".$ctx->{'count'}." operations!\n";
}
);
foreach $i (1..120) {
$qworker->push_queue(
IPC::QWorker::WorkUnit->new(
'cmd' => 'dump',
'params' => $i,
)
);
( run in 1.955 second using v1.01-cache-2.11-cpan-39bf76dae61 )