Async-Redis

 view release on metacpan or  search on metacpan

examples/async-job-queue/PLAN.md  view on Meta::CPAN


- both jobs are started and finished by `worker-1`
- final processed count is `2`
- worker exits after consuming the sentinel

Review after step:

- `BLPOP` response shape is handled correctly
- worker disconnects even on normal sentinel exit
- in-flight set is cleaned after each job
- no busy polling

## Step 6: Run Multiple Workers Concurrently

Change `main()` to start `$workers` worker futures concurrently and wait for
them all to finish.

Shutdown should push one sentinel per worker after all real jobs are processed.

Verification:

lib/Async/Redis/Error/Redis.pm  view on Meta::CPAN


    return $class->new(
        message => $message,
        type    => $type,
    );
}

# Predicate methods for common error types
sub is_wrongtype { uc(shift->{type} // '') eq 'WRONGTYPE' }
sub is_oom       { uc(shift->{type} // '') eq 'OOM' }
sub is_busy      { uc(shift->{type} // '') eq 'BUSY' }
sub is_noscript  { uc(shift->{type} // '') eq 'NOSCRIPT' }
sub is_readonly  { uc(shift->{type} // '') eq 'READONLY' }
sub is_loading   { uc(shift->{type} // '') eq 'LOADING' }
sub is_noauth    { uc(shift->{type} // '') eq 'NOAUTH' }
sub is_noperm    { uc(shift->{type} // '') eq 'NOPERM' }

# Fatal errors should not be retried
sub is_fatal {
    my $self = shift;
    my $type = uc($self->{type} // '');

lib/Async/Redis/Error/Redis.pm  view on Meta::CPAN

    say $error->type;  # 'WRONGTYPE'

=head2 Predicates

=over 4

=item is_wrongtype - Key holds wrong type for operation

=item is_oom - Out of memory

=item is_busy - Server busy (Lua script running)

=item is_noscript - Script SHA not found

=item is_readonly - Write on read-only replica

=item is_loading - Server still loading dataset

=item is_noauth - Authentication required

=item is_noperm - ACL permission denied

script/commands.json  view on Meta::CPAN

                "type": "string",
                "display_text": "password"
            }
        ],
        "command_flags": [
            "noscript",
            "loading",
            "stale",
            "fast",
            "no_auth",
            "allow_busy"
        ]
    },
    "BGREWRITEAOF": {
        "summary": "Asynchronously rewrites the append-only file to disk.",
        "since": "1.0.0",
        "group": "server",
        "complexity": "O(1)",
        "acl_categories": [
            "@admin",
            "@slow",

script/commands.json  view on Meta::CPAN

        "acl_categories": [
            "@fast",
            "@transaction"
        ],
        "arity": 1,
        "command_flags": [
            "noscript",
            "loading",
            "stale",
            "fast",
            "allow_busy"
        ]
    },
    "DUMP": {
        "summary": "Returns a serialized representation of the value stored at a key.",
        "since": "2.6.0",
        "group": "generic",
        "complexity": "O(1) to access the key and additional O(N*M) to serialize it, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small...
        "acl_categories": [
            "@keyspace",
            "@read",

script/commands.json  view on Meta::CPAN

        "since": "7.0.0",
        "group": "scripting",
        "complexity": "O(1)",
        "acl_categories": [
            "@slow",
            "@scripting"
        ],
        "arity": 2,
        "command_flags": [
            "noscript",
            "allow_busy"
        ],
        "hints": [
            "request_policy:all_shards",
            "response_policy:one_succeeded"
        ]
    },
    "FUNCTION LIST": {
        "summary": "Returns information about all libraries.",
        "since": "7.0.0",
        "group": "scripting",

script/commands.json  view on Meta::CPAN

        "since": "7.0.0",
        "group": "scripting",
        "complexity": "O(1)",
        "acl_categories": [
            "@slow",
            "@scripting"
        ],
        "arity": 2,
        "command_flags": [
            "noscript",
            "allow_busy"
        ],
        "hints": [
            "nondeterministic_output",
            "request_policy:all_shards",
            "response_policy:special"
        ]
    },
    "GEOADD": {
        "summary": "Adds one or more members to a geospatial index. The key is created if it doesn't exist.",
        "since": "3.2.0",

script/commands.json  view on Meta::CPAN

                    }
                ]
            }
        ],
        "command_flags": [
            "noscript",
            "loading",
            "stale",
            "fast",
            "no_auth",
            "allow_busy"
        ]
    },
    "HEXISTS": {
        "summary": "Determines whether a field exists in a hash.",
        "since": "2.0.0",
        "group": "hash",
        "complexity": "O(1)",
        "acl_categories": [
            "@read",
            "@hash",

script/commands.json  view on Meta::CPAN

        "acl_categories": [
            "@fast",
            "@transaction"
        ],
        "arity": 1,
        "command_flags": [
            "noscript",
            "loading",
            "stale",
            "fast",
            "allow_busy"
        ]
    },
    "OBJECT": {
        "summary": "A container for object introspection commands.",
        "since": "2.2.3",
        "group": "generic",
        "complexity": "Depends on subcommand.",
        "acl_categories": [
            "@slow"
        ],

script/commands.json  view on Meta::CPAN

            "@fast",
            "@connection"
        ],
        "arity": -1,
        "command_flags": [
            "noscript",
            "loading",
            "stale",
            "fast",
            "no_auth",
            "allow_busy"
        ],
        "doc_flags": [
            "deprecated"
        ]
    },
    "RANDOMKEY": {
        "summary": "Returns a random key name from the database.",
        "since": "1.0.0",
        "group": "generic",
        "complexity": "O(1)",

script/commands.json  view on Meta::CPAN

            "@admin",
            "@slow",
            "@dangerous"
        ],
        "arity": -1,
        "command_flags": [
            "admin",
            "noscript",
            "loading",
            "stale",
            "allow_busy"
        ],
        "doc_flags": [
            "syscmd"
        ]
    },
    "REPLICAOF": {
        "summary": "Configures a server as replica of another, or promotes it to a master.",
        "since": "5.0.0",
        "group": "server",
        "complexity": "O(1)",

script/commands.json  view on Meta::CPAN

            "@fast",
            "@connection"
        ],
        "arity": 1,
        "command_flags": [
            "noscript",
            "loading",
            "stale",
            "fast",
            "no_auth",
            "allow_busy"
        ]
    },
    "RESTORE": {
        "summary": "Creates a key from the serialized representation of a value.",
        "since": "2.6.0",
        "group": "generic",
        "complexity": "O(1) to create the new key and additional O(N*M) to reconstruct the serialized value, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)...
        "history": [
            [
                "3.0.0",

script/commands.json  view on Meta::CPAN

        "since": "2.6.0",
        "group": "scripting",
        "complexity": "O(1)",
        "acl_categories": [
            "@slow",
            "@scripting"
        ],
        "arity": 2,
        "command_flags": [
            "noscript",
            "allow_busy"
        ],
        "hints": [
            "request_policy:all_shards",
            "response_policy:one_succeeded"
        ]
    },
    "SCRIPT LOAD": {
        "summary": "Loads a server-side Lua script to the script cache.",
        "since": "2.6.0",
        "group": "scripting",

script/commands.json  view on Meta::CPAN

                "since": "7.0.0",
                "optional": true
            }
        ],
        "command_flags": [
            "admin",
            "noscript",
            "loading",
            "stale",
            "no_multi",
            "allow_busy"
        ]
    },
    "SINTER": {
        "summary": "Returns the intersect of multiple sets.",
        "since": "1.0.0",
        "group": "set",
        "complexity": "O(N*M) worst case where N is the cardinality of the smallest set and M is the number of sets.",
        "acl_categories": [
            "@read",
            "@set",

script/commands.json  view on Meta::CPAN

        "acl_categories": [
            "@fast",
            "@transaction"
        ],
        "arity": 1,
        "command_flags": [
            "noscript",
            "loading",
            "stale",
            "fast",
            "allow_busy"
        ]
    },
    "WAIT": {
        "summary": "Blocks until the asynchronous replication of all preceding write commands sent by the connection is completed.",
        "since": "3.0.0",
        "group": "generic",
        "complexity": "O(1)",
        "acl_categories": [
            "@slow",
            "@connection"

script/commands.json  view on Meta::CPAN

                "display_text": "key",
                "key_spec_index": 0,
                "multiple": true
            }
        ],
        "command_flags": [
            "noscript",
            "loading",
            "stale",
            "fast",
            "allow_busy"
        ]
    },
    "XACK": {
        "summary": "Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.",
        "since": "5.0.0",
        "group": "stream",
        "complexity": "O(1) for each message ID processed.",
        "acl_categories": [
            "@write",
            "@stream",

t/01-unit/error.t  view on Meta::CPAN

        message => 'WRONGTYPE Operation against a key holding the wrong kind of value',
        type    => 'WRONGTYPE',
    );
    ok($e->isa('Async::Redis::Error'), 'isa base Error');
    ok($e->isa('Async::Redis::Error::Redis'), 'isa Redis');
    is($e->type, 'WRONGTYPE', 'error type');

    # Predicate methods
    ok($e->is_wrongtype, 'is_wrongtype true');
    ok(!$e->is_oom, 'is_oom false');
    ok(!$e->is_busy, 'is_busy false');
    ok(!$e->is_loading, 'is_loading false');
    ok($e->is_fatal, 'WRONGTYPE is fatal');
};

subtest 'Redis error predicates' => sub {
    my %cases = (
        WRONGTYPE => { is_wrongtype => 1, is_fatal => 1 },
        OOM       => { is_oom => 1, is_fatal => 1 },
        BUSY      => { is_busy => 1, is_fatal => 0 },
        LOADING   => { is_loading => 1, is_fatal => 0 },
        NOSCRIPT  => { is_noscript => 1, is_fatal => 1 },
        READONLY  => { is_readonly => 1, is_fatal => 0 },
    );

    for my $type (sort keys %cases) {
        my $e = Async::Redis::Error::Redis->new(
            message => "$type error",
            type    => $type,
        );
        my $expected = $cases{$type};

        for my $pred (qw(is_wrongtype is_oom is_busy is_loading is_noscript is_readonly)) {
            my $want = $expected->{$pred} // 0;
            is(!!$e->$pred, !!$want, "$type: $pred = $want");
        }
        is(!!$e->is_fatal, !!$expected->{is_fatal}, "$type: is_fatal = $expected->{is_fatal}");
    }
};

subtest 'Redis error from_message parser' => sub {
    my $e = Async::Redis::Error::Redis->from_message(
        'WRONGTYPE Operation against a key holding the wrong kind of value'

t/50-pubsub/on-message.t  view on Meta::CPAN

    is($sub->{_pending_messages}[0]{data}, 'payload', 'buffered message data');
    is($result, undef, 'dispatch returns undef on fallthrough');
};

# --- Integration tests (need Redis) ---

SKIP: {
    _with_redis {
        my ($publisher) = @_;

        # Integration tests bypass the run{} helper's busy-poll pump
        # because it interacts badly with the driver's on_done
        # callbacks (pumping via Future::IO->sleep(0)->get corrupts
        # internal state when the driver fires a failed-Future →
        # on_error → _close sequence). Direct ->get works reliably.

        subtest 'on_message receives messages published to subscribed channels' => sub {
            my $subscriber = _make_subscriber();
            my @received;
            my $sub = $subscriber->subscribe('test:onmsg:basic')->get;
            $sub->on_message(sub {

t/99-integration/high-throughput.t  view on Meta::CPAN

#!/usr/bin/env perl
# Test: High throughput pipeline operations
#
# Gated behind RELEASE_TESTING because the assertions are hard
# ops/sec thresholds (>=10k for SET/GET pipelines, >=5k for mixed).
# Those numbers depend on the test environment — a busy CI host or a
# slow VM can dip below the threshold even when the library itself
# is fine, producing false-negative smoker reports. Pipeline
# correctness (commands return, results in order, error objects
# round-trip) is exercised by t/30-pipeline/. Run this with
# RELEASE_TESTING=1 on a quiet box before cutting a release.
use strict;
use warnings;
use Test2::V0;
use Test::Lib;
use Test::Async::Redis qw(skip_without_redis await_f cleanup_keys run);



( run in 2.466 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )