App-MHFS
view release on metacpan or search on metacpan
lib/MHFS/Process.pm view on Meta::CPAN
package MHFS::Process v0.7.0;
use 5.014;
use strict; use warnings;
use feature 'say';
use Symbol 'gensym';
use Time::HiRes qw( usleep clock_gettime CLOCK_REALTIME CLOCK_MONOTONIC);
use POSIX ":sys_wait_h";
use IO::Socket::INET;
use IO::Poll qw(POLLIN POLLOUT POLLHUP);
use Errno qw(EINTR EIO :POSIX);
use Fcntl qw(:seek :mode);
use File::stat;
use IPC::Open3;
use Scalar::Util qw(looks_like_number weaken);
use Data::Dumper;
use Devel::Peek;
use MHFS::FD::Reader;
use MHFS::FD::Writer;
use MHFS::EventLoop::Poll;
use Carp;
#my %CHILDREN;
#$SIG{CHLD} = sub {
# while((my $child = waitpid(-1, WNOHANG)) > 0) {
# my ($wstatus, $exitcode) = ($?, $?>> 8);
# if(defined $CHILDREN{$child}) {
# say "PID $child reaped (func) $exitcode";
# $CHILDREN{$child}->($exitcode);
# # remove file handles here?
# $CHILDREN{$child} = undef;
# }
# else {
# say "PID $child reaped (No func) $exitcode";
# }
# }
#};
sub _setup_handlers {
my ($self, $in, $out, $err, $fddispatch, $handlesettings) = @_;
my $pid = $self->{'pid'};
my $evp = $self->{'evp'};
if($fddispatch->{'SIGCHLD'}) {
say "PID $pid custom SIGCHLD handler";
#$CHILDREN{$pid} = $fddispatch->{'SIGCHLD'};
$evp->register_child($pid, $fddispatch->{'SIGCHLD'});
}
if($fddispatch->{'STDIN'}) {
$self->{'fd'}{'stdin'} = MHFS::FD::Writer->new($self, $in, $fddispatch->{'STDIN'});
$evp->set($in, $self->{'fd'}{'stdin'}, POLLOUT | MHFS::EventLoop::Poll->ALWAYSMASK);
}
else {
$self->{'fd'}{'stdin'}{'fd'} = $in;
}
if($fddispatch->{'STDOUT'}) {
$self->{'fd'}{'stdout'} = MHFS::FD::Reader->new($self, $out, $fddispatch->{'STDOUT'});
$evp->set($out, $self->{'fd'}{'stdout'}, POLLIN | MHFS::EventLoop::Poll->ALWAYSMASK());
}
else {
$self->{'fd'}{'stdout'}{'fd'} = $out;
}
if($fddispatch->{'STDERR'}) {
$self->{'fd'}{'stderr'} = MHFS::FD::Reader->new($self, $err, $fddispatch->{'STDERR'});
$evp->set($err, $self->{'fd'}{'stderr'}, POLLIN | MHFS::EventLoop::Poll->ALWAYSMASK);
}
else {
$self->{'fd'}{'stderr'}{'fd'} = $err;
}
if($handlesettings->{'O_NONBLOCK'}) {
# stderr
{
my $flags = fcntl($err, Fcntl::F_GETFL, 0) or die "$!";
fcntl($err, Fcntl::F_SETFL, $flags | Fcntl::O_NONBLOCK) or die "$!";
}
# stdout
{
my $flags = fcntl($out, Fcntl::F_GETFL, 0) or die "$!";
fcntl($out, Fcntl::F_SETFL, $flags | Fcntl::O_NONBLOCK) or die "$!";
}
# stdin
defined($in->blocking(0)) or die($!);
#(0 == fcntl($in, Fcntl::F_GETFL, $flags)) or die("$!");#return undef;
#$flags |= Fcntl::O_NONBLOCK;
#(0 == fcntl($in, Fcntl::F_SETFL, $flags)) or die;#return undef;
return $self;
}
}
sub sigkill {
my ($self, $cb) = @_;
if($cb) {
$self->{'evp'}{'children'}{$self->{'pid'}} = $cb;
}
kill('KILL', $self->{'pid'});
}
sub stopSTDOUT {
my ($self) = @_;
$self->{'evp'}->set($self->{'fd'}{'stdout'}{'fd'}, $self->{'fd'}{'stdout'}, MHFS::EventLoop::Poll->ALWAYSMASK);
}
sub resumeSTDOUT {
my ($self) = @_;
$self->{'evp'}->set($self->{'fd'}{'stdout'}{'fd'}, $self->{'fd'}{'stdout'}, POLLIN | MHFS::EventLoop::Poll->ALWAYSMASK);
}
sub new {
my ($class, $torun, $evp, $fddispatch, $handlesettings, $env) = @_;
my %self = ('time' => clock_gettime(CLOCK_MONOTONIC), 'evp' => $evp);
my %oldenvvars;
if($env) {
foreach my $key(keys %{$env}) {
# save current value
$oldenvvars{$key} = $ENV{$key};
# set new value
$ENV{$key} = $env->{$key};
my $oldval = $oldenvvars{$key} // '{undef}';
my $newval = $env->{$key} // '{undef}';
say "Changed \$ENV{$key} from $oldval to $newval";
}
}
my ($pid, $in, $out, $err);
eval{ $pid = open3($in, $out, $err = gensym, @$torun); };
if($@) {
say "BAD process";
return undef;
}
$self{'pid'} = $pid;
say 'PID '. $pid . ' NEW PROCESS: ' . $torun->[0];
if($env) {
# restore environment
foreach my $key(keys %oldenvvars) {
$ENV{$key} = $oldenvvars{$key};
my $oldval = $env->{$key} // '{undef}';
my $newval = $oldenvvars{$key} // '{undef}';
say "Restored \$ENV{$key} from $oldval to $newval";
}
}
_setup_handlers(\%self, $in, $out, $err, $fddispatch, $handlesettings);
return bless \%self, $class;
}
sub _new_ex {
my ($make_process, $make_process_args, $context) = @_;
my $process;
$context->{'stdout'} = '';
$context->{'stderr'} = '';
my $prochandlers = {
'STDOUT' => sub {
my ($handle) = @_;
my $buf;
while(read($handle, $buf, 4096)) {
$context->{'stdout'} .= $buf;
}
if($context->{'on_stdout_data'}) {
$context->{'on_stdout_data'}->($context);
}
return 1;
},
'STDERR' => sub {
my ($handle) = @_;
my $buf;
while(read($handle, $buf, 4096)) {
$context->{'stderr'} .= $buf;
}
return 1;
},
'SIGCHLD' => sub {
$context->{exit_status} = $_[0];
my $obuf;
my $handle = $process->{'fd'}{'stdout'}{'fd'};
while(read($handle, $obuf, 100000)) {
$context->{'stdout'} .= $obuf;
say "stdout sigchld read";
}
my $ebuf;
$handle = $process->{'fd'}{'stderr'}{'fd'};
while(read($handle, $ebuf, 100000)) {
$context->{'stderr'} .= $ebuf;
say "stderr sigchld read";
}
if($context->{'on_stdout_data'}) {
$context->{'on_stdout_data'}->($context);
}
$context->{'at_exit'}->($context);
},
};
if($context->{'input'}) {
$prochandlers->{'STDIN'} = sub {
my ($fh) = @_;
while(1) {
my $curbuf = $context->{'curbuf'};
if($curbuf) {
my $rv = syswrite($fh, $curbuf, length($curbuf));
if(!defined($rv)) {
if(! $!{EAGAIN}) {
say "Critical write error";
return -1;
}
return 1;
}
elsif($rv != length($curbuf)) {
substr($context->{'curbuf'}, 0, $rv, '');
return 1;
}
else {
say "wrote all";
}
}
$context->{'curbuf'} = $context->{'input'}->($context);
if(! defined $context->{'curbuf'}) {
return 0;
}
}
};
}
$process = $make_process->($make_process_args, $prochandlers, {'O_NONBLOCK' => 1});
return $process;
}
# launch a command process with poll handlers
sub _new_cmd {
my ($mpa, $prochandlers, $handlesettings) = @_;
return $mpa->{'class'}->new($mpa->{'cmd'}, $mpa->{'evp'}, $prochandlers, $handlesettings);
}
# launch a command process
sub new_cmd_process {
my ($class, $evp, $cmd, $context) = @_;
my $mpa = {'class' => $class, 'evp' => $evp, 'cmd' => $cmd};
return _new_ex(\&_new_cmd, $mpa, $context);
}
# subset of command process, just need the data on SIGCHLD
sub new_output_process {
my ($class, $evp, $cmd, $handler) = @_;
return new_cmd_process($class, $evp, $cmd, {
'at_exit' => sub {
my ($context) = @_;
say 'run handler';
$handler->($context->{'stdout'}, $context->{'stderr'});
}
});
}
sub new_io_process {
my ($class, $evp, $cmd, $handler, $inputdata) = @_;
my $ctx = {
'at_exit' => sub {
my ($context) = @_;
say 'run handler';
$handler->($context->{'stdout'}, $context->{'stderr'});
}
};
if(defined $inputdata) {
$ctx->{'curbuf'} = $inputdata;
$ctx->{'input'} = sub {
say "all written";
return undef;
};
}
return new_cmd_process($class, $evp, $cmd, $ctx);
}
# launch a process without a new exe with poll handlers
sub _new_child {
my ($mpa, $prochandlers, $handlesettings) = @_;
my %self = ('time' => clock_gettime(CLOCK_MONOTONIC), 'evp' => $mpa->{'evp'});
# inreader/inwriter is the parent to child data channel
# outreader/outwriter is the child to parent data channel
# errreader/errwriter is the child to parent log channel
pipe(my $inreader, my $inwriter) or die("pipe failed $!");
pipe(my $outreader, my $outwriter) or die("pipe failed $!");
pipe(my $errreader, my $errwriter) or die("pipe failed $!");
# the childs stderr will be UTF-8 text
binmode($errreader, ':encoding(UTF-8)');
my $pid = fork() // do {
say "failed to fork";
return undef;
};
if($pid == 0) {
close($inwriter);
close($outreader);
close($errreader);
open(STDIN, "<&", $inreader) or die("Can't dup \$inreader to STDIN");
open(STDOUT, ">&", $errwriter) or die("Can't dup \$errwriter to STDOUT");
open(STDERR, ">&", $errwriter) or die("Can't dup \$errwriter to STDERR");
$mpa->{'func'}->($outwriter);
exit 0;
}
close($inreader);
close($outwriter);
close($errwriter);
$self{'pid'} = $pid;
say 'PID '. $pid . ' NEW CHILD';
_setup_handlers(\%self, $inwriter, $outreader, $errreader, $prochandlers, $handlesettings);
return bless \%self, $mpa->{'class'};
}
sub cmd_to_sock {
my ($name, $cmd, $sockfh) = @_;
if(fork() == 0) {
open(STDOUT, ">&", $sockfh) or die("Can't dup \$sockfh to STDOUT");
exec(@$cmd);
die;
}
close($sockfh);
}
# launch a process without a new exe with just sigchld handler
sub new_output_child {
my ($class, $evp, $func, $handler) = @_;
my $mpa = {'class' => $class, 'evp' => $evp, 'func' => $func};
return _new_ex(\&_new_child, $mpa, {
'at_exit' => sub {
my ($context) = @_;
$handler->($context->{'stdout'}, $context->{'stderr'}, $context->{exit_status});
}
});
}
sub remove {
my ($self, $fd) = @_;
$self->{'evp'}->remove($fd);
say "poll has " . scalar ( $self->{'evp'}{'poll'}->handles) . " handles";
foreach my $key (keys %{$self->{'fd'}}) {
if(defined($self->{'fd'}{$key}{'fd'}) && ($fd == $self->{'fd'}{$key}{'fd'})) {
$self->{'fd'}{$key} = undef;
last;
}
}
}
sub DESTROY {
my $self = shift;
say "PID " . $self->{'pid'} . ' DESTROY called';
foreach my $key (keys %{$self->{'fd'}}) {
if(defined($self->{'fd'}{$key}{'fd'})) {
#Dump($self->{'fd'}{$key});
$self->{'evp'}->remove($self->{'fd'}{$key}{'fd'});
$self->{'fd'}{$key} = undef;
}
}
}
1;
( run in 0.634 second using v1.01-cache-2.11-cpan-d7a12ab2c7f )