view release on metacpan or search on metacpan
examples/async-job-queue/PLAN.md view on Meta::CPAN
- both jobs are started and finished by `worker-1`
- final processed count is `2`
- worker exits after consuming the sentinel
Review after step:
- `BLPOP` response shape is handled correctly
- worker disconnects even on normal sentinel exit
- in-flight set is cleaned after each job
- no busy polling
## Step 6: Run Multiple Workers Concurrently
Change `main()` to start `$workers` worker futures concurrently and wait for
them all to finish.
Shutdown should push one sentinel per worker after all real jobs are processed.
Verification:
lib/Async/Redis/Error/Redis.pm view on Meta::CPAN
return $class->new(
message => $message,
type => $type,
);
}
# Predicate methods for common error types
sub is_wrongtype { uc(shift->{type} // '') eq 'WRONGTYPE' }
sub is_oom { uc(shift->{type} // '') eq 'OOM' }
sub is_busy { uc(shift->{type} // '') eq 'BUSY' }
sub is_noscript { uc(shift->{type} // '') eq 'NOSCRIPT' }
sub is_readonly { uc(shift->{type} // '') eq 'READONLY' }
sub is_loading { uc(shift->{type} // '') eq 'LOADING' }
sub is_noauth { uc(shift->{type} // '') eq 'NOAUTH' }
sub is_noperm { uc(shift->{type} // '') eq 'NOPERM' }
# Fatal errors should not be retried
sub is_fatal {
my $self = shift;
my $type = uc($self->{type} // '');
lib/Async/Redis/Error/Redis.pm view on Meta::CPAN
say $error->type; # 'WRONGTYPE'
=head2 Predicates
=over 4
=item is_wrongtype - Key holds wrong type for operation
=item is_oom - Out of memory
=item is_busy - Server busy (Lua script running)
=item is_noscript - Script SHA not found
=item is_readonly - Write on read-only replica
=item is_loading - Server still loading dataset
=item is_noauth - Authentication required
=item is_noperm - ACL permission denied
script/commands.json view on Meta::CPAN
"type": "string",
"display_text": "password"
}
],
"command_flags": [
"noscript",
"loading",
"stale",
"fast",
"no_auth",
"allow_busy"
]
},
"BGREWRITEAOF": {
"summary": "Asynchronously rewrites the append-only file to disk.",
"since": "1.0.0",
"group": "server",
"complexity": "O(1)",
"acl_categories": [
"@admin",
"@slow",
script/commands.json view on Meta::CPAN
"acl_categories": [
"@fast",
"@transaction"
],
"arity": 1,
"command_flags": [
"noscript",
"loading",
"stale",
"fast",
"allow_busy"
]
},
"DUMP": {
"summary": "Returns a serialized representation of the value stored at a key.",
"since": "2.6.0",
"group": "generic",
"complexity": "O(1) to access the key and additional O(N*M) to serialize it, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small...
"acl_categories": [
"@keyspace",
"@read",
script/commands.json view on Meta::CPAN
"since": "7.0.0",
"group": "scripting",
"complexity": "O(1)",
"acl_categories": [
"@slow",
"@scripting"
],
"arity": 2,
"command_flags": [
"noscript",
"allow_busy"
],
"hints": [
"request_policy:all_shards",
"response_policy:one_succeeded"
]
},
"FUNCTION LIST": {
"summary": "Returns information about all libraries.",
"since": "7.0.0",
"group": "scripting",
script/commands.json view on Meta::CPAN
"since": "7.0.0",
"group": "scripting",
"complexity": "O(1)",
"acl_categories": [
"@slow",
"@scripting"
],
"arity": 2,
"command_flags": [
"noscript",
"allow_busy"
],
"hints": [
"nondeterministic_output",
"request_policy:all_shards",
"response_policy:special"
]
},
"GEOADD": {
"summary": "Adds one or more members to a geospatial index. The key is created if it doesn't exist.",
"since": "3.2.0",
script/commands.json view on Meta::CPAN
}
]
}
],
"command_flags": [
"noscript",
"loading",
"stale",
"fast",
"no_auth",
"allow_busy"
]
},
"HEXISTS": {
"summary": "Determines whether a field exists in a hash.",
"since": "2.0.0",
"group": "hash",
"complexity": "O(1)",
"acl_categories": [
"@read",
"@hash",
script/commands.json view on Meta::CPAN
"acl_categories": [
"@fast",
"@transaction"
],
"arity": 1,
"command_flags": [
"noscript",
"loading",
"stale",
"fast",
"allow_busy"
]
},
"OBJECT": {
"summary": "A container for object introspection commands.",
"since": "2.2.3",
"group": "generic",
"complexity": "Depends on subcommand.",
"acl_categories": [
"@slow"
],
script/commands.json view on Meta::CPAN
"@fast",
"@connection"
],
"arity": -1,
"command_flags": [
"noscript",
"loading",
"stale",
"fast",
"no_auth",
"allow_busy"
],
"doc_flags": [
"deprecated"
]
},
"RANDOMKEY": {
"summary": "Returns a random key name from the database.",
"since": "1.0.0",
"group": "generic",
"complexity": "O(1)",
script/commands.json view on Meta::CPAN
"@admin",
"@slow",
"@dangerous"
],
"arity": -1,
"command_flags": [
"admin",
"noscript",
"loading",
"stale",
"allow_busy"
],
"doc_flags": [
"syscmd"
]
},
"REPLICAOF": {
"summary": "Configures a server as replica of another, or promotes it to a master.",
"since": "5.0.0",
"group": "server",
"complexity": "O(1)",
script/commands.json view on Meta::CPAN
"@fast",
"@connection"
],
"arity": 1,
"command_flags": [
"noscript",
"loading",
"stale",
"fast",
"no_auth",
"allow_busy"
]
},
"RESTORE": {
"summary": "Creates a key from the serialized representation of a value.",
"since": "2.6.0",
"group": "generic",
"complexity": "O(1) to create the new key and additional O(N*M) to reconstruct the serialized value, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)...
"history": [
[
"3.0.0",
script/commands.json view on Meta::CPAN
"since": "2.6.0",
"group": "scripting",
"complexity": "O(1)",
"acl_categories": [
"@slow",
"@scripting"
],
"arity": 2,
"command_flags": [
"noscript",
"allow_busy"
],
"hints": [
"request_policy:all_shards",
"response_policy:one_succeeded"
]
},
"SCRIPT LOAD": {
"summary": "Loads a server-side Lua script to the script cache.",
"since": "2.6.0",
"group": "scripting",
script/commands.json view on Meta::CPAN
"since": "7.0.0",
"optional": true
}
],
"command_flags": [
"admin",
"noscript",
"loading",
"stale",
"no_multi",
"allow_busy"
]
},
"SINTER": {
"summary": "Returns the intersect of multiple sets.",
"since": "1.0.0",
"group": "set",
"complexity": "O(N*M) worst case where N is the cardinality of the smallest set and M is the number of sets.",
"acl_categories": [
"@read",
"@set",
script/commands.json view on Meta::CPAN
"acl_categories": [
"@fast",
"@transaction"
],
"arity": 1,
"command_flags": [
"noscript",
"loading",
"stale",
"fast",
"allow_busy"
]
},
"WAIT": {
"summary": "Blocks until the asynchronous replication of all preceding write commands sent by the connection is completed.",
"since": "3.0.0",
"group": "generic",
"complexity": "O(1)",
"acl_categories": [
"@slow",
"@connection"
script/commands.json view on Meta::CPAN
"display_text": "key",
"key_spec_index": 0,
"multiple": true
}
],
"command_flags": [
"noscript",
"loading",
"stale",
"fast",
"allow_busy"
]
},
"XACK": {
"summary": "Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.",
"since": "5.0.0",
"group": "stream",
"complexity": "O(1) for each message ID processed.",
"acl_categories": [
"@write",
"@stream",
t/01-unit/error.t view on Meta::CPAN
message => 'WRONGTYPE Operation against a key holding the wrong kind of value',
type => 'WRONGTYPE',
);
ok($e->isa('Async::Redis::Error'), 'isa base Error');
ok($e->isa('Async::Redis::Error::Redis'), 'isa Redis');
is($e->type, 'WRONGTYPE', 'error type');
# Predicate methods
ok($e->is_wrongtype, 'is_wrongtype true');
ok(!$e->is_oom, 'is_oom false');
ok(!$e->is_busy, 'is_busy false');
ok(!$e->is_loading, 'is_loading false');
ok($e->is_fatal, 'WRONGTYPE is fatal');
};
subtest 'Redis error predicates' => sub {
my %cases = (
WRONGTYPE => { is_wrongtype => 1, is_fatal => 1 },
OOM => { is_oom => 1, is_fatal => 1 },
BUSY => { is_busy => 1, is_fatal => 0 },
LOADING => { is_loading => 1, is_fatal => 0 },
NOSCRIPT => { is_noscript => 1, is_fatal => 1 },
READONLY => { is_readonly => 1, is_fatal => 0 },
);
for my $type (sort keys %cases) {
my $e = Async::Redis::Error::Redis->new(
message => "$type error",
type => $type,
);
my $expected = $cases{$type};
for my $pred (qw(is_wrongtype is_oom is_busy is_loading is_noscript is_readonly)) {
my $want = $expected->{$pred} // 0;
is(!!$e->$pred, !!$want, "$type: $pred = $want");
}
is(!!$e->is_fatal, !!$expected->{is_fatal}, "$type: is_fatal = $expected->{is_fatal}");
}
};
subtest 'Redis error from_message parser' => sub {
my $e = Async::Redis::Error::Redis->from_message(
'WRONGTYPE Operation against a key holding the wrong kind of value'
t/50-pubsub/on-message.t view on Meta::CPAN
is($sub->{_pending_messages}[0]{data}, 'payload', 'buffered message data');
is($result, undef, 'dispatch returns undef on fallthrough');
};
# --- Integration tests (need Redis) ---
SKIP: {
_with_redis {
my ($publisher) = @_;
# Integration tests bypass the run{} helper's busy-poll pump
# because it interacts badly with the driver's on_done
# callbacks (pumping via Future::IO->sleep(0)->get corrupts
# internal state when the driver fires a failed-Future â
# on_error â _close sequence). Direct ->get works reliably.
subtest 'on_message receives messages published to subscribed channels' => sub {
my $subscriber = _make_subscriber();
my @received;
my $sub = $subscriber->subscribe('test:onmsg:basic')->get;
$sub->on_message(sub {
t/99-integration/high-throughput.t view on Meta::CPAN
#!/usr/bin/env perl
# Test: High throughput pipeline operations
#
# Gated behind RELEASE_TESTING because the assertions are hard
# ops/sec thresholds (>=10k for SET/GET pipelines, >=5k for mixed).
# Those numbers depend on the test environment â a busy CI host or a
# slow VM can dip below the threshold even when the library itself
# is fine, producing false-negative smoker reports. Pipeline
# correctness (commands return, results in order, error objects
# round-trip) is exercised by t/30-pipeline/. Run this with
# RELEASE_TESTING=1 on a quiet box before cutting a release.
use strict;
use warnings;
use Test2::V0;
use Test::Lib;
use Test::Async::Redis qw(skip_without_redis await_f cleanup_keys run);