Async-Redis
view release on metacpan or search on metacpan
- Pipeline results contain error objects at failed slot positions
- Bug Fix: Pool max connection race condition
- Concurrent acquire() calls could exceed max connections
- Added _creating counter to track in-flight connection creations
- Dependency Change: Future::IO bumped to 0.23 (was 0.19)
- Future::IO 0.22+ broke load_impl('IOAsync') by adding a poll
method check that IO::Async's impl doesn't satisfy
- Removed IO::Async as a test dependency entirely
- Tests now use Future::IO's built-in poll-based default impl
- Test suite is fully event-loop agnostic
- Examples: pagi-chat stats timer uses Future::IO instead of IO::Async
0.001005 2026-03-15
- Feature: Unix domain socket connections
- Connect via path parameter or redis+unix:// URI scheme
- Constructor stores path and skips host/port for unix sockets
- Added docker-compose redis-unix service for testing
- Feature: PubSub auto-resubscribe on reconnect
- Subscriptions automatically re-established after connection drop
- on_reconnect callback on Subscription object for notification
- _read_pubsub_frame checks connected state to fail fast
examples/pagi-chat/lib/ChatApp/State.pm view on Meta::CPAN
# Separate connection for PubSub
my $pubsub_redis = Async::Redis->new(host => $host, port => $port);
await $pubsub_redis->connect;
$pubsub = await $pubsub_redis->subscribe(BROADCAST_CHANNEL);
# Initialize background task selector and start the runner
$background_selector = Future::Selector->new;
_start_selector_runner();
# Start periodic stats timer (every 10 seconds)
_start_stats_timer();
# Initialize default rooms
await add_room('general', 'system');
await add_room('random', 'system');
await add_room('help', 'system');
print STDERR "Worker $$: Redis state initialized\n";
return $redis;
}
sub get_redis { $redis }
sub get_pubsub { $pubsub }
# Background selector runner
my $selector_runner_future;
# Periodic stats timer
my $stats_timer;
# Start the selector with the PubSub listener as the main long-running task
sub _start_selector_runner {
# Add the broadcast listener as a long-running task
# Use gen => to provide a generator that creates the future
# (f => expects a completed future, gen => expects a coderef)
$background_selector->add(
data => 'pubsub-listener',
gen => sub { _broadcast_listener() },
);
# Run the selector in the background
$selector_runner_future = $background_selector->run->on_fail(sub {
my ($err) = @_;
warn "[selector] Runner failed: $err";
})->retain;
}
# Start periodic stats timer using Future::IO (event-loop agnostic)
sub _start_stats_timer {
$stats_timer = (async sub {
while (1) {
await Future::IO->sleep(10);
next unless %local_sessions; # Skip if no clients
eval {
my $stats = await get_stats();
my $msg = $JSON->encode({
global => 1,
payload => {
type => 'stats',
users_online => $stats->{users_online},
examples/pagi-chat/public/js/app.js view on Meta::CPAN
console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${state.reconnectAttempts})...`);
setConnectionStatus('reconnecting');
setTimeout(connectWebSocket, delay);
};
state.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
state.ws.onmessage = (event) => {
// Any message resets the heartbeat timer
state.lastPongTime = Date.now();
try {
const data = JSON.parse(event.data);
handleWebSocketMessage(data);
} catch (e) {
console.error('Failed to parse message:', e);
}
};
}
( run in 0.835 second using v1.01-cache-2.11-cpan-adec679a428 )