view release on metacpan or search on metacpan
- Feature: Unix domain socket connections
- Connect via path parameter or redis+unix:// URI scheme
- Constructor stores path and skips host/port for unix sockets
- Added docker-compose redis-unix service for testing
- Feature: PubSub auto-resubscribe on reconnect
- Subscriptions automatically re-established after connection drop
- on_reconnect callback on Subscription object for notification
- _read_pubsub_frame checks connected state to fail fast
- _reset_connection now clears in_pubsub flag
- Bug Fix: _reset_connection left in_pubsub=1 after disconnect
- Prevented reconnection because AUTH/SELECT blocked by pubsub guard
0.001004 2026-02-02
- Bug Fix: Socket close ordering
- Fixed Future::IO corruption when disconnect() called with active watchers
- Cancel _current_read_future before socket close ensures Future::IO
unregisters watchers while fileno is still valid
- Let Perl's DESTROY handle close() after futures are cancelled
- Breaking Change: Future::IO configuration
- Removed Future::IO->load_best_impl from module
- Libraries should not configure Future::IO - only application entry
examples/async-job-queue/PLAN.md view on Meta::CPAN
## Goal
Implement `examples/async-job-queue/` from
`examples/async-job-queue/SPEC.md`: a small CLI demo that visibly proves
Async::Redis can run a Redis-backed job queue concurrently in one Perl process.
The demo should show:
- a burst of jobs queued immediately
- two or more workers claiming and processing jobs concurrently
- a heartbeat that keeps printing while workers are blocked in `BLPOP` or
sleeping during simulated work
- automatic shutdown after all jobs are processed
## Constraints
- Work only on the async-job-queue example and the examples index.
- Do not touch existing dirty stress files:
- `examples/stress/lib/Stress/Chaos.pm`
- `examples/stress/lib/Stress/Harness.pm`
- Use `perlbrew use perl-5.40.0@default` for all Perl checks.
examples/async-job-queue/PLAN.md view on Meta::CPAN
- output shows both `worker-1` and `worker-2`
- the first two jobs start near the same timestamp
- elapsed time is closer to `0.4s` than `0.8s`
- final processed count is `4`
Review after step:
- each worker uses its own Redis connection
- all worker futures are awaited
- sentinels cannot be counted as processed jobs
- no worker can be left blocked after completion
## Step 7: Add Heartbeat Task
Add:
- separate stats Redis connection
- `heartbeat($opts)` async helper
- heartbeat loop every `0.25s`
- output with `queue=`, `in_flight=`, and `processed=`
- stop condition when processed count reaches target
examples/async-job-queue/PLAN.md view on Meta::CPAN
```
Expected:
- exits `0`
- prints `queued 6 jobs`
- prints at least one `heartbeat` line before completion
- prints `worker-1` and `worker-2`
- prints `done processed=6`
If Redis access is blocked by the sandbox, rerun with escalation rather than
changing the app.
Review after step:
- no warnings
- no hanging worker process
- output demonstrates the intended async behavior without explanation
## Step 12: Related Test Suite Check
examples/async-job-queue/README.md view on Meta::CPAN
This example demonstrates Async::Redis running a small Redis-backed job queue in
one Perl process.
It intentionally keeps the app simple:
- one controller connection queues jobs and coordinates shutdown
- one Redis connection per worker waits in `BLPOP`
- one stats connection prints heartbeat lines
- workers simulate slow work with `Future::IO->sleep`
The point is to make async behavior visible. While workers are blocked in
`BLPOP` or waiting on simulated work, the heartbeat keeps printing and other
workers continue making progress.
## Running
Start Redis from the project root:
```bash
docker compose -f examples/docker-compose.yml up -d
```
examples/async-job-queue/README.md view on Meta::CPAN
[ 0.25s] heartbeat queue=4 in_flight=2 processed=0
[ 1.51s] worker-1 finished job-1 processed=1
[ 1.51s] worker-2 finished job-2 processed=2
[ 1.51s] worker-1 started job-3
[ 1.51s] worker-2 started job-4
[ 1.75s] heartbeat queue=2 in_flight=2 processed=2
[ 4.55s] done processed=6 workers=2 elapsed=4.55s sequential_about=9.00s
```
The important lines are the heartbeat lines. They show the process is still
doing useful work while workers are blocked on Redis or sleeping between job
start and finish.
## Why Each Worker Has Its Own Redis Connection
`BLPOP` is a Redis blocking command. A connection sitting inside `BLPOP` cannot
also be used for unrelated commands until Redis replies.
This example gives each worker its own connection so one worker can wait for
jobs without blocking:
examples/async-job-queue/SPEC.md view on Meta::CPAN
[ 0.25s] heartbeat queue=8 in_flight=2 processed=0
[ 1.51s] worker-1 finished job-1
[ 1.51s] worker-1 started job-3
[ 1.51s] worker-2 finished job-2
[ 1.51s] worker-2 started job-4
[ 1.75s] heartbeat queue=6 in_flight=2 processed=2
[ 7.55s] done processed=10 workers=2 elapsed=7.55s sequential_about=15.00s
```
The README should explain that the heartbeat lines are the key proof: the
process continues running other async work while workers are blocked in Redis
or waiting on simulated work.
## CLI
Keep the first version simple, but allow the demo to be tweaked:
```text
examples/async-job-queue/app.pl [options]
Options:
lib/Async/Redis/Cookbook.pod view on Meta::CPAN
$pool->shutdown;
=for cookbook-test-end pool_with
=head1 PITFALLS
=over 4
=item * Do not share one connection between a C<BLPOP> worker and unrelated
commands. The connection is blocked until Redis replies.
=item * Do not mix C<next> and C<on_message> on the same subscription.
Callback mode is sticky.
=item * Do not return dirty connections to a pool. Use C<< $pool->with(...) >>
or let the pool destroy/clean dirty connections on release.
=item * Do not rely on C<prefix> as a security boundary. Use Redis ACLs or
separate databases for tenant isolation.
lib/Async/Redis/Subscription.pm view on Meta::CPAN
}
return $frame;
}
}
# Convert a raw RESP pub/sub frame into a message hashref and deliver it.
# In callback mode, invokes _on_message via _invoke_user_callback and
# returns its result (which may be a Future for consumer-side backpressure).
# In iterator mode, queues the message into _pending_messages and signals
# _message_waiter so a blocked next() can wake up.
#
# Non-message frames (subscribe confirmations, etc.) return undef and
# take no action â the driver loop will read the next frame.
sub _dispatch_frame {
my ($self, $frame) = @_;
return unless $frame && ref $frame eq 'ARRAY';
my $type = $frame->[0] // '';
my $msg;
lib/Async/Redis/Subscription.pm view on Meta::CPAN
# Convert new format to old format for compatibility
return {
channel => $msg->{channel},
message => $msg->{data},
pattern => $msg->{pattern},
type => $msg->{type},
};
}
# Intentional teardown: marks the subscription closed and wakes any
# blocked next() with undef. Clears the parent _subscription slot
# with an identity guard so a stale _close cannot evict a newer
# subscription object that reused the same slot.
sub _close {
my ($self) = @_;
return if $self->{_closed};
$self->{_closed} = 1;
$self->{_pending_messages} = [];
if (my $w = delete $self->{_message_waiter}) {
lib/Async/Redis/Subscription.pm view on Meta::CPAN
# Cancel any running driver Future. The driver's await on _dequeue
# also unwinds because we resolved _message_waiter above, so this is
# belt-and-suspenders; either path exits the driver cleanly.
if (my $f = delete $self->{_driver_step}) {
$f->cancel unless $f->is_ready;
}
}
# Unrecoverable failure: marks the subscription closed with a typed
# error. Any blocked next() will die with that error. The error is
# preserved for callers who call next() after the fact.
# In callback mode, fires on_error if registered; dies otherwise.
sub _fail_fatal {
my ($self, $typed_error) = @_;
return if $self->{_closed};
$self->{_closed} = 1;
$self->{_fatal_error} = $typed_error;
$self->{_pending_messages} = [];
lib/Async/Redis/Subscription.pm view on Meta::CPAN
=head1 INTERNAL LIFECYCLE METHODS
The following methods are used by L<Async::Redis> to manage subscription
state. They are not part of the public API for end consumers, but are
documented here for maintainers.
=head2 _close
Intentional teardown. Marks the subscription closed and wakes any
blocked C<next()> with C<undef>. Clears the parent C<_subscription>
slot on the L<Async::Redis> object with an identity guard â a stale
C<_close> call from an earlier subscription object cannot evict a newer
one that has since taken the slot.
=head2 _fail_fatal($typed_error)
Unrecoverable failure. Marks the subscription closed with a typed error
object. Any blocked C<next()> call will C<die> with that error. The
error is preserved for callers who call C<next()> after the fact.
Routes through C<_close>'s identity guard for parent-slot clearing.
=head2 _pause_for_reconnect
Called before a reconnect attempt. Does B<not> mark the subscription
closed â the underlying reader has already exited due to the connection
drop. Channel/pattern tracking hashes are left intact for replay.
=head2 _resume_after_reconnect
script/commands.json view on Meta::CPAN
"@connection"
],
"arity": 2,
"command_flags": [
"noscript",
"loading",
"stale"
]
},
"CLIENT UNBLOCK": {
"summary": "Unblocks a client blocked by a blocking command from a different connection.",
"since": "5.0.0",
"group": "connection",
"complexity": "O(log N) where N is the number of client connections",
"acl_categories": [
"@admin",
"@slow",
"@dangerous",
"@connection"
],
"arity": -3,
script/commands.json view on Meta::CPAN
"command_flags": [
"readonly",
"blocking",
"movablekeys"
]
},
"XREADGROUP": {
"summary": "Returns new or historical messages from a stream for a consumer in a group. Blocks until a message is available otherwise.",
"since": "5.0.0",
"group": "stream",
"complexity": "For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD ...
"acl_categories": [
"@write",
"@stream",
"@slow",
"@blocking"
],
"arity": -7,
"key_specs": [
{
"begin_search": {
t/02-nonblocking.t view on Meta::CPAN
run { $redis->del("pipe_pipe:$i") };
}
$redis->disconnect;
};
# ============================================================================
# Test 3: Timer can run during Redis operations
# ============================================================================
subtest 'event loop not blocked' => sub {
my $redis = Async::Redis->new(host => redis_host(), port => redis_port());
run { $redis->connect };
my @futures = map { $redis->set("nb:nonblocking:$_", $_) } (1..50);
my $start = Time::HiRes::time();
run { Future->needs_all(@futures) };
my $elapsed = Time::HiRes::time() - $start;
ok($elapsed < 5, "50 concurrent ops completed in ${elapsed}s");
t/10-connection/timeout.t view on Meta::CPAN
my $elapsed = time() - $start;
ok($error, 'command failed');
like("$error", qr/timed?\s*out/i, 'error mentions timeout');
ok($elapsed >= 0.2, "waited at least 0.2s (got ${elapsed}s)");
ok($elapsed < 1.0, "timed out before command finished (got ${elapsed}s)");
$redis->disconnect;
};
subtest 'concurrent ops not blocked during request timeout' => sub {
my $redis = Async::Redis->new(
host => $ENV{REDIS_HOST} // 'localhost',
request_timeout => 5 * $TIMEOUT_SCALE,
);
run { $redis->connect };
my @futures = map { $redis->set("nb:req:timeout:$_", $_) } (1..50);
my $start = time();
run { Future->needs_all(@futures) };
my $elapsed = time() - $start;
t/50-pubsub/on-message.t view on Meta::CPAN
# First message blocks on the gate; subsequent return
# undef so the driver can drain.
return @received == 1 ? $gate : undef;
});
for my $i (1..3) {
$publisher->publish('test:onmsg:backpressure', "msg-$i")->get;
}
Future::IO->sleep(0.3)->get;
is(scalar @received, 1, 'driver delivered 1 message then blocked on Future');
is($received[0], 'msg-1', 'first message delivered');
$gate->done;
Future::IO->sleep(0.3)->get;
is(scalar @received, 3, 'remaining messages delivered after gate released');
$subscriber->disconnect;
};
subtest 'subscribe from inside callback takes effect' => sub {