IPC-Manager

 view release on metacpan or  search on metacpan

lib/IPC/Manager/Service/State.pm  view on Meta::CPAN

package IPC::Manager::Service::State;
use strict;
use warnings;

use Carp qw/croak/;
use IPC::Manager::Util qw/require_mod clone_io/;
use IPC::Manager::Serializer::JSON();
use Long::Jump qw/setjump longjump havejump/;

use POSIX();

my $base_0 = $0;
my $service_inst;

my $_post_exec_run = sub { 255 };

sub import {
    my $class = shift;

    my $json = shift @ARGV;

    my $params = IPC::Manager::Serializer::JSON->deserialize($json);
    my $exec = delete $params->{exec} // {};

    $params->{orig_io} = {
        stderr => clone_io('>&', \*STDERR),
        stdout => clone_io('>&', \*STDOUT),
        stdin  => clone_io('<&', \*STDIN),
    };

    my $svc_class = require_mod($params->{class} // 'IPC::Manager::Service');
    my $new_inst = $svc_class->new(%$params);

    my $exit;
    my $code = sub { _ipcm_service(%$params, new_inst => $new_inst); $exit = 0 };

    if ($exec->{stay_in_begin}) {
        $exit = 255;
        $_post_exec_run = sub { $exit };
        $code->();
    }
    else {
        $_post_exec_run = $code;
    }
}

sub ipcm_worker {
    my ($name, $cb) = @_;

    croak "ipcm_service_worker() can only be called from inside a service"
        unless $service_inst && ref($service_inst) ne 'CODE';

    my $pid = fork // die "Could not fork: $!";
    if ($pid) {
        $service_inst->register_worker($name => $pid);
        return $pid;
    }

    $0 = "$0 $name";

    $service_inst = $cb;
    croak "ipcm_worker: no IPCM_SERVICE jump point on the stack (worker invoked outside a running service?)"
        unless havejump 'IPCM_SERVICE';
    longjump IPCM_SERVICE => ();
    die "This should not be reachable";
}

sub ipcm_service {
    my ($name, @args) = @_;

    my $return_handle = defined(wantarray) ? 1 : 0;

    my %params;
    if (@args == 1 && ref($args[0]) eq 'CODE') {
        $params{class}  = 'IPC::Manager::Service';
        $params{on_all} = $args[0];
    }
    elsif (@args == 2 && ref($args[0]) eq 'HASH' && ref($args[1]) eq 'CODE') {
        %params         = %{$args[0]};
        $params{class}  = 'IPC::Manager::Service';
        $params{on_all} = $args[1];
    }
    else {
        %params = @args;
    }

    my $skip_role_checks = delete $params{skip_role_checks};

    my $exec = delete $params{exec};

    $params{name} = $name;

    $params{orig_io} //= $service_inst ? $service_inst->orig_io : {
        stderr => clone_io('>&', \*STDERR),
        stdout => clone_io('>&', \*STDOUT),
        stdin  => clone_io('<&', \*STDIN),
    } unless $exec;

    if ($service_inst && !$params{redirect}) {
        if (my $redir = $service_inst->redirect) {
            $params{redirect} = $redir unless defined($redir->{inherit}) && !$redir->{inherit};
        }
    }

    my $new_ipcm = delete $params{new_ipcm};

    my $handle_params = delete $params{handle_params} // {};
    if ($params{ipcm_info}) {
        croak "'new_ipcm' and 'ipcm_info' may not be combined" if $new_ipcm;
    }
    else {
        if ($service_inst && !$new_ipcm) {
            $handle_params->{_peer} = 1;
            $params{ipcm_info} = $service_inst->ipcm_info;
        }
        elsif ($return_handle) {
            require IPC::Manager;
            $handle_params->{spawn} = IPC::Manager::ipcm_spawn();
            $params{ipcm_info}      = $handle_params->{spawn}->info;
            $params{watch_pids}     = [$$];
        }
        else {
            croak "Cannot be called in void context without providing 'ipcm_info'";
        }
    }

    my $class = require_mod($params{class} // 'IPC::Manager::Service');

    croak "'$class' does not implement the 'IPC::Manager::Role::Service' role"
        unless $skip_role_checks || Role::Tiny::does_role($class, 'IPC::Manager::Role::Service');

    my $new_inst = $class->new(%params);

    $new_inst->pre_fork_hook();

    $exec->{json} = IPC::Manager::Serializer::JSON->serialize(\%params)
        if $exec;

    my $pid = fork // die "Could not fork: $!";

    # In parent
    if ($pid) {
        return unless $return_handle || $exec;

        my $out;

        if (delete $handle_params->{_peer}) {
            $out = $service_inst->peer($params{name}, %$handle_params);
        }
        else {
            $out = $new_inst->handle(%$handle_params, name => "service_parent_$$");
        }

        my $timeout = $params{timeout} || 10;

        croak "Timeout waiting for service to come up after ${timeout}s"
            unless $out->ready($timeout);

        $out->_set_child_pid($pid);

        return $out;
    }

    $new_inst->post_fork_hook();

    if ($exec) {
        my $cmd = $exec->{cmd} // [];
        my $json = $exec->{json};

        exec(
            $^X,
            @{$cmd},
            "-M${ \__PACKAGE__ }",
            "-e" => "exit(${ \__PACKAGE__ }\::_post_exec_run())",
            $json,
        );
    }

    @_ = (%params, new_inst => $new_inst);
    goto \&_ipcm_service;
}

sub _post_exec_run { $_post_exec_run->() }

sub _ipcm_service {
    my %params = @_;



( run in 1.040 second using v1.01-cache-2.11-cpan-39bf76dae61 )