Async-Redis
view release on metacpan or search on metacpan
t/50-pubsub/on-message.t view on Meta::CPAN
# Give deferred GC/event-loop callbacks a chance to fire.
Future::IO->sleep(0.1)->get;
my @faa_warnings = grep { /lost.+returning future/i } @warnings;
is(scalar @faa_warnings, 0,
'no "lost its returning future" warnings from on_message path')
or note("warnings captured: @warnings");
};
# Full end-to-end backpressure timing is flaky in this test
# harness because the synchronous-callback path is extremely
# tight â messages are dispatched as soon as frames arrive,
# which can race the publisher. The backpressure LOGIC is
# covered by the unit test below (`_dispatch_frame returns
# Future when callback does`) and the failed-Future
# integration test. The end-to-end timing test is documented
# as a known gap pending a deterministic sync primitive.
subtest 'callback returning a failed Future routes to on_error' => sub {
# Fresh publisher too â the shared one can linger in a
# state that interacts oddly with the fatal-close sequence.
my $pub = _make_publisher();
my $subscriber = _make_subscriber();
my $err_seen;
my $sub = $subscriber->subscribe('test:onmsg:future-fail')->get;
$sub->on_error(sub {
my ($s, $err) = @_;
$err_seen = $err;
});
$sub->on_message(sub {
return Future->fail("callback-future-boom");
});
$pub->publish('test:onmsg:future-fail', 'x')->get;
Future::IO->sleep(0.3)->get;
like($err_seen, qr/callback-future-boom/, 'on_error fired with callback Future failure');
ok($sub->is_closed, 'subscription closed after fatal error');
eval { $subscriber->disconnect };
eval { $pub->disconnect };
};
subtest 'callback returning a Future delays next read until it resolves' => sub {
my $subscriber = _make_subscriber();
my @received;
my $gate = Future->new;
my $sub = $subscriber->subscribe('test:onmsg:backpressure')->get;
$sub->on_message(sub {
my ($s, $msg) = @_;
push @received, $msg->{data};
# First message blocks on the gate; subsequent return
# undef so the driver can drain.
return @received == 1 ? $gate : undef;
});
for my $i (1..3) {
$publisher->publish('test:onmsg:backpressure', "msg-$i")->get;
}
Future::IO->sleep(0.3)->get;
is(scalar @received, 1, 'driver delivered 1 message then blocked on Future');
is($received[0], 'msg-1', 'first message delivered');
$gate->done;
Future::IO->sleep(0.3)->get;
is(scalar @received, 3, 'remaining messages delivered after gate released');
$subscriber->disconnect;
};
subtest 'subscribe from inside callback takes effect' => sub {
my $subscriber = _make_subscriber();
my @received;
my $sub = $subscriber->subscribe('test:onmsg:reent:primary')->get;
$sub->on_message(sub {
my ($s, $msg) = @_;
push @received, $msg;
# First message triggers a subscribe to a second channel.
if ($msg->{channel} eq 'test:onmsg:reent:primary' && @received == 1) {
$s->{redis}->subscribe('test:onmsg:reent:secondary')->retain;
}
});
$publisher->publish('test:onmsg:reent:primary', 'p-1')->get;
Future::IO->sleep(0.2)->get;
$publisher->publish('test:onmsg:reent:secondary', 's-1')->get;
Future::IO->sleep(0.3)->get;
ok(scalar @received >= 2, 'received at least 2 messages');
ok((grep { $_->{channel} eq 'test:onmsg:reent:secondary' } @received),
'received message from channel subscribed inside callback');
$subscriber->disconnect;
};
subtest 'handler swap inside callback uses new handler for next frame' => sub {
my $subscriber = _make_subscriber();
my @received_by_a;
my @received_by_b;
my $sub = $subscriber->subscribe('test:onmsg:swap')->get;
my $handler_b = sub {
my ($s, $msg) = @_;
push @received_by_b, $msg->{data};
};
my $handler_a = sub {
my ($s, $msg) = @_;
push @received_by_a, $msg->{data};
# Swap handler mid-stream
$s->on_message($handler_b);
};
$sub->on_message($handler_a);
for my $i (1..3) {
$publisher->publish('test:onmsg:swap', "m-$i")->get;
}
Future::IO->sleep(0.3)->get;
is(scalar @received_by_a, 1, 'handler A received exactly one message');
is($received_by_a[0], 'm-1', 'handler A got first message');
is(scalar @received_by_b, 2, 'handler B received the next two');
( run in 1.272 second using v1.01-cache-2.11-cpan-df04353d9ac )