Async-Redis
view release on metacpan or search on metacpan
examples/pagi-chat/lib/ChatApp/State.pm view on Meta::CPAN
package ChatApp::State;
#
# Redis-backed State Management
#
# This replaces the in-memory state from PAGI's websocket-chat-v2 example
# with Redis, enabling multi-worker deployments.
#
# Key differences:
# - Sessions stored in Redis hashes
# - Room membership in Redis sets
# - Messages in Redis lists
# - PubSub for cross-worker broadcasting
#
use strict;
use warnings;
use Future;
use Future::AsyncAwait;
use Future::Selector;
use Exporter 'import';
use JSON::MaybeXS;
use Time::HiRes qw(time);
use Scalar::Util qw(weaken);
use Future::IO;
our @EXPORT_OK = qw(
init_redis get_redis get_pubsub
get_session create_session update_session remove_session
get_session_by_name set_session_connected set_session_disconnected
is_session_connected
get_room add_room remove_room get_all_rooms
add_user_to_room remove_user_from_room get_room_users
add_message get_room_messages
get_stats generate_id sanitize_username sanitize_room_name
subscribe_broadcasts register_local_session unregister_local_session
broadcast_to_room broadcast_global add_local_room
add_background_task
);
my $JSON = JSON::MaybeXS->new->utf8->canonical;
# Redis connections (per-worker)
my $redis;
my $pubsub;
my $pubsub_subscription;
# Background task selector (per-worker)
my $background_selector;
# Local session callbacks (for this worker only)
# Redis PubSub delivers to all workers, but we only call callbacks for OUR clients
my %local_sessions;
use constant {
MAX_MESSAGES_PER_ROOM => 100,
SESSION_TTL => 86400, # 24 hours
BROADCAST_CHANNEL => 'chat:broadcast',
};
# Track server start time for uptime
my $server_start_time = time();
# Initialize Redis connections for this worker
async sub init_redis {
my (%args) = @_;
require Async::Redis;
my $host = $args{host} // $ENV{REDIS_HOST} // 'localhost';
my $port = $args{port} // $ENV{REDIS_PORT} // 6379;
# Main connection for commands
$redis = Async::Redis->new(host => $host, port => $port);
await $redis->connect;
# Separate connection for PubSub
my $pubsub_redis = Async::Redis->new(host => $host, port => $port);
await $pubsub_redis->connect;
$pubsub = await $pubsub_redis->subscribe(BROADCAST_CHANNEL);
# Initialize background task selector and start the runner
$background_selector = Future::Selector->new;
_start_selector_runner();
( run in 3.292 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )