IPC-Manager
view release on metacpan or search on metacpan
lib/IPC/Manager/Service/State.pm view on Meta::CPAN
package IPC::Manager::Service::State;
use strict;
use warnings;
use Carp qw/croak/;
use IPC::Manager::Util qw/require_mod clone_io/;
use IPC::Manager::Serializer::JSON();
use Long::Jump qw/setjump longjump havejump/;
use POSIX();
my $base_0 = $0;
my $service_inst;
my $_post_exec_run = sub { 255 };
sub import {
my $class = shift;
my $json = shift @ARGV;
my $params = IPC::Manager::Serializer::JSON->deserialize($json);
my $exec = delete $params->{exec} // {};
$params->{orig_io} = {
stderr => clone_io('>&', \*STDERR),
stdout => clone_io('>&', \*STDOUT),
stdin => clone_io('<&', \*STDIN),
};
my $svc_class = require_mod($params->{class} // 'IPC::Manager::Service');
my $new_inst = $svc_class->new(%$params);
my $exit;
my $code = sub { _ipcm_service(%$params, new_inst => $new_inst); $exit = 0 };
if ($exec->{stay_in_begin}) {
$exit = 255;
$_post_exec_run = sub { $exit };
$code->();
}
else {
$_post_exec_run = $code;
}
}
sub ipcm_worker {
my ($name, $cb) = @_;
croak "ipcm_service_worker() can only be called from inside a service"
unless $service_inst && ref($service_inst) ne 'CODE';
my $pid = fork // die "Could not fork: $!";
if ($pid) {
$service_inst->register_worker($name => $pid);
return $pid;
}
$0 = "$0 $name";
$service_inst = $cb;
croak "ipcm_worker: no IPCM_SERVICE jump point on the stack (worker invoked outside a running service?)"
unless havejump 'IPCM_SERVICE';
longjump IPCM_SERVICE => ();
die "This should not be reachable";
}
sub ipcm_service {
my ($name, @args) = @_;
my $return_handle = defined(wantarray) ? 1 : 0;
my %params;
if (@args == 1 && ref($args[0]) eq 'CODE') {
$params{class} = 'IPC::Manager::Service';
$params{on_all} = $args[0];
}
elsif (@args == 2 && ref($args[0]) eq 'HASH' && ref($args[1]) eq 'CODE') {
%params = %{$args[0]};
$params{class} = 'IPC::Manager::Service';
$params{on_all} = $args[1];
}
else {
%params = @args;
}
my $skip_role_checks = delete $params{skip_role_checks};
my $exec = delete $params{exec};
$params{name} = $name;
$params{orig_io} //= $service_inst ? $service_inst->orig_io : {
stderr => clone_io('>&', \*STDERR),
stdout => clone_io('>&', \*STDOUT),
stdin => clone_io('<&', \*STDIN),
} unless $exec;
if ($service_inst && !$params{redirect}) {
if (my $redir = $service_inst->redirect) {
$params{redirect} = $redir unless defined($redir->{inherit}) && !$redir->{inherit};
}
}
my $new_ipcm = delete $params{new_ipcm};
my $handle_params = delete $params{handle_params} // {};
if ($params{ipcm_info}) {
croak "'new_ipcm' and 'ipcm_info' may not be combined" if $new_ipcm;
}
else {
if ($service_inst && !$new_ipcm) {
$handle_params->{_peer} = 1;
$params{ipcm_info} = $service_inst->ipcm_info;
}
elsif ($return_handle) {
require IPC::Manager;
$handle_params->{spawn} = IPC::Manager::ipcm_spawn();
$params{ipcm_info} = $handle_params->{spawn}->info;
$params{watch_pids} = [$$];
}
else {
croak "Cannot be called in void context without providing 'ipcm_info'";
}
}
my $class = require_mod($params{class} // 'IPC::Manager::Service');
croak "'$class' does not implement the 'IPC::Manager::Role::Service' role"
unless $skip_role_checks || Role::Tiny::does_role($class, 'IPC::Manager::Role::Service');
my $new_inst = $class->new(%params);
$new_inst->pre_fork_hook();
$exec->{json} = IPC::Manager::Serializer::JSON->serialize(\%params)
if $exec;
my $pid = fork // die "Could not fork: $!";
# In parent
if ($pid) {
return unless $return_handle || $exec;
my $out;
if (delete $handle_params->{_peer}) {
$out = $service_inst->peer($params{name}, %$handle_params);
}
else {
$out = $new_inst->handle(%$handle_params, name => "service_parent_$$");
}
my $timeout = $params{timeout} || 10;
croak "Timeout waiting for service to come up after ${timeout}s"
unless $out->ready($timeout);
$out->_set_child_pid($pid);
return $out;
}
$new_inst->post_fork_hook();
if ($exec) {
my $cmd = $exec->{cmd} // [];
my $json = $exec->{json};
exec(
$^X,
@{$cmd},
"-M${ \__PACKAGE__ }",
"-e" => "exit(${ \__PACKAGE__ }\::_post_exec_run())",
$json,
);
}
@_ = (%params, new_inst => $new_inst);
goto \&_ipcm_service;
}
sub _post_exec_run { $_post_exec_run->() }
sub _ipcm_service {
my %params = @_;
( run in 1.040 second using v1.01-cache-2.11-cpan-39bf76dae61 )