AnyEvent-Fork-Pool
view release on metacpan or search on metacpan
=item init => $initfunction (default: none)
The function to call in the child, once before handling requests.
=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
The serialiser to use.
=back
=back
=cut
sub run {
my ($template, $function, %arg) = @_;
my $max = $arg{max} || 4;
my $idle = $arg{idle} || 0,
my $load = $arg{load} || 2,
my $start = $arg{start} || 0.1,
my $stop = $arg{stop} || 10,
my $on_event = $arg{on_event} || sub { },
my $on_destroy = $arg{on_destroy};
my @rpc = (
async => $arg{async},
init => $arg{init},
serialiser => delete $arg{serialiser},
on_error => $arg{on_error},
);
my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
my $destroy_guard = Guard::guard {
$on_destroy->()
if $on_destroy;
};
$template
->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
->eval ('
my ($magic0, $magic1) = @_;
sub AnyEvent::Fork::Pool::retire() {
AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
}
', $magic0, $magic1)
;
$start_worker = sub {
my $proc = [0, 0, undef]; # load, index, rpc
$proc->[2] = $template
->fork
->AnyEvent::Fork::RPC::run ($function,
@rpc,
on_event => sub {
if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
$destroy_guard if 0; # keep it alive
$_[1] eq "quit" and $stop_worker->($proc);
return;
}
&$on_event;
},
)
;
++$nidle;
Array::Heap::push_heap_idx @pool, $proc;
Scalar::Util::weaken $proc;
};
$stop_worker = sub {
my $proc = shift;
$proc->[0]
or --$nidle;
Array::Heap::splice_heap_idx @pool, $proc->[1]
if defined $proc->[1];
@$proc = 0; # tell others to leave it be
};
$want_start = sub {
undef $stop_w;
$start_w ||= AE::timer $start, $start, sub {
if (($nidle < $idle || @queue) && @pool < $max) {
$start_worker->();
$scheduler->();
} else {
undef $start_w;
}
};
};
$want_stop = sub {
$stop_w ||= AE::timer $stop, $stop, sub {
$stop_worker->($pool[0])
if $nidle;
undef $stop_w
if $nidle <= $idle;
};
};
$scheduler = sub {
if (@queue) {
while (@queue) {
@pool or $start_worker->();
my $proc = $pool[0];
if ($proc->[0] < $load) {
# found free worker, increase load
unless ($proc->[0]++) {
# worker became busy
--$nidle
or undef $stop_w;
$want_start->()
if $nidle < $idle && @pool < $max;
}
Array::Heap::adjust_heap_idx @pool, 0;
my $job = shift @queue;
my $ocb = pop @$job;
$proc->[2]->(@$job, sub {
# reduce load
--$proc->[0] # worker still busy?
or ++$nidle > $idle # not too many idle processes?
or $want_stop->();
Array::Heap::adjust_heap_idx @pool, $proc->[1]
if defined $proc->[1];
&$ocb;
$scheduler->();
});
} else {
$want_start->()
unless @pool >= $max;
last;
}
}
} elsif ($shutdown) {
undef $_->[2]
for @pool;
undef $start_w;
undef $start_worker; # frees $destroy_guard reference
$stop_worker->($pool[0])
while $nidle;
}
};
my $shutdown_guard = Guard::guard {
$shutdown = 1;
$scheduler->();
};
$start_worker->()
while @pool < $idle;
sub {
$shutdown_guard if 0; # keep it alive
$start_worker->()
unless @pool;
push @queue, [@_];
$scheduler->();
}
}
=item $pool->(..., $cb->(...))
Call the RPC function of a worker with the given arguments, and when the
worker is done, call the C<$cb> with the results, just like calling the
RPC object durectly - see the L<AnyEvent::Fork::RPC> documentation for
details on the RPC API.
If there is no free worker, the call will be queued until a worker becomes
available.
Note that there can be considerable time between calling this method and
the call actually being executed. During this time, the parameters passed
to this function are effectively read-only - modifying them after the call
and before the callback is invoked causes undefined behaviour.
=cut
=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
Tries to detect the number of CPUs (C<$cpus> often called CPU cores
nowadays) and execution units (C<$eus>) which include e.g. extra
hyperthreaded units). When C<$cpus> cannot be determined reliably,
C<$default_cpus> is returned for both values, or C<1> if it is missing.
For normal CPU bound uses, it is wise to have as many worker processes
as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
hyperthreading is usually detrimental to performance, but in those rare
cases where that really helps it might be beneficial to use more workers
(C<$eus>).
Currently, F</proc/cpuinfo> is parsed on GNU/Linux systems for both
C<$cpus> and C<$eus>, and on {Free,Net,Open}BSD, F<sysctl -n hw.ncpu> is
used for C<$cpus>.
Example: create a worker pool with as many workers as CPU cores, or C<2>,
if the actual number could not be determined.
$fork->AnyEvent::Fork::Pool::run ("myworker::function",
max => (scalar AnyEvent::Fork::Pool::ncpu 2),
);
=cut
BEGIN {
if ($^O eq "linux") {
*ncpu = sub(;$) {
my ($cpus, $eus);
if (open my $fh, "<", "/proc/cpuinfo") {
( run in 2.670 seconds using v1.01-cache-2.11-cpan-df04353d9ac )