Async-Redis

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

    - Feature: Unix domain socket connections
        - Connect via path parameter or redis+unix:// URI scheme
        - Constructor stores path and skips host/port for unix sockets
        - Added docker-compose redis-unix service for testing
    - Feature: PubSub auto-resubscribe on reconnect
        - Subscriptions automatically re-established after connection drop
        - on_reconnect callback on Subscription object for notification
        - _read_pubsub_frame checks connected state to fail fast
        - _reset_connection now clears in_pubsub flag
    - Bug Fix: _reset_connection left in_pubsub=1 after disconnect
        - Prevented reconnection because AUTH/SELECT blocked by pubsub guard

0.001004  2026-02-02
    - Bug Fix: Socket close ordering
        - Fixed Future::IO corruption when disconnect() called with active watchers
        - Cancel _current_read_future before socket close ensures Future::IO
          unregisters watchers while fileno is still valid
        - Let Perl's DESTROY handle close() after futures are cancelled
    - Breaking Change: Future::IO configuration
        - Removed Future::IO->load_best_impl from module
        - Libraries should not configure Future::IO - only application entry

examples/async-job-queue/PLAN.md  view on Meta::CPAN

## Goal

Implement `examples/async-job-queue/` from
`examples/async-job-queue/SPEC.md`: a small CLI demo that visibly proves
Async::Redis can run a Redis-backed job queue concurrently in one Perl process.

The demo should show:

- a burst of jobs queued immediately
- two or more workers claiming and processing jobs concurrently
- a heartbeat that keeps printing while workers are blocked in `BLPOP` or
  sleeping during simulated work
- automatic shutdown after all jobs are processed

## Constraints

- Work only on the async-job-queue example and the examples index.
- Do not touch existing dirty stress files:
  - `examples/stress/lib/Stress/Chaos.pm`
  - `examples/stress/lib/Stress/Harness.pm`
- Use `perlbrew use perl-5.40.0@default` for all Perl checks.

examples/async-job-queue/PLAN.md  view on Meta::CPAN

- output shows both `worker-1` and `worker-2`
- the first two jobs start near the same timestamp
- elapsed time is closer to `0.4s` than `0.8s`
- final processed count is `4`

Review after step:

- each worker uses its own Redis connection
- all worker futures are awaited
- sentinels cannot be counted as processed jobs
- no worker can be left blocked after completion

## Step 7: Add Heartbeat Task

Add:

- separate stats Redis connection
- `heartbeat($opts)` async helper
- heartbeat loop every `0.25s`
- output with `queue=`, `in_flight=`, and `processed=`
- stop condition when processed count reaches target

examples/async-job-queue/PLAN.md  view on Meta::CPAN

```

Expected:

- exits `0`
- prints `queued 6 jobs`
- prints at least one `heartbeat` line before completion
- prints `worker-1` and `worker-2`
- prints `done processed=6`

If Redis access is blocked by the sandbox, rerun with escalation rather than
changing the app.

Review after step:

- no warnings
- no hanging worker process
- output demonstrates the intended async behavior without explanation

## Step 12: Related Test Suite Check

examples/async-job-queue/README.md  view on Meta::CPAN

This example demonstrates Async::Redis running a small Redis-backed job queue in
one Perl process.

It intentionally keeps the app simple:

- one controller connection queues jobs and coordinates shutdown
- one Redis connection per worker waits in `BLPOP`
- one stats connection prints heartbeat lines
- workers simulate slow work with `Future::IO->sleep`

The point is to make async behavior visible. While workers are blocked in
`BLPOP` or waiting on simulated work, the heartbeat keeps printing and other
workers continue making progress.

## Running

Start Redis from the project root:

```bash
docker compose -f examples/docker-compose.yml up -d
```

examples/async-job-queue/README.md  view on Meta::CPAN

[ 0.25s] heartbeat queue=4 in_flight=2 processed=0
[ 1.51s] worker-1 finished job-1 processed=1
[ 1.51s] worker-2 finished job-2 processed=2
[ 1.51s] worker-1 started job-3
[ 1.51s] worker-2 started job-4
[ 1.75s] heartbeat queue=2 in_flight=2 processed=2
[ 4.55s] done processed=6 workers=2 elapsed=4.55s sequential_about=9.00s
```

The important lines are the heartbeat lines. They show the process is still
doing useful work while workers are blocked on Redis or sleeping between job
start and finish.

## Why Each Worker Has Its Own Redis Connection

`BLPOP` is a Redis blocking command. A connection sitting inside `BLPOP` cannot
also be used for unrelated commands until Redis replies.

This example gives each worker its own connection so one worker can wait for
jobs without blocking:

examples/async-job-queue/SPEC.md  view on Meta::CPAN

[ 0.25s] heartbeat queue=8 in_flight=2 processed=0
[ 1.51s] worker-1 finished job-1
[ 1.51s] worker-1 started job-3
[ 1.51s] worker-2 finished job-2
[ 1.51s] worker-2 started job-4
[ 1.75s] heartbeat queue=6 in_flight=2 processed=2
[ 7.55s] done processed=10 workers=2 elapsed=7.55s sequential_about=15.00s
```

The README should explain that the heartbeat lines are the key proof: the
process continues running other async work while workers are blocked in Redis
or waiting on simulated work.

## CLI

Keep the first version simple, but allow the demo to be tweaked:

```text
examples/async-job-queue/app.pl [options]

Options:

lib/Async/Redis/Cookbook.pod  view on Meta::CPAN


    $pool->shutdown;

=for cookbook-test-end pool_with

=head1 PITFALLS

=over 4

=item * Do not share one connection between a C<BLPOP> worker and unrelated
commands. The connection is blocked until Redis replies.

=item * Do not mix C<next> and C<on_message> on the same subscription.
Callback mode is sticky.

=item * Do not return dirty connections to a pool. Use C<< $pool->with(...) >>
or let the pool destroy/clean dirty connections on release.

=item * Do not rely on C<prefix> as a security boundary. Use Redis ACLs or
separate databases for tenant isolation.

lib/Async/Redis/Subscription.pm  view on Meta::CPAN

        }

        return $frame;
    }
}

# Convert a raw RESP pub/sub frame into a message hashref and deliver it.
# In callback mode, invokes _on_message via _invoke_user_callback and
# returns its result (which may be a Future for consumer-side backpressure).
# In iterator mode, queues the message into _pending_messages and signals
# _message_waiter so a blocked next() can wake up.
#
# Non-message frames (subscribe confirmations, etc.) return undef and
# take no action — the driver loop will read the next frame.
sub _dispatch_frame {
    my ($self, $frame) = @_;
    return unless $frame && ref $frame eq 'ARRAY';

    my $type = $frame->[0] // '';
    my $msg;

lib/Async/Redis/Subscription.pm  view on Meta::CPAN

    # Convert new format to old format for compatibility
    return {
        channel => $msg->{channel},
        message => $msg->{data},
        pattern => $msg->{pattern},
        type    => $msg->{type},
    };
}

# Intentional teardown: marks the subscription closed and wakes any
# blocked next() with undef. Clears the parent _subscription slot
# with an identity guard so a stale _close cannot evict a newer
# subscription object that reused the same slot.
sub _close {
    my ($self) = @_;
    return if $self->{_closed};
    $self->{_closed} = 1;

    $self->{_pending_messages} = [];

    if (my $w = delete $self->{_message_waiter}) {

lib/Async/Redis/Subscription.pm  view on Meta::CPAN


    # Cancel any running driver Future. The driver's await on _dequeue
    # also unwinds because we resolved _message_waiter above, so this is
    # belt-and-suspenders; either path exits the driver cleanly.
    if (my $f = delete $self->{_driver_step}) {
        $f->cancel unless $f->is_ready;
    }
}

# Unrecoverable failure: marks the subscription closed with a typed
# error. Any blocked next() will die with that error. The error is
# preserved for callers who call next() after the fact.
# In callback mode, fires on_error if registered; dies otherwise.
sub _fail_fatal {
    my ($self, $typed_error) = @_;
    return if $self->{_closed};
    $self->{_closed}      = 1;
    $self->{_fatal_error} = $typed_error;

    $self->{_pending_messages} = [];

lib/Async/Redis/Subscription.pm  view on Meta::CPAN


=head1 INTERNAL LIFECYCLE METHODS

The following methods are used by L<Async::Redis> to manage subscription
state. They are not part of the public API for end consumers, but are
documented here for maintainers.

=head2 _close

Intentional teardown. Marks the subscription closed and wakes any
blocked C<next()> with C<undef>. Clears the parent C<_subscription>
slot on the L<Async::Redis> object with an identity guard — a stale
C<_close> call from an earlier subscription object cannot evict a newer
one that has since taken the slot.

=head2 _fail_fatal($typed_error)

Unrecoverable failure. Marks the subscription closed with a typed error
object. Any blocked C<next()> call will C<die> with that error. The
error is preserved for callers who call C<next()> after the fact.
Routes through C<_close>'s identity guard for parent-slot clearing.

=head2 _pause_for_reconnect

Called before a reconnect attempt. Does B<not> mark the subscription
closed — the underlying reader has already exited due to the connection
drop. Channel/pattern tracking hashes are left intact for replay.

=head2 _resume_after_reconnect

script/commands.json  view on Meta::CPAN

            "@connection"
        ],
        "arity": 2,
        "command_flags": [
            "noscript",
            "loading",
            "stale"
        ]
    },
    "CLIENT UNBLOCK": {
        "summary": "Unblocks a client blocked by a blocking command from a different connection.",
        "since": "5.0.0",
        "group": "connection",
        "complexity": "O(log N) where N is the number of client connections",
        "acl_categories": [
            "@admin",
            "@slow",
            "@dangerous",
            "@connection"
        ],
        "arity": -3,

script/commands.json  view on Meta::CPAN

        "command_flags": [
            "readonly",
            "blocking",
            "movablekeys"
        ]
    },
    "XREADGROUP": {
        "summary": "Returns new or historical messages from a stream for a consumer in a group. Blocks until a message is available otherwise.",
        "since": "5.0.0",
        "group": "stream",
        "complexity": "For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD ...
        "acl_categories": [
            "@write",
            "@stream",
            "@slow",
            "@blocking"
        ],
        "arity": -7,
        "key_specs": [
            {
                "begin_search": {

t/02-nonblocking.t  view on Meta::CPAN

        run { $redis->del("pipe_pipe:$i") };
    }

    $redis->disconnect;
};

# ============================================================================
# Test 3: Timer can run during Redis operations
# ============================================================================

subtest 'event loop not blocked' => sub {
    my $redis = Async::Redis->new(host => redis_host(), port => redis_port());
    run { $redis->connect };

    my @futures = map { $redis->set("nb:nonblocking:$_", $_) } (1..50);
    my $start = Time::HiRes::time();
    run { Future->needs_all(@futures) };
    my $elapsed = Time::HiRes::time() - $start;

    ok($elapsed < 5, "50 concurrent ops completed in ${elapsed}s");

t/10-connection/timeout.t  view on Meta::CPAN

        my $elapsed = time() - $start;

        ok($error, 'command failed');
        like("$error", qr/timed?\s*out/i, 'error mentions timeout');
        ok($elapsed >= 0.2, "waited at least 0.2s (got ${elapsed}s)");
        ok($elapsed < 1.0, "timed out before command finished (got ${elapsed}s)");

        $redis->disconnect;
    };

    subtest 'concurrent ops not blocked during request timeout' => sub {
        my $redis = Async::Redis->new(
            host            => $ENV{REDIS_HOST} // 'localhost',
            request_timeout => 5 * $TIMEOUT_SCALE,
        );
        run { $redis->connect };

        my @futures = map { $redis->set("nb:req:timeout:$_", $_) } (1..50);
        my $start = time();
        run { Future->needs_all(@futures) };
        my $elapsed = time() - $start;

t/50-pubsub/on-message.t  view on Meta::CPAN

                # First message blocks on the gate; subsequent return
                # undef so the driver can drain.
                return @received == 1 ? $gate : undef;
            });

            for my $i (1..3) {
                $publisher->publish('test:onmsg:backpressure', "msg-$i")->get;
            }
            Future::IO->sleep(0.3)->get;

            is(scalar @received, 1, 'driver delivered 1 message then blocked on Future');
            is($received[0], 'msg-1', 'first message delivered');

            $gate->done;
            Future::IO->sleep(0.3)->get;
            is(scalar @received, 3, 'remaining messages delivered after gate released');

            $subscriber->disconnect;
        };

        subtest 'subscribe from inside callback takes effect' => sub {



( run in 0.664 second using v1.01-cache-2.11-cpan-df04353d9ac )