Async-Event-Interval
view release on metacpan or search on metacpan
lib/Async/Event/Interval.pm view on Meta::CPAN
});
if (! $unique_shm_key_found) {
croak("Could not generate a unique shared memory segment.");
}
push @{ $self->{_shared_scalars} }, \$scalar;
return \$scalar;
}
sub start {
my ($self, @callback_params) = @_;
if ($self->_started) {
warn "Event already running...\n";
return;
}
$self->_crashed(0);
_events_write(sub {
delete $events{$self->id}{_stop_requested};
delete $events{$self->id}{_clean_exit};
$events{$self->id}{error} = 0;
});
$self->_started(1);
$self->_event(@callback_params);
}
sub status {
my ($self) = @_;
$self->_detect_crash;
return 0 unless $self->_started;
if (! $self->pid) {
croak "Event is started, but no PID can be found. This is a " .
"fatal error. Exiting...\n";
}
return $self->pid;
}
sub stop {
my $self = shift;
return if $self->_crashed;
return unless $self->pid;
$self->_started(0);
# Set cooperative stop flag so a well-behaved child exits its event loop on
# the next iteration. The signals below act as a safety net for children
# stuck in a long-running callback.
_events_write(sub { $events{$self->id}{_stop_requested} = 1 });
# Try graceful SIGTERM first so a user-installed SIGTERM handler in the
# callback can do cleanup (close files, release locks, etc.).
# Escalate to SIGKILL if the child is still alive after STOP_TERM_TIMEOUT
# _signal_and_wait polls at STOP_KILL_POLL_INTERVAL and returns 1 as
# soon as the process is gone, so the common case is a single poll.
return if $self->_signal_and_wait('TERM', STOP_TERM_TIMEOUT);
return if $self->_signal_and_wait('KILL', STOP_KILL_TIMEOUT);
croak "Event stop was called, but the process hasn't been killed " .
"(SIGTERM + SIGKILL both ignored). This is a fatal event. " .
"Exiting...\n";
}
sub timeout {
my ($self, $timeout) = @_;
# Check param count here because we allow undef as a valid value for
# $timeout
if (@_ > 1) {
if (defined $timeout && $timeout !~ /^\d+$/) {
croak "\$timeout must be a non-negative integer or undef";
}
_events_write(sub { $events{$self->id}{timeout} = $timeout });
}
return _events_read(sub { $events{$self->id}->{timeout} });
}
sub wait {
my ($self, $interval) = @_;
if (defined $interval) {
if ($interval !~ /^\d+$/ && $interval !~ /^(?:\d+)?\.\d+$/) {
croak "\$interval must be an integer or float";
}
}
else {
$interval = 0.01;
}
while (! $self->waiting) {
select(undef, undef, undef, $interval);
}
return;
}
sub waiting {
my ($self) = @_;
return 1 if $self->error || ! $self->status;
return 0;
}
# Internal methods
sub _args {
my ($self, $args) = @_;
if (defined $args) {
$self->{args} = $args;
}
return $self->{args};
lib/Async/Event/Interval.pm view on Meta::CPAN
}
1;
};
alarm(0) if $timeout;
if (! $ok) {
my $err = $@;
$self->_errors(1);
$self->_error_message($err);
$self->_runs(1);
$self->status;
die $err;
}
$self->_runs(1);
$self->status;
}
sub _runs {
my ($self, $increment) = @_;
if (defined $increment) {
_events_write(sub { $events{$self->id}->{runs}++ });
}
return _events_read(sub { $events{$self->id}->{runs} });
}
sub _setup {
my ($self, $interval, $cb, @args) = @_;
$self->interval($interval);
$self->_cb($cb);
$self->_args(\@args);
}
sub _shm_lock {
return $shared_memory_protect_lock;
}
sub _signal_and_wait {
my ($self, $sig, $timeout) = @_;
kill $sig, $self->pid;
my $start = Time::HiRes::time();
while (kill 0, $self->pid) {
return 0 if Time::HiRes::time() - $start >= $timeout;
select(undef, undef, undef, STOP_KILL_POLL_INTERVAL);
}
return 1;
}
sub _started {
my ($self, $started) = @_;
$self->{started} = $started if defined $started;
return $self->{started};
}
# External access: These allow unit tests to directly access live data in the
# %events hash
sub _events_count {
# Number of events currently alive
return _events_read(sub { $events{_event_count} || 0 });
}
sub _events_knot {
# The IPC::Shareable knot itself
return tied(%events);
}
sub _events_next_id {
# Fetch the next ID that will be assigned to an event
return _events_read(sub { $events{_id_counter} || 0 });
}
sub _events_stop_requested {
# Is the _stop_requested flag set?
my ($self) = @_;
return _events_read(sub { $events{$self->id}{_stop_requested} });
}
# Destruction
sub _alarmed_eval {
my ($timeout, $code) = @_;
my $handler = sub { die "alarm\n" };
local $SIG{ALRM} = $handler;
my $sigset = POSIX::SigSet->new(POSIX::SIGALRM());
my $sa = POSIX::SigAction->new($handler, $sigset, 0);
my $old = POSIX::SigAction->new();
POSIX::sigaction(POSIX::SIGALRM(), $sa, $old);
alarm($timeout);
eval { $code->() };
alarm(0);
POSIX::sigaction(POSIX::SIGALRM(), $old);
}
sub _end {
my ($is_shutdown) = @_;
return if $$ != $creator_pid;
if ($is_shutdown) {
return if $_shutting_down;
$_shutting_down = 1;
}
# Phase 1: Collect PIDs from %events (1 second timeout).
# Falls back to @all_pids if the lock is stuck.
my @pids;
_alarmed_eval(1, sub {
_events_read(sub {
for my $id (keys %events) {
next if $id =~ /^_/;
my $pid = $events{$id}{pid};
push @pids, $pid if $pid && kill(0, $pid);
}
});
( run in 1.950 second using v1.01-cache-2.11-cpan-df04353d9ac )