Async-Event-Interval

 view release on metacpan or  search on metacpan

t/09-locking.t  view on Meta::CPAN

        Async::Event::Interval::_events_read(sub { die "boom-read\n" });
        1;
    };
    is $ok, undef, "_read_events propagates die from its coderef";
    like $@, qr/boom-read/, "...with the original error";
    is $knot->sem->getval(SEM_READERS), 0,
        "_read_events released LOCK_SH even on die";
}

# When the tie has been torn down (simulate by undef'ing tied()), the
# helpers degrade gracefully by just running the coderef without a lock.

{
    no warnings 'redefine';
    local *Async::Event::Interval::_events_write = sub {
        my ($cb) = @_;
        my $knot = Async::Event::Interval::_events_knot();
        # Simulate teardown by ignoring the knot
        $knot = undef;
        return $cb->() unless $knot;
        return 'never';
    };
    is
        Async::Event::Interval::_events_write(sub { 'fallback' }),
        'fallback',
        "_write_events falls back to running the coderef when no knot is available";
}

# Concurrent writers through the public API (interval() setter) do not
# corrupt state. Two forked children each set interval many times; the
# final value is one of the two expected values, not a corrupt mix.

{
    my $e = Async::Event::Interval->new(0.5, sub {});
    my $iters = 100;

    my @kids;
    my $pid = fork;
    die "fork: $!" unless defined $pid;
    if (! $pid) {
        for (1 .. $iters) { $e->interval(100) }
        require POSIX;
        POSIX::_exit(0);
    }
    push @kids, $pid;

    $pid = fork;
    die "fork: $!" unless defined $pid;
    if (! $pid) {
        for (1 .. $iters) { $e->interval(200) }
        require POSIX;
        POSIX::_exit(0);
    }
    push @kids, $pid;

    waitpid $_, 0 for @kids;

    my $v = $e->interval;
    ok $v == 100 || $v == 200,
        "concurrent interval() setters produce an expected value, got $v "
      . "(no corruption across $iters iterations x 2 children)";
}

# A LOCK_EX writer must block a LOCK_SH reader. Verify by holding
# LOCK_EX in a forked child and watching the parent's LOCK_SH wait
# for it.

{
    my $knot = events_knot;

    pipe my $child_ready_r, my $child_ready_w or die "pipe: $!";
    pipe my $parent_done_r, my $parent_done_w or die "pipe: $!";

    my $pid = fork;
    die "fork: $!" unless defined $pid;
    if (! $pid) {
        # Child: take LOCK_EX, tell parent, wait for parent's signal,
        # then release.
        $knot->lock(LOCK_EX);
        close $child_ready_r;
        print $child_ready_w "ready\n";
        close $child_ready_w;

        # Block until parent writes back
        close $parent_done_w;
        my $line = <$parent_done_r>;
        close $parent_done_r;

        $knot->unlock;
        require POSIX;
        POSIX::_exit(0);
    }

    # Parent: wait for child's LOCK_EX, then time a LOCK_SH attempt;
    # release the child while we're waiting on LOCK_SH.
    close $child_ready_w;
    my $r = <$child_ready_r>;
    chomp $r;
    is $r, 'ready', "child signaled it holds LOCK_EX";
    close $child_ready_r;

    is $knot->sem->getval(SEM_WRITERS), 1,
        "SEM_WRITERS shows the child's LOCK_EX is held";

    # Release the child only after we've kicked off the LOCK_SH wait
    # below. Do it in a forked grandchild so the parent's blocking
    # _read_events has someone to unblock it.
    my $unblock_pid = fork;
    die "fork: $!" unless defined $unblock_pid;
    if (! $unblock_pid) {
        # Wait briefly so the parent enters its LOCK_SH wait first
        select undef, undef, undef, 0.2;
        close $parent_done_r;
        print $parent_done_w "go\n";
        close $parent_done_w;
        require POSIX;
        POSIX::_exit(0);
    }

    use Time::HiRes ();
    my $t0 = Time::HiRes::time();

t/09-locking.t  view on Meta::CPAN

{
    my @finish_calls;
    my $start_call = 0;
    my $cb_count   = 0;
    no warnings 'redefine';
    local *Parallel::ForkManager::start = sub {
        $start_call++;
        return 0 if $start_call == 1;
        return 1;
    };
    local *Parallel::ForkManager::finish = sub {
        push @finish_calls, $_[1];
    };

    my $e = Async::Event::Interval->new(
        0.001,
        sub { $cb_count++; die "immediate-loop-death\n" },
    );
    $e->start;

    is $cb_count, 1, "interval mode (immediate death): callback ran once";
    is scalar @finish_calls, 1,
        "interval mode (immediate death): finish called exactly once";
    is $finish_calls[0], 1, "...with exit code 1";
    is $e->runs, 1, "runs incremented for the one iteration";
    is $e->errors, 1, "errors incremented exactly once";
    like $e->error_message, qr/immediate-loop-death/,
        "interval mode: error_message captured";

    $e->_pid(0);
}

# Interval mode: callback succeeds N times then dies; finish(1) called
# once, runs == N (incremented in both success and failure paths of
# _run_callback), errors == 1.

{
    my @finish_calls;
    my $start_call = 0;
    my $cb_count   = 0;
    no warnings 'redefine';
    local *Parallel::ForkManager::start = sub {
        $start_call++;
        return 0 if $start_call == 1;
        return 1;
    };
    local *Parallel::ForkManager::finish = sub {
        push @finish_calls, $_[1];
    };

    my $e = Async::Event::Interval->new(0.001, sub {
        $cb_count++;
        die "death-after-runs\n" if $cb_count >= 3;
    });
    $e->start;

    is $cb_count, 3, "interval mode (death after runs): callback ran 3 times";
    is scalar @finish_calls, 1,
        "interval mode (death after runs): finish called exactly once";
    is $finish_calls[0], 1, "...with exit code 1";
    is $e->runs, 3, "runs incremented for all iterations including the dying one";
    is $e->errors, 1, "errors incremented exactly once";
    like $e->error_message, qr/death-after-runs/,
        "interval mode (death after runs): error_message captured";

    $e->_pid(0);
}

# Multiple events crashing independently each receive their own finish(1)
# call. The mocked start() returns 0 on odd calls (child path) and 1 on
# even calls (parent path), so each event's for(0..1) loop runs once
# through the child and once through the parent.

{
    my @finish_calls;
    my $start_call = 0;
    no warnings 'redefine';
    local *Parallel::ForkManager::start = sub {
        $start_call++;
        return $start_call % 2 == 1 ? 0 : 1;
    };
    local *Parallel::ForkManager::finish = sub {
        push @finish_calls, $_[1];
    };

    my $e1 = Async::Event::Interval->new(0, sub { die "e1-crash\n" });
    my $e2 = Async::Event::Interval->new(0, sub { die "e2-crash\n" });
    $e1->start;
    $e2->start;

    is scalar @finish_calls, 2,
        "multiple events: each crash invokes _pm->finish";
    is $finish_calls[0], 1, "first event finish exit code 1";
    is $finish_calls[1], 1, "second event finish exit code 1";
    is $e1->errors, 1, "first event errors == 1";
    is $e2->errors, 1, "second event errors == 1";
    like $e1->error_message, qr/e1-crash/,
        "first event error_message captured";
    like $e2->error_message, qr/e2-crash/,
        "second event error_message captured";

    $e1->_pid(0);
    $e2->_pid(0);
}

# finish() is called exactly once per start, from inside the child-path
# branch of the for(0..1) loop. The parent-path iteration must not call
# finish a second time. Recording the start-call counter at finish time
# proves finish ran before the parent-path iteration.

{
    my @finish_at;
    my $start_call = 0;
    no warnings 'redefine';
    local *Parallel::ForkManager::start = sub {
        $start_call++;
        return 0 if $start_call == 1;
        return 1;
    };
    local *Parallel::ForkManager::finish = sub {
        push @finish_at, $start_call;



( run in 0.644 second using v1.01-cache-2.11-cpan-71847e10f99 )