Async-Redis

 view release on metacpan or  search on metacpan

examples/pagi-chat/lib/ChatApp/State.pm  view on Meta::CPAN

        payload    => $payload,
    });

    await $redis->publish(BROADCAST_CHANNEL, $msg);
}

# Broadcast to all connected users globally via Redis PubSub
async sub broadcast_global {
    my ($payload) = @_;

    my $msg = $JSON->encode({
        global     => 1,
        payload    => $payload,
    });

    await $redis->publish(BROADCAST_CHANNEL, $msg);
}

sub generate_id {
    require Digest::SHA;
    return Digest::SHA::sha256_hex(time() . $$ . rand());
}

sub sanitize_username {
    my ($name) = @_;
    $name //= '';
    $name =~ s/[^\w]/_/g;
    $name = substr($name, 0, 20);
    $name = 'User' . int(rand(1000)) if length($name) < 2;
    return $name;
}

sub sanitize_room_name {
    my ($name) = @_;
    $name //= '';
    $name =~ s/[^\w-]/_/g;
    $name = lc(substr($name, 0, 30));
    $name = 'room' . int(rand(1000)) if length($name) < 2;
    return $name;
}

# Session management (Redis hashes)
async sub get_session {
    my ($session_id) = @_;
    return unless $session_id;

    my $data = await $redis->hgetall("session:$session_id");
    return unless $data && %$data;

    # Deserialize rooms
    $data->{rooms} = $data->{rooms} ? $JSON->decode($data->{rooms}) : {};
    $data->{connected} = $data->{connected} ? 1 : 0;

    return $data;
}

async sub get_session_by_name {
    my ($name) = @_;

    # Scan for session with this name (not efficient, but works for demo)
    my $cursor = "0";
    do {
        my $result = await $redis->scan($cursor, MATCH => 'session:*', COUNT => 100);
        $cursor = $result->[0];
        my $keys = $result->[1] // [];

        for my $key (@$keys) {
            my $session = await $redis->hgetall($key);
            if ($session && $session->{name} eq $name && $session->{connected}) {
                $session->{rooms} = $session->{rooms} ? $JSON->decode($session->{rooms}) : {};
                return $session;
            }
        }
    } while ($cursor && $cursor ne "0");

    return;
}

async sub create_session {
    my ($session_id, $name, $send_cb) = @_;

    my $session = {
        id        => $session_id,
        name      => $name,
        connected => 1,
        joined_at => time(),
        last_seen => time(),
        rooms     => {},
    };

    await $redis->hmset("session:$session_id",
        id        => $session_id,
        name      => $name,
        connected => 1,
        joined_at => $session->{joined_at},
        last_seen => $session->{last_seen},
        rooms     => '{}',
    );
    await $redis->expire("session:$session_id", SESSION_TTL);
    await $redis->sadd('connected:sessions', $session_id);

    # Track locally for this worker
    register_local_session($session_id, $send_cb);

    return $session;
}

async sub update_session {
    my ($session_id, $updates) = @_;

    my @args;
    for my $key (keys %$updates) {
        my $val = $updates->{$key};
        $val = $JSON->encode($val) if ref $val;
        push @args, $key, $val;
    }

    await $redis->hmset("session:$session_id", @args) if @args;
    await $redis->expire("session:$session_id", SESSION_TTL);
}

async sub set_session_connected {
    my ($session_id, $send_cb) = @_;

    await update_session($session_id, { connected => 1, last_seen => time() });
    await $redis->sadd('connected:sessions', $session_id);
    register_local_session($session_id, $send_cb);

    return await get_session($session_id);
}

async sub set_session_disconnected {
    my ($session_id) = @_;



( run in 0.998 second using v1.01-cache-2.11-cpan-39bf76dae61 )