Async-Redis
view release on metacpan or search on metacpan
lib/Async/Redis.pm view on Meta::CPAN
BRPOPLPUSH => { position => 'last' },
BLMOVE => { position => 'last' },
BZPOPMIN => { position => 'last' },
BZPOPMAX => { position => 'last' },
BLMPOP => { position => 0 },
BZMPOP => { position => 0 },
XREAD => { position => 'block_option', unit => 'ms' },
XREADGROUP => { position => 'block_option', unit => 'ms' },
WAIT => { position => 'last', unit => 'ms' },
WAITAOF => { position => 'last', unit => 'ms' },
);
# Calculate deadline based on command type
sub _calculate_deadline {
my ($self, $cmd, @args) = @_;
$cmd = uc($cmd // '');
my $spec = $BLOCKING_TIMEOUT{$cmd};
if (!$spec) {
return Time::HiRes::time() + $self->{request_timeout};
}
my $raw;
my $pos = $spec->{position};
if ($pos eq 'last') {
$raw = $args[-1];
}
elsif ($pos eq 'block_option') {
for my $i (0 .. $#args - 1) {
if (uc($args[$i] // '') eq 'BLOCK') {
$raw = $args[$i + 1];
last;
}
}
# No BLOCK option found â non-blocking variant; use request_timeout
return Time::HiRes::time() + $self->{request_timeout}
unless defined $raw;
}
else {
# Numeric index into @args
$raw = $args[$pos];
}
if (!defined $raw || $raw !~ /^-?\d+(?:\.\d+)?$/) {
warn "_calculate_deadline: non-numeric timeout for $cmd; falling back to request_timeout\n";
return Time::HiRes::time() + $self->{request_timeout};
}
my $seconds = ($spec->{unit} // 'seconds') eq 'ms'
? $raw / 1000
: $raw + 0;
# Zero means block indefinitely â no client-side deadline
return undef if $seconds == 0;
return Time::HiRes::time() + $seconds + $self->{blocking_timeout_buffer};
}
sub _ssl_verify_peer {
require IO::Socket::SSL;
return IO::Socket::SSL::SSL_VERIFY_PEER();
}
sub _ssl_verify_none {
require IO::Socket::SSL;
return IO::Socket::SSL::SSL_VERIFY_NONE();
}
# Build the IO::Socket::SSL option hash for the current connection.
# Handles chain verification, SNI, hostname identity checking, and
# client cert/key/CA forwarding. Called by _tls_upgrade and directly
# by unit tests.
sub _build_tls_options {
my ($self) = @_;
my %ssl_opts = (SSL_startHandshake => 0);
my $tls = $self->{tls};
my $tls_hash = ref $tls eq 'HASH' ? $tls : {};
my $verify = exists $tls_hash->{verify} ? !!$tls_hash->{verify} : 1;
my $verify_hostname = exists $tls_hash->{verify_hostname} ? !!$tls_hash->{verify_hostname} : 1;
$ssl_opts{SSL_ca_file} = $tls_hash->{ca_file} if $tls_hash->{ca_file};
$ssl_opts{SSL_cert_file} = $tls_hash->{cert_file} if $tls_hash->{cert_file};
$ssl_opts{SSL_key_file} = $tls_hash->{key_file} if $tls_hash->{key_file};
if ($verify) {
$ssl_opts{SSL_verify_mode} = $self->_ssl_verify_peer;
$ssl_opts{SSL_hostname} = $self->{host};
if ($verify_hostname) {
$ssl_opts{SSL_verifycn_name} = $self->{host};
$ssl_opts{SSL_verifycn_scheme} = 'default';
}
} else {
$ssl_opts{SSL_verify_mode} = $self->_ssl_verify_none;
}
return %ssl_opts;
}
# Non-blocking TLS upgrade
async sub _tls_upgrade {
my ($self, $socket) = @_;
require IO::Socket::SSL;
my %ssl_opts = $self->_build_tls_options;
# Start SSL (does not block because SSL_startHandshake => 0)
IO::Socket::SSL->start_SSL($socket, %ssl_opts)
or die Async::Redis::Error::Connection->new(
message => "SSL setup failed: " . IO::Socket::SSL::errstr(),
host => $self->{host},
port => $self->{port},
);
# Drive handshake with non-blocking loop
my $deadline = Time::HiRes::time() + $self->{connect_timeout};
while (1) {
# Check timeout
if (Time::HiRes::time() >= $deadline) {
die Async::Redis::Error::Timeout->new(
message => "TLS handshake timed out",
timeout => $self->{connect_timeout},
);
}
# Attempt handshake step
my $rv = $socket->connect_SSL;
if ($rv) {
# Handshake complete!
return $socket;
}
# Check what the handshake needs
my $remaining = $deadline - Time::HiRes::time();
$remaining = 0.1 if $remaining <= 0;
if ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_READ()) {
# Wait for socket to become readable with timeout
my $read_f = Future::IO->waitfor_readable($socket);
my $timeout_f = Future::IO->sleep($remaining)->then(sub {
return Future->fail('tls_timeout');
});
my $wait_f = Future->wait_any($read_f, $timeout_f);
await $wait_f;
if ($wait_f->is_failed) {
die Async::Redis::Error::Timeout->new(
message => "TLS handshake timed out",
timeout => $self->{connect_timeout},
);
}
}
elsif ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_WRITE()) {
# Wait for socket to become writable with timeout
my $write_f = Future::IO->waitfor_writable($socket);
my $timeout_f = Future::IO->sleep($remaining)->then(sub {
return Future->fail('tls_timeout');
});
my $wait_f = Future->wait_any($write_f, $timeout_f);
await $wait_f;
if ($wait_f->is_failed) {
die Async::Redis::Error::Timeout->new(
message => "TLS handshake timed out",
timeout => $self->{connect_timeout},
);
}
}
else {
# Actual error
die Async::Redis::Error::Connection->new(
message => "TLS handshake failed: " . IO::Socket::SSL::errstr(),
host => $self->{host},
port => $self->{port},
);
}
}
}
# Reconnect with exponential backoff
async sub _reconnect {
my ($self) = @_;
my $max = $self->{reconnect_max_attempts};
my $attempt = 0;
while (!$self->{connected}) {
$attempt++;
$self->{_reconnect_attempt} = $attempt;
my $ok = eval {
await $self->connect;
1;
};
if ($ok) {
$self->{_reconnect_attempt} = 0;
last;
}
my $error = $@;
# Fire on_error callback
if ($self->{on_error}) {
$self->{on_error}->($self, $error);
}
# Honor reconnect_max_attempts cap so an unreachable Redis
# doesn't spin forever. 0 means unlimited.
if ($max && $attempt >= $max) {
$self->{_reconnect_attempt} = 0;
die Async::Redis::Error::Disconnected->new(
message => "Reconnect gave up after $max attempts",
);
}
my $delay = $self->_calculate_backoff($attempt);
await Future::IO->sleep($delay);
}
# Reset attempt counter on success so subsequent reconnects start fresh.
$self->{_reconnect_attempt} = 0;
}
# Ensure the socket is live, reconnecting if configured.
#
# Dedup: $self->{_reconnect_future} is the Future for the in-flight
# reconnect. Concurrent callers share it. The slot is the shared-await
# signal, NOT the ownership â ownership lives in $self->{_tasks}.
#
# Structured-concurrency: the reconnect task is added to the selector
# so any caller currently awaiting via run_until_ready sees reconnect
( run in 0.723 second using v1.01-cache-2.11-cpan-39bf76dae61 )