Async-Redis
view release on metacpan or search on metacpan
t/50-pubsub/on-message.t view on Meta::CPAN
# After the unified-reader refactor, _dispatch_frame always queues
# regardless of mode. The callback driver (_start_driver) dequeues and
# invokes _on_message; _dispatch_frame is mode-agnostic.
my $redis = Async::Redis->new(host => 'localhost');
my $sub = Async::Redis::Subscription->new(redis => $redis);
$sub->on_message(sub { 1 }); # set callback mode
my $frame = [ 'message', 'chan', 'payload' ];
my $result = $sub->_dispatch_frame($frame);
# Message must be in the queue for the driver to dequeue.
is(scalar @{$sub->{_pending_messages}}, 1, 'message buffered in queue');
is($sub->{_pending_messages}[0]{type}, 'message', 'correct type');
is($sub->{_pending_messages}[0]{channel}, 'chan', 'correct channel');
is($sub->{_pending_messages}[0]{data}, 'payload', 'correct data');
# _dispatch_frame returns undef (no Future) when queued synchronously.
is($result, undef, 'dispatch returns undef (queued synchronously)');
};
subtest '_dispatch_frame: depth backpressure applies in callback mode too' => sub {
# Backpressure Future is returned when queue is at depth â same path
# in both modes since _dispatch_frame is now mode-agnostic.
my $redis = bless { message_queue_depth => 1 }, 'Async::Redis';
my $sub = Async::Redis::Subscription->new(redis => $redis);
$sub->on_message(sub { 1 }); # set callback mode
# First message fills the queue.
my $r1 = $sub->_dispatch_frame([ 'message', 'ch', 'v1' ]);
is($r1, undef, 'first dispatch returns undef (queued)');
is(scalar @{$sub->{_pending_messages}}, 1, 'queue at depth');
# Second message exceeds depth â should return a Future.
my $r2 = $sub->_dispatch_frame([ 'message', 'ch', 'v2' ]);
ok(ref($r2) && $r2->isa('Future'), 'second dispatch returns Future (at depth)');
is(scalar @{$sub->{_pending_messages}}, 1, 'queue still at depth (v2 held)');
};
subtest '_dispatch_frame queues message for iterator consumers when no callback' => sub {
my $redis = Async::Redis->new(host => 'localhost');
my $sub = Async::Redis::Subscription->new(redis => $redis);
my $frame = [ 'message', 'chan', 'payload' ];
my $result = $sub->_dispatch_frame($frame);
# With no callback, the message is queued for next() consumers.
is(scalar @{$sub->{_pending_messages}}, 1, 'message buffered in queue');
is($sub->{_pending_messages}[0]{data}, 'payload', 'buffered message data');
is($result, undef, 'dispatch returns undef on fallthrough');
};
# --- Integration tests (need Redis) ---
SKIP: {
_with_redis {
my ($publisher) = @_;
# Integration tests bypass the run{} helper's busy-poll pump
# because it interacts badly with the driver's on_done
# callbacks (pumping via Future::IO->sleep(0)->get corrupts
# internal state when the driver fires a failed-Future â
# on_error â _close sequence). Direct ->get works reliably.
subtest 'on_message receives messages published to subscribed channels' => sub {
my $subscriber = _make_subscriber();
my @received;
my $sub = $subscriber->subscribe('test:onmsg:basic')->get;
$sub->on_message(sub {
my ($s, $msg) = @_;
push @received, $msg;
});
for my $i (1..3) {
$publisher->publish('test:onmsg:basic', "msg-$i")->get;
}
Future::IO->sleep(0.3)->get;
is(scalar @received, 3, 'received all 3 messages');
is($received[0]{type}, 'message', 'first msg type');
is($received[0]{channel}, 'test:onmsg:basic', 'first msg channel');
is($received[0]{data}, 'msg-1', 'first msg data');
is($received[0]{pattern}, undef, 'pattern undef for message');
$subscriber->disconnect;
};
subtest 'no F::AA "lost its returning future" warning from fire-and-forget use' => sub {
# Placed BEFORE the CLIENT KILL subtests so Redis's client
# state is still clean.
my @warnings;
local $SIG{__WARN__} = sub { push @warnings, @_ };
{
my $subscriber = _make_subscriber();
my $sub = $subscriber->subscribe('test:onmsg:no-warn')->get;
$sub->on_message(sub { });
$publisher->publish('test:onmsg:no-warn', 'x')->get;
Future::IO->sleep(0.3)->get;
$subscriber->disconnect;
}
# Give deferred GC/event-loop callbacks a chance to fire.
Future::IO->sleep(0.1)->get;
my @faa_warnings = grep { /lost.+returning future/i } @warnings;
is(scalar @faa_warnings, 0,
'no "lost its returning future" warnings from on_message path')
or note("warnings captured: @warnings");
};
# Full end-to-end backpressure timing is flaky in this test
# harness because the synchronous-callback path is extremely
# tight â messages are dispatched as soon as frames arrive,
# which can race the publisher. The backpressure LOGIC is
# covered by the unit test below (`_dispatch_frame returns
# Future when callback does`) and the failed-Future
# integration test. The end-to-end timing test is documented
# as a known gap pending a deterministic sync primitive.
subtest 'callback returning a failed Future routes to on_error' => sub {
# Fresh publisher too â the shared one can linger in a
# state that interacts oddly with the fatal-close sequence.
my $pub = _make_publisher();
my $subscriber = _make_subscriber();
my $err_seen;
my $sub = $subscriber->subscribe('test:onmsg:future-fail')->get;
$sub->on_error(sub {
my ($s, $err) = @_;
$err_seen = $err;
});
$sub->on_message(sub {
return Future->fail("callback-future-boom");
});
$pub->publish('test:onmsg:future-fail', 'x')->get;
Future::IO->sleep(0.3)->get;
like($err_seen, qr/callback-future-boom/, 'on_error fired with callback Future failure');
ok($sub->is_closed, 'subscription closed after fatal error');
eval { $subscriber->disconnect };
eval { $pub->disconnect };
};
subtest 'callback returning a Future delays next read until it resolves' => sub {
my $subscriber = _make_subscriber();
my @received;
my $gate = Future->new;
my $sub = $subscriber->subscribe('test:onmsg:backpressure')->get;
$sub->on_message(sub {
my ($s, $msg) = @_;
push @received, $msg->{data};
# First message blocks on the gate; subsequent return
# undef so the driver can drain.
return @received == 1 ? $gate : undef;
});
for my $i (1..3) {
$publisher->publish('test:onmsg:backpressure', "msg-$i")->get;
}
Future::IO->sleep(0.3)->get;
is(scalar @received, 1, 'driver delivered 1 message then blocked on Future');
( run in 3.070 seconds using v1.01-cache-2.11-cpan-cdf2f3d4e48 )