Async-Redis

 view release on metacpan or  search on metacpan

examples/pagi-chat/README.md  view on Meta::CPAN

    ├── css/style.css
    └── js/app.js
```

## Comparison with Original

| Feature | Original (In-Memory) | This Version (Redis) |
|---------|---------------------|----------------------|
| Workers | 1 only | N workers |
| State | Process memory | Redis |
| Broadcast | Direct callbacks | Redis PubSub |
| Persistence | None (lost on restart) | Redis (survives restart) |
| Scalability | Single process | Horizontal |

## Requirements

- PAGI (0.001+)
- Async::Redis
- Redis server
- JSON::MaybeXS

examples/pagi-chat/lib/ChatApp/State.pm  view on Meta::CPAN

my $JSON = JSON::MaybeXS->new->utf8->canonical;

# Redis connections (per-worker)
my $redis;
my $pubsub;
my $pubsub_subscription;

# Background task selector (per-worker)
my $background_selector;

# Local session callbacks (for this worker only)
# Redis PubSub delivers to all workers, but we only call callbacks for OUR clients
my %local_sessions;

use constant {
    MAX_MESSAGES_PER_ROOM => 100,
    SESSION_TTL           => 86400,    # 24 hours
    BROADCAST_CHANNEL     => 'chat:broadcast',
};

# Track server start time for uptime
my $server_start_time = time();

lib/Async/Redis/Subscription.pm  view on Meta::CPAN

=head2 Backpressure

If your callback returns a C<Future>, the driver waits for that Future
to resolve before reading the next frame:

    $sub->on_message(async sub {
        my ($sub, $msg) = @_;
        await store_to_database($msg);    # driver waits before next read
    });

Synchronous callbacks (or callbacks returning non-Future values) do not
block the driver. This gives consumers opt-in backpressure with no
default overhead.

If the returned Future fails, the failure is routed to C<on_error>.

=head2 Fatal error handling

    $sub->on_error(sub {
        my ($sub, $err) = @_;
        ...

lib/Async/Redis/Subscription.pm  view on Meta::CPAN

=item * Call C<< $sub->on_message($new_cb) >> — the current message is
dispatched to the previously-installed handler; the next frame uses
the new handler.

=item * C<die> — routed to C<on_error>.

=back

=head2 Backpressure and Redis server limits

Synchronous callbacks provide backpressure by blocking the driver loop:
while your callback runs, the driver doesn't read the next frame, so
TCP fills, Redis's output buffer grows. But Redis enforces
C<client-output-buffer-limit pubsub> (defaulting to S<32mb 8mb 60>
in recent versions) — if your subscriber cannot keep up for sustained
periods, B<Redis will disconnect you>. There is no amount of
client-side buffering that changes this: the limit is on the server.

If your processing is genuinely slow, return a Future from your
callback (enabling opt-in backpressure above) AND consider moving the
expensive work to a worker pool so the callback can return quickly.
Long synchronous processing in pub/sub callbacks is an anti-pattern at
scale regardless of client.

=head1 INTERNAL LIFECYCLE METHODS

The following methods are used by L<Async::Redis> to manage subscription
state. They are not part of the public API for end consumers, but are
documented here for maintainers.

=head2 _close

t/10-connection/reconnect.t  view on Meta::CPAN


subtest 'default reconnect values' => sub {
    my $redis = Async::Redis->new(host => 'localhost');

    ok(!$redis->{reconnect}, 'reconnect disabled by default');
    is($redis->{reconnect_delay}, 0.1, 'default reconnect_delay');
    is($redis->{reconnect_delay_max}, 60, 'default reconnect_delay_max');
    is($redis->{reconnect_jitter}, 0.25, 'default reconnect_jitter');
};

subtest 'callbacks accepted' => sub {
    my @events;

    my $redis = Async::Redis->new(
        host          => 'localhost',
        on_connect    => sub { push @events, ['connect', @_] },
        on_disconnect => sub { push @events, ['disconnect', @_] },
        on_error      => sub { push @events, ['error', @_] },
    );

    ok(ref $redis->{on_connect} eq 'CODE', 'on_connect stored');

t/50-pubsub/on-message.t  view on Meta::CPAN

};

# --- Integration tests (need Redis) ---

SKIP: {
    _with_redis {
        my ($publisher) = @_;

        # Integration tests bypass the run{} helper's busy-poll pump
        # because it interacts badly with the driver's on_done
        # callbacks (pumping via Future::IO->sleep(0)->get corrupts
        # internal state when the driver fires a failed-Future →
        # on_error → _close sequence). Direct ->get works reliably.

        subtest 'on_message receives messages published to subscribed channels' => sub {
            my $subscriber = _make_subscriber();
            my @received;
            my $sub = $subscriber->subscribe('test:onmsg:basic')->get;
            $sub->on_message(sub {
                my ($s, $msg) = @_;
                push @received, $msg;

t/50-pubsub/on-message.t  view on Meta::CPAN

            {
                my $subscriber = _make_subscriber();
                my $sub = $subscriber->subscribe('test:onmsg:no-warn')->get;
                $sub->on_message(sub { });

                $publisher->publish('test:onmsg:no-warn', 'x')->get;
                Future::IO->sleep(0.3)->get;

                $subscriber->disconnect;
            }
            # Give deferred GC/event-loop callbacks a chance to fire.
            Future::IO->sleep(0.1)->get;

            my @faa_warnings = grep { /lost.+returning future/i } @warnings;
            is(scalar @faa_warnings, 0,
                'no "lost its returning future" warnings from on_message path')
                or note("warnings captured: @warnings");
        };

        # Full end-to-end backpressure timing is flaky in this test
        # harness because the synchronous-callback path is extremely

t/50-pubsub/unified-reader.t  view on Meta::CPAN

    my $done = Future->new;
    $sub->on_message(sub {
        my ($self, $msg) = @_;
        push @got, $msg->{data};
        $done->done if @got == 3 && !$done->is_ready;
        return;
    });

    for my $i (1..3) { run { $pub_c->publish($ch, "cb$i") } }
    run { Future->wait_any($done, Future::IO->sleep(2)) };
    is \@got, ['cb1', 'cb2', 'cb3'], 'all 3 callbacks fired in order';
    $sub->_close;
    $sub_c->disconnect;
    $pub_c->disconnect;
};

subtest 'callback mode: Future-returning backpressure serializes invocations' => sub {
    my $sub_c = new_redis;
    my $pub_c = new_redis;
    run { $sub_c->connect };
    run { $pub_c->connect };



( run in 1.761 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )