Async-Redis

 view release on metacpan or  search on metacpan

examples/pagi-chat/lib/ChatApp/State.pm  view on Meta::CPAN

package ChatApp::State;

#
# Redis-backed State Management
#
# This replaces the in-memory state from PAGI's websocket-chat-v2 example
# with Redis, enabling multi-worker deployments.
#
# Key differences:
# - Sessions stored in Redis hashes
# - Room membership in Redis sets
# - Messages in Redis lists
# - PubSub for cross-worker broadcasting
#

use strict;
use warnings;
use Future;
use Future::AsyncAwait;
use Future::Selector;
use Exporter 'import';
use JSON::MaybeXS;
use Time::HiRes qw(time);
use Scalar::Util qw(weaken);
use Future::IO;

our @EXPORT_OK = qw(
    init_redis get_redis get_pubsub
    get_session create_session update_session remove_session
    get_session_by_name set_session_connected set_session_disconnected
    is_session_connected
    get_room add_room remove_room get_all_rooms
    add_user_to_room remove_user_from_room get_room_users
    add_message get_room_messages
    get_stats generate_id sanitize_username sanitize_room_name
    subscribe_broadcasts register_local_session unregister_local_session
    broadcast_to_room broadcast_global add_local_room
    add_background_task
);

my $JSON = JSON::MaybeXS->new->utf8->canonical;

# Redis connections (per-worker)
my $redis;
my $pubsub;
my $pubsub_subscription;

# Background task selector (per-worker)
my $background_selector;

# Local session callbacks (for this worker only)
# Redis PubSub delivers to all workers, but we only call callbacks for OUR clients
my %local_sessions;

use constant {
    MAX_MESSAGES_PER_ROOM => 100,
    SESSION_TTL           => 86400,    # 24 hours
    BROADCAST_CHANNEL     => 'chat:broadcast',
};

# Track server start time for uptime
my $server_start_time = time();

# Initialize Redis connections for this worker
async sub init_redis {
    my (%args) = @_;

    require Async::Redis;

    my $host = $args{host} // $ENV{REDIS_HOST} // 'localhost';
    my $port = $args{port} // $ENV{REDIS_PORT} // 6379;

    # Main connection for commands
    $redis = Async::Redis->new(host => $host, port => $port);
    await $redis->connect;

    # Separate connection for PubSub
    my $pubsub_redis = Async::Redis->new(host => $host, port => $port);
    await $pubsub_redis->connect;
    $pubsub = await $pubsub_redis->subscribe(BROADCAST_CHANNEL);

    # Initialize background task selector and start the runner
    $background_selector = Future::Selector->new;
    _start_selector_runner();



( run in 3.292 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )