Async-Redis
view release on metacpan or search on metacpan
lib/Async/Redis.pm view on Meta::CPAN
package Async::Redis;
use strict;
use warnings;
use 5.018;
our $VERSION = '0.002000';
use Future;
use Future::AsyncAwait;
use Future::IO 0.23;
use Future::Selector 0.05;
use Scalar::Util qw(blessed weaken);
use Socket qw(pack_sockaddr_in pack_sockaddr_un inet_aton AF_INET AF_UNIX SOCK_STREAM);
use IO::Handle ();
use IO::Socket::INET;
use Time::HiRes ();
# Error classes
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Redis;
use Async::Redis::Error::Protocol;
# Import auto-generated command methods
use Async::Redis::Commands;
our @ISA = qw(Async::Redis::Commands);
# Key extraction for prefixing
use Async::Redis::KeyExtractor;
# Transaction support
use Async::Redis::Transaction;
# Script support
use Async::Redis::Script;
# Iterator support
use Async::Redis::Iterator;
# Pipeline support
use Async::Redis::Pipeline;
use Async::Redis::AutoPipeline;
# PubSub support
use Async::Redis::Subscription;
# Telemetry support
use Async::Redis::Telemetry;
# Try XS version first, fall back to pure Perl
BEGIN {
eval { require Protocol::Redis::XS; 1 }
or require Protocol::Redis;
}
sub _parser_class {
return $INC{'Protocol/Redis/XS.pm'} ? 'Protocol::Redis::XS' : 'Protocol::Redis';
}
sub _calculate_backoff {
my ($self, $attempt) = @_;
# Exponential: delay * 2^(attempt-1)
my $delay = $self->{reconnect_delay} * (2 ** ($attempt - 1));
# Cap at max
$delay = $self->{reconnect_delay_max} if $delay > $self->{reconnect_delay_max};
# Apply jitter: delay * (1 +/- jitter)
if ($self->{reconnect_jitter} > 0) {
my $jitter_range = $delay * $self->{reconnect_jitter};
my $jitter = (rand(2) - 1) * $jitter_range;
$delay += $jitter;
}
return $delay;
}
# Free function, not a method. Call as _await_with_deadline($f, $deadline).
lib/Async/Redis.pm view on Meta::CPAN
sub _await_with_deadline {
my ($read_f, $deadline) = @_;
if (!defined $deadline) {
return $read_f->followed_by(sub { Future->done($read_f, 0) });
}
my $remaining = $deadline - Time::HiRes::time();
if ($remaining <= 0) {
return Future->done($read_f, 1);
}
my $timeout_f = Future::IO->sleep($remaining)
->then(sub { Future->fail('__deadline__') });
# Use without_cancel so that if timeout wins, wait_any's cancel of the
# losing future does not propagate to $read_f (caller owns its lifecycle).
return Future->wait_any($read_f->without_cancel, $timeout_f)
->followed_by(sub {
my ($f) = @_;
my $timed_out = $f->is_failed
&& (($f->failure)[0] // '') eq '__deadline__' ? 1 : 0;
if (!$timed_out && !$timeout_f->is_ready) {
$timeout_f->cancel;
}
return Future->done($read_f, $timed_out);
});
}
sub new {
my ($class, %args) = @_;
# Parse URI if provided
if ($args{uri}) {
require Async::Redis::URI;
my $uri = Async::Redis::URI->parse($args{uri});
if ($uri) {
my %uri_args = $uri->to_hash;
# URI values are defaults, explicit args override
%args = (%uri_args, %args);
delete $args{uri}; # don't store the string
}
}
my $self = bless {
path => $args{path},
host => $args{path} ? undef : ($args{host} // 'localhost'),
port => $args{path} ? undef : ($args{port} // 6379),
socket => undef,
parser => undef,
connected => 0,
_socket_live => 0,
_fatal_in_progress => 0,
_reader_running => 0, # dedup guard; the selector owns the reader Future itself
_write_lock => undef, # will be a Future used as a lock, populated lazily
_reconnect_future => undef,
_tasks => Future::Selector->new,
# Timeout settings
connect_timeout => $args{connect_timeout} // 10,
request_timeout => $args{request_timeout} // 5,
blocking_timeout_buffer => $args{blocking_timeout_buffer} // 2,
# Inflight tracking with deadlines
# Entry: { future => $f, cmd => $cmd, args => \@args, deadline => $t, sent_at => $t }
inflight => [],
# Reconnection settings
reconnect => $args{reconnect} // 0,
reconnect_delay => $args{reconnect_delay} // 0.1,
reconnect_delay_max => $args{reconnect_delay_max} // 60,
reconnect_jitter => $args{reconnect_jitter} // 0.25,
reconnect_max_attempts => $args{reconnect_max_attempts} // 10, # 0 = unlimited
_reconnect_attempt => 0,
# Callbacks
on_connect => $args{on_connect},
on_disconnect => $args{on_disconnect},
on_error => $args{on_error},
# Authentication
password => $args{password},
username => $args{username},
database => $args{database} // 0,
client_name => $args{client_name},
# TLS
tls => $args{tls},
# Key prefixing
prefix => $args{prefix},
# Pipeline settings
pipeline_depth => $args{pipeline_depth} // 10000,
auto_pipeline => $args{auto_pipeline} // 0,
# Backpressure: max queued messages before _dispatch_frame's slot wait blocks.
message_queue_depth => do {
my $d = $args{message_queue_depth} // 1;
die "message_queue_depth must be >= 1 (got $d)" if $d < 1;
$d;
},
# Transaction state
in_multi => 0,
watching => 0,
# PubSub state
in_pubsub => 0,
_subscription => undef,
_pump_running => 0,
# Fork safety
_pid => $$,
# Script registry
_scripts => {},
# Current read future for clean disconnect cancellation
lib/Async/Redis.pm view on Meta::CPAN
return $self if $self->{connected};
# Create socket â AF_UNIX for path, AF_INET for host:port
my ($socket, $sockaddr);
if ($self->{path}) {
socket($socket, AF_UNIX, SOCK_STREAM, 0)
or die Async::Redis::Error::Connection->new(
message => "Cannot create unix socket: $!",
host => $self->{path},
port => 0,
);
IO::Handle::blocking($socket, 0);
$sockaddr = pack_sockaddr_un($self->{path});
} else {
$socket = IO::Socket::INET->new(
Proto => 'tcp',
Blocking => 0,
) or die Async::Redis::Error::Connection->new(
message => "Cannot create socket: $!",
host => $self->{host},
port => $self->{port},
);
# Build sockaddr
my $addr = inet_aton($self->{host})
or die Async::Redis::Error::Connection->new(
message => "Cannot resolve host: $self->{host}",
host => $self->{host},
port => $self->{port},
);
$sockaddr = pack_sockaddr_in($self->{port}, $addr);
}
# Connect with timeout using Future->wait_any
my $connect_f = Future::IO->connect($socket, $sockaddr);
my $sleep_f = Future::IO->sleep($self->{connect_timeout});
my $timeout_f = $sleep_f->then(sub {
return Future->fail('connect_timeout');
});
my $wait_f = Future->wait_any($connect_f, $timeout_f);
# Use followed_by to handle both success and failure without await propagating failure
my $result_f = $wait_f->followed_by(sub {
my ($f) = @_;
return Future->done($f); # wrap the future itself
});
my $completed_f = await $result_f;
# Now check the result
if ($completed_f->is_failed) {
my ($error) = $completed_f->failure;
# Don't call close() - let $socket go out of scope when we die.
# Perl's DESTROY will close it after the exception unwinds.
if ($error eq 'connect_timeout') {
die Async::Redis::Error::Timeout->new(
message => "Connect timed out after $self->{connect_timeout}s",
timeout => $self->{connect_timeout},
);
}
die Async::Redis::Error::Connection->new(
message => "$error",
host => $self->{path} // $self->{host},
port => $self->{port} // 0,
);
}
# TLS upgrade if enabled
if ($self->{tls}) {
eval {
$socket = await $self->_tls_upgrade($socket);
};
if ($@) {
# Don't call close() - let $socket go out of scope when we die.
# Perl's DESTROY will close it after the exception unwinds.
die $@;
}
}
$self->{socket} = $socket;
$self->{parser} = _parser_class()->new(api => 1);
$self->{_socket_live} = 1; # write gate and reader can now submit
$self->{inflight} = [];
$self->{_pid} = $$; # Track PID for fork safety
$self->{_current_read_future} = undef;
# Run Redis protocol handshake (AUTH, SELECT, CLIENT SETNAME).
# connected stays 0 during handshake; set it only on success so
# callers never see a half-initialised object.
my $handshake_ok = eval { await $self->_redis_handshake; 1 };
unless ($handshake_ok) {
my $err = $@;
$self->_reset_connection('handshake_failure');
die $err;
}
$self->{connected} = 1;
# Initialize auto-pipeline if enabled
if ($self->{auto_pipeline}) {
$self->{_auto_pipeline} = Async::Redis::AutoPipeline->new(
redis => $self,
max_depth => $self->{pipeline_depth},
);
}
# Fire on_connect callback and reset reconnect counter
if ($self->{on_connect}) {
$self->{on_connect}->($self);
}
$self->{_reconnect_attempt} = 0;
# Telemetry: record connection
if ($self->{_telemetry}) {
$self->{_telemetry}->record_connection(1);
$self->{_telemetry}->log_event('connected',
lib/Async/Redis.pm view on Meta::CPAN
# Add command to inflight queue - returns queue depth.
# redis_error_policy: 'fail' (default) fails the future on -ERR frames;
# 'capture' calls ->done($err_obj) so callers can inspect per-slot errors
# (used by pipelining in Task N+).
sub _add_inflight {
my ($self, $future, $cmd, $args, $deadline, $redis_error_policy) = @_;
push @{$self->{inflight}}, {
future => $future,
cmd => $cmd,
args => $args,
deadline => $deadline,
redis_error => $redis_error_policy // 'fail',
sent_at => Time::HiRes::time(),
};
return scalar @{$self->{inflight}};
}
# Shift first entry from inflight queue
sub _shift_inflight {
my ($self) = @_;
return shift @{$self->{inflight}};
}
# Fail all pending inflight futures with given error
sub _fail_all_inflight {
my ($self, $error) = @_;
while (my $entry = $self->_shift_inflight) {
if ($entry->{future} && !$entry->{future}->is_ready) {
$entry->{future}->fail($error);
}
}
}
# The single socket reader. Runs while there is work (inflight or pubsub).
# Calls _reader_fatal on any stream-alignment failure. The selector
# (_tasks) owns this task; _reader_running is cleared on every exit path
# via on_ready so _ensure_reader can restart it on the next submission.
async sub _run_reader {
my ($self) = @_;
while (1) {
# Exit conditions.
return unless $self->{_socket_live};
last if !$self->{in_pubsub} && !@{$self->{inflight}};
last if $self->{in_pubsub} && !$self->{_subscription};
my $head = $self->{inflight}[0];
my $deadline = $head ? $head->{deadline} : undef;
# Set up read future; track so _reader_fatal can cancel it.
my $read_f = Future::IO->read($self->{socket}, 65536);
$self->{_current_read_future} = $read_f;
my ($returned_f, $timed_out) = await _await_with_deadline($read_f, $deadline);
# Clear slot on success path; fatal clears it on timeout/cancel.
$self->{_current_read_future} = undef
if !$timed_out && $returned_f->is_ready && !$returned_f->is_failed;
if ($timed_out) {
my $err = Async::Redis::Error::Timeout->new(
message => "Request timed out",
command => $head ? $head->{args} : undef,
timeout => $self->{request_timeout},
maybe_executed => 1,
);
$self->_reader_fatal($err);
return;
}
if ($returned_f->is_failed) {
my ($rerr) = $returned_f->failure;
my $err = Async::Redis::Error::Connection->new(
message => "Connection read error: $rerr",
host => $self->{host},
port => $self->{port},
);
$self->_reader_fatal($err);
return;
}
my $buf = $returned_f->get;
if (!defined $buf || length($buf) == 0) {
my $err = Async::Redis::Error::Connection->new(
message => "Connection closed by peer",
host => $self->{host},
port => $self->{port},
);
$self->_reader_fatal($err);
return;
}
$self->{parser}->parse($buf);
# Drain all complete messages the parser has.
while (my $msg = $self->{parser}->get_message) {
my ($kind, $value) = $self->_decode_response_result($msg);
if ($kind eq 'protocol_error') {
$self->_reader_fatal($value);
return;
}
my $is_pubsub_message = 0;
if ($self->{in_pubsub} && $kind eq 'ok' && ref($value) eq 'ARRAY') {
my $frame_name = $value->[0] // '';
$is_pubsub_message = 1
if $frame_name eq 'message'
|| $frame_name eq 'pmessage'
|| $frame_name eq 'smessage';
}
if ($is_pubsub_message) {
my $sub = $self->{_subscription};
if (!$sub) {
# No active subscription but got a message frame: strict desync.
$self->_reader_fatal(
Async::Redis::Error::Protocol->new(
message => "message frame but no active subscription",
)
);
lib/Async/Redis.pm view on Meta::CPAN
sub _ssl_verify_none {
require IO::Socket::SSL;
return IO::Socket::SSL::SSL_VERIFY_NONE();
}
# Build the IO::Socket::SSL option hash for the current connection.
# Handles chain verification, SNI, hostname identity checking, and
# client cert/key/CA forwarding. Called by _tls_upgrade and directly
# by unit tests.
sub _build_tls_options {
my ($self) = @_;
my %ssl_opts = (SSL_startHandshake => 0);
my $tls = $self->{tls};
my $tls_hash = ref $tls eq 'HASH' ? $tls : {};
my $verify = exists $tls_hash->{verify} ? !!$tls_hash->{verify} : 1;
my $verify_hostname = exists $tls_hash->{verify_hostname} ? !!$tls_hash->{verify_hostname} : 1;
$ssl_opts{SSL_ca_file} = $tls_hash->{ca_file} if $tls_hash->{ca_file};
$ssl_opts{SSL_cert_file} = $tls_hash->{cert_file} if $tls_hash->{cert_file};
$ssl_opts{SSL_key_file} = $tls_hash->{key_file} if $tls_hash->{key_file};
if ($verify) {
$ssl_opts{SSL_verify_mode} = $self->_ssl_verify_peer;
$ssl_opts{SSL_hostname} = $self->{host};
if ($verify_hostname) {
$ssl_opts{SSL_verifycn_name} = $self->{host};
$ssl_opts{SSL_verifycn_scheme} = 'default';
}
} else {
$ssl_opts{SSL_verify_mode} = $self->_ssl_verify_none;
}
return %ssl_opts;
}
# Non-blocking TLS upgrade
async sub _tls_upgrade {
my ($self, $socket) = @_;
require IO::Socket::SSL;
my %ssl_opts = $self->_build_tls_options;
# Start SSL (does not block because SSL_startHandshake => 0)
IO::Socket::SSL->start_SSL($socket, %ssl_opts)
or die Async::Redis::Error::Connection->new(
message => "SSL setup failed: " . IO::Socket::SSL::errstr(),
host => $self->{host},
port => $self->{port},
);
# Drive handshake with non-blocking loop
my $deadline = Time::HiRes::time() + $self->{connect_timeout};
while (1) {
# Check timeout
if (Time::HiRes::time() >= $deadline) {
die Async::Redis::Error::Timeout->new(
message => "TLS handshake timed out",
timeout => $self->{connect_timeout},
);
}
# Attempt handshake step
my $rv = $socket->connect_SSL;
if ($rv) {
# Handshake complete!
return $socket;
}
# Check what the handshake needs
my $remaining = $deadline - Time::HiRes::time();
$remaining = 0.1 if $remaining <= 0;
if ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_READ()) {
# Wait for socket to become readable with timeout
my $read_f = Future::IO->waitfor_readable($socket);
my $timeout_f = Future::IO->sleep($remaining)->then(sub {
return Future->fail('tls_timeout');
});
my $wait_f = Future->wait_any($read_f, $timeout_f);
await $wait_f;
if ($wait_f->is_failed) {
die Async::Redis::Error::Timeout->new(
message => "TLS handshake timed out",
timeout => $self->{connect_timeout},
);
}
}
elsif ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_WRITE()) {
# Wait for socket to become writable with timeout
my $write_f = Future::IO->waitfor_writable($socket);
my $timeout_f = Future::IO->sleep($remaining)->then(sub {
return Future->fail('tls_timeout');
});
my $wait_f = Future->wait_any($write_f, $timeout_f);
await $wait_f;
if ($wait_f->is_failed) {
die Async::Redis::Error::Timeout->new(
message => "TLS handshake timed out",
timeout => $self->{connect_timeout},
);
}
}
else {
# Actual error
die Async::Redis::Error::Connection->new(
message => "TLS handshake failed: " . IO::Socket::SSL::errstr(),
host => $self->{host},
port => $self->{port},
);
}
}
}
# Reconnect with exponential backoff
async sub _reconnect {
my ($self) = @_;
my $max = $self->{reconnect_max_attempts};
my $attempt = 0;
while (!$self->{connected}) {
$attempt++;
$self->{_reconnect_attempt} = $attempt;
my $ok = eval {
await $self->connect;
1;
};
if ($ok) {
$self->{_reconnect_attempt} = 0;
last;
}
my $error = $@;
# Fire on_error callback
if ($self->{on_error}) {
$self->{on_error}->($self, $error);
}
# Honor reconnect_max_attempts cap so an unreachable Redis
# doesn't spin forever. 0 means unlimited.
if ($max && $attempt >= $max) {
$self->{_reconnect_attempt} = 0;
die Async::Redis::Error::Disconnected->new(
message => "Reconnect gave up after $max attempts",
);
}
my $delay = $self->_calculate_backoff($attempt);
await Future::IO->sleep($delay);
}
# Reset attempt counter on success so subsequent reconnects start fresh.
$self->{_reconnect_attempt} = 0;
}
lib/Async/Redis.pm view on Meta::CPAN
die Async::Redis::Error::Disconnected->new(
message => "Not connected",
);
}
}
# Register inflight BEFORE writing so order matches the wire.
$self->_add_inflight($response, $cmd, \@args, $deadline, 'fail');
await $self->_send($raw_cmd);
})->();
});
1;
};
if (!$submit_ok) {
$error = $@;
# _with_write_gate already called _reader_fatal on write failure.
} else {
$self->_ensure_reader;
# run_until_ready awaits $response while the selector pumps the
# reader (and any other adopted tasks). If any selector task fails
# unhandled â in particular, the reader â the failure propagates
# here, so callers never hang waiting on a dead reader.
my $await_ok = eval {
$result = await $self->{_tasks}->run_until_ready($response);
1;
};
if (!$await_ok) { $error = $@ }
}
# Telemetry: log result and end span
if ($self->{_telemetry}) {
my $elapsed_ms = (Time::HiRes::time() - $start_time) * 1000;
if ($error) {
$self->{_telemetry}->log_error($error);
}
else {
$self->{_telemetry}->log_recv($result, $elapsed_ms);
}
$self->{_telemetry}->end_command_span($span_context, $error);
}
die $error if $error;
return $result;
}
# Read response with deadline enforcement
async sub _read_response_with_deadline {
my ($self, $deadline, $cmd_ref) = @_;
# First check if parser already has a complete message
if (my $msg = $self->{parser}->get_message) {
return $msg;
}
# Read until we get a complete message
while (1) {
my $remaining = $deadline - Time::HiRes::time();
if ($remaining <= 0) {
$self->_reset_connection;
die Async::Redis::Error::Timeout->new(
message => "Request timed out after $self->{request_timeout}s",
command => $cmd_ref,
timeout => $self->{request_timeout},
maybe_executed => 1, # already sent the command
);
}
# Use wait_any for timeout
my $read_f = Future::IO->read($self->{socket}, 65536);
# Store reference so disconnect() can cancel it
$self->{_current_read_future} = $read_f;
my $timeout_f = Future::IO->sleep($remaining)->then(sub {
return Future->fail('read_timeout');
});
my $wait_f = Future->wait_any($read_f, $timeout_f);
await $wait_f;
# Clear stored reference after await completes
$self->{_current_read_future} = undef;
# Check if read was cancelled (by disconnect)
if ($read_f->is_cancelled) {
die Async::Redis::Error::Disconnected->new(
message => "Disconnected during read",
);
}
if ($wait_f->is_failed) {
my ($error) = $wait_f->failure;
if ($error eq 'read_timeout') {
$self->_reset_connection;
die Async::Redis::Error::Timeout->new(
message => "Request timed out after $self->{request_timeout}s",
command => $cmd_ref,
timeout => $self->{request_timeout},
maybe_executed => 1,
);
}
$self->_reset_connection;
die Async::Redis::Error::Connection->new(
message => "$error",
);
}
# Get the read result
my $buf = $wait_f->get;
# EOF
if (!defined $buf || length($buf) == 0) {
$self->_reset_connection;
die Async::Redis::Error::Connection->new(
message => "Connection closed by server",
);
}
$self->{parser}->parse($buf);
if (my $msg = $self->{parser}->get_message) {
return $msg;
}
}
}
# Reset connection after timeout (stream is desynced)
sub _reset_connection {
my ($self, $reason) = @_;
$reason //= 'timeout';
my $was_connected = $self->{connected};
# Cancel any active read future BEFORE closing socket
# This ensures Future::IO unregisters its watcher while fileno is still valid
if ($self->{_current_read_future} && !$self->{_current_read_future}->is_ready) {
$self->{_current_read_future}->cancel;
$self->{_current_read_future} = undef;
}
# Cancel any pending inflight operations before closing socket
if (my $inflight = $self->{inflight}) {
for my $entry (@$inflight) {
if ($entry->{future} && !$entry->{future}->is_ready) {
$entry->{future}->cancel;
}
}
$self->{inflight} = [];
}
if ($self->{socket}) {
$self->_close_socket;
}
$self->{_socket_live} = 0;
lib/Async/Redis.pm view on Meta::CPAN
unless (@commands) {
await $self->unwatch;
$watch_active = 0;
$results = [];
}
else {
await $self->multi_start;
$multi_started = 1;
for my $cmd (@commands) {
await $self->command(@$cmd);
}
$results = await $self->exec;
$multi_started = 0;
$watch_active = 0;
}
1;
};
my $error = $@;
if (!$ok) {
if ($multi_started) {
eval { await $self->discard; 1 };
}
elsif ($watch_active) {
eval { await $self->unwatch; 1 };
}
# Cleanup can fail on a dead socket; keep local state conservative
# and preserve the original caller-facing error.
$self->{in_multi} = 0;
$self->{watching} = 0;
die $error;
}
# EXEC returns undef/nil if WATCH failed
return $results;
}
# ============================================================================
# PUB/SUB
# ============================================================================
# Wait for inflight commands to complete before mode change
async sub _wait_for_inflight_drain {
my ($self, $timeout) = @_;
$timeout //= 30;
return unless @{$self->{inflight}};
my $deadline = Time::HiRes::time() + $timeout;
while (@{$self->{inflight}} && Time::HiRes::time() < $deadline) {
await Future::IO->sleep(0.001);
}
if (@{$self->{inflight}}) {
$self->_fail_all_inflight("Timeout waiting for inflight commands");
}
}
async sub publish {
my ($self, $channel, $message) = @_;
return await $self->command('PUBLISH', $channel, $message);
}
async sub spublish {
my ($self, $channel, $message) = @_;
return await $self->command('SPUBLISH', $channel, $message);
}
# Subscribe to channels - returns a Subscription object
async sub subscribe {
my ($self, @channels) = @_;
die Async::Redis::Error::Disconnected->new(
message => "Not connected",
) unless $self->{connected};
# Wait for pending commands before entering PubSub mode
await $self->_wait_for_inflight_drain;
# Clear a stale closed subscription so we allocate a fresh object.
if ($self->{_subscription} && $self->{_subscription}->is_closed) {
delete $self->{_subscription};
}
# Create or reuse subscription
my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
# Set in_pubsub BEFORE submitting so the unified reader classifies
# racing message frames correctly (e.g. published before our
# confirmation arrives).
$self->{in_pubsub} = 1;
# Issue one SUBSCRIBE per channel through the write gate and unified
# reader. Each call awaits its matching confirmation frame.
for my $ch (@channels) {
await $self->_pubsub_command('SUBSCRIBE', $ch);
$sub->_add_channel($ch);
}
return $sub;
}
# Pattern subscribe
async sub psubscribe {
my ($self, @patterns) = @_;
die Async::Redis::Error::Disconnected->new(
message => "Not connected",
) unless $self->{connected};
# Wait for pending commands before entering PubSub mode
await $self->_wait_for_inflight_drain;
# Clear a stale closed subscription so we allocate a fresh object.
if ($self->{_subscription} && $self->{_subscription}->is_closed) {
lib/Async/Redis.pm view on Meta::CPAN
my $pipe = $redis->pipeline;
$pipe->run_script('atomic_incr', 'counter:a', 1);
$pipe->run_script('atomic_incr', 'counter:b', 1);
$pipe->set('other:key', 'value');
my $results = await $pipe->execute;
Scripts are automatically preloaded before pipeline execution.
=head2 EVALSHA Optimization
Scripts automatically use EVALSHA (by SHA1 hash) for efficiency.
If the script isn't cached on the server, it falls back to EVAL
and caches for future calls. This is transparent to your code.
=head2 scan_iter
my $iter = $redis->scan_iter(match => 'user:*', count => 100);
while (my $keys = await $iter->next) {
for my $key (@$keys) { ... }
}
Create an iterator for SCAN. Also available:
my $hash_iter = $redis->hscan_iter('hash', match => 'field:*');
my $set_iter = $redis->sscan_iter('set', count => 100);
my $zset_iter = $redis->zscan_iter('zset');
Iterators return batches. C<ZSCAN> batches are the Redis flat
member/score list.
=head1 CONNECTION POOLING
For high-throughput applications, use L<Async::Redis::Pool>:
use Async::Redis::Pool;
my $pool = Async::Redis::Pool->new(
host => 'localhost',
min => 2,
max => 10,
);
# Use with() for automatic acquire/release
my $result = await $pool->with(sub {
my ($conn) = @_;
return $conn->get('key');
});
=head1 ERROR HANDLING
Errors are thrown as exception objects:
eval {
await $redis->get('key');
1;
} or do {
my $error = $@;
if (ref($error) && $error->isa('Async::Redis::Error::Connection')) {
# Connection error
} elsif (ref($error) && $error->isa('Async::Redis::Error::Timeout')) {
# Timeout error
} elsif (ref($error) && $error->isa('Async::Redis::Error::Redis')) {
# Redis error (e.g., WRONGTYPE)
}
};
Exception classes:
=over 4
=item Async::Redis::Error::Connection
Connection-related errors (refused, reset, etc.)
=item Async::Redis::Error::Timeout
Timeout errors (connect, request, read).
=item Async::Redis::Error::Protocol
Protocol parsing errors.
=item Async::Redis::Error::Redis
Errors returned by Redis (WRONGTYPE, ERR, etc.)
=item Async::Redis::Error::Disconnected
Operation attempted on disconnected client.
=back
=head1 FORK SAFETY
Async::Redis is fork-safe. When a fork is detected, the child
process will automatically invalidate its connection state and
reconnect when needed. The parent retains ownership of the original
connection.
=head1 EVENT LOOP CONFIGURATION
Async::Redis uses L<Future::IO> for event loop abstraction, making it
compatible with IO::Async, UV, AnyEvent, and other event loops. However,
B<Async::Redis does not choose which event loop to use> - that's the
application's responsibility.
=head2 Default (No Configuration Needed)
B<Future::IO 0.23+> includes a built-in poll-based implementation that works
out of the box. For standalone scripts, you don't need to configure anything:
#!/usr/bin/env perl
use strict;
use warnings;
use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');
# Just works - Future::IO uses its built-in IO::Poll backend
=head2 The Golden Rule
B<Only executable scripts should configure Future::IO.> Library modules
(C<.pm> files) should never configure the backend because they don't know
what event loop the application wants to use.
=head2 For IO::Async Applications
If your application already uses IO::Async for its event loop, load the
implementation directly:
use IO::Async::Loop;
require Future::IO::Impl::IOAsync;
my $loop = IO::Async::Loop->new;
use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');
( run in 1.555 second using v1.01-cache-2.11-cpan-39bf76dae61 )