Fsdb

 view release on metacpan or  search on metacpan

lib/Fsdb/Filter/dbmerge.pm  view on Meta::CPAN

	#
	# Go through this loop multiple times because closing the last depth
	# can actually allow work to start at the last+1 depth,
	# and in endgame mode we risk blocking (due to flow control)
	# if we don't start threads at all depths.
	#
	my $try_again = 1;
	while ($try_again) {
	    foreach my $depth (0..$#{$self->{_work}}) {
	        $self->segments_merge_one_depth($depth);
		$try_again = undef;
	        if ($#{$self->{_work}[$depth]} == -1 && $self->{_work_closed}[$depth]) {
		    # When one level is all in progress, we can close the next.
		    my $next_depth = $depth + 1;
		    if (!$self->{_work_closed}[$next_depth]) {
		        $self->{_work_closed}[$next_depth] = 1;
			$try_again = 1;
			$overall_progress++;
		        print "# segments_merge_all: closed work depth $next_depth\n" if ($self->{_debug});
		    };
		};
	    };
	};

	#
	# Next, handle Freds that have finished.
	# Reap as many as possible.
	#
	print "# segments_merge_all: reaping threads\n" if ($self->{_debug});
	for (;;) {
	    my $fred_or_code = Fsdb::Support::Freds::join_any();
	    last if (ref($fred_or_code) eq '');
	    $overall_progress++;
	    croak("dbmerge: merge thread failed\n")
		if ($fred_or_code->exit_code() != 0);
	    print "# segments_merge_all: merged fred " . $fred_or_code->info() . "\n" if ($self->{_debug});
	};

	#
	# Now start up more parallelism, if possible.
	#
	my $depth = 0;
	my $i = 0;
	while ($self->{_parallelism_available} > 0) {
	    my $work_ref = $self->{_work}[$depth][$i];
	    if (defined($work_ref) && $work_ref->[0] == -1) {
		# start it (it will decrement parallelism)
		&{$work_ref->[2]}($work_ref);
		$overall_progress++;
	    };
	    # walk the whole work queue
	    if (++$i > $#{$self->{_work}[$depth]}) {
		last if (++$depth > $#{$self->{_work}});
		$i = 0;
	    };
	};

	#
	# Handle xargs, if any.
	#
	# Unfortunately, we busy-loop here,
	# because we need to alternate reaping finished processes
	# and xargs.
	#
	# Fortunately, this terminates when xargs are complete.
       	#
	if ($self->{_xargs_ipc_status}) {
	    my(@ready) = $xargs_select->can_read($progress_backoff);
	    foreach my $fh (@ready) {
		my ($fn) = $fh->getline;
		if (defined($fn)) {
		    chomp($fn);
                    $self->{_files_cleanup}{$fn} = 'unlink'
                        if ($self->{_remove_inputs});
                    $self->enqueue_work(0, [2, $fn, undef]);
                    $self->{_xargs_ipc_count}++;
                    print "# xargs receive $fn, file " . $self->{_xargs_ipc_count} . "\n" if ($self->{_debug});
		} else {
		    # eof, so no more xargs
		    $self->{_work_closed}[0] = 1;
                    # We could check for special cases of 0 or 1 file.
                    # But we don't.  Just pass them through to the parent who will handle it.
                    # if ($self->{_xargs_ipc_count} == 0) {
                    #	croak($self->{_prog} . ": xargs, but no files for input\n");
                    # } elsif ($self->{_xargs_ipc_count} == 1) {
                    	# carp($self->{_prog} . ": xargs, but one file for input\n");
                        # Pass through and we will catch this case
                        # in the main segements_merge_all loop.
                    # };
		};
		$overall_progress++;
            };
	};
	#
	# Avoid spinlooping.
	#
	if ($overall_progress <= $overall_progress_last_reset) {
	    # No progress, so stall.
	    print "# segments_merge_all: stalling for $progress_backoff\n" if ($self->{_debug});
	    sleep($progress_backoff);
	    $progress_backoff *= $PROGRESS_MULTIPLIER;  # exponential backoff
	    $progress_backoff = $PROGRESS_MAX if ($progress_backoff > $PROGRESS_MAX);
	} else {
	    # Woo-hoo, did something.  Rush right back and try again.
	    $overall_progress_last_reset = $overall_progress;
	    $progress_backoff = $PROGRESS_START;
	};
    };

    # reap endgame zombies
    while (my $zombie_work_ref = shift(@{$self->{_zombie_work}})) {
	next if ($zombie_work_ref->[0] == 2);
	print "# waiting on zombie " . $zombie_work_ref->[2]->info() . "\n" if ($self->{_debug});
	$zombie_work_ref->[2]->join();
	croak("internal error: zombie didn't reset status\n") if ($zombie_work_ref->[0] != 2);
    };

    # reap xargs (if it didn't already get picked up)
    if ($self->{_xargs_fred}) {
	print "# waiting on xargs fred\n" if ($self->{_debug});
	$self->{_xargs_fred}->join();



( run in 1.232 second using v1.01-cache-2.11-cpan-5a3173703d6 )