Forks-Super

 view release on metacpan or  search on metacpan

lib/Forks/Super/Job/Ipc.pm  view on Meta::CPAN

	if (! -e $ipc_file) {
	    $deleted{$ipc_file} = delete $IPC_FILES{$ipc_file};
	} else {
	    local $! = undef;
	    if ($DEBUG) {
		print STDERR "Deleting $ipc_file ... ";
	    }
	    my $z = unlink $ipc_file;
	    if ($z && ! -e $ipc_file) {
		if ($DEBUG) {
		    print STDERR "Delete $ipc_file ok\n";
		}
		$deleted{$ipc_file} = delete $IPC_FILES{$ipc_file};
	    } else {
		if ($DEBUG) {
		    print STDERR "Delete $ipc_file failed: $!\n";
		}
		warn 'Forks::Super::END_cleanup: ',
		    "error disposing of ipc file $ipc_file: $z/$!\n";
	    }
	}
    }
    return %deleted;
}

sub _END_background_cleanup2 {
    # best efforts to cleanup the IPC files
    my %G = @_;
    my $z = rmdir($_IPC_DIR) || 0;
    if (!$z) {
	unlink glob("$_IPC_DIR/*");
	CORE::sleep 5;
	$z = rmdir($_IPC_DIR) || 0;
    }

    if (!$z
	&& -d $_IPC_DIR
	&& glob("$_IPC_DIR/.nfs*")) {

	# Observed these files on Linux running from NSF mounted filesystem
	# .nfsXXX files are usually temporary (~30s) but hard to kill
	for my $i (1..10) {
	    CORE::sleep 5;
	    last unless glob("$_IPC_DIR/.nfs*");
	}
	$z = rmdir($_IPC_DIR) || 0;
    }

    if (!$z && -d $_IPC_DIR) {

	warn "Forks::Super::END_cleanup: rmdir $_IPC_DIR failed. $!\n";

	opendir(my $_Z, $_IPC_DIR);
	my @g = grep { !/^\.nfs/ } readdir($_Z);
	closedir $_Z;
    }
    return;
}

# if we have created temporary files for IPC, clean them up.
# clean them up even if the children are still alive -- these files
# are exclusively for IPC, and IPC isn't needed after the parent
# process is done.
sub END_cleanup {

    if ($$ != ($Forks::Super::MAIN_PID || $MAIN_PID)) {
	return;
    }

    return if _END_foreground_cleanup();

    return if CORE::fork();
    exit 0 if CORE::fork();

    my %G = _END_background_cleanup1();

    return if !defined $IPC_DIR_DEDICATED;
    return if 0 < scalar keys %IPC_FILES;

    my $zz = rmdir($_IPC_DIR) || 0;
    return if $zz;

    CORE::sleep 2;
    exit 0 if CORE::fork();

    # long sleep here for maximum portability.
    CORE::sleep 10;
    _END_background_cleanup2();
    return;
}

sub END_cleanup_MSWin32 {
    return if $$ != ($Forks::Super::MAIN_PID || $MAIN_PID);
    return if $_CLEANUP++;
    $0 = "Forks::Super:cleanup:$0";

    # Use brute force to close all open handles. Leave STDERR open for warns.
    # XXX - is this ok? what if perl script is communicating with a socket?
    use POSIX ();
    for (0,1,3..999) {
        no warnings;
	POSIX::close($_);
    }

    Forks::Super::Job::dispose(@Forks::Super::ALL_JOBS);

    my @G = grep { -e $_ } keys %IPC_FILES;
  FILE_TRY: for my $try (1 .. 3) {
        if (@G == 0) {
	    last FILE_TRY;
	}
	foreach my $G (@G) {
	    local $! = undef;
	    if (!unlink $G) {
		undef $!;
		$G =~ s!/!\\!;
		my $c1 = system("CMD /C DEL /Q \"$G\" 2> NUL");
	    }
	}
    } continue {
	CORE::sleep 1;

lib/Forks/Super/Job/Ipc.pm  view on Meta::CPAN

    my ($job, $fh, $expire, $blocking_desired) = @_;
    my @lines;
    while (@lines == 0) {
	@lines = readline($fh);
	if (@lines > 0) {
	    return @lines;
	}

	if (!_check_if_job_is_complete_and_close_io($job, $fh)) {
	    seek $fh, 0, 1;
	    if ($blocking_desired) {
		if ($expire > 0 && Time::HiRes::time() >= $expire) {
		    $blocking_desired = 0;
		} else {
		    Forks::Super::Util::pause(
			1 * $Forks::Super::Util::DEFAULT_PAUSE_IO);
		}
	    }
	} else {
	    return;
	}
	if (!$blocking_desired) {
	    return;
	}
    }
    return @lines;
}

sub _readline_scalar {
    my ($job, $fh, $expire, $blocking_desired) = @_;
    my $line;
    while (!defined $line) {
	$line = readline($fh);

	if (defined $line) {
	    return $line;
	}

	last if _check_if_job_is_complete_and_close_io($job, $fh);
	seek $fh, 0, 1;
	if ($blocking_desired) {
	    if ($expire > 0 && Time::HiRes::time() >= $expire) {
		$blocking_desired = 0;
	    } else {
		Forks::Super::Util::pause(
		    $Forks::Super::Util::DEFAULT_PAUSE_IO);
	    }
	}
	if (!$blocking_desired) {
	    if (!$job->{is_child}) {
		# in the parent, we can tell the difference between this input

		# stream being empty because the child process is finished
		# (see _check_if_job_is_complete_and_close_io() call, above),
		# and the stream being empty because the child isn't producing
		# enough output to keep it full.

		# We can and do distinguish between these two cases by
		# returning <undef> when the child is finished and will
		# not produce any more input, and  ""  (empty string) when
		# the child is still alive and it could potentially
		# produce more input.

		return '';
	    } else {
		# in the child, we don't make this distinction.
		return;
	    }
	}
    }
    return;
}



sub init_child {
    $IPC_DIR_DEDICATED = 0;
    %IPC_FILES = @IPC_FILES = ();
    @SAFEOPENED = ();
    %SIG_OLD = ();
    return;
}

sub _child_share {
    my $job = shift;
    return if !defined $job->{share};
    return if !defined $job->{share_ipc};
    if (open my $fh, '>', $job->{share_ipc}) {
        print $fh Data::Dumper::Dumper( $job->{share} );
        close $fh;
        if ($job->{debug}) {
            debug("shared data written to $job->{share_ipc}");
        }
    } else {
        carp 'Forks::Super::deinit_child: could not open ',
            "share ipc file $job->{share_ipc}: $!";
    }
}

sub deinit_child {
    use Data::Dumper;
    my $job = Forks::Super::Job->this;
    if ($job->{is_emulation}) {
        local *STDERR = *$Forks::Super::Debug::DEBUG_FH;
        Carp::cluck("FSJ::Ipc: deinit_child called for emulated job!");
    }
    _child_share($job);

    if (@IPC_FILES > 0) {
        Carp::cluck("Child $$ had temp files! @IPC_FILES\n")
	    if $Forks::Super::CHILD_FORK_OK < 0; # stackoverflow/q/15230850
	unlink @IPC_FILES;
	@IPC_FILES = ();
    }
    _close_child_fh($job);
    return;
}

sub _close_child_fh {
    my $job = shift;
    my %closed = ();



( run in 2.533 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )