Fsdb
view release on metacpan or search on metacpan
lib/Fsdb/Filter/dbmerge.pm view on Meta::CPAN
#
# Go through this loop multiple times because closing the last depth
# can actually allow work to start at the last+1 depth,
# and in endgame mode we risk blocking (due to flow control)
# if we don't start threads at all depths.
#
my $try_again = 1;
while ($try_again) {
foreach my $depth (0..$#{$self->{_work}}) {
$self->segments_merge_one_depth($depth);
$try_again = undef;
if ($#{$self->{_work}[$depth]} == -1 && $self->{_work_closed}[$depth]) {
# When one level is all in progress, we can close the next.
my $next_depth = $depth + 1;
if (!$self->{_work_closed}[$next_depth]) {
$self->{_work_closed}[$next_depth] = 1;
$try_again = 1;
$overall_progress++;
print "# segments_merge_all: closed work depth $next_depth\n" if ($self->{_debug});
};
};
};
};
#
# Next, handle Freds that have finished.
# Reap as many as possible.
#
print "# segments_merge_all: reaping threads\n" if ($self->{_debug});
for (;;) {
my $fred_or_code = Fsdb::Support::Freds::join_any();
last if (ref($fred_or_code) eq '');
$overall_progress++;
croak("dbmerge: merge thread failed\n")
if ($fred_or_code->exit_code() != 0);
print "# segments_merge_all: merged fred " . $fred_or_code->info() . "\n" if ($self->{_debug});
};
#
# Now start up more parallelism, if possible.
#
my $depth = 0;
my $i = 0;
while ($self->{_parallelism_available} > 0) {
my $work_ref = $self->{_work}[$depth][$i];
if (defined($work_ref) && $work_ref->[0] == -1) {
# start it (it will decrement parallelism)
&{$work_ref->[2]}($work_ref);
$overall_progress++;
};
# walk the whole work queue
if (++$i > $#{$self->{_work}[$depth]}) {
last if (++$depth > $#{$self->{_work}});
$i = 0;
};
};
#
# Handle xargs, if any.
#
# Unfortunately, we busy-loop here,
# because we need to alternate reaping finished processes
# and xargs.
#
# Fortunately, this terminates when xargs are complete.
#
if ($self->{_xargs_ipc_status}) {
my(@ready) = $xargs_select->can_read($progress_backoff);
foreach my $fh (@ready) {
my ($fn) = $fh->getline;
if (defined($fn)) {
chomp($fn);
$self->{_files_cleanup}{$fn} = 'unlink'
if ($self->{_remove_inputs});
$self->enqueue_work(0, [2, $fn, undef]);
$self->{_xargs_ipc_count}++;
print "# xargs receive $fn, file " . $self->{_xargs_ipc_count} . "\n" if ($self->{_debug});
} else {
# eof, so no more xargs
$self->{_work_closed}[0] = 1;
# We could check for special cases of 0 or 1 file.
# But we don't. Just pass them through to the parent who will handle it.
# if ($self->{_xargs_ipc_count} == 0) {
# croak($self->{_prog} . ": xargs, but no files for input\n");
# } elsif ($self->{_xargs_ipc_count} == 1) {
# carp($self->{_prog} . ": xargs, but one file for input\n");
# Pass through and we will catch this case
# in the main segements_merge_all loop.
# };
};
$overall_progress++;
};
};
#
# Avoid spinlooping.
#
if ($overall_progress <= $overall_progress_last_reset) {
# No progress, so stall.
print "# segments_merge_all: stalling for $progress_backoff\n" if ($self->{_debug});
sleep($progress_backoff);
$progress_backoff *= $PROGRESS_MULTIPLIER; # exponential backoff
$progress_backoff = $PROGRESS_MAX if ($progress_backoff > $PROGRESS_MAX);
} else {
# Woo-hoo, did something. Rush right back and try again.
$overall_progress_last_reset = $overall_progress;
$progress_backoff = $PROGRESS_START;
};
};
# reap endgame zombies
while (my $zombie_work_ref = shift(@{$self->{_zombie_work}})) {
next if ($zombie_work_ref->[0] == 2);
print "# waiting on zombie " . $zombie_work_ref->[2]->info() . "\n" if ($self->{_debug});
$zombie_work_ref->[2]->join();
croak("internal error: zombie didn't reset status\n") if ($zombie_work_ref->[0] != 2);
};
# reap xargs (if it didn't already get picked up)
if ($self->{_xargs_fred}) {
print "# waiting on xargs fred\n" if ($self->{_debug});
$self->{_xargs_fred}->join();
( run in 1.232 second using v1.01-cache-2.11-cpan-5a3173703d6 )