Async-Redis
view release on metacpan or search on metacpan
lib/Async/Redis.pm view on Meta::CPAN
package Async::Redis;
use strict;
use warnings;
use 5.018;
our $VERSION = '0.002000';
use Future;
use Future::AsyncAwait;
use Future::IO 0.23;
use Future::Selector 0.05;
use Scalar::Util qw(blessed weaken);
use Socket qw(pack_sockaddr_in pack_sockaddr_un inet_aton AF_INET AF_UNIX SOCK_STREAM);
use IO::Handle ();
use IO::Socket::INET;
use Time::HiRes ();
# Error classes
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Redis;
use Async::Redis::Error::Protocol;
# Import auto-generated command methods
use Async::Redis::Commands;
our @ISA = qw(Async::Redis::Commands);
# Key extraction for prefixing
use Async::Redis::KeyExtractor;
# Transaction support
use Async::Redis::Transaction;
# Script support
use Async::Redis::Script;
# Iterator support
use Async::Redis::Iterator;
# Pipeline support
use Async::Redis::Pipeline;
use Async::Redis::AutoPipeline;
# PubSub support
use Async::Redis::Subscription;
# Telemetry support
use Async::Redis::Telemetry;
# Try XS version first, fall back to pure Perl
BEGIN {
eval { require Protocol::Redis::XS; 1 }
or require Protocol::Redis;
}
sub _parser_class {
return $INC{'Protocol/Redis/XS.pm'} ? 'Protocol::Redis::XS' : 'Protocol::Redis';
}
sub _calculate_backoff {
my ($self, $attempt) = @_;
# Exponential: delay * 2^(attempt-1)
my $delay = $self->{reconnect_delay} * (2 ** ($attempt - 1));
# Cap at max
$delay = $self->{reconnect_delay_max} if $delay > $self->{reconnect_delay_max};
# Apply jitter: delay * (1 +/- jitter)
if ($self->{reconnect_jitter} > 0) {
my $jitter_range = $delay * $self->{reconnect_jitter};
lib/Async/Redis.pm view on Meta::CPAN
my ($self) = @_;
return if $self->{_socket_live};
if (my $f = $self->{_reconnect_future}) {
await $f;
return;
}
my $f = $self->_reconnect;
$self->{_reconnect_future} = $f;
$self->{_tasks}->add(data => 'reconnect', f => $f);
$f->on_ready(sub { $self->{_reconnect_future} = undef });
await $f;
}
# Reconnect and replay pubsub subscriptions
async sub _reconnect_pubsub {
my ($self) = @_;
my $sub = $self->{_subscription}
or die Async::Redis::Error::Disconnected->new(
message => "No subscription to replay",
);
my @replay = $sub->get_replay_commands;
# Ensure connection state is fully cleaned up before reconnecting.
# _reset_connection may have already been called by _read_response,
# but if the socket was closed externally, we need to clean up
# stale IO watchers and state here. It is safe to call twice â
# the on_disconnect callback is guarded by $was_connected.
$self->_reset_connection('pubsub_reconnect');
await $self->_reconnect;
# Re-enter pubsub mode before replaying so the unified reader
# classifies incoming message frames correctly during replay.
$self->{in_pubsub} = 1;
# Replay all subscription commands through the write gate and unified
# reader. Each channel/pattern gets its own command so confirmations
# are matched one-to-one via the inflight queue.
for my $cmd (@replay) {
my ($command, @args) = @$cmd;
for my $arg (@args) {
await $self->_pubsub_command($command, $arg);
}
}
}
# Asynchronously reconnect after a pubsub connection drop. Called by
# _reader_fatal when reconnect is enabled and a subscription is active.
# Fires _resume_after_reconnect on the subscription on success, or
# _fail_fatal on unrecoverable reconnect failure.
sub _reconnect_async {
my ($self, $sub) = @_;
# Dedup against any reconnect already in progress (from either this
# path or _ensure_connected). The slot is the shared signal.
return if $self->{_reconnect_future}
&& !$self->{_reconnect_future}->is_ready;
weaken(my $weak_self = $self);
weaken(my $weak_sub = $sub);
my $f = (async sub {
# Reconnect the socket. _reconnect handles retry/backoff and
# dies with Disconnected if reconnect_max_attempts is exhausted.
await $weak_self->_reconnect;
# Delegate the replay, on_reconnect, and driver-restart work to
# the subscription's unified resume path. _resume_after_reconnect
# handles clearing _paused, setting in_pubsub, replaying all
# tracked channels/patterns, firing on_reconnect, and starting
# the driver. Keeps the "who restarts what after reconnect"
# logic in one place.
if ($weak_sub) {
await $weak_sub->_resume_after_reconnect;
}
})->();
# Ownership: the selector owns the task; the slot is the dedup signal.
# No ->retain â the selector holds the strong reference.
$self->{_reconnect_future} = $f;
$self->{_tasks}->add(data => 'pubsub-reconnect', f => $f);
$f->on_ready(sub {
return unless $weak_self;
$weak_self->{_reconnect_future} = undef;
});
$f->on_fail(sub {
my $err = shift;
return unless $weak_sub;
$weak_sub->_fail_fatal($err);
});
return;
}
# Execute a Redis command
async sub command {
my ($self, $cmd, @args) = @_;
# Check for fork - invalidate connection if PID changed
$self->_check_fork;
# Block regular commands on pubsub connection
if ($self->{in_pubsub}) {
my $ucmd = uc($cmd // '');
unless ($ucmd =~ /^(SUBSCRIBE|UNSUBSCRIBE|PSUBSCRIBE|PUNSUBSCRIBE|SSUBSCRIBE|SUNSUBSCRIBE|PING|QUIT)$/) {
die Async::Redis::Error::Protocol->new(
message => "Cannot execute '$cmd' on connection in PubSub mode",
);
}
}
# Apply key prefixing if configured
if (defined $self->{prefix} && $self->{prefix} ne '') {
@args = Async::Redis::KeyExtractor::apply_prefix(
$self->{prefix}, $cmd, @args
);
}
# Route through auto-pipeline if enabled
( run in 0.889 second using v1.01-cache-2.11-cpan-39bf76dae61 )