view release on metacpan or search on metacpan
examples/stress/stress
lib/Async/Redis.pm
lib/Async/Redis/AutoPipeline.pm
lib/Async/Redis/Commands.pm
lib/Async/Redis/Cookbook.pod
lib/Async/Redis/Error.pm
lib/Async/Redis/Error/Connection.pm
lib/Async/Redis/Error/Disconnected.pm
lib/Async/Redis/Error/Protocol.pm
lib/Async/Redis/Error/Redis.pm
lib/Async/Redis/Error/Timeout.pm
lib/Async/Redis/Iterator.pm
lib/Async/Redis/KeyExtractor.pm
lib/Async/Redis/Pipeline.pm
lib/Async/Redis/Pool.pm
lib/Async/Redis/Script.pm
lib/Async/Redis/Subscription.pm
lib/Async/Redis/Telemetry.pm
lib/Async/Redis/Transaction.pm
lib/Async/Redis/URI.pm
script/commands.json
port => 6379,
# Or use URI
uri => 'redis://user:pass@host:6379/1',
# Authentication
password => 'secret',
username => 'myuser', # Redis 6+ ACL
database => 1,
# Timeouts
connect_timeout => 10,
request_timeout => 5,
# Auto-reconnect
reconnect => 1,
reconnect_delay => 0.1,
reconnect_delay_max => 60,
# TLS
tls => {
examples/pagi-chat/public/js/app.js view on Meta::CPAN
// ===== State =====
const state = {
username: '',
userId: '',
sessionId: '', // Persistent session ID for reconnection
currentRoom: 'general',
rooms: {},
users: {},
ws: null,
sse: null,
typingTimeout: null,
isTyping: false,
reconnectAttempts: 0,
maxReconnectDelay: 30000, // Max 30 seconds between attempts
pingInterval: null,
pingIntervalMs: 10000, // Send ping every 10 seconds
lastPongTime: 0, // Track last pong received
heartbeatCheckInterval: null,
heartbeatTimeoutMs: 35000, // Consider connection dead if no pong in 35s
lastMsgId: 0 // Track last received message ID for catch-up
};
// ===== DOM Elements =====
const elements = {
// Screens
loginScreen: document.getElementById('login-screen'),
chatScreen: document.getElementById('chat-screen'),
// Login
examples/pagi-chat/public/js/app.js view on Meta::CPAN
}
// ===== Toast Notifications =====
function showToast(message, type = 'info') {
const toast = document.createElement('div');
toast.className = `toast ${type}`;
toast.textContent = message;
elements.toastContainer.appendChild(toast);
setTimeout(() => {
toast.style.animation = 'slideIn 0.3s ease reverse';
setTimeout(() => toast.remove(), 300);
}, 3000);
}
// ===== Exponential Backoff =====
function calculateReconnectDelay() {
// Formula: min(1000 * 2^attempts + random(0, 1000), 30000)
const baseDelay = 1000 * Math.pow(2, state.reconnectAttempts);
const jitter = Math.random() * 1000;
return Math.min(baseDelay + jitter, state.maxReconnectDelay);
}
examples/pagi-chat/public/js/app.js view on Meta::CPAN
sendMessage({ type: 'ping' });
}
}, state.pingIntervalMs);
// Start heartbeat timeout check
if (state.heartbeatCheckInterval) {
clearInterval(state.heartbeatCheckInterval);
}
state.heartbeatCheckInterval = setInterval(() => {
const timeSinceLastPong = Date.now() - state.lastPongTime;
if (timeSinceLastPong > state.heartbeatTimeoutMs) {
console.warn('Heartbeat timeout - connection appears dead, reconnecting...');
if (state.ws) {
state.ws.close();
}
}
}, 5000); // Check every 5 seconds
};
state.ws.onclose = (event) => {
setConnectionStatus('disconnected');
examples/pagi-chat/public/js/app.js view on Meta::CPAN
if (state.heartbeatCheckInterval) {
clearInterval(state.heartbeatCheckInterval);
state.heartbeatCheckInterval = null;
}
// Always try to reconnect with exponential backoff
state.reconnectAttempts++;
const delay = calculateReconnectDelay();
console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${state.reconnectAttempts})...`);
setConnectionStatus('reconnecting');
setTimeout(connectWebSocket, delay);
};
state.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
state.ws.onmessage = (event) => {
// Any message resets the heartbeat timer
state.lastPongTime = Date.now();
examples/pagi-chat/public/js/app.js view on Meta::CPAN
if (!state.isTyping) {
state.isTyping = true;
sendMessage({
type: 'typing',
room: state.currentRoom,
typing: true
});
}
// Clear existing timeout
if (state.typingTimeout) {
clearTimeout(state.typingTimeout);
}
// Stop typing after 2 seconds of inactivity
state.typingTimeout = setTimeout(() => {
state.isTyping = false;
sendMessage({
type: 'typing',
room: state.currentRoom,
typing: false
});
}, 2000);
}
// ===== Event Handlers =====
examples/pagi-chat/public/js/app.js view on Meta::CPAN
const text = elements.messageInput.value.trim();
if (text) {
sendMessage({
type: 'message',
room: state.currentRoom,
text: text
});
elements.messageInput.value = '';
// Stop typing indicator
if (state.typingTimeout) {
clearTimeout(state.typingTimeout);
}
state.isTyping = false;
}
});
// Typing detection
elements.messageInput.addEventListener('input', handleTyping);
// Leave room button
elements.leaveRoomBtn.addEventListener('click', () => {
lib/Async/Redis.pm view on Meta::CPAN
use Future::IO 0.23;
use Future::Selector 0.05;
use Scalar::Util qw(blessed weaken);
use Socket qw(pack_sockaddr_in pack_sockaddr_un inet_aton AF_INET AF_UNIX SOCK_STREAM);
use IO::Handle ();
use IO::Socket::INET;
use Time::HiRes ();
# Error classes
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Redis;
use Async::Redis::Error::Protocol;
# Import auto-generated command methods
use Async::Redis::Commands;
our @ISA = qw(Async::Redis::Commands);
# Key extraction for prefixing
use Async::Redis::KeyExtractor;
lib/Async/Redis.pm view on Meta::CPAN
socket => undef,
parser => undef,
connected => 0,
_socket_live => 0,
_fatal_in_progress => 0,
_reader_running => 0, # dedup guard; the selector owns the reader Future itself
_write_lock => undef, # will be a Future used as a lock, populated lazily
_reconnect_future => undef,
_tasks => Future::Selector->new,
# Timeout settings
connect_timeout => $args{connect_timeout} // 10,
request_timeout => $args{request_timeout} // 5,
blocking_timeout_buffer => $args{blocking_timeout_buffer} // 2,
# Inflight tracking with deadlines
# Entry: { future => $f, cmd => $cmd, args => \@args, deadline => $t, sent_at => $t }
inflight => [],
# Reconnection settings
reconnect => $args{reconnect} // 0,
lib/Async/Redis.pm view on Meta::CPAN
my $completed_f = await $result_f;
# Now check the result
if ($completed_f->is_failed) {
my ($error) = $completed_f->failure;
# Don't call close() - let $socket go out of scope when we die.
# Perl's DESTROY will close it after the exception unwinds.
if ($error eq 'connect_timeout') {
die Async::Redis::Error::Timeout->new(
message => "Connect timed out after $self->{connect_timeout}s",
timeout => $self->{connect_timeout},
);
}
die Async::Redis::Error::Connection->new(
message => "$error",
host => $self->{path} // $self->{host},
port => $self->{port} // 0,
);
}
lib/Async/Redis.pm view on Meta::CPAN
my $read_f = Future::IO->read($self->{socket}, 65536);
$self->{_current_read_future} = $read_f;
my ($returned_f, $timed_out) = await _await_with_deadline($read_f, $deadline);
# Clear slot on success path; fatal clears it on timeout/cancel.
$self->{_current_read_future} = undef
if !$timed_out && $returned_f->is_ready && !$returned_f->is_failed;
if ($timed_out) {
my $err = Async::Redis::Error::Timeout->new(
message => "Request timed out",
command => $head ? $head->{args} : undef,
timeout => $self->{request_timeout},
maybe_executed => 1,
);
$self->_reader_fatal($err);
return;
}
if ($returned_f->is_failed) {
lib/Async/Redis.pm view on Meta::CPAN
host => $self->{host},
port => $self->{port},
);
# Drive handshake with non-blocking loop
my $deadline = Time::HiRes::time() + $self->{connect_timeout};
while (1) {
# Check timeout
if (Time::HiRes::time() >= $deadline) {
die Async::Redis::Error::Timeout->new(
message => "TLS handshake timed out",
timeout => $self->{connect_timeout},
);
}
# Attempt handshake step
my $rv = $socket->connect_SSL;
if ($rv) {
# Handshake complete!
lib/Async/Redis.pm view on Meta::CPAN
# Wait for socket to become readable with timeout
my $read_f = Future::IO->waitfor_readable($socket);
my $timeout_f = Future::IO->sleep($remaining)->then(sub {
return Future->fail('tls_timeout');
});
my $wait_f = Future->wait_any($read_f, $timeout_f);
await $wait_f;
if ($wait_f->is_failed) {
die Async::Redis::Error::Timeout->new(
message => "TLS handshake timed out",
timeout => $self->{connect_timeout},
);
}
}
elsif ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_WRITE()) {
# Wait for socket to become writable with timeout
my $write_f = Future::IO->waitfor_writable($socket);
my $timeout_f = Future::IO->sleep($remaining)->then(sub {
return Future->fail('tls_timeout');
});
my $wait_f = Future->wait_any($write_f, $timeout_f);
await $wait_f;
if ($wait_f->is_failed) {
die Async::Redis::Error::Timeout->new(
message => "TLS handshake timed out",
timeout => $self->{connect_timeout},
);
}
}
else {
# Actual error
die Async::Redis::Error::Connection->new(
message => "TLS handshake failed: " . IO::Socket::SSL::errstr(),
host => $self->{host},
lib/Async/Redis.pm view on Meta::CPAN
if (my $msg = $self->{parser}->get_message) {
return $msg;
}
# Read until we get a complete message
while (1) {
my $remaining = $deadline - Time::HiRes::time();
if ($remaining <= 0) {
$self->_reset_connection;
die Async::Redis::Error::Timeout->new(
message => "Request timed out after $self->{request_timeout}s",
command => $cmd_ref,
timeout => $self->{request_timeout},
maybe_executed => 1, # already sent the command
);
}
# Use wait_any for timeout
my $read_f = Future::IO->read($self->{socket}, 65536);
lib/Async/Redis.pm view on Meta::CPAN
if ($read_f->is_cancelled) {
die Async::Redis::Error::Disconnected->new(
message => "Disconnected during read",
);
}
if ($wait_f->is_failed) {
my ($error) = $wait_f->failure;
if ($error eq 'read_timeout') {
$self->_reset_connection;
die Async::Redis::Error::Timeout->new(
message => "Request timed out after $self->{request_timeout}s",
command => $cmd_ref,
timeout => $self->{request_timeout},
maybe_executed => 1,
);
}
$self->_reset_connection;
die Async::Redis::Error::Connection->new(
message => "$error",
);
lib/Async/Redis.pm view on Meta::CPAN
return unless @{$self->{inflight}};
my $deadline = Time::HiRes::time() + $timeout;
while (@{$self->{inflight}} && Time::HiRes::time() < $deadline) {
await Future::IO->sleep(0.001);
}
if (@{$self->{inflight}}) {
$self->_fail_all_inflight("Timeout waiting for inflight commands");
}
}
async sub publish {
my ($self, $channel, $message) = @_;
return await $self->command('PUBLISH', $channel, $message);
}
async sub spublish {
my ($self, $channel, $message) = @_;
lib/Async/Redis.pm view on Meta::CPAN
Errors are thrown as exception objects:
eval {
await $redis->get('key');
1;
} or do {
my $error = $@;
if (ref($error) && $error->isa('Async::Redis::Error::Connection')) {
# Connection error
} elsif (ref($error) && $error->isa('Async::Redis::Error::Timeout')) {
# Timeout error
} elsif (ref($error) && $error->isa('Async::Redis::Error::Redis')) {
# Redis error (e.g., WRONGTYPE)
}
};
Exception classes:
=over 4
=item Async::Redis::Error::Connection
Connection-related errors (refused, reset, etc.)
=item Async::Redis::Error::Timeout
Timeout errors (connect, request, read).
=item Async::Redis::Error::Protocol
Protocol parsing errors.
=item Async::Redis::Error::Redis
Errors returned by Redis (WRONGTYPE, ERR, etc.)
=item Async::Redis::Error::Disconnected
lib/Async/Redis/Error/Timeout.pm view on Meta::CPAN
package Async::Redis::Error::Timeout;
use strict;
use warnings;
use 5.018;
use parent 'Async::Redis::Error';
sub command { shift->{command} }
sub timeout { shift->{timeout} }
sub maybe_executed { shift->{maybe_executed} }
1;
__END__
=head1 NAME
Async::Redis::Error::Timeout - Timeout exception
=head1 DESCRIPTION
Thrown when a Redis operation times out.
=head1 ATTRIBUTES
=over 4
=item command - The command that timed out (arrayref)
lib/Async/Redis/Pool.pm view on Meta::CPAN
use strict;
use warnings;
use 5.018;
use Future;
use Future::AsyncAwait;
use Future::IO;
use Scalar::Util qw(refaddr);
use Async::Redis;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Timeout;
sub new {
my ($class, %args) = @_;
# Separate pool-specific args from connection args.
# Everything not pool-specific is passed through to Async::Redis->new().
my %pool_args;
for my $key (qw(min max acquire_timeout idle_timeout cleanup_timeout on_dirty)) {
$pool_args{$key} = delete $args{$key} if exists $args{$key};
}
my $self = bless {
# Connection params (passed through to Async::Redis->new)
_conn_args => \%args,
# Pool sizing
min => $pool_args{min} // 1,
max => $pool_args{max} // 10,
# Timeouts
acquire_timeout => $pool_args{acquire_timeout} // 5,
idle_timeout => $pool_args{idle_timeout} // 60,
cleanup_timeout => $pool_args{cleanup_timeout} // 5,
# Dirty handling
on_dirty => $pool_args{on_dirty} // 'destroy',
# Pool state
_idle => [], # Available connections
_active => {}, # Connections in use (refaddr => conn)
lib/Async/Redis/Pool.pm view on Meta::CPAN
$self->{_active}{refaddr($conn)} = $conn;
return $conn;
}
# At max capacity - wait for release
my $waiter = Future->new;
push @{$self->{_waiters}}, $waiter;
my $timeout_future = Future::IO->sleep($self->{acquire_timeout})->then(sub {
Future->fail(Async::Redis::Error::Timeout->new(
message => "Acquire timed out after $self->{acquire_timeout}s",
timeout => $self->{acquire_timeout},
));
});
my $wait_f = Future->wait_any($waiter, $timeout_future);
my $result;
eval {
$result = await $wait_f;
lib/Async/Redis/Pool.pm view on Meta::CPAN
connections after dirty connections are destroyed if the total drops below
this value.
=item max
Maximum number of active, idle, and currently-creating connections. Default: 10.
=item acquire_timeout
Seconds to wait for a connection when the pool is at capacity. Default: 5.
Timeouts throw L<Async::Redis::Error::Timeout>.
=item cleanup_timeout
Seconds to allow a best-effort cleanup command such as C<DISCARD> or
C<UNWATCH>. Default: 5.
=item on_dirty
Dirty connection policy. Default: C<destroy>.
t/01-unit/error.t view on Meta::CPAN
# t/01-unit/error.t
use strict;
use warnings;
use Test2::V0;
use Async::Redis::Error;
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Protocol;
use Async::Redis::Error::Redis;
use Async::Redis::Error::Disconnected;
subtest 'Error base class' => sub {
my $e = Async::Redis::Error->new(message => 'test error');
ok($e->isa('Async::Redis::Error'), 'isa Error');
is($e->message, 'test error', 'message accessor');
like("$e", qr/test error/, 'stringifies to message');
};
t/01-unit/error.t view on Meta::CPAN
reason => 'timeout',
);
ok($e->isa('Async::Redis::Error'), 'isa base Error');
ok($e->isa('Async::Redis::Error::Connection'), 'isa Connection');
is($e->host, 'localhost', 'host accessor');
is($e->port, 6379, 'port accessor');
is($e->reason, 'timeout', 'reason accessor');
like("$e", qr/connection lost/, 'stringifies');
};
subtest 'Timeout error' => sub {
my $e = Async::Redis::Error::Timeout->new(
message => 'request timed out',
command => ['GET', 'mykey'],
timeout => 5,
maybe_executed => 0,
);
ok($e->isa('Async::Redis::Error'), 'isa base Error');
ok($e->isa('Async::Redis::Error::Timeout'), 'isa Timeout');
is($e->command, ['GET', 'mykey'], 'command accessor');
is($e->timeout, 5, 'timeout accessor');
ok(!$e->maybe_executed, 'maybe_executed false');
my $e2 = Async::Redis::Error::Timeout->new(
message => 'timed out after write',
maybe_executed => 1,
);
ok($e2->maybe_executed, 'maybe_executed true');
};
subtest 'Protocol error' => sub {
my $e = Async::Redis::Error::Protocol->new(
message => 'unexpected response type',
data => '+OK',
t/92-concurrency/reader-fatal-paths.t view on Meta::CPAN
my $c = Async::Redis->new(host => 'x', port => 1);
$c->{_socket_live} = 1;
$c->{connected} = 1;
my $f1 = Future->new;
my $f2 = Future->new;
$c->{inflight} = [
{ future => $f1, cmd => 'GET', args => ['x'], deadline => 0 },
{ future => $f2, cmd => 'SET', args => ['y', 'z'], deadline => 0 },
];
my $err = Async::Redis::Error::Timeout->new(
message => 'test timeout',
timeout => 5,
);
$c->_reader_fatal($err);
ok $f1->is_failed, 'f1 failed';
my ($e1) = $f1->failure;
ok blessed($e1) && $e1->isa('Async::Redis::Error::Timeout'),
'f1 carries the typed error, not cancellation';
ok $f2->is_failed, 'f2 failed';
my ($e2) = $f2->failure;
ok blessed($e2) && $e2->isa('Async::Redis::Error::Timeout'),
'f2 also carries the typed error';
is scalar @{$c->{inflight}}, 0, 'inflight drained';
is $c->{_socket_live}, 0, '_socket_live cleared';
is $c->{connected}, 0, 'connected cleared';
};
subtest 'reentrance during same call is a no-op (idempotence guard)' => sub {
my $c = Async::Redis->new(host => 'x', port => 1);
$c->{_socket_live} = 1;
t/92-concurrency/reader-invariants.t view on Meta::CPAN
# inject the synthetic timeout and verify the typed error propagates.
my $blpop_f = $r->command('BLPOP', 'test-timeout-injection-list', 30);
# Yield briefly so the write goes out and the reader is waiting.
await Future::IO->sleep(0.02);
force_read_timeout($r);
my $ok = eval { await $blpop_f; 1 };
ok !$ok, 'blpop failed';
my $err = $@;
isa_ok $err, ['Async::Redis::Error::Timeout'];
})->()->get;
};
subtest 'fuzz: randomized mix of ops, every future resolves or fails' => sub {
(async sub {
my $r = new_redis();
await $r->connect;
my @ops;
for my $i (1..30) {
t/92-concurrency/selector-invariants.t view on Meta::CPAN
eval { $r->disconnect };
})->()->get;
};
subtest 'disconnect under load: write-gate waiters unwind cleanly' => sub {
# Fire many concurrent commands without awaiting individually, so
# most end up waiting on _acquire_write_lock rather than in the
# inflight queue. Then disconnect and await them all. Verify:
# - every future resolves (no pending futures left)
# - only Disconnected / Connection error types appear
# - no Timeout, Protocol, or generic die leaking out
# Uses BLPOP with a 0.1s server-side timeout so commands are slow
# enough that disconnect actually catches some mid-flight even on
# fast loopback.
(async sub {
my $r = new_redis();
await $r->connect;
my @futures;
for my $i (1..30) {
t/lib/Test/Async/Redis.pm view on Meta::CPAN
}
# Feed bytes directly into the parser, bypassing the socket. Useful
# for exercising the reader's decode/dispatch path with a crafted frame.
sub inject_unexpected_frame {
my ($redis, $raw_bytes) = @_;
$redis->{parser}->parse($raw_bytes) if $redis->{parser};
}
# Synthesize a fatal timeout directly. Routes through the detach-first
# _reader_fatal path so the typed Async::Redis::Error::Timeout is
# propagated to all inflight futures (not a generic cancellation).
sub force_read_timeout {
my ($redis) = @_;
require Async::Redis::Error::Timeout;
$redis->_reader_fatal(Async::Redis::Error::Timeout->new(
message => "synthetic timeout for test",
timeout => 0,
));
}
1;
__END__
=head1 NAME
t/lib/Test/Async/Redis.pm view on Meta::CPAN
=item inject_unexpected_frame($redis, $raw_bytes)
Feed raw bytes directly into the RESP parser, bypassing the socket.
Useful for exercising the reader's decode/dispatch path with a crafted
or malformed frame without needing a live server to send it.
=item force_read_timeout($redis)
Synthesize a fatal timeout by calling C<_reader_fatal> with a typed
C<Async::Redis::Error::Timeout> object. All inflight futures receive
the typed error via the detach-first path rather than generic
cancellation.
=back
=cut