Async-Redis

 view release on metacpan or  search on metacpan

t/92-concurrency/reader-invariants.t  view on Meta::CPAN

        # If the reader doesn't exit, the next command's ensure_reader
        # would no-op and the command would hang. A second command
        # after a quiet period proves the reader restarts cleanly.
        await Future::IO->sleep(0.05);
        my $v = await $r->get('k');
        is $v, '1';
        await $r->del('k');
        $r->disconnect;
    })->()->get;
};

# Local helper: true if an arrayref contains at least one Async::Redis::Error.
sub _array_has_error {
    my ($results) = @_;
    return 0 unless ref($results) eq 'ARRAY';
    for my $r (@$results) {
        return 1 if ref($r) && eval { $r->isa('Async::Redis::Error') };
    }
    return 0;
}

subtest 'synthetic EOF mid-pipeline fails all pipeline entries with typed error' => sub {
    (async sub {
        my $r = new_redis();
        await $r->connect;

        # Schedule the pipeline but do NOT await yet — inject EOF
        # synchronously before yielding to the event loop so the reader
        # sees a closed socket before any response arrives.
        my $pipe_f = $r->_execute_pipeline([
            ['SET', 'eof-k1', '1'],
            ['SET', 'eof-k2', '2'],
            ['GET', 'eof-k1'],
        ]);
        inject_eof($r);

        my $ok = eval { await $pipe_f; 1 };
        ok !$ok || _array_has_error(($pipe_f->is_done ? [$pipe_f->get] : [])),
            'pipeline ended with failure or inline error objects';
        is $r->{connected}, 0, 'connection marked disconnected';
    })->()->get;
};

subtest 'typed-error preservation on synthetic timeout' => sub {
    (async sub {
        my $r = new_redis();
        await $r->connect;

        # BLPOP on a list that has no elements blocks on the server side,
        # giving us a clean window: write completes, server waits, then we
        # inject the synthetic timeout and verify the typed error propagates.
        my $blpop_f = $r->command('BLPOP', 'test-timeout-injection-list', 30);

        # Yield briefly so the write goes out and the reader is waiting.
        await Future::IO->sleep(0.02);
        force_read_timeout($r);

        my $ok = eval { await $blpop_f; 1 };
        ok !$ok, 'blpop failed';
        my $err = $@;
        isa_ok $err, ['Async::Redis::Error::Timeout'];
    })->()->get;
};

subtest 'fuzz: randomized mix of ops, every future resolves or fails' => sub {
    (async sub {
        my $r = new_redis();
        await $r->connect;

        my @ops;
        for my $i (1..30) {
            my $choice = int(rand(3));
            if ($choice == 0) {
                push @ops, $r->set("fuzz-$i", "v$i");
            } elsif ($choice == 1) {
                push @ops, $r->get("fuzz-$i");
            } else {
                push @ops, $r->_execute_pipeline([
                    ['SET', "fuzz-p$i", "pv$i"],
                    ['GET', "fuzz-p$i"],
                ]);
            }
        }

        for my $f (@ops) {
            eval { await $f };
            ok $f->is_ready, "future resolved (is_ready)";
        }

        # Clean up all keys
        await $r->command('DEL', map { ("fuzz-$_", "fuzz-p$_") } 1..30);
        $r->disconnect;
    })->()->get;
};

done_testing;



( run in 2.939 seconds using v1.01-cache-2.11-cpan-d8267643d1d )