Acme-Ghost
view release on metacpan or search on metacpan
lib/Acme/Ghost/Prefork.pm view on Meta::CPAN
sleep 1;
$self->log->debug(sprintf("$$> %d/%d", $i, $max));
last if $i >= $max;
}
},
);
exit $g->ctrl(shift(@ARGV) // '');
=head1 DESCRIPTION
Pre-forking ghost daemon (server)
=head1 ATTRIBUTES
This class inherits all attributes from L<Acme::Ghost> and implements the following new ones
=head2 graceful_timeout
graceful_timeout => 120
The maximum amount of time in seconds stopping a spirit gracefully may take before being forced to stop
B<Note that> this value should usually be a little larger than the maximum
amount of time you expect any one request to take
Defaults to C<120>
=head2 heartbeat_interval
heartbeat_interval => 5
Heartbeat interval in seconds, defaults to C<5>
=head2 heartbeat_timeout
heartbeat_timeout => 50
Maximum amount of time in seconds before a spirit without a heartbeat will be stopped gracefully
B<Note that> this value should usually be a little larger than the maximum
amount of time you expect any one operation to block the event loop
Defaults to C<50>
=head2 spare
spare => 2
Temporarily spawn up to this number of additional spirits if there is a need
This allows for new spirits to be started while old ones are still shutting down gracefully,
drastically reducing the performance cost of spirit restarts.
Defaults to C<2>
=head2 spirits, workers
spirits => 4
Number of spirit processes.
A good rule of thumb is two spirit processes per CPU core for applications that perform mostly
non-blocking operations.
Blocking operations often require more amount of spirits and benefit from decreasing concurrency
(often as low as C<1>)
Defaults to C<4>
=head1 METHODS
This class inherits all methods from L<Acme::Ghost> and implements the following new ones
=head2 again
This method is called immediately after creating the instance and returns it
B<NOTE:> Internal use only!
=head2 healthy
my $healthy = $g->healthy;
This method returns the number of currently active live spirit processes (with a heartbeat)
=head2 startup
$prefork->startup;
This method starts preforked process (manager and spirits) and wait for L</"MANAGER SIGNALS">
=head2 tick
my $ok = $g->tick;
my $ok = $g->tick(1); # marks the finished status
This is B<required> method of spirit main process that sends heartbeat message to
process manager and returns the status of the running server via the 'ok' attribute
=head1 MANAGER SIGNALS
The manager process can be controlled at runtime with the following signals
=head2 INT, TERM
Shut down server immediately
=head2 QUIT
Shut down server gracefully
=head2 TTIN
Increase spirit pool by one
=head2 TTOU
Decrease spirit pool by one
=head1 SPIRIT SIGNALS
The spirit processes can be controlled at runtime with the following signals
=head2 QUIT
Stop spirit gracefully
=head1 HOOKS
This class inherits all hooks from L<Acme::Ghost> and implements the following new ones
Any of the following methods may be implemented (overwriting) in your class
=head2 finish
sub finish {
my $self = shift;
my $graceful = shift;
# . . .
}
Is called when the server shuts down
sub finish {
my $self = shift;
my $graceful = shift;
$self->log->debug($graceful ? 'Graceful server shutdown' : 'Server shutdown');
}
=head2 heartbeat
sub heartbeat {
my $self = shift;
my $pid = shift;
# . . .
}
Is called when a heartbeat message has been received from a spirit
sub heartbeat {
my $self = shift;
my $pid = shift;
$self->log->debug("Spirit $pid has a heartbeat");
}
=head2 reap
sub reap {
my $self = shift;
my $pid = shift;
# . . .
}
Is called when a child process (spirit) finished
sub reap {
my $self = shift;
my $pid = shift;
$self->log->debug("Spirit $pid stopped");
}
=head2 spawn
sub spawn {
my $self = shift;
my $pid = shift;
# . . .
}
Is called when a spirit process is spawned
sub spawn {
my $self = shift;
my $pid = shift;
$self->log->debug("Spirit $pid started");
}
=head2 waitup
sub waitup {
my $self = shift;
# . . .
}
Is called when the manager starts waiting for new heartbeat messages
sub waitup {
my $self = shift;
my $spirits = $prefork->{spirits};
$self->log->debug("Waiting for heartbeat messages from $spirits spirits");
}
=head2 spirit
B<The spirit body>
This hook is called when the spirit process has started and is ready to run in isolation.
This is main hook that MUST BE implement to in user subclass
sub spirit {
my $self = shift;
# . . .
}
=head1 EXAMPLES
=over 4
=item prefork_acme.pl
Prefork acme example of daemon with reloading demonstration
my $g = MyGhost->new(
logfile => 'daemon.log',
pidfile => 'daemon.pid',
);
exit $g->ctrl(shift(@ARGV) // 'start');
1;
package MyGhost;
use parent 'Acme::Ghost::Prefork';
use Data::Dumper qw/Dumper/;
sub init {
my $self = shift;
$SIG{HUP} = sub { $self->hangup };
}
sub hangup {
my $self = shift;
$self->log->debug(Dumper($self->{pool}));
}
sub spirit {
my $self = shift;
my $max = 10;
my $i = 0;
while ($self->tick) {
$i++;
sleep 1;
$self->log->debug(sprintf("$$> %d/%d", $i, $max));
last if $i >= $max;
}
}
1;
=item prefork_ioloop.pl
L<Mojo::IOLoop> example
my $g = MyGhost->new(
logfile => 'daemon.log',
pidfile => 'daemon.pid',
);
exit $g->ctrl(shift(@ARGV) // 'start');
lib/Acme/Ghost/Prefork.pm view on Meta::CPAN
DEBUG => !!($ENV{ACME_GHOST_PREFORK_DEBUG} || 0),
SPARE => 2,
SPIRITS => 4,
HEARTBEAT_INTERVAL => 50,
HEARTBEAT_TIMEOUT => 5,
GRACEFUL_TIMEOUT => 120,
};
sub again {
my $self = shift;
my %args = @_;
# Prefork management subsystem
$self->{pool} = {}; # pid => {...}
$self->{running} = 0; # 0 - not running; 1 - running
$self->{finished} = 0; # 1 - marker for spirits and manager stopping
$self->{gracefully_stop} = 0; # 1 - marker for gracefully stopping
$self->{reader} = undef; # Readable pipe to get messages from spirits
$self->{writer} = undef; # Writable pipe to send messages to manager
$self->{spare} = $args{spare} || SPARE;
$self->{spirits} = $args{spirits} || $args{workers} || SPIRITS;
$self->{heartbeat_interval} = $args{heartbeat_interval} || HEARTBEAT_INTERVAL;
$self->{heartbeat_timeout} = $args{heartbeat_timeout} || HEARTBEAT_TIMEOUT;
$self->{graceful_timeout} = $args{graceful_timeout} || GRACEFUL_TIMEOUT;
$self->{spirit_cb} = $args{spirit};
return $self;
}
sub startup {
my $self = shift;
# Pipe for spirit communication
pipe($self->{reader}, $self->{writer}) or croak("Can't create pipe: $!\n");
# Set manager signals
local $SIG{INT} = local $SIG{TERM} = sub { $self->_stop };
local $SIG{QUIT} = sub { $self->_stop(1) };
local $SIG{CHLD} = sub { while ((my $pid = waitpid -1, WNOHANG) > 0) { $self->_stopped($pid) } };
local $SIG{TTIN} = sub { $self->_increase };
local $SIG{TTOU} = sub { $self->_decrease };
# Starting
$self->log->info("Manager $$ started");
$self->{running} = 1;
$self->_manage while $self->{running};
$self->log->info("Manager $$ stopped");
}
sub healthy {
return scalar grep { $_->{healthy} } values %{shift->{pool}};
}
sub tick { # Spirit level
my $self = shift;
my $finished = shift || 0; # 0 - no finished; 1 - finished
$self->_heartbeat($finished);
return $self->ok;
}
# User hooks
sub finish { } # Emitted when the server shuts down
sub heartbeat { } # Emitted when a heartbeat message has been received from a spirit
sub reap { } # Emitted when a child process exited
sub spawn { } # Emitted when a spirit process is spawned
sub waitup { } # Emitted when the manager starts waiting for new heartbeat messages
sub spirit {
my $self = shift;
my $cb = $self->{spirit_cb};
return unless $cb;
return $self->$cb if ref($cb) eq 'CODE';
$self->log->error("Callback `spirit` is incorrect");
$self->tick(1);
}
# Internal methods
sub _increase { # Manager level
my $self = shift;
$self->log->debug(sprintf("> Increase spirit pool by one")) if DEBUG;
$self->{spirits} = $self->{spirits} + 1;
}
sub _decrease { # Manager level
my $self = shift;
$self->log->debug(sprintf("> Decrease spirit pool by one")) if DEBUG;
return unless $self->{spirits} > 0;
$self->{spirits} = $self->{spirits} - 1;
# Set graceful time for first found unfinished pid (spirit)
for my $w (values %{$self->{pool}}) {
unless ($w->{graceful}) {
$w->{graceful} = Time::HiRes::time;
last;
}
}
}
sub _stop { # Manager level
my ($self, $graceful) = @_;
$self->log->debug(sprintf("> Received stop signal/command: %s",
$graceful ? 'graceful shutdown' : 'forced shutdown')) if DEBUG;
$self->finish($graceful);
$self->{finished} = 1;
$self->{gracefully_stop} = $graceful ? 1 : 0;
}
sub _stopped { # Manager level (Calls when a child process exited)
my $self = shift;
my $pid = shift;
$self->log->debug(sprintf("> Reap %s", $pid)) if DEBUG;
$self->reap($pid);
return unless my $w = delete $self->{pool}{$pid};
$self->log->info("Spirit $pid stopped");
unless ($w->{healthy}) {
$self->log->error("Spirit $pid stopped too early, shutting down");
$self->_stop;
}
}
sub _manage { # Manager level
my $self = shift;
# Spawn more spirits if necessary
if (!$self->{finished}) { # No finished
my $graceful = grep { $_->{graceful} } values %{$self->{pool}}; # Number gracefuled spirits
my $spare = $self->{spare};
$spare = $graceful # Check gracefuls
? $graceful > $spare # Check difference between graceful numbers and spare numbers
? $spare # graceful numbers greater than spare numbers - use original spare value
: $graceful # graceful numbers less or equal to spare numbers - set spare to graceful
: 0; # No gracefuls - no spares - set spare to 0 ('spare = 0')
my $required = ($self->{spirits} - keys %{$self->{pool}}) + $spare; # How many spirits are required?
$self->log->debug(sprintf("> graceful=%d; spare=%d; need=%d", $graceful, $spare, $required))
if DEBUG && $required;
$self->_spawn while $required-- > 0; # Spawn required spirits
} elsif (!keys %{$self->{pool}}) { # No PIDs found, shutdown!
return delete $self->{running}; # Return from the manager and exit immediately
}
# Wait for heartbeats
$self->_wait;
# Stops
my $interval = $self->{heartbeat_interval};
my $hb_to = $self->{heartbeat_timeout};
my $gf_to = $self->{graceful_timeout};
my $now = Time::HiRes::time;
my $log = $self->log;
for my $pid (keys %{$self->{pool}}) {
next unless my $w = $self->{pool}{$pid}; # Get spirit struct
# No heartbeat (graceful stop)
if (!$w->{graceful} && ($w->{time} + $interval + $hb_to <= $now)) {
$log->error("Spirit $pid has no heartbeat ($hb_to seconds), restarting");
$w->{graceful} = $now;
}
# Graceful stop with timeout
my $graceful = $w->{graceful} ||= $self->{gracefully_stop} ? $now : undef;
if ($graceful && !$w->{attempt}) {
$w->{attempt}++;
$log->info("Stopping spirit $pid gracefully ($gf_to seconds)");
kill 'QUIT', $pid or $self->_stopped($pid);
}
$w->{force} = 1 if $graceful && $graceful + $gf_to <= $now; # The conditions for a graceful stop by timeout were violated
# Normal stop
( run in 0.838 second using v1.01-cache-2.11-cpan-39bf76dae61 )