Thread-Pool
view release on metacpan or search on metacpan
lib/Thread/Pool.pm view on Meta::CPAN
#---------------------------------------------------------------------------
# IN: 1 instantiated object
sub DESTROY {
# Return now if we're in a rogue DESTROY
return unless UNIVERSAL::isa( $_[0],__PACKAGE__ ); #HACK
# Obtain the object
# Return now if we're not allowed to run DESTROY
# Do the shutdown if shutdown is required
my $self = shift;
return unless $self->{'cloned'} == $cloned;
$self->shutdown if $self->{'autoshutdown'};
} #DESTROY
#---------------------------------------------------------------------------
# class methods
#---------------------------------------------------------------------------
# IN: 1 class with which to create
# 2 reference to hash with parameters
# 3..N parameters to be passed to "pre" routine
# OUT: 1 instantiated object
sub new {
# Obtain the class
# Obtain the hash with parameters and bless it
# Save the clone level (so we can check later if we've been cloned)
# Die now if there is no subroutine to execute
my $class = shift;
my $self = bless shift,$class;
$self->{'cloned'} = $cloned;
die "Must have a subroutine to perform jobs" unless exists $self->{'do'};
# Save the number of workers that were specified now (is changed later)
# Set the maximum number of jobs if not set already
# Set the minimum number of jobs if not set already
# Set the default optimization if none specified
my $add = $self->{'workers'};
$self->{'maxjobs'} = 5 * ($add || 1) unless defined( $self->{'maxjobs'} );
$self->{'minjobs'} ||= $self->{'maxjobs'} >> 1;
$self->{'optimize'} ||= $OPTIMIZE;
# If we're supposed to monitor
# Die now if also attempting to stream
if (exists $self->{'monitor'}) {
die "Cannot stream and monitor at the same time"
if exists $self->{ 'stream'};
# Make sure we have a real coderef for the pre and the monitoring routine
# Create a monitored belt
# Set the streaming routine that sends to the monitor
$self->_makecoderef( caller().'::',qw(pre monitor post checkpoint) );
$self->{'monitor_belt'} = Thread::Conveyor::Monitored->new(
{
optimize => $self->{'optimize'},
pre => $self->{'pre'},
monitor => $self->{'monitor'},
post => $self->{'post'},
exit => $self->{'exit'},
checkpoint => $self->{'checkpoint'},
frequency => $self->{'checkpoint'} ?
$self->{'frequency'} || $FREQUENCY : undef,
maxboxes => $self->{'maxjobs'},
minboxes => $self->{'minjobs'},
},
@_
);
$self->{'stream'} = \&_have_monitored;
}
# Create a belt for it
$self->{'belt'} = Thread::Conveyor->new(
{
optimize => $self->{'optimize'}
}
);
# Set the auto-shutdown flag unless it is specified already
# Set the dispatcher to be used if none specified yet
# Make sure all subroutines are code references
$self->{'autoshutdown'} = 1 unless exists $self->{'autoshutdown'};
$self->{'dispatcher'} ||= $self->{'stream'} ? \&_stream : \&_random;
$self->_makecoderef( caller().'::',qw(pre do post stream dispatcher) );
# If the Thread::Tie module is loaded
# Initialize the workers threads list as tied and save the locking semaphore
# Make sure references exist in the object
# if (defined( $Thread::Tie::VERSION )) {
$self->{'lock_workers'} = (tie my @workers, 'Thread::Tie')->semaphore;
@$self{qw(workers)} = (\@workers);
# Else (use standard shared implemenation)
# Initialize the workers threads list as shared
# Make sure references exist in the object
# } else {
# my @workers : shared;
# @$self{qw(workers)} = (\@workers);
# }
# Initialize the jobid counter as shared
# Initialize the streamid counter as shared
# Initialize the running flag
# Initialize the removed hash as shared
# Initialize the result hash as shared
# Make sure references exist in the object
lib/Thread/Pool.pm view on Meta::CPAN
#---------------------------------------------------------------------------
# IN: 1 class (ignored) or instantiated object
# 2 new default checkpoint frequency (if called as class method only)
# OUT: 1 default frequency
sub frequency {
# Obtain the object
# If called as an object method
# Return undef if no checkpointing active
# Return frequency with which checkpointing is occurring
my $self = shift;
if (ref($self)) {
return unless $self->{'checkpoint'};
return $self->{'frequency'} || $FREQUENCY;
}
# Set new default frequency if specified
# Return current default frequency
$FREQUENCY = shift if @_;
$FREQUENCY;
} #frequency
#---------------------------------------------------------------------------
# IN: 1 class (ignored)
# 2 new default optimization type
# OUT: 1 current default optimization type
sub optimize {
# Set new optimized value if specified
# Return current optimized value
$OPTIMIZE = $_[1] if @_ > 1;
$OPTIMIZE;
} #optimize
#---------------------------------------------------------------------------
# instance methods
#---------------------------------------------------------------------------
# IN: 1 instantiated object
# 2..N parameters to be passed for this job
# OUT: 1 jobid
sub job {
# Obtain the object
# Die now if the pool was shut down
# Obtain local copy of the job belt
my $self = shift;
die "Cannot submit jobs on a pool that has been shut down"
if $self->{'shutdown'};
my $belt = $self->{'belt'};
# If we're streaming
# Die now if an individual jobid requested
# Put in box with a jobid obtained on the fly
if ($self->{'stream'}) {
die "Cannot return individual results when streaming"
if defined( wantarray );
$belt->put( $self->_jobid, \@_ );
# Elseif we want a jobid
# Obtain a jobid
# Put in box with that jobid
# And return with that jobid now
} elsif (defined( wantarray )) {
my $jobid = $self->_jobid;
$belt->put( $jobid, \@_ );
return $jobid;
# Else (not streaming and not interested in the result)
# Put in box without a jobid
} else {
$belt->put( \@_ )
}
} #job
#---------------------------------------------------------------------------
# IN: 1 instantiated object
# 2..N parameters to be passed for this job
# OUT: 1..N parameters returned from the job
sub waitfor {
# Obtain the object
# Submit the job, obtain the jobid and wait for the result
my $self = shift;
$self->result( $self->job( @_ ) );
} #waitfor
#---------------------------------------------------------------------------
# IN: 1 instantiated object
# OUT: 1 number of jobs to be done still
sub todo { shift->{'belt'}->onbelt } #todo
#---------------------------------------------------------------------------
# IN: 1 instantiated object
# 2..N tids of removed worker (default: all removed workers)
# OUT: 1 number of jobs done
sub done {
# Obtain the object
# Obtain references to the hashes with done values, keyed by tid
# Set to do all tids if none specified
my $self = shift;
my $removed = $self->{'removed'};
@_ = keys %{$removed} unless @_;
# Initialize the number of jobs done
# Loop through all removed worker tids and add the number of jobs
# Return the result
my $done = 0;
$done += ($removed->{$_} || 0) foreach @_;
$done;
} #done
#---------------------------------------------------------------------------
# IN: 1 instantiated object
# 2 jobid for which to obtain result
# OUT: 1..N parameters returned from the job
sub result {
# Obtain the object
# Obtain the jobid
lib/Thread/Pool.pm view on Meta::CPAN
# Set minimum number of boxes on the monitoring belt if any
# Return current value
my $self = shift;
if (@_) {
my $minjobs = $self->{'minjobs'} = shift;
$self->{'belt'}->minboxes( $minjobs );
$self->{'monitor_belt'}->minboxes($minjobs) if $self->{'monitor_belt'};
}
$self->{'minjobs'};
} #minjobs
#---------------------------------------------------------------------------
# IN: 1 instantiated object
# 2 new setting of autoshutdown flag
# OUT: 1 current/new setting of autoshutdown
sub autoshutdown {
# Obtain the object
# Set new setting if so specified
# Return the current/new setting
my $self = shift;
$self->{'autoshutdown'} = shift if @_;
$self->{'autoshutdown'};
} #autoshutdown
#---------------------------------------------------------------------------
# IN: 1 instantiated object
sub shutdown {
# Obtain the object
# Die now if not in the correct thread
# Return now if are already shut down
my $self = shift;
$self->_check_originating_thread( 'shutdown' );
return if $self->{'shutdown'};
# Notify all available active workers after all jobs
# Mark the object as shut down now (in case we die in here)
# Join all workers, active or non-active (should be all now)
$self->workers( 0 );
$self->{'shutdown'} = 1;
$self->join( @{$self->{'workers'}} );
# Obtain local copy of the job belt
# While there are boxes on the belt (without taking them off)
# Outloop if there is a real job in the box
# Remove the finishing job (amount of workers to remove was too high)
my $belt = $self->{'belt'};
while (my ($jobid) = $belt->peek_dontwait) {
last if $jobid;
$belt->take;
}
# If we were streaming
# Obtain local copy of stuff we need
if (my $stream = $self->{'stream'}) {
my ($streamid,$jobid,$result) = @$self{qw(streamid jobid result)};
# Set the extra parameters to be passed to streamer if monitoring
# Make sure we're the only one handling results
# Obtain last ID to loop through
my @extra = exists $self->{'monitor_belt'} ? ($self) : ();
lock( $result );
my $last = $self->_first_todo_jobid;
# For all the results that still need to be streamd
# Die if there is no result (_should_ be there by now)
# Call the "stream" routine with this result
# Delete the result from the hash
# Set the stream ID for any further streaming later
for (my $i = $$streamid; $i < $last; $i++) {
die "Cannot find result for streaming job $i"
unless exists( $result->{$i} );
$stream->( @extra,Thread::Serialize::thaw( $result->{$i} ) );
delete( $result->{$i} );
}
$$streamid = $last;
}
# Die now if there are still any jobs to be done
# And shut the belt down
die "Shutting down pool while there are still jobs to be done"
if $belt->onbelt;
$belt->shutdown;
# If there is a monitoring thread
# Tell the monitoring to stop and wait for that thread to finish
# Forget we had a monitoring belt
if (my $mbelt = $self->{'monitor_belt'}) {
$mbelt->shutdown;
delete( $self->{'monitor_belt'} );
}
} #shutdown
#---------------------------------------------------------------------------
# IN: 1 instantiated object
sub abort {
# Obtain the object
# Die now if in the wrong thread
my $self = shift;
$self->_check_originating_thread( 'abort' );
# Reset the flag that we're running
# While there are still workers active
# Reset to 0 workers if there are no jobs left to do (they won't see the flag)
# Give the other threads a chance
# Set the running flag again (in case workers get added later)
# Collect the actual threads
${$self->{'running'}} = 0;
while ($self->workers) {
$self->workers( 0 ) unless $self->todo;
threads->yield;
}
${$self->{'running'}} = 1;
$self->join;
} #abort
#---------------------------------------------------------------------------
# IN: 1 instantiated object
# OUT: 1 number of threads that have not done any jobs
sub notused {
# Obtain local copy of stuff we need
# Initialize counter
lib/Thread/Pool.pm view on Meta::CPAN
subroutine or as a reference to a (anonymous) subroutine.
The specified subroutine should expect the following parameters to be passed:
1..N any additional parameters that were passed with the call to L<new>.
You can determine whether the "pre" routine is called for a new worker thread
or for a monitoring thread by checking the L<self> or L<monitor> class method
inside the "pre" routine.
=item post
post => 'cleanup_after_worker', # assume caller's namespace
or:
post => 'Package::cleanup_after_worker',
or:
post => \&SomeOther::cleanup_after_worker,
or:
post => sub {print "anonymous sub cleaning up after the worker removed\n"},
The "post" field specifies the subroutine to be executed B<each> time a worker
thread is B<removed> (either when being specifically L<remove>d, or when the
pool is L<shutdown> specifically or implicitely when the Thread::Pool object
is destroyed. It must be specified as either the name of a subroutine or as
a reference to a (anonymous) subroutine.
The specified subroutine should expect the following parameters to be passed:
1..N any additional parameters that were passed with the call to L<new>.
Any values that are returned by this subroutine after closing down the thread,
are accessible with the L<result> method, but only if the thread was
L<removed> and a job ID was requested.
You can determine whether the "post" routine is called for a new worker thread
or for a monitoring thread by checking the L<self> or L<monitor> class method
inside the "post" routine.
=item stream
stream => 'in_order_of_submit', # assume caller's namespace
or:
stream => 'Package::in_order_of_submit',
or:
stream => \&SomeOther::in_order_of_submit,
or:
stream => sub {print "anonymous sub called in order of submit\n"},
The "stream" field specifies the subroutine to be executed for streaming the
results of the "do" routine. If specified, the "stream" routine is called
once for the result of each "do" subroutine, but in the order in which the
L<job>s were submitted rather than in the order in which the result were
obtained (which is by the very nature of threads, indeterminate).
The specified subroutine should expect the following parameters to be passed:
1 the Thread::Pool object to which the worker thread belongs.
2..N the values that were returned by the "do" subroutine
The "stream" routine is executed in B<any> of the threads that are created
for the Thread::Pool object. The system attempts to call the "stream"
routine in the same thread from which the values are obtained, but when
things get out of sync, other threads may stream the result of a job. If
you want B<only one> thread to stream all results, use the "monitor" routine.
=item monitor
monitor => 'in_order_of_submit', # assume caller's namespace
or:
monitor => 'Package::in_order_of_submit',
or:
monitor => \&SomeOther::in_order_of_submit,
or:
monitor => sub {print "anonymous sub called in order of submit\n"},
The "monitor" field specifies the subroutine to be executed for monitoring the
results of the "do" routine. If specified, the "monitor" routine is called
once for the result of each "do" subroutine, but in the order in which the
L<job>s were submitted rather than in the order in which the result were
obtained (which is by the very nature of threads, indeterminate).
The specified subroutine should expect the following parameters to be passed:
1..N the values that were returned by the "do" subroutine
The "monitor" routine is executed in its own thread. This means that all
results have to be passed between threads, and therefore be frozen and thawed
with L<Storable>. If you can handle the streaming from different threads,
it is probably wiser to use the "stream" routine feature.
=item pre_post_monitor_only
pre_post_monitor_only => 1, # default 0
The "pre_post_monitor_only" field only makes sense if a "monitor" routine
is specified. If specified with a true value, indicates that the "pre" and
"post" routines (if specified) should only be called for the "monitor"
routine only and B<not> for the "do" routine. Otherwise, the same "pre" and
"post" routine will be called for both the "do" as well as the "monitor"
routine.
When the "pre" and "post" routine are called for the "do" subroutine, the
L<self> class method returns the Thread::Pool object (which it doesn't do
when called in the "monitor" routine).
=item checkpoint
checkpoint => 'checkpointing', # assume caller's namespace
or:
checkpoint => 'Package::checkpointing',
or:
checkpoint => \&SomeOther::checkpointing,
or:
checkpoint => sub {print "anonymous sub to do checkpointing\n"},
The "checkpoint" field specifies the subroutine to be executed everytime a
checkpoint should be made by a monitoring routine (e.g. for saving or updating
status). It must be specified as either the name of a subroutine or as a
reference to a (anonymous) subroutine.
It only makes sense to specify a checkpoint routine if there is also a
monitoring routine specified. No checkpointing will occur by default if a
monitoring routine B<is> specified. The frequency of checkpointing can
be specified with the "frequency" field.
The specified subroutine should not expect any parameters to be passed. Any
values returned by the checkpointing routine, will be lost.
=item frequency
frequency => 100, # default = 1000
The "frequency" field specifies the number of jobs that should have been
monitored before the "checkpoint" routine is called. If a checkpoint routine
is specified but no frequency field is specified, then a frequency of B<1000>
will be assumed.
This field has no meaning if no checkpoint routine is specified with the
"checkpoint" field. The default frequency can be changed with the L<frequency>
method.
=item autoshutdown
lib/Thread/Pool.pm view on Meta::CPAN
=head2 done
$done = $pool->done;
The "done" method returns the number of L<job>s that has been performed by
the L<removed> worker threads of the pool.
The "done" method is typically called after the L<shutdown> method
has been called.
=head2 notused
$notused = $pool->notused;
The "notused" method returns the number of removed threads that have not
performed any jobs. It provides a heuristic to determine how many
L<workers> you actually need for a specific application: a value > 0
indicates that you have specified too many worker threads for this
application.
The "notused" method is typically called after the L<shutdown> method
has been called.
=head1 INSIDE JOB METHODS
The following methods only make sense inside the "pre", "do", "post",
"stream" and "monitor" routines.
=head2 self
$self = Thread::Pool->self;
The class method "self" returns the object to which this thread belongs.
It is available within the "pre", "do", "post", "stream" and "monitor"
subroutines only.
=head2 monitor
$monitor = Thread::Pool->monitor;
The class method "monitor" returns the Thread::Conveyor::Monitored object
that is associated with the pool. It is available only if the "monitor"
field was specified in L<new>. And then only within the "pre", "do", "post",
"stream" and "monitor" subroutines only.
=head2 remove_me
Thread::Pool->remove_me;
The "remove_me" class method only makes sense within the "do" subroutine.
It indicates to the job dispatcher that this worker thread should be removed
from the pool. After the "do" subroutine returns, the worker thread will
be removed.
=head2 jobid
$jobid = Thread::Pool->jobid;
The "jobid" class method only makes sense within the "do" subroutine in
streaming mode. It returns the job ID value of the current job. This can
be used connection with the L<dont_set_result> and the L<set_result> methods
to have another thread set the result of the current job.
=head2 dont_set_result
Thread::Pool->dont_set_result;
The "dont_set_result" class method only makes sense within the "do" subroutine.
It indicates to the job dispatcher that the result of this job should B<not>
be saved. This is for cases where the result of this job will be placed in
the result hash at some time in the future by another thread using the
L<set_result> method.
=head2 set_result
Thread::Pool->self->set_result( $jobid,@param );
The "set_result" object method only makes sense within the "do" subroutine.
It allows you to set the result of B<other> jobs than the one currently being
performed.
This method is only needed in B<very> special situations. Normally, just
returning values from the "do" subroutine is enough to have the result saved.
This method is exposed to the outside world in those cases where a specific
thread becomes responsible for setting the result of other threads (which
used the L<dont_set_result> method to defer saving their result.
The first input parameter specifies the job ID of the job for which to set
the result. The rest of the input parameters is considered to be the result
to be saved. Whatever is specified in the rest of the input parameters, will
be returned with the L<result> or L<result_dontwait> methods.
=head1 REQUIRED MODULES
Thread::Conveyor (0.15)
Thread::Conveyor::Monitored (0.11)
=head1 OPTIMIZATIONS
This module uses L<load> to reduce memory and CPU usage. This causes
subroutines only to be compiled in a thread when they are actually needed at
the expense of more CPU when they need to be compiled. Simple benchmarks
however revealed that the overhead of the compiling single routines is not
much more (and sometimes a lot less) than the overhead of cloning a Perl
interpreter with a lot of subroutines pre-loaded.
=head1 CAVEATS
Passing unshared values between threads is accomplished by serializing the
specified values using L<Thread::Serialize>. Please see the CAVEATS section
there for an up-to-date status of what can be passed around between threads.
=head1 EXAMPLES
There are currently two examples.
=head2 simple asynchronous log file resolving filter
This is an example of a very simple asynchronous log file resolver filter.
( run in 0.866 second using v1.01-cache-2.11-cpan-39bf76dae61 )