Async-Redis
view release on metacpan or search on metacpan
examples/pagi-chat/lib/ChatApp/State.pm view on Meta::CPAN
payload => $payload,
});
await $redis->publish(BROADCAST_CHANNEL, $msg);
}
# Broadcast to all connected users globally via Redis PubSub
async sub broadcast_global {
my ($payload) = @_;
my $msg = $JSON->encode({
global => 1,
payload => $payload,
});
await $redis->publish(BROADCAST_CHANNEL, $msg);
}
sub generate_id {
require Digest::SHA;
return Digest::SHA::sha256_hex(time() . $$ . rand());
}
sub sanitize_username {
my ($name) = @_;
$name //= '';
$name =~ s/[^\w]/_/g;
$name = substr($name, 0, 20);
$name = 'User' . int(rand(1000)) if length($name) < 2;
return $name;
}
sub sanitize_room_name {
my ($name) = @_;
$name //= '';
$name =~ s/[^\w-]/_/g;
$name = lc(substr($name, 0, 30));
$name = 'room' . int(rand(1000)) if length($name) < 2;
return $name;
}
# Session management (Redis hashes)
async sub get_session {
my ($session_id) = @_;
return unless $session_id;
my $data = await $redis->hgetall("session:$session_id");
return unless $data && %$data;
# Deserialize rooms
$data->{rooms} = $data->{rooms} ? $JSON->decode($data->{rooms}) : {};
$data->{connected} = $data->{connected} ? 1 : 0;
return $data;
}
async sub get_session_by_name {
my ($name) = @_;
# Scan for session with this name (not efficient, but works for demo)
my $cursor = "0";
do {
my $result = await $redis->scan($cursor, MATCH => 'session:*', COUNT => 100);
$cursor = $result->[0];
my $keys = $result->[1] // [];
for my $key (@$keys) {
my $session = await $redis->hgetall($key);
if ($session && $session->{name} eq $name && $session->{connected}) {
$session->{rooms} = $session->{rooms} ? $JSON->decode($session->{rooms}) : {};
return $session;
}
}
} while ($cursor && $cursor ne "0");
return;
}
async sub create_session {
my ($session_id, $name, $send_cb) = @_;
my $session = {
id => $session_id,
name => $name,
connected => 1,
joined_at => time(),
last_seen => time(),
rooms => {},
};
await $redis->hmset("session:$session_id",
id => $session_id,
name => $name,
connected => 1,
joined_at => $session->{joined_at},
last_seen => $session->{last_seen},
rooms => '{}',
);
await $redis->expire("session:$session_id", SESSION_TTL);
await $redis->sadd('connected:sessions', $session_id);
# Track locally for this worker
register_local_session($session_id, $send_cb);
return $session;
}
async sub update_session {
my ($session_id, $updates) = @_;
my @args;
for my $key (keys %$updates) {
my $val = $updates->{$key};
$val = $JSON->encode($val) if ref $val;
push @args, $key, $val;
}
await $redis->hmset("session:$session_id", @args) if @args;
await $redis->expire("session:$session_id", SESSION_TTL);
}
async sub set_session_connected {
my ($session_id, $send_cb) = @_;
await update_session($session_id, { connected => 1, last_seen => time() });
await $redis->sadd('connected:sessions', $session_id);
register_local_session($session_id, $send_cb);
return await get_session($session_id);
}
async sub set_session_disconnected {
my ($session_id) = @_;
( run in 0.998 second using v1.01-cache-2.11-cpan-39bf76dae61 )