Async-Redis
view release on metacpan or search on metacpan
examples/pagi-chat/README.md view on Meta::CPAN
âââ css/style.css
âââ js/app.js
```
## Comparison with Original
| Feature | Original (In-Memory) | This Version (Redis) |
|---------|---------------------|----------------------|
| Workers | 1 only | N workers |
| State | Process memory | Redis |
| Broadcast | Direct callbacks | Redis PubSub |
| Persistence | None (lost on restart) | Redis (survives restart) |
| Scalability | Single process | Horizontal |
## Requirements
- PAGI (0.001+)
- Async::Redis
- Redis server
- JSON::MaybeXS
examples/pagi-chat/lib/ChatApp/State.pm view on Meta::CPAN
my $JSON = JSON::MaybeXS->new->utf8->canonical;
# Redis connections (per-worker)
my $redis;
my $pubsub;
my $pubsub_subscription;
# Background task selector (per-worker)
my $background_selector;
# Local session callbacks (for this worker only)
# Redis PubSub delivers to all workers, but we only call callbacks for OUR clients
my %local_sessions;
use constant {
MAX_MESSAGES_PER_ROOM => 100,
SESSION_TTL => 86400, # 24 hours
BROADCAST_CHANNEL => 'chat:broadcast',
};
# Track server start time for uptime
my $server_start_time = time();
lib/Async/Redis/Subscription.pm view on Meta::CPAN
=head2 Backpressure
If your callback returns a C<Future>, the driver waits for that Future
to resolve before reading the next frame:
$sub->on_message(async sub {
my ($sub, $msg) = @_;
await store_to_database($msg); # driver waits before next read
});
Synchronous callbacks (or callbacks returning non-Future values) do not
block the driver. This gives consumers opt-in backpressure with no
default overhead.
If the returned Future fails, the failure is routed to C<on_error>.
=head2 Fatal error handling
$sub->on_error(sub {
my ($sub, $err) = @_;
...
lib/Async/Redis/Subscription.pm view on Meta::CPAN
=item * Call C<< $sub->on_message($new_cb) >> â the current message is
dispatched to the previously-installed handler; the next frame uses
the new handler.
=item * C<die> â routed to C<on_error>.
=back
=head2 Backpressure and Redis server limits
Synchronous callbacks provide backpressure by blocking the driver loop:
while your callback runs, the driver doesn't read the next frame, so
TCP fills, Redis's output buffer grows. But Redis enforces
C<client-output-buffer-limit pubsub> (defaulting to S<32mb 8mb 60>
in recent versions) â if your subscriber cannot keep up for sustained
periods, B<Redis will disconnect you>. There is no amount of
client-side buffering that changes this: the limit is on the server.
If your processing is genuinely slow, return a Future from your
callback (enabling opt-in backpressure above) AND consider moving the
expensive work to a worker pool so the callback can return quickly.
Long synchronous processing in pub/sub callbacks is an anti-pattern at
scale regardless of client.
=head1 INTERNAL LIFECYCLE METHODS
The following methods are used by L<Async::Redis> to manage subscription
state. They are not part of the public API for end consumers, but are
documented here for maintainers.
=head2 _close
t/10-connection/reconnect.t view on Meta::CPAN
subtest 'default reconnect values' => sub {
my $redis = Async::Redis->new(host => 'localhost');
ok(!$redis->{reconnect}, 'reconnect disabled by default');
is($redis->{reconnect_delay}, 0.1, 'default reconnect_delay');
is($redis->{reconnect_delay_max}, 60, 'default reconnect_delay_max');
is($redis->{reconnect_jitter}, 0.25, 'default reconnect_jitter');
};
subtest 'callbacks accepted' => sub {
my @events;
my $redis = Async::Redis->new(
host => 'localhost',
on_connect => sub { push @events, ['connect', @_] },
on_disconnect => sub { push @events, ['disconnect', @_] },
on_error => sub { push @events, ['error', @_] },
);
ok(ref $redis->{on_connect} eq 'CODE', 'on_connect stored');
t/50-pubsub/on-message.t view on Meta::CPAN
};
# --- Integration tests (need Redis) ---
SKIP: {
_with_redis {
my ($publisher) = @_;
# Integration tests bypass the run{} helper's busy-poll pump
# because it interacts badly with the driver's on_done
# callbacks (pumping via Future::IO->sleep(0)->get corrupts
# internal state when the driver fires a failed-Future â
# on_error â _close sequence). Direct ->get works reliably.
subtest 'on_message receives messages published to subscribed channels' => sub {
my $subscriber = _make_subscriber();
my @received;
my $sub = $subscriber->subscribe('test:onmsg:basic')->get;
$sub->on_message(sub {
my ($s, $msg) = @_;
push @received, $msg;
t/50-pubsub/on-message.t view on Meta::CPAN
{
my $subscriber = _make_subscriber();
my $sub = $subscriber->subscribe('test:onmsg:no-warn')->get;
$sub->on_message(sub { });
$publisher->publish('test:onmsg:no-warn', 'x')->get;
Future::IO->sleep(0.3)->get;
$subscriber->disconnect;
}
# Give deferred GC/event-loop callbacks a chance to fire.
Future::IO->sleep(0.1)->get;
my @faa_warnings = grep { /lost.+returning future/i } @warnings;
is(scalar @faa_warnings, 0,
'no "lost its returning future" warnings from on_message path')
or note("warnings captured: @warnings");
};
# Full end-to-end backpressure timing is flaky in this test
# harness because the synchronous-callback path is extremely
t/50-pubsub/unified-reader.t view on Meta::CPAN
my $done = Future->new;
$sub->on_message(sub {
my ($self, $msg) = @_;
push @got, $msg->{data};
$done->done if @got == 3 && !$done->is_ready;
return;
});
for my $i (1..3) { run { $pub_c->publish($ch, "cb$i") } }
run { Future->wait_any($done, Future::IO->sleep(2)) };
is \@got, ['cb1', 'cb2', 'cb3'], 'all 3 callbacks fired in order';
$sub->_close;
$sub_c->disconnect;
$pub_c->disconnect;
};
subtest 'callback mode: Future-returning backpressure serializes invocations' => sub {
my $sub_c = new_redis;
my $pub_c = new_redis;
run { $sub_c->connect };
run { $pub_c->connect };
( run in 1.761 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )