Forks-Super
view release on metacpan or search on metacpan
lib/Forks/Super/Job/Ipc.pm view on Meta::CPAN
if (! -e $ipc_file) {
$deleted{$ipc_file} = delete $IPC_FILES{$ipc_file};
} else {
local $! = undef;
if ($DEBUG) {
print STDERR "Deleting $ipc_file ... ";
}
my $z = unlink $ipc_file;
if ($z && ! -e $ipc_file) {
if ($DEBUG) {
print STDERR "Delete $ipc_file ok\n";
}
$deleted{$ipc_file} = delete $IPC_FILES{$ipc_file};
} else {
if ($DEBUG) {
print STDERR "Delete $ipc_file failed: $!\n";
}
warn 'Forks::Super::END_cleanup: ',
"error disposing of ipc file $ipc_file: $z/$!\n";
}
}
}
return %deleted;
}
sub _END_background_cleanup2 {
# best efforts to cleanup the IPC files
my %G = @_;
my $z = rmdir($_IPC_DIR) || 0;
if (!$z) {
unlink glob("$_IPC_DIR/*");
CORE::sleep 5;
$z = rmdir($_IPC_DIR) || 0;
}
if (!$z
&& -d $_IPC_DIR
&& glob("$_IPC_DIR/.nfs*")) {
# Observed these files on Linux running from NSF mounted filesystem
# .nfsXXX files are usually temporary (~30s) but hard to kill
for my $i (1..10) {
CORE::sleep 5;
last unless glob("$_IPC_DIR/.nfs*");
}
$z = rmdir($_IPC_DIR) || 0;
}
if (!$z && -d $_IPC_DIR) {
warn "Forks::Super::END_cleanup: rmdir $_IPC_DIR failed. $!\n";
opendir(my $_Z, $_IPC_DIR);
my @g = grep { !/^\.nfs/ } readdir($_Z);
closedir $_Z;
}
return;
}
# if we have created temporary files for IPC, clean them up.
# clean them up even if the children are still alive -- these files
# are exclusively for IPC, and IPC isn't needed after the parent
# process is done.
sub END_cleanup {
if ($$ != ($Forks::Super::MAIN_PID || $MAIN_PID)) {
return;
}
return if _END_foreground_cleanup();
return if CORE::fork();
exit 0 if CORE::fork();
my %G = _END_background_cleanup1();
return if !defined $IPC_DIR_DEDICATED;
return if 0 < scalar keys %IPC_FILES;
my $zz = rmdir($_IPC_DIR) || 0;
return if $zz;
CORE::sleep 2;
exit 0 if CORE::fork();
# long sleep here for maximum portability.
CORE::sleep 10;
_END_background_cleanup2();
return;
}
sub END_cleanup_MSWin32 {
return if $$ != ($Forks::Super::MAIN_PID || $MAIN_PID);
return if $_CLEANUP++;
$0 = "Forks::Super:cleanup:$0";
# Use brute force to close all open handles. Leave STDERR open for warns.
# XXX - is this ok? what if perl script is communicating with a socket?
use POSIX ();
for (0,1,3..999) {
no warnings;
POSIX::close($_);
}
Forks::Super::Job::dispose(@Forks::Super::ALL_JOBS);
my @G = grep { -e $_ } keys %IPC_FILES;
FILE_TRY: for my $try (1 .. 3) {
if (@G == 0) {
last FILE_TRY;
}
foreach my $G (@G) {
local $! = undef;
if (!unlink $G) {
undef $!;
$G =~ s!/!\\!;
my $c1 = system("CMD /C DEL /Q \"$G\" 2> NUL");
}
}
} continue {
CORE::sleep 1;
lib/Forks/Super/Job/Ipc.pm view on Meta::CPAN
my ($job, $fh, $expire, $blocking_desired) = @_;
my @lines;
while (@lines == 0) {
@lines = readline($fh);
if (@lines > 0) {
return @lines;
}
if (!_check_if_job_is_complete_and_close_io($job, $fh)) {
seek $fh, 0, 1;
if ($blocking_desired) {
if ($expire > 0 && Time::HiRes::time() >= $expire) {
$blocking_desired = 0;
} else {
Forks::Super::Util::pause(
1 * $Forks::Super::Util::DEFAULT_PAUSE_IO);
}
}
} else {
return;
}
if (!$blocking_desired) {
return;
}
}
return @lines;
}
sub _readline_scalar {
my ($job, $fh, $expire, $blocking_desired) = @_;
my $line;
while (!defined $line) {
$line = readline($fh);
if (defined $line) {
return $line;
}
last if _check_if_job_is_complete_and_close_io($job, $fh);
seek $fh, 0, 1;
if ($blocking_desired) {
if ($expire > 0 && Time::HiRes::time() >= $expire) {
$blocking_desired = 0;
} else {
Forks::Super::Util::pause(
$Forks::Super::Util::DEFAULT_PAUSE_IO);
}
}
if (!$blocking_desired) {
if (!$job->{is_child}) {
# in the parent, we can tell the difference between this input
# stream being empty because the child process is finished
# (see _check_if_job_is_complete_and_close_io() call, above),
# and the stream being empty because the child isn't producing
# enough output to keep it full.
# We can and do distinguish between these two cases by
# returning <undef> when the child is finished and will
# not produce any more input, and "" (empty string) when
# the child is still alive and it could potentially
# produce more input.
return '';
} else {
# in the child, we don't make this distinction.
return;
}
}
}
return;
}
sub init_child {
$IPC_DIR_DEDICATED = 0;
%IPC_FILES = @IPC_FILES = ();
@SAFEOPENED = ();
%SIG_OLD = ();
return;
}
sub _child_share {
my $job = shift;
return if !defined $job->{share};
return if !defined $job->{share_ipc};
if (open my $fh, '>', $job->{share_ipc}) {
print $fh Data::Dumper::Dumper( $job->{share} );
close $fh;
if ($job->{debug}) {
debug("shared data written to $job->{share_ipc}");
}
} else {
carp 'Forks::Super::deinit_child: could not open ',
"share ipc file $job->{share_ipc}: $!";
}
}
sub deinit_child {
use Data::Dumper;
my $job = Forks::Super::Job->this;
if ($job->{is_emulation}) {
local *STDERR = *$Forks::Super::Debug::DEBUG_FH;
Carp::cluck("FSJ::Ipc: deinit_child called for emulated job!");
}
_child_share($job);
if (@IPC_FILES > 0) {
Carp::cluck("Child $$ had temp files! @IPC_FILES\n")
if $Forks::Super::CHILD_FORK_OK < 0; # stackoverflow/q/15230850
unlink @IPC_FILES;
@IPC_FILES = ();
}
_close_child_fh($job);
return;
}
sub _close_child_fh {
my $job = shift;
my %closed = ();
( run in 2.533 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )