AnyEvent-Fork-Pool
view release on metacpan or search on metacpan
sub run {
my ($template, $function, %arg) = @_;
my $max = $arg{max} || 4;
my $idle = $arg{idle} || 0,
my $load = $arg{load} || 2,
my $start = $arg{start} || 0.1,
my $stop = $arg{stop} || 10,
my $on_event = $arg{on_event} || sub { },
my $on_destroy = $arg{on_destroy};
my @rpc = (
async => $arg{async},
init => $arg{init},
serialiser => delete $arg{serialiser},
on_error => $arg{on_error},
);
my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
my $destroy_guard = Guard::guard {
$on_destroy->()
if $on_destroy;
};
$template
->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
->eval ('
my ($magic0, $magic1) = @_;
sub AnyEvent::Fork::Pool::retire() {
AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
}
', $magic0, $magic1)
;
$start_worker = sub {
my $proc = [0, 0, undef]; # load, index, rpc
$proc->[2] = $template
->fork
->AnyEvent::Fork::RPC::run ($function,
@rpc,
on_event => sub {
if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
$destroy_guard if 0; # keep it alive
$_[1] eq "quit" and $stop_worker->($proc);
return;
}
&$on_event;
},
)
;
++$nidle;
Array::Heap::push_heap_idx @pool, $proc;
Scalar::Util::weaken $proc;
};
$stop_worker = sub {
my $proc = shift;
$proc->[0]
or --$nidle;
Array::Heap::splice_heap_idx @pool, $proc->[1]
if defined $proc->[1];
@$proc = 0; # tell others to leave it be
};
$want_start = sub {
undef $stop_w;
$start_w ||= AE::timer $start, $start, sub {
if (($nidle < $idle || @queue) && @pool < $max) {
$start_worker->();
$scheduler->();
} else {
undef $start_w;
}
};
};
$want_stop = sub {
$stop_w ||= AE::timer $stop, $stop, sub {
$stop_worker->($pool[0])
if $nidle;
undef $stop_w
if $nidle <= $idle;
};
};
$scheduler = sub {
if (@queue) {
while (@queue) {
@pool or $start_worker->();
my $proc = $pool[0];
if ($proc->[0] < $load) {
# found free worker, increase load
unless ($proc->[0]++) {
# worker became busy
--$nidle
or undef $stop_w;
$want_start->()
if $nidle < $idle && @pool < $max;
}
Array::Heap::adjust_heap_idx @pool, 0;
my $job = shift @queue;
my $ocb = pop @$job;
( run in 1.761 second using v1.01-cache-2.11-cpan-39bf76dae61 )