Async-Redis

 view release on metacpan or  search on metacpan

MANIFEST  view on Meta::CPAN

examples/slow-redis/README.md
examples/slow-redis/app.pl
lib/Async/Redis.pm
lib/Async/Redis/AutoPipeline.pm
lib/Async/Redis/Commands.pm
lib/Async/Redis/Error.pm
lib/Async/Redis/Error/Connection.pm
lib/Async/Redis/Error/Disconnected.pm
lib/Async/Redis/Error/Protocol.pm
lib/Async/Redis/Error/Redis.pm
lib/Async/Redis/Error/Timeout.pm
lib/Async/Redis/Iterator.pm
lib/Async/Redis/KeyExtractor.pm
lib/Async/Redis/Pipeline.pm
lib/Async/Redis/Pool.pm
lib/Async/Redis/Script.pm
lib/Async/Redis/Subscription.pm
lib/Async/Redis/Telemetry.pm
lib/Async/Redis/Transaction.pm
lib/Async/Redis/URI.pm
script/commands.json

README.md  view on Meta::CPAN

    port => 6379,

    # Or use URI
    uri => 'redis://user:pass@host:6379/1',

    # Authentication
    password => 'secret',
    username => 'myuser',  # Redis 6+ ACL
    database => 1,

    # Timeouts
    connect_timeout => 10,
    request_timeout => 5,

    # Auto-reconnect
    reconnect       => 1,
    reconnect_delay => 0.1,
    reconnect_delay_max => 60,

    # TLS
    tls => {

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

    // ===== State =====
    const state = {
        username: '',
        userId: '',
        sessionId: '',          // Persistent session ID for reconnection
        currentRoom: 'general',
        rooms: {},
        users: {},
        ws: null,
        sse: null,
        typingTimeout: null,
        isTyping: false,
        reconnectAttempts: 0,
        maxReconnectDelay: 30000,   // Max 30 seconds between attempts
        pingInterval: null,
        pingIntervalMs: 10000,      // Send ping every 10 seconds
        lastPongTime: 0,            // Track last pong received
        heartbeatCheckInterval: null,
        heartbeatTimeoutMs: 35000,  // Consider connection dead if no pong in 35s
        lastMsgId: 0                // Track last received message ID for catch-up
    };

    // ===== DOM Elements =====
    const elements = {
        // Screens
        loginScreen: document.getElementById('login-screen'),
        chatScreen: document.getElementById('chat-screen'),

        // Login

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

    }

    // ===== Toast Notifications =====
    function showToast(message, type = 'info') {
        const toast = document.createElement('div');
        toast.className = `toast ${type}`;
        toast.textContent = message;

        elements.toastContainer.appendChild(toast);

        setTimeout(() => {
            toast.style.animation = 'slideIn 0.3s ease reverse';
            setTimeout(() => toast.remove(), 300);
        }, 3000);
    }

    // ===== Exponential Backoff =====
    function calculateReconnectDelay() {
        // Formula: min(1000 * 2^attempts + random(0, 1000), 30000)
        const baseDelay = 1000 * Math.pow(2, state.reconnectAttempts);
        const jitter = Math.random() * 1000;
        return Math.min(baseDelay + jitter, state.maxReconnectDelay);
    }

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

                    sendMessage({ type: 'ping' });
                }
            }, state.pingIntervalMs);

            // Start heartbeat timeout check
            if (state.heartbeatCheckInterval) {
                clearInterval(state.heartbeatCheckInterval);
            }
            state.heartbeatCheckInterval = setInterval(() => {
                const timeSinceLastPong = Date.now() - state.lastPongTime;
                if (timeSinceLastPong > state.heartbeatTimeoutMs) {
                    console.warn('Heartbeat timeout - connection appears dead, reconnecting...');
                    if (state.ws) {
                        state.ws.close();
                    }
                }
            }, 5000); // Check every 5 seconds
        };

        state.ws.onclose = (event) => {
            setConnectionStatus('disconnected');

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

            if (state.heartbeatCheckInterval) {
                clearInterval(state.heartbeatCheckInterval);
                state.heartbeatCheckInterval = null;
            }

            // Always try to reconnect with exponential backoff
            state.reconnectAttempts++;
            const delay = calculateReconnectDelay();
            console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${state.reconnectAttempts})...`);
            setConnectionStatus('reconnecting');
            setTimeout(connectWebSocket, delay);
        };

        state.ws.onerror = (error) => {
            console.error('WebSocket error:', error);
        };

        state.ws.onmessage = (event) => {
            // Any message resets the heartbeat timer
            state.lastPongTime = Date.now();

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

        if (!state.isTyping) {
            state.isTyping = true;
            sendMessage({
                type: 'typing',
                room: state.currentRoom,
                typing: true
            });
        }

        // Clear existing timeout
        if (state.typingTimeout) {
            clearTimeout(state.typingTimeout);
        }

        // Stop typing after 2 seconds of inactivity
        state.typingTimeout = setTimeout(() => {
            state.isTyping = false;
            sendMessage({
                type: 'typing',
                room: state.currentRoom,
                typing: false
            });
        }, 2000);
    }

    // ===== Event Handlers =====

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

            const text = elements.messageInput.value.trim();
            if (text) {
                sendMessage({
                    type: 'message',
                    room: state.currentRoom,
                    text: text
                });
                elements.messageInput.value = '';

                // Stop typing indicator
                if (state.typingTimeout) {
                    clearTimeout(state.typingTimeout);
                }
                state.isTyping = false;
            }
        });

        // Typing detection
        elements.messageInput.addEventListener('input', handleTyping);

        // Leave room button
        elements.leaveRoomBtn.addEventListener('click', () => {

lib/Async/Redis.pm  view on Meta::CPAN


use Future;
use Future::AsyncAwait;
use Future::IO 0.17;  # Need read/write methods
use Socket qw(pack_sockaddr_in inet_aton AF_INET SOCK_STREAM);
use IO::Socket::INET;
use Time::HiRes ();

# Error classes
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Redis;
use Async::Redis::Error::Protocol;

# Import auto-generated command methods
use Async::Redis::Commands;
our @ISA = qw(Async::Redis::Commands);

# Key extraction for prefixing
use Async::Redis::KeyExtractor;

lib/Async/Redis.pm  view on Meta::CPAN

        }
    }

    my $self = bless {
        host     => $args{host} // 'localhost',
        port     => $args{port} // 6379,
        socket   => undef,
        parser   => undef,
        connected => 0,

        # Timeout settings
        connect_timeout         => $args{connect_timeout} // 10,
        read_timeout            => $args{read_timeout} // 30,
        write_timeout           => $args{write_timeout} // 30,
        request_timeout         => $args{request_timeout} // 5,
        blocking_timeout_buffer => $args{blocking_timeout_buffer} // 2,

        # Inflight tracking with deadlines
        # Entry: { future => $f, cmd => $cmd, args => \@args, deadline => $t, sent_at => $t }
        inflight => [],

lib/Async/Redis.pm  view on Meta::CPAN

    });

    my $completed_f = await $result_f;

    # Now check the result
    if ($completed_f->is_failed) {
        my ($error) = $completed_f->failure;
        close $socket;

        if ($error eq 'connect_timeout') {
            die Async::Redis::Error::Timeout->new(
                message => "Connect timed out after $self->{connect_timeout}s",
                timeout => $self->{connect_timeout},
            );
        }
        die Async::Redis::Error::Connection->new(
            message => "$error",
            host    => $self->{host},
            port    => $self->{port},
        );
    }

lib/Async/Redis.pm  view on Meta::CPAN

            host    => $self->{host},
            port    => $self->{port},
        );

    # Drive handshake with non-blocking loop
    my $deadline = Time::HiRes::time() + $self->{connect_timeout};

    while (1) {
        # Check timeout
        if (Time::HiRes::time() >= $deadline) {
            die Async::Redis::Error::Timeout->new(
                message => "TLS handshake timed out",
                timeout => $self->{connect_timeout},
            );
        }

        # Attempt handshake step
        my $rv = $socket->connect_SSL;

        if ($rv) {
            # Handshake complete!

lib/Async/Redis.pm  view on Meta::CPAN

            # Wait for socket to become readable with timeout
            my $read_f = Future::IO->waitfor_readable($socket);
            my $timeout_f = Future::IO->sleep($remaining)->then(sub {
                return Future->fail('tls_timeout');
            });

            my $wait_f = Future->wait_any($read_f, $timeout_f);
            await $wait_f;

            if ($wait_f->is_failed) {
                die Async::Redis::Error::Timeout->new(
                    message => "TLS handshake timed out",
                    timeout => $self->{connect_timeout},
                );
            }
        }
        elsif ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_WRITE()) {
            # Wait for socket to become writable with timeout
            my $write_f = Future::IO->waitfor_writable($socket);
            my $timeout_f = Future::IO->sleep($remaining)->then(sub {
                return Future->fail('tls_timeout');
            });

            my $wait_f = Future->wait_any($write_f, $timeout_f);
            await $wait_f;

            if ($wait_f->is_failed) {
                die Async::Redis::Error::Timeout->new(
                    message => "TLS handshake timed out",
                    timeout => $self->{connect_timeout},
                );
            }
        }
        else {
            # Actual error
            die Async::Redis::Error::Connection->new(
                message => "TLS handshake failed: " . IO::Socket::SSL::errstr(),
                host    => $self->{host},

lib/Async/Redis.pm  view on Meta::CPAN

    if (my $msg = $self->{parser}->get_message) {
        return $msg;
    }

    # Read until we get a complete message
    while (1) {
        my $remaining = $deadline - Time::HiRes::time();

        if ($remaining <= 0) {
            $self->_reset_connection;
            die Async::Redis::Error::Timeout->new(
                message        => "Request timed out after $self->{request_timeout}s",
                command        => $cmd_ref,
                timeout        => $self->{request_timeout},
                maybe_executed => 1,  # already sent the command
            );
        }

        # Use wait_any for timeout
        my $read_f = Future::IO->read($self->{socket}, 65536);
        my $timeout_f = Future::IO->sleep($remaining)->then(sub {
            return Future->fail('read_timeout');
        });

        my $wait_f = Future->wait_any($read_f, $timeout_f);
        await $wait_f;

        if ($wait_f->is_failed) {
            my ($error) = $wait_f->failure;
            if ($error eq 'read_timeout') {
                $self->_reset_connection;
                die Async::Redis::Error::Timeout->new(
                    message        => "Request timed out after $self->{request_timeout}s",
                    command        => $cmd_ref,
                    timeout        => $self->{request_timeout},
                    maybe_executed => 1,
                );
            }
            $self->_reset_connection;
            die Async::Redis::Error::Connection->new(
                message => "$error",
            );

lib/Async/Redis.pm  view on Meta::CPAN


    return unless @{$self->{inflight}};

    my $deadline = Time::HiRes::time() + $timeout;

    while (@{$self->{inflight}} && Time::HiRes::time() < $deadline) {
        await Future::IO->sleep(0.001);
    }

    if (@{$self->{inflight}}) {
        $self->_fail_all_inflight("Timeout waiting for inflight commands");
    }
}

async sub publish {
    my ($self, $channel, $message) = @_;
    return await $self->command('PUBLISH', $channel, $message);
}

async sub spublish {
    my ($self, $channel, $message) = @_;

lib/Async/Redis.pm  view on Meta::CPAN


Errors are thrown as exception objects:

    use Try::Tiny;

    try {
        await $redis->get('key');
    } catch {
        if ($_->isa('Async::Redis::Error::Connection')) {
            # Connection error
        } elsif ($_->isa('Async::Redis::Error::Timeout')) {
            # Timeout error
        } elsif ($_->isa('Async::Redis::Error::Redis')) {
            # Redis error (e.g., WRONGTYPE)
        }
    };

Exception classes:

=over 4

=item Async::Redis::Error::Connection

Connection-related errors (refused, reset, etc.)

=item Async::Redis::Error::Timeout

Timeout errors (connect, request, read).

=item Async::Redis::Error::Protocol

Protocol parsing errors.

=item Async::Redis::Error::Redis

Errors returned by Redis (WRONGTYPE, ERR, etc.)

=item Async::Redis::Error::Disconnected

lib/Async/Redis/Error/Timeout.pm  view on Meta::CPAN

package Async::Redis::Error::Timeout;

use strict;
use warnings;
use 5.018;

our $VERSION = '0.001';

use parent 'Async::Redis::Error';

sub command        { shift->{command} }
sub timeout        { shift->{timeout} }
sub maybe_executed { shift->{maybe_executed} }

1;

__END__

=head1 NAME

Async::Redis::Error::Timeout - Timeout exception

=head1 DESCRIPTION

Thrown when a Redis operation times out.

=head1 ATTRIBUTES

=over 4

=item command - The command that timed out (arrayref)

lib/Async/Redis/Pool.pm  view on Meta::CPAN

package Async::Redis::Pool;

use strict;
use warnings;
use 5.018;

use Future;
use Future::AsyncAwait;
use Future::IO;
use Async::Redis;
use Async::Redis::Error::Timeout;

our $VERSION = '0.001';

sub new {
    my ($class, %args) = @_;

    my $self = bless {
        # Connection params (passed to Async::Redis->new)
        host     => $args{host} // 'localhost',
        port     => $args{port} // 6379,
        password => $args{password},
        database => $args{database},
        tls      => $args{tls},
        uri      => $args{uri},

        # Pool sizing
        min => $args{min} // 1,
        max => $args{max} // 10,

        # Timeouts
        acquire_timeout  => $args{acquire_timeout} // 5,
        idle_timeout     => $args{idle_timeout} // 60,
        connect_timeout  => $args{connect_timeout} // 10,
        cleanup_timeout  => $args{cleanup_timeout} // 5,

        # Dirty handling
        on_dirty => $args{on_dirty} // 'destroy',

        # Pool state
        _idle    => [],   # Available connections

lib/Async/Redis/Pool.pm  view on Meta::CPAN

        my $conn = await $self->_create_connection;
        $self->{_active}{"$conn"} = $conn;
        return $conn;
    }

    # At max capacity - wait for release
    my $waiter = Future->new;
    push @{$self->{_waiters}}, $waiter;

    my $timeout_future = Future::IO->sleep($self->{acquire_timeout})->then(sub {
        Future->fail(Async::Redis::Error::Timeout->new(
            message => "Acquire timed out after $self->{acquire_timeout}s",
            timeout => $self->{acquire_timeout},
        ));
    });

    my $wait_f = Future->wait_any($waiter, $timeout_future);

    my $result;
    eval {
        $result = await $wait_f;

t/01-unit/error.t  view on Meta::CPAN

# t/01-unit/error.t
use strict;
use warnings;
use Test2::V0;

use Async::Redis::Error;
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Protocol;
use Async::Redis::Error::Redis;
use Async::Redis::Error::Disconnected;

subtest 'Error base class' => sub {
    my $e = Async::Redis::Error->new(message => 'test error');
    ok($e->isa('Async::Redis::Error'), 'isa Error');
    is($e->message, 'test error', 'message accessor');
    like("$e", qr/test error/, 'stringifies to message');
};

t/01-unit/error.t  view on Meta::CPAN

        reason  => 'timeout',
    );
    ok($e->isa('Async::Redis::Error'), 'isa base Error');
    ok($e->isa('Async::Redis::Error::Connection'), 'isa Connection');
    is($e->host, 'localhost', 'host accessor');
    is($e->port, 6379, 'port accessor');
    is($e->reason, 'timeout', 'reason accessor');
    like("$e", qr/connection lost/, 'stringifies');
};

subtest 'Timeout error' => sub {
    my $e = Async::Redis::Error::Timeout->new(
        message        => 'request timed out',
        command        => ['GET', 'mykey'],
        timeout        => 5,
        maybe_executed => 0,
    );
    ok($e->isa('Async::Redis::Error'), 'isa base Error');
    ok($e->isa('Async::Redis::Error::Timeout'), 'isa Timeout');
    is($e->command, ['GET', 'mykey'], 'command accessor');
    is($e->timeout, 5, 'timeout accessor');
    ok(!$e->maybe_executed, 'maybe_executed false');

    my $e2 = Async::Redis::Error::Timeout->new(
        message        => 'timed out after write',
        maybe_executed => 1,
    );
    ok($e2->maybe_executed, 'maybe_executed true');
};

subtest 'Protocol error' => sub {
    my $e = Async::Redis::Error::Protocol->new(
        message => 'unexpected response type',
        data    => '+OK',



( run in 0.637 second using v1.01-cache-2.11-cpan-bbdf54b448f )