view release on metacpan or search on metacpan
examples/slow-redis/README.md
examples/slow-redis/app.pl
lib/Async/Redis.pm
lib/Async/Redis/AutoPipeline.pm
lib/Async/Redis/Commands.pm
lib/Async/Redis/Error.pm
lib/Async/Redis/Error/Connection.pm
lib/Async/Redis/Error/Disconnected.pm
lib/Async/Redis/Error/Protocol.pm
lib/Async/Redis/Error/Redis.pm
lib/Async/Redis/Error/Timeout.pm
lib/Async/Redis/Iterator.pm
lib/Async/Redis/KeyExtractor.pm
lib/Async/Redis/Pipeline.pm
lib/Async/Redis/Pool.pm
lib/Async/Redis/Script.pm
lib/Async/Redis/Subscription.pm
lib/Async/Redis/Telemetry.pm
lib/Async/Redis/Transaction.pm
lib/Async/Redis/URI.pm
script/commands.json
port => 6379,
# Or use URI
uri => 'redis://user:pass@host:6379/1',
# Authentication
password => 'secret',
username => 'myuser', # Redis 6+ ACL
database => 1,
# Timeouts
connect_timeout => 10,
request_timeout => 5,
# Auto-reconnect
reconnect => 1,
reconnect_delay => 0.1,
reconnect_delay_max => 60,
# TLS
tls => {
examples/pagi-chat/public/js/app.js view on Meta::CPAN
// ===== State =====
const state = {
username: '',
userId: '',
sessionId: '', // Persistent session ID for reconnection
currentRoom: 'general',
rooms: {},
users: {},
ws: null,
sse: null,
typingTimeout: null,
isTyping: false,
reconnectAttempts: 0,
maxReconnectDelay: 30000, // Max 30 seconds between attempts
pingInterval: null,
pingIntervalMs: 10000, // Send ping every 10 seconds
lastPongTime: 0, // Track last pong received
heartbeatCheckInterval: null,
heartbeatTimeoutMs: 35000, // Consider connection dead if no pong in 35s
lastMsgId: 0 // Track last received message ID for catch-up
};
// ===== DOM Elements =====
const elements = {
// Screens
loginScreen: document.getElementById('login-screen'),
chatScreen: document.getElementById('chat-screen'),
// Login
examples/pagi-chat/public/js/app.js view on Meta::CPAN
}
// ===== Toast Notifications =====
function showToast(message, type = 'info') {
const toast = document.createElement('div');
toast.className = `toast ${type}`;
toast.textContent = message;
elements.toastContainer.appendChild(toast);
setTimeout(() => {
toast.style.animation = 'slideIn 0.3s ease reverse';
setTimeout(() => toast.remove(), 300);
}, 3000);
}
// ===== Exponential Backoff =====
function calculateReconnectDelay() {
// Formula: min(1000 * 2^attempts + random(0, 1000), 30000)
const baseDelay = 1000 * Math.pow(2, state.reconnectAttempts);
const jitter = Math.random() * 1000;
return Math.min(baseDelay + jitter, state.maxReconnectDelay);
}
examples/pagi-chat/public/js/app.js view on Meta::CPAN
sendMessage({ type: 'ping' });
}
}, state.pingIntervalMs);
// Start heartbeat timeout check
if (state.heartbeatCheckInterval) {
clearInterval(state.heartbeatCheckInterval);
}
state.heartbeatCheckInterval = setInterval(() => {
const timeSinceLastPong = Date.now() - state.lastPongTime;
if (timeSinceLastPong > state.heartbeatTimeoutMs) {
console.warn('Heartbeat timeout - connection appears dead, reconnecting...');
if (state.ws) {
state.ws.close();
}
}
}, 5000); // Check every 5 seconds
};
state.ws.onclose = (event) => {
setConnectionStatus('disconnected');
examples/pagi-chat/public/js/app.js view on Meta::CPAN
if (state.heartbeatCheckInterval) {
clearInterval(state.heartbeatCheckInterval);
state.heartbeatCheckInterval = null;
}
// Always try to reconnect with exponential backoff
state.reconnectAttempts++;
const delay = calculateReconnectDelay();
console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${state.reconnectAttempts})...`);
setConnectionStatus('reconnecting');
setTimeout(connectWebSocket, delay);
};
state.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
state.ws.onmessage = (event) => {
// Any message resets the heartbeat timer
state.lastPongTime = Date.now();
examples/pagi-chat/public/js/app.js view on Meta::CPAN
if (!state.isTyping) {
state.isTyping = true;
sendMessage({
type: 'typing',
room: state.currentRoom,
typing: true
});
}
// Clear existing timeout
if (state.typingTimeout) {
clearTimeout(state.typingTimeout);
}
// Stop typing after 2 seconds of inactivity
state.typingTimeout = setTimeout(() => {
state.isTyping = false;
sendMessage({
type: 'typing',
room: state.currentRoom,
typing: false
});
}, 2000);
}
// ===== Event Handlers =====
examples/pagi-chat/public/js/app.js view on Meta::CPAN
const text = elements.messageInput.value.trim();
if (text) {
sendMessage({
type: 'message',
room: state.currentRoom,
text: text
});
elements.messageInput.value = '';
// Stop typing indicator
if (state.typingTimeout) {
clearTimeout(state.typingTimeout);
}
state.isTyping = false;
}
});
// Typing detection
elements.messageInput.addEventListener('input', handleTyping);
// Leave room button
elements.leaveRoomBtn.addEventListener('click', () => {
lib/Async/Redis.pm view on Meta::CPAN
use Future;
use Future::AsyncAwait;
use Future::IO 0.17; # Need read/write methods
use Socket qw(pack_sockaddr_in inet_aton AF_INET SOCK_STREAM);
use IO::Socket::INET;
use Time::HiRes ();
# Error classes
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Redis;
use Async::Redis::Error::Protocol;
# Import auto-generated command methods
use Async::Redis::Commands;
our @ISA = qw(Async::Redis::Commands);
# Key extraction for prefixing
use Async::Redis::KeyExtractor;
lib/Async/Redis.pm view on Meta::CPAN
}
}
my $self = bless {
host => $args{host} // 'localhost',
port => $args{port} // 6379,
socket => undef,
parser => undef,
connected => 0,
# Timeout settings
connect_timeout => $args{connect_timeout} // 10,
read_timeout => $args{read_timeout} // 30,
write_timeout => $args{write_timeout} // 30,
request_timeout => $args{request_timeout} // 5,
blocking_timeout_buffer => $args{blocking_timeout_buffer} // 2,
# Inflight tracking with deadlines
# Entry: { future => $f, cmd => $cmd, args => \@args, deadline => $t, sent_at => $t }
inflight => [],
lib/Async/Redis.pm view on Meta::CPAN
});
my $completed_f = await $result_f;
# Now check the result
if ($completed_f->is_failed) {
my ($error) = $completed_f->failure;
close $socket;
if ($error eq 'connect_timeout') {
die Async::Redis::Error::Timeout->new(
message => "Connect timed out after $self->{connect_timeout}s",
timeout => $self->{connect_timeout},
);
}
die Async::Redis::Error::Connection->new(
message => "$error",
host => $self->{host},
port => $self->{port},
);
}
lib/Async/Redis.pm view on Meta::CPAN
host => $self->{host},
port => $self->{port},
);
# Drive handshake with non-blocking loop
my $deadline = Time::HiRes::time() + $self->{connect_timeout};
while (1) {
# Check timeout
if (Time::HiRes::time() >= $deadline) {
die Async::Redis::Error::Timeout->new(
message => "TLS handshake timed out",
timeout => $self->{connect_timeout},
);
}
# Attempt handshake step
my $rv = $socket->connect_SSL;
if ($rv) {
# Handshake complete!
lib/Async/Redis.pm view on Meta::CPAN
# Wait for socket to become readable with timeout
my $read_f = Future::IO->waitfor_readable($socket);
my $timeout_f = Future::IO->sleep($remaining)->then(sub {
return Future->fail('tls_timeout');
});
my $wait_f = Future->wait_any($read_f, $timeout_f);
await $wait_f;
if ($wait_f->is_failed) {
die Async::Redis::Error::Timeout->new(
message => "TLS handshake timed out",
timeout => $self->{connect_timeout},
);
}
}
elsif ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_WRITE()) {
# Wait for socket to become writable with timeout
my $write_f = Future::IO->waitfor_writable($socket);
my $timeout_f = Future::IO->sleep($remaining)->then(sub {
return Future->fail('tls_timeout');
});
my $wait_f = Future->wait_any($write_f, $timeout_f);
await $wait_f;
if ($wait_f->is_failed) {
die Async::Redis::Error::Timeout->new(
message => "TLS handshake timed out",
timeout => $self->{connect_timeout},
);
}
}
else {
# Actual error
die Async::Redis::Error::Connection->new(
message => "TLS handshake failed: " . IO::Socket::SSL::errstr(),
host => $self->{host},
lib/Async/Redis.pm view on Meta::CPAN
if (my $msg = $self->{parser}->get_message) {
return $msg;
}
# Read until we get a complete message
while (1) {
my $remaining = $deadline - Time::HiRes::time();
if ($remaining <= 0) {
$self->_reset_connection;
die Async::Redis::Error::Timeout->new(
message => "Request timed out after $self->{request_timeout}s",
command => $cmd_ref,
timeout => $self->{request_timeout},
maybe_executed => 1, # already sent the command
);
}
# Use wait_any for timeout
my $read_f = Future::IO->read($self->{socket}, 65536);
my $timeout_f = Future::IO->sleep($remaining)->then(sub {
return Future->fail('read_timeout');
});
my $wait_f = Future->wait_any($read_f, $timeout_f);
await $wait_f;
if ($wait_f->is_failed) {
my ($error) = $wait_f->failure;
if ($error eq 'read_timeout') {
$self->_reset_connection;
die Async::Redis::Error::Timeout->new(
message => "Request timed out after $self->{request_timeout}s",
command => $cmd_ref,
timeout => $self->{request_timeout},
maybe_executed => 1,
);
}
$self->_reset_connection;
die Async::Redis::Error::Connection->new(
message => "$error",
);
lib/Async/Redis.pm view on Meta::CPAN
return unless @{$self->{inflight}};
my $deadline = Time::HiRes::time() + $timeout;
while (@{$self->{inflight}} && Time::HiRes::time() < $deadline) {
await Future::IO->sleep(0.001);
}
if (@{$self->{inflight}}) {
$self->_fail_all_inflight("Timeout waiting for inflight commands");
}
}
async sub publish {
my ($self, $channel, $message) = @_;
return await $self->command('PUBLISH', $channel, $message);
}
async sub spublish {
my ($self, $channel, $message) = @_;
lib/Async/Redis.pm view on Meta::CPAN
Errors are thrown as exception objects:
use Try::Tiny;
try {
await $redis->get('key');
} catch {
if ($_->isa('Async::Redis::Error::Connection')) {
# Connection error
} elsif ($_->isa('Async::Redis::Error::Timeout')) {
# Timeout error
} elsif ($_->isa('Async::Redis::Error::Redis')) {
# Redis error (e.g., WRONGTYPE)
}
};
Exception classes:
=over 4
=item Async::Redis::Error::Connection
Connection-related errors (refused, reset, etc.)
=item Async::Redis::Error::Timeout
Timeout errors (connect, request, read).
=item Async::Redis::Error::Protocol
Protocol parsing errors.
=item Async::Redis::Error::Redis
Errors returned by Redis (WRONGTYPE, ERR, etc.)
=item Async::Redis::Error::Disconnected
lib/Async/Redis/Error/Timeout.pm view on Meta::CPAN
package Async::Redis::Error::Timeout;
use strict;
use warnings;
use 5.018;
our $VERSION = '0.001';
use parent 'Async::Redis::Error';
sub command { shift->{command} }
sub timeout { shift->{timeout} }
sub maybe_executed { shift->{maybe_executed} }
1;
__END__
=head1 NAME
Async::Redis::Error::Timeout - Timeout exception
=head1 DESCRIPTION
Thrown when a Redis operation times out.
=head1 ATTRIBUTES
=over 4
=item command - The command that timed out (arrayref)
lib/Async/Redis/Pool.pm view on Meta::CPAN
package Async::Redis::Pool;
use strict;
use warnings;
use 5.018;
use Future;
use Future::AsyncAwait;
use Future::IO;
use Async::Redis;
use Async::Redis::Error::Timeout;
our $VERSION = '0.001';
sub new {
my ($class, %args) = @_;
my $self = bless {
# Connection params (passed to Async::Redis->new)
host => $args{host} // 'localhost',
port => $args{port} // 6379,
password => $args{password},
database => $args{database},
tls => $args{tls},
uri => $args{uri},
# Pool sizing
min => $args{min} // 1,
max => $args{max} // 10,
# Timeouts
acquire_timeout => $args{acquire_timeout} // 5,
idle_timeout => $args{idle_timeout} // 60,
connect_timeout => $args{connect_timeout} // 10,
cleanup_timeout => $args{cleanup_timeout} // 5,
# Dirty handling
on_dirty => $args{on_dirty} // 'destroy',
# Pool state
_idle => [], # Available connections
lib/Async/Redis/Pool.pm view on Meta::CPAN
my $conn = await $self->_create_connection;
$self->{_active}{"$conn"} = $conn;
return $conn;
}
# At max capacity - wait for release
my $waiter = Future->new;
push @{$self->{_waiters}}, $waiter;
my $timeout_future = Future::IO->sleep($self->{acquire_timeout})->then(sub {
Future->fail(Async::Redis::Error::Timeout->new(
message => "Acquire timed out after $self->{acquire_timeout}s",
timeout => $self->{acquire_timeout},
));
});
my $wait_f = Future->wait_any($waiter, $timeout_future);
my $result;
eval {
$result = await $wait_f;
t/01-unit/error.t view on Meta::CPAN
# t/01-unit/error.t
use strict;
use warnings;
use Test2::V0;
use Async::Redis::Error;
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Protocol;
use Async::Redis::Error::Redis;
use Async::Redis::Error::Disconnected;
subtest 'Error base class' => sub {
my $e = Async::Redis::Error->new(message => 'test error');
ok($e->isa('Async::Redis::Error'), 'isa Error');
is($e->message, 'test error', 'message accessor');
like("$e", qr/test error/, 'stringifies to message');
};
t/01-unit/error.t view on Meta::CPAN
reason => 'timeout',
);
ok($e->isa('Async::Redis::Error'), 'isa base Error');
ok($e->isa('Async::Redis::Error::Connection'), 'isa Connection');
is($e->host, 'localhost', 'host accessor');
is($e->port, 6379, 'port accessor');
is($e->reason, 'timeout', 'reason accessor');
like("$e", qr/connection lost/, 'stringifies');
};
subtest 'Timeout error' => sub {
my $e = Async::Redis::Error::Timeout->new(
message => 'request timed out',
command => ['GET', 'mykey'],
timeout => 5,
maybe_executed => 0,
);
ok($e->isa('Async::Redis::Error'), 'isa base Error');
ok($e->isa('Async::Redis::Error::Timeout'), 'isa Timeout');
is($e->command, ['GET', 'mykey'], 'command accessor');
is($e->timeout, 5, 'timeout accessor');
ok(!$e->maybe_executed, 'maybe_executed false');
my $e2 = Async::Redis::Error::Timeout->new(
message => 'timed out after write',
maybe_executed => 1,
);
ok($e2->maybe_executed, 'maybe_executed true');
};
subtest 'Protocol error' => sub {
my $e = Async::Redis::Error::Protocol->new(
message => 'unexpected response type',
data => '+OK',