Thread-Pool
view release on metacpan or search on metacpan
use a "stream" routine to fill that monitored queue.
Changed the functionality of the "pre" routine to not save any
of the returned values. It makes much more sense for the "pre"
routine to set either lexical or global variables that would be
automatically accessible to the "do" and "post" routine by virtue
of the fact that they have their own memory space to play with.
Adapted the test-suite accordingly.
Added support for "monitor" keyword in "new" method. Allows you to
specify a streaming mode in which the stream is handled by a single
monitoring thread rather than by each thread themselves. Courtesy
of the new Thread::Queue::Any::Monitored module.
Fixed problem with warning being issued when starting the object.
Caused by the originating thread id value not being set for the
originating thread itself.
0.12 16 July 2002
Added method "waitfor" as a shortcut for using "job" and "result".
Added method "abort" to have all the worker threads stop after they're
finished with their current job. Adapted new, _random and _stream
for this new feature.
0.11 15 July 2002
Bumped up version to 0.11 to be higher than Malcolm Beattie's original
Thread::Pool module on CPAN. No other changes were made.
0.04 15 July 2002
Hopefully fixed problem in streaming caused by some stupid thread
inertia problem. Problem was caused by the dispatcher assigning the
jobid rather than the "job" method. Things could get out of sync
before the dispatcher was reached.
Added "join" method to allow intermediate cleanup of removed worker
threads. Fixed up "shutdown" to call "join".
Internally changed dispatcher system: there are now different
dispatchers for streaming and random access mode.
Method "done" now only counts jobs by removed threads. Job counts
are now kept in local thread space and only made shared when the
worker thread is removed.
Gone back to not detaching threads. Detached threads cannot be
waited for, which is a major PITA.
0.03 14 July 2002
Made sure that once the pool is shut down, calling method "shutdown"
again will not do anything. At least not until method "add" is
called. This should fix the situation where the pool is specifically
shut down, and again shut down when the object is destroyed.
Added method "notused" which returns the number of threads that
were removed before they got a chance to do anything. Can be used
to give you a heuristic of how many threads you actually need for a
specific application.
Added functionality for streaming results. This allows you to
specify a "stream" subroutine that allows you to handle the result
of the asynchronously executed "do" subroutines in the order in
which the jobs were submitted (rather than in the order in which the
results were obtained).
Added "remove_me" functionality, which allows a "do" routine to tell
its dispatcher that the worker thread should be removed.
Added Perl version requirement to README and pod, per suggestion of
mrbbking. Added -use threads- to Makefile.PL to cause breakage of
Check all the signalling going on and see whether they still make sense with
what I now know about cond_signal and cond_wait.
Think of other way to do streaming, possibly without having to lock the
result hash, as that seems to hold up stuff severely when there are more
than X threads at the same time trying to give their results.
Investigate whether Data::Dumper could serve as an alternative for Storable.
Add still more examples.
Add support for Thread::Needs?
Find out why Thread::Pool sometimes just dies in long running processes.
lib/Thread/Pool.pm view on Meta::CPAN
# If we're supposed to monitor
# Die now if also attempting to stream
if (exists $self->{'monitor'}) {
die "Cannot stream and monitor at the same time"
if exists $self->{ 'stream'};
# Make sure we have a real coderef for the pre and the monitoring routine
# Create a monitored belt
# Set the streaming routine that sends to the monitor
$self->_makecoderef( caller().'::',qw(pre monitor post checkpoint) );
$self->{'monitor_belt'} = Thread::Conveyor::Monitored->new(
{
optimize => $self->{'optimize'},
pre => $self->{'pre'},
monitor => $self->{'monitor'},
post => $self->{'post'},
exit => $self->{'exit'},
checkpoint => $self->{'checkpoint'},
lib/Thread/Pool.pm view on Meta::CPAN
# Obtain the object
# Die now if the pool was shut down
# Obtain local copy of the job belt
my $self = shift;
die "Cannot submit jobs on a pool that has been shut down"
if $self->{'shutdown'};
my $belt = $self->{'belt'};
# If we're streaming
# Die now if an individual jobid requested
# Put in box with a jobid obtained on the fly
if ($self->{'stream'}) {
die "Cannot return individual results when streaming"
if defined( wantarray );
$belt->put( $self->_jobid, \@_ );
# Elseif we want a jobid
# Obtain a jobid
# Put in box with that jobid
# And return with that jobid now
} elsif (defined( wantarray )) {
my $jobid = $self->_jobid;
$belt->put( $jobid, \@_ );
return $jobid;
# Else (not streaming and not interested in the result)
# Put in box without a jobid
} else {
$belt->put( \@_ )
}
} #job
#---------------------------------------------------------------------------
# IN: 1 instantiated object
# 2..N parameters to be passed for this job
lib/Thread/Pool.pm view on Meta::CPAN
# While there are boxes on the belt (without taking them off)
# Outloop if there is a real job in the box
# Remove the finishing job (amount of workers to remove was too high)
my $belt = $self->{'belt'};
while (my ($jobid) = $belt->peek_dontwait) {
last if $jobid;
$belt->take;
}
# If we were streaming
# Obtain local copy of stuff we need
if (my $stream = $self->{'stream'}) {
my ($streamid,$jobid,$result) = @$self{qw(streamid jobid result)};
# Set the extra parameters to be passed to streamer if monitoring
# Make sure we're the only one handling results
# Obtain last ID to loop through
my @extra = exists $self->{'monitor_belt'} ? ($self) : ();
lock( $result );
my $last = $self->_first_todo_jobid;
# For all the results that still need to be streamd
# Die if there is no result (_should_ be there by now)
# Call the "stream" routine with this result
# Delete the result from the hash
# Set the stream ID for any further streaming later
for (my $i = $$streamid; $i < $last; $i++) {
die "Cannot find result for streaming job $i"
unless exists( $result->{$i} );
$stream->( @extra,Thread::Serialize::thaw( $result->{$i} ) );
delete( $result->{$i} );
}
$$streamid = $last;
}
# Die now if there are still any jobs to be done
# And shut the belt down
lib/Thread/Pool.pm view on Meta::CPAN
stream => 'Package::in_order_of_submit',
or:
stream => \&SomeOther::in_order_of_submit,
or:
stream => sub {print "anonymous sub called in order of submit\n"},
The "stream" field specifies the subroutine to be executed for streaming the
results of the "do" routine. If specified, the "stream" routine is called
once for the result of each "do" subroutine, but in the order in which the
L<job>s were submitted rather than in the order in which the result were
obtained (which is by the very nature of threads, indeterminate).
The specified subroutine should expect the following parameters to be passed:
1 the Thread::Pool object to which the worker thread belongs.
2..N the values that were returned by the "do" subroutine
lib/Thread/Pool.pm view on Meta::CPAN
once for the result of each "do" subroutine, but in the order in which the
L<job>s were submitted rather than in the order in which the result were
obtained (which is by the very nature of threads, indeterminate).
The specified subroutine should expect the following parameters to be passed:
1..N the values that were returned by the "do" subroutine
The "monitor" routine is executed in its own thread. This means that all
results have to be passed between threads, and therefore be frozen and thawed
with L<Storable>. If you can handle the streaming from different threads,
it is probably wiser to use the "stream" routine feature.
=item pre_post_monitor_only
pre_post_monitor_only => 1, # default 0
The "pre_post_monitor_only" field only makes sense if a "monitor" routine
is specified. If specified with a true value, indicates that the "pre" and
"post" routines (if specified) should only be called for the "monitor"
routine only and B<not> for the "do" routine. Otherwise, the same "pre" and
lib/Thread/Pool.pm view on Meta::CPAN
The "remove_me" class method only makes sense within the "do" subroutine.
It indicates to the job dispatcher that this worker thread should be removed
from the pool. After the "do" subroutine returns, the worker thread will
be removed.
=head2 jobid
$jobid = Thread::Pool->jobid;
The "jobid" class method only makes sense within the "do" subroutine in
streaming mode. It returns the job ID value of the current job. This can
be used connection with the L<dont_set_result> and the L<set_result> methods
to have another thread set the result of the current job.
=head2 dont_set_result
Thread::Pool->dont_set_result;
The "dont_set_result" class method only makes sense within the "do" subroutine.
It indicates to the job dispatcher that the result of this job should B<not>
be saved. This is for cases where the result of this job will be placed in
BEGIN {our $tests = 1 + (2*2*4*21)}
use Test::More tests => $tests;
use strict;
use warnings;
$SIG{__DIE__} = sub { require Carp; Carp::confess() };
$SIG{__WARN__} = sub { require Carp; Carp::confess() };
diag( "Test streaming to memory" );
BEGIN { use_ok('Thread::Pool') }
my $t0 = () = threads->list; # remember number of threads now
my $check;
my $format = '%5d';
my @list : shared;
# [int(2+rand(8)),int(1+rand(1000))],
( run in 0.284 second using v1.01-cache-2.11-cpan-4d50c553e7e )