Acme-Ghost

 view release on metacpan or  search on metacpan

lib/Acme/Ghost/Prefork.pm  view on Meta::CPAN

    sub spirit {
        my $self = shift;
        my $loop = $self->{loop};
        my $max = 10;
        my $i = 0;

        # Add a timers
        my $timer = $loop->timer(5 => sub {
            my $l = shift; # loop
            $self->log->info("Timer!");
        });

        my $recur = $loop->recurring(1 => sub {
            my $l = shift; # loop
            $l->stop unless $self->tick;
            $self->log->debug(sprintf("$$> %d/%d", ++$i, $max));
            $l->stop if $i >= $max;
        });

        $self->log->debug("Start IOLoop");

        # Start event loop if necessary
        $loop->start unless $loop->is_running;

        $self->log->debug("Finish IOLoop");
    }

    1;

=back

=head1 TO DO

See C<TODO> file

=head1 SEE ALSO

L<Acme::Ghost>, L<Mojo::Server::Prefork>

=head1 AUTHOR

Serż Minus (Sergey Lepenkov) L<https://www.serzik.com> E<lt>abalama@cpan.orgE<gt>

=head1 COPYRIGHT

Copyright (C) 1998-2026 D&D Corporation

=head1 LICENSE

This program is distributed under the terms of the Artistic License Version 2.0

See the C<LICENSE> file or L<https://opensource.org/license/artistic-2-0> for details

=cut

use parent qw/Acme::Ghost/;

use Carp qw/carp croak/;
use POSIX qw/WNOHANG/;
use Time::HiRes qw//;
use Scalar::Util qw/weaken/;
use IO::Poll qw/POLLIN POLLPRI/;

use constant {
    DEBUG   => !!($ENV{ACME_GHOST_PREFORK_DEBUG} || 0),
    SPARE   => 2,
    SPIRITS => 4,
    HEARTBEAT_INTERVAL  => 50,
    HEARTBEAT_TIMEOUT   => 5,
    GRACEFUL_TIMEOUT    => 120,
};

sub again {
    my $self = shift;
    my %args = @_;

    # Prefork management subsystem
    $self->{pool}               = {}; # pid => {...}
    $self->{running}            = 0; # 0 - not running; 1 - running
    $self->{finished}           = 0; # 1 - marker for spirits and manager stopping
    $self->{gracefully_stop}    = 0; # 1 - marker for gracefully stopping
    $self->{reader}             = undef; # Readable pipe to get messages from spirits
    $self->{writer}             = undef; # Writable pipe to send messages to manager
    $self->{spare}              = $args{spare} || SPARE;
    $self->{spirits}            = $args{spirits} || $args{workers} || SPIRITS;
    $self->{heartbeat_interval} = $args{heartbeat_interval} || HEARTBEAT_INTERVAL;
    $self->{heartbeat_timeout}  = $args{heartbeat_timeout} || HEARTBEAT_TIMEOUT;
    $self->{graceful_timeout}   = $args{graceful_timeout} || GRACEFUL_TIMEOUT;
    $self->{spirit_cb}          = $args{spirit};

    return $self;
}
sub startup {
    my $self = shift;

    # Pipe for spirit communication
    pipe($self->{reader}, $self->{writer}) or croak("Can't create pipe: $!\n");

    # Set manager signals
    local $SIG{INT}  = local $SIG{TERM} = sub { $self->_stop };
    local $SIG{QUIT} = sub { $self->_stop(1) };
    local $SIG{CHLD} = sub { while ((my $pid = waitpid -1, WNOHANG) > 0) { $self->_stopped($pid) } };
    local $SIG{TTIN} = sub { $self->_increase };
    local $SIG{TTOU} = sub { $self->_decrease };

    # Starting
    $self->log->info("Manager $$ started");
    $self->{running} = 1;
    $self->_manage while $self->{running};
    $self->log->info("Manager $$ stopped");
}
sub healthy {
    return scalar grep { $_->{healthy} } values %{shift->{pool}};
}
sub tick { # Spirit level
    my $self = shift;
    my $finished = shift || 0; # 0 - no finished; 1 - finished
    $self->_heartbeat($finished);
    return $self->ok;
}

lib/Acme/Ghost/Prefork.pm  view on Meta::CPAN

        my $graceful = grep { $_->{graceful} } values %{$self->{pool}}; # Number gracefuled spirits
        my $spare = $self->{spare};
           $spare = $graceful # Check gracefuls
                ? $graceful > $spare # Check difference between graceful numbers and spare numbers
                    ? $spare # graceful numbers greater than spare numbers - use original spare value
                    : $graceful # graceful numbers less or equal to spare numbers - set spare to graceful
                : 0; # No gracefuls - no spares - set spare to 0 ('spare = 0')
        my $required = ($self->{spirits} - keys %{$self->{pool}}) + $spare; # How many spirits are required?
        $self->log->debug(sprintf("> graceful=%d; spare=%d; need=%d", $graceful, $spare, $required))
            if DEBUG && $required;
        $self->_spawn while $required-- > 0; # Spawn required spirits
    } elsif (!keys %{$self->{pool}}) { # No PIDs found, shutdown!
        return delete $self->{running}; # Return from the manager and exit immediately
    }

    # Wait for heartbeats
    $self->_wait;

    # Stops
    my $interval = $self->{heartbeat_interval};
    my $hb_to    = $self->{heartbeat_timeout};
    my $gf_to    = $self->{graceful_timeout};
    my $now      = Time::HiRes::time;
    my $log      = $self->log;
    for my $pid (keys %{$self->{pool}}) {
        next unless my $w = $self->{pool}{$pid}; # Get spirit struct

        # No heartbeat (graceful stop)
        if (!$w->{graceful} && ($w->{time} + $interval + $hb_to <= $now)) {
            $log->error("Spirit $pid has no heartbeat ($hb_to seconds), restarting");
            $w->{graceful} = $now;
        }

        # Graceful stop with timeout
        my $graceful = $w->{graceful} ||= $self->{gracefully_stop} ? $now : undef;
        if ($graceful && !$w->{attempt}) {
            $w->{attempt}++;
            $log->info("Stopping spirit $pid gracefully ($gf_to seconds)");
            kill 'QUIT', $pid or $self->_stopped($pid);
        }
        $w->{force} = 1 if $graceful && $graceful + $gf_to <= $now; # The conditions for a graceful stop by timeout were violated

        # Normal stop
        if ($w->{force} || ($self->{finished} && !$graceful)) {
            $log->warn("Stopping spirit $pid immediately");
            kill 'KILL', $pid or $self->_stopped($pid);
        }
    }
}
sub _spawn { # Manager level (Spawn a spirit and transferring control to it)
    my $self = shift;

    # Manager
    croak("Can't fork: $!\n") unless defined(my $pid = fork);
    if ($pid) { # Parent (manager)
        $self->spawn($pid);
        return $self->{pool}{$pid} = {time => Time::HiRes::time};
    }
    $self->{spirited} = 1; # Inspiration! (disables cleanup)

    weaken $self;

    # Clean spirit signals
    $SIG{$_} = 'DEFAULT' for qw/CHLD INT TERM TTIN TTOU/;

    # Set QUIT signal
    $SIG{QUIT} = sub {
        $self->log->warn("Spirit $$ received QUIT signal") if DEBUG;
        $self->_heartbeat(1); # Send finish command to manager
    };

    # Close reader pipe
    delete $self->{reader};

    # Reset the random number seed for spirit
    srand;

    $self->log->info("Spirit $$ started");

    # Start spirit
    $self->spirit;

    exit 0; # EXIT FROM APPLICATION
}
sub _wait { # Manager level
    my $self = shift;

    # Call waitup hook
    $self->waitup;

    # Poll for heartbeats
    my $reader = $self->{reader};
    return unless _is_readable(1000, fileno($reader));
    return unless $reader->sysread(my $chunk, 4194304);

    # Update heartbeats (and stop gracefully if necessary)
    my $now = Time::HiRes::time;
    while ($chunk =~ /(\d+):(\d)\n/g) {
        my $pid = $1;
        my $finished = $2;
        $self->log->warn("Spirit $$ received finished HeartBeat message $pid:$finished") if DEBUG && $finished;
        next unless my $w = $self->{pool}{$pid};
        $w->{healthy} = 1;
        $w->{time} = $now;
        $self->heartbeat($pid);
        if ($finished) { # Oops! Needs to finish
            $w->{graceful} ||= $now;
            $w->{attempt}++;
        }
    }
}

sub _heartbeat { # Spirit level (send message to manager)
    my $self = shift;
    my $msg = shift || 0;
    $self->{ok} = 0 if $msg; # Stop gracefully
    $self->{writer}->syswrite("$$:$msg\n") or exit 0;
}

# See Mojo::Util::_readable
sub _is_readable { !!(IO::Poll::_poll(@_[0, 1], my $m = POLLIN | POLLPRI) > 0) }



( run in 0.644 second using v1.01-cache-2.11-cpan-5511b514fd6 )