Async-Event-Interval

 view release on metacpan or  search on metacpan

lib/Async/Event/Interval.pm  view on Meta::CPAN

    });

    if (! $unique_shm_key_found) {
        croak("Could not generate a unique shared memory segment.");
    }

    push @{ $self->{_shared_scalars} }, \$scalar;

    return \$scalar;
}
sub start {
    my ($self, @callback_params) = @_;

    if ($self->_started) {
        warn "Event already running...\n";
        return;
    }

    $self->_crashed(0);

    _events_write(sub {
        delete $events{$self->id}{_stop_requested};
        delete $events{$self->id}{_clean_exit};
        $events{$self->id}{error} = 0;
    });

    $self->_started(1);
    $self->_event(@callback_params);
}
sub status {
    my ($self) = @_;

    $self->_detect_crash;

    return 0 unless $self->_started;

    if (! $self->pid) {
        croak "Event is started, but no PID can be found. This is a " .
              "fatal error. Exiting...\n";
    }

    return $self->pid;
}
sub stop {
    my $self = shift;

    return if $self->_crashed;
    return unless $self->pid;

    $self->_started(0);

    # Set cooperative stop flag so a well-behaved child exits its event loop on
    # the next iteration. The signals below act as a safety net for children
    # stuck in a long-running callback.

    _events_write(sub { $events{$self->id}{_stop_requested} = 1 });

    # Try graceful SIGTERM first so a user-installed SIGTERM handler in the
    # callback can do cleanup (close files, release locks, etc.).

    # Escalate to SIGKILL if the child is still alive after STOP_TERM_TIMEOUT

    # _signal_and_wait polls at STOP_KILL_POLL_INTERVAL and returns 1 as
    # soon as the process is gone, so the common case is a single poll.

    return if $self->_signal_and_wait('TERM', STOP_TERM_TIMEOUT);
    return if $self->_signal_and_wait('KILL', STOP_KILL_TIMEOUT);

    croak "Event stop was called, but the process hasn't been killed " .
          "(SIGTERM + SIGKILL both ignored). This is a fatal event. " .
          "Exiting...\n";
}
sub timeout {
    my ($self, $timeout) = @_;

    # Check param count here because we allow undef as a valid value for
    # $timeout

    if (@_ > 1) {
        if (defined $timeout && $timeout !~ /^\d+$/) {
            croak "\$timeout must be a non-negative integer or undef";
        }
        _events_write(sub { $events{$self->id}{timeout} = $timeout });
    }

    return _events_read(sub { $events{$self->id}->{timeout} });
}
sub wait {
    my ($self, $interval) = @_;

    if (defined $interval) {
        if ($interval !~ /^\d+$/ && $interval !~ /^(?:\d+)?\.\d+$/) {
            croak "\$interval must be an integer or float";
        }
    }
    else {
        $interval = 0.01;
    }

    while (! $self->waiting) {
        select(undef, undef, undef, $interval);
    }

    return;
}
sub waiting {
    my ($self) = @_;
    return 1 if $self->error || ! $self->status;
    return 0;
}

# Internal methods

sub _args {
    my ($self, $args) = @_;

    if (defined $args) {
        $self->{args} = $args;
    }

    return $self->{args};

lib/Async/Event/Interval.pm  view on Meta::CPAN

        }
        1;
    };

    alarm(0) if $timeout;

    if (! $ok) {
        my $err = $@;

        $self->_errors(1);
        $self->_error_message($err);
        $self->_runs(1);
        $self->status;

        die $err;
    }

    $self->_runs(1);
    $self->status;
}
sub _runs {
    my ($self, $increment) = @_;
    if (defined $increment) {
        _events_write(sub { $events{$self->id}->{runs}++ });
    }
    return _events_read(sub { $events{$self->id}->{runs} });
}
sub _setup {
    my ($self, $interval, $cb, @args) = @_;
    $self->interval($interval);
    $self->_cb($cb);
    $self->_args(\@args);
}
sub _shm_lock {
    return $shared_memory_protect_lock;
}
sub _signal_and_wait {
    my ($self, $sig, $timeout) = @_;

    kill $sig, $self->pid;

    my $start = Time::HiRes::time();

    while (kill 0, $self->pid) {
        return 0 if Time::HiRes::time() - $start >= $timeout;
        select(undef, undef, undef, STOP_KILL_POLL_INTERVAL);
    }

    return 1;
}
sub _started {
    my ($self, $started) = @_;
    $self->{started} = $started if defined $started;
    return $self->{started};
}

# External access: These allow unit tests to directly access live data in the
# %events hash

sub _events_count {
    # Number of events currently alive
    return _events_read(sub { $events{_event_count} || 0 });
}
sub _events_knot {
    # The IPC::Shareable knot itself
    return tied(%events);
}
sub _events_next_id {
    # Fetch the next ID that will be assigned to an event
    return _events_read(sub { $events{_id_counter} || 0 });
}
sub _events_stop_requested {
    # Is the _stop_requested flag set?
    my ($self) = @_;
    return _events_read(sub { $events{$self->id}{_stop_requested} });
}

# Destruction

sub _alarmed_eval {
    my ($timeout, $code) = @_;

    my $handler = sub { die "alarm\n" };

    local $SIG{ALRM} = $handler;

    my $sigset = POSIX::SigSet->new(POSIX::SIGALRM());
    my $sa     = POSIX::SigAction->new($handler, $sigset, 0);
    my $old    = POSIX::SigAction->new();

    POSIX::sigaction(POSIX::SIGALRM(), $sa, $old);

    alarm($timeout);
    eval { $code->() };
    alarm(0);

    POSIX::sigaction(POSIX::SIGALRM(), $old);
}
sub _end {
    my ($is_shutdown) = @_;

    return if $$ != $creator_pid;

    if ($is_shutdown) {
        return if $_shutting_down;
        $_shutting_down = 1;
    }

    # Phase 1: Collect PIDs from %events (1 second timeout).
    # Falls back to @all_pids if the lock is stuck.

    my @pids;

    _alarmed_eval(1, sub {
        _events_read(sub {
            for my $id (keys %events) {
                next if $id =~ /^_/;
                my $pid = $events{$id}{pid};
                push @pids, $pid if $pid && kill(0, $pid);
            }
        });



( run in 1.950 second using v1.01-cache-2.11-cpan-df04353d9ac )