Async-Redis

 view release on metacpan or  search on metacpan

MANIFEST  view on Meta::CPAN

examples/stress/stress
lib/Async/Redis.pm
lib/Async/Redis/AutoPipeline.pm
lib/Async/Redis/Commands.pm
lib/Async/Redis/Cookbook.pod
lib/Async/Redis/Error.pm
lib/Async/Redis/Error/Connection.pm
lib/Async/Redis/Error/Disconnected.pm
lib/Async/Redis/Error/Protocol.pm
lib/Async/Redis/Error/Redis.pm
lib/Async/Redis/Error/Timeout.pm
lib/Async/Redis/Iterator.pm
lib/Async/Redis/KeyExtractor.pm
lib/Async/Redis/Pipeline.pm
lib/Async/Redis/Pool.pm
lib/Async/Redis/Script.pm
lib/Async/Redis/Subscription.pm
lib/Async/Redis/Telemetry.pm
lib/Async/Redis/Transaction.pm
lib/Async/Redis/URI.pm
script/commands.json

README.md  view on Meta::CPAN

    port => 6379,

    # Or use URI
    uri => 'redis://user:pass@host:6379/1',

    # Authentication
    password => 'secret',
    username => 'myuser',  # Redis 6+ ACL
    database => 1,

    # Timeouts
    connect_timeout => 10,
    request_timeout => 5,

    # Auto-reconnect
    reconnect       => 1,
    reconnect_delay => 0.1,
    reconnect_delay_max => 60,

    # TLS
    tls => {

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

    // ===== State =====
    const state = {
        username: '',
        userId: '',
        sessionId: '',          // Persistent session ID for reconnection
        currentRoom: 'general',
        rooms: {},
        users: {},
        ws: null,
        sse: null,
        typingTimeout: null,
        isTyping: false,
        reconnectAttempts: 0,
        maxReconnectDelay: 30000,   // Max 30 seconds between attempts
        pingInterval: null,
        pingIntervalMs: 10000,      // Send ping every 10 seconds
        lastPongTime: 0,            // Track last pong received
        heartbeatCheckInterval: null,
        heartbeatTimeoutMs: 35000,  // Consider connection dead if no pong in 35s
        lastMsgId: 0                // Track last received message ID for catch-up
    };

    // ===== DOM Elements =====
    const elements = {
        // Screens
        loginScreen: document.getElementById('login-screen'),
        chatScreen: document.getElementById('chat-screen'),

        // Login

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

    }

    // ===== Toast Notifications =====
    function showToast(message, type = 'info') {
        const toast = document.createElement('div');
        toast.className = `toast ${type}`;
        toast.textContent = message;

        elements.toastContainer.appendChild(toast);

        setTimeout(() => {
            toast.style.animation = 'slideIn 0.3s ease reverse';
            setTimeout(() => toast.remove(), 300);
        }, 3000);
    }

    // ===== Exponential Backoff =====
    function calculateReconnectDelay() {
        // Formula: min(1000 * 2^attempts + random(0, 1000), 30000)
        const baseDelay = 1000 * Math.pow(2, state.reconnectAttempts);
        const jitter = Math.random() * 1000;
        return Math.min(baseDelay + jitter, state.maxReconnectDelay);
    }

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

                    sendMessage({ type: 'ping' });
                }
            }, state.pingIntervalMs);

            // Start heartbeat timeout check
            if (state.heartbeatCheckInterval) {
                clearInterval(state.heartbeatCheckInterval);
            }
            state.heartbeatCheckInterval = setInterval(() => {
                const timeSinceLastPong = Date.now() - state.lastPongTime;
                if (timeSinceLastPong > state.heartbeatTimeoutMs) {
                    console.warn('Heartbeat timeout - connection appears dead, reconnecting...');
                    if (state.ws) {
                        state.ws.close();
                    }
                }
            }, 5000); // Check every 5 seconds
        };

        state.ws.onclose = (event) => {
            setConnectionStatus('disconnected');

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

            if (state.heartbeatCheckInterval) {
                clearInterval(state.heartbeatCheckInterval);
                state.heartbeatCheckInterval = null;
            }

            // Always try to reconnect with exponential backoff
            state.reconnectAttempts++;
            const delay = calculateReconnectDelay();
            console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${state.reconnectAttempts})...`);
            setConnectionStatus('reconnecting');
            setTimeout(connectWebSocket, delay);
        };

        state.ws.onerror = (error) => {
            console.error('WebSocket error:', error);
        };

        state.ws.onmessage = (event) => {
            // Any message resets the heartbeat timer
            state.lastPongTime = Date.now();

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

        if (!state.isTyping) {
            state.isTyping = true;
            sendMessage({
                type: 'typing',
                room: state.currentRoom,
                typing: true
            });
        }

        // Clear existing timeout
        if (state.typingTimeout) {
            clearTimeout(state.typingTimeout);
        }

        // Stop typing after 2 seconds of inactivity
        state.typingTimeout = setTimeout(() => {
            state.isTyping = false;
            sendMessage({
                type: 'typing',
                room: state.currentRoom,
                typing: false
            });
        }, 2000);
    }

    // ===== Event Handlers =====

examples/pagi-chat/public/js/app.js  view on Meta::CPAN

            const text = elements.messageInput.value.trim();
            if (text) {
                sendMessage({
                    type: 'message',
                    room: state.currentRoom,
                    text: text
                });
                elements.messageInput.value = '';

                // Stop typing indicator
                if (state.typingTimeout) {
                    clearTimeout(state.typingTimeout);
                }
                state.isTyping = false;
            }
        });

        // Typing detection
        elements.messageInput.addEventListener('input', handleTyping);

        // Leave room button
        elements.leaveRoomBtn.addEventListener('click', () => {

lib/Async/Redis.pm  view on Meta::CPAN

use Future::IO 0.23;
use Future::Selector 0.05;
use Scalar::Util qw(blessed weaken);
use Socket qw(pack_sockaddr_in pack_sockaddr_un inet_aton AF_INET AF_UNIX SOCK_STREAM);
use IO::Handle ();
use IO::Socket::INET;
use Time::HiRes ();

# Error classes
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Redis;
use Async::Redis::Error::Protocol;

# Import auto-generated command methods
use Async::Redis::Commands;
our @ISA = qw(Async::Redis::Commands);

# Key extraction for prefixing
use Async::Redis::KeyExtractor;

lib/Async/Redis.pm  view on Meta::CPAN

        socket   => undef,
        parser   => undef,
        connected          => 0,
        _socket_live       => 0,
        _fatal_in_progress => 0,
        _reader_running    => 0,   # dedup guard; the selector owns the reader Future itself
        _write_lock        => undef,     # will be a Future used as a lock, populated lazily
        _reconnect_future  => undef,
        _tasks             => Future::Selector->new,

        # Timeout settings
        connect_timeout         => $args{connect_timeout} // 10,
        request_timeout         => $args{request_timeout} // 5,
        blocking_timeout_buffer => $args{blocking_timeout_buffer} // 2,

        # Inflight tracking with deadlines
        # Entry: { future => $f, cmd => $cmd, args => \@args, deadline => $t, sent_at => $t }
        inflight => [],

        # Reconnection settings
        reconnect              => $args{reconnect} // 0,

lib/Async/Redis.pm  view on Meta::CPAN


    my $completed_f = await $result_f;

    # Now check the result
    if ($completed_f->is_failed) {
        my ($error) = $completed_f->failure;
        # Don't call close() - let $socket go out of scope when we die.
        # Perl's DESTROY will close it after the exception unwinds.

        if ($error eq 'connect_timeout') {
            die Async::Redis::Error::Timeout->new(
                message => "Connect timed out after $self->{connect_timeout}s",
                timeout => $self->{connect_timeout},
            );
        }
        die Async::Redis::Error::Connection->new(
            message => "$error",
            host    => $self->{path} // $self->{host},
            port    => $self->{port} // 0,
        );
    }

lib/Async/Redis.pm  view on Meta::CPAN

        my $read_f = Future::IO->read($self->{socket}, 65536);
        $self->{_current_read_future} = $read_f;

        my ($returned_f, $timed_out) = await _await_with_deadline($read_f, $deadline);

        # Clear slot on success path; fatal clears it on timeout/cancel.
        $self->{_current_read_future} = undef
            if !$timed_out && $returned_f->is_ready && !$returned_f->is_failed;

        if ($timed_out) {
            my $err = Async::Redis::Error::Timeout->new(
                message        => "Request timed out",
                command        => $head ? $head->{args} : undef,
                timeout        => $self->{request_timeout},
                maybe_executed => 1,
            );
            $self->_reader_fatal($err);
            return;
        }

        if ($returned_f->is_failed) {

lib/Async/Redis.pm  view on Meta::CPAN

            host    => $self->{host},
            port    => $self->{port},
        );

    # Drive handshake with non-blocking loop
    my $deadline = Time::HiRes::time() + $self->{connect_timeout};

    while (1) {
        # Check timeout
        if (Time::HiRes::time() >= $deadline) {
            die Async::Redis::Error::Timeout->new(
                message => "TLS handshake timed out",
                timeout => $self->{connect_timeout},
            );
        }

        # Attempt handshake step
        my $rv = $socket->connect_SSL;

        if ($rv) {
            # Handshake complete!

lib/Async/Redis.pm  view on Meta::CPAN

            # Wait for socket to become readable with timeout
            my $read_f = Future::IO->waitfor_readable($socket);
            my $timeout_f = Future::IO->sleep($remaining)->then(sub {
                return Future->fail('tls_timeout');
            });

            my $wait_f = Future->wait_any($read_f, $timeout_f);
            await $wait_f;

            if ($wait_f->is_failed) {
                die Async::Redis::Error::Timeout->new(
                    message => "TLS handshake timed out",
                    timeout => $self->{connect_timeout},
                );
            }
        }
        elsif ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_WRITE()) {
            # Wait for socket to become writable with timeout
            my $write_f = Future::IO->waitfor_writable($socket);
            my $timeout_f = Future::IO->sleep($remaining)->then(sub {
                return Future->fail('tls_timeout');
            });

            my $wait_f = Future->wait_any($write_f, $timeout_f);
            await $wait_f;

            if ($wait_f->is_failed) {
                die Async::Redis::Error::Timeout->new(
                    message => "TLS handshake timed out",
                    timeout => $self->{connect_timeout},
                );
            }
        }
        else {
            # Actual error
            die Async::Redis::Error::Connection->new(
                message => "TLS handshake failed: " . IO::Socket::SSL::errstr(),
                host    => $self->{host},

lib/Async/Redis.pm  view on Meta::CPAN

    if (my $msg = $self->{parser}->get_message) {
        return $msg;
    }

    # Read until we get a complete message
    while (1) {
        my $remaining = $deadline - Time::HiRes::time();

        if ($remaining <= 0) {
            $self->_reset_connection;
            die Async::Redis::Error::Timeout->new(
                message        => "Request timed out after $self->{request_timeout}s",
                command        => $cmd_ref,
                timeout        => $self->{request_timeout},
                maybe_executed => 1,  # already sent the command
            );
        }

        # Use wait_any for timeout
        my $read_f = Future::IO->read($self->{socket}, 65536);

lib/Async/Redis.pm  view on Meta::CPAN

        if ($read_f->is_cancelled) {
            die Async::Redis::Error::Disconnected->new(
                message => "Disconnected during read",
            );
        }

        if ($wait_f->is_failed) {
            my ($error) = $wait_f->failure;
            if ($error eq 'read_timeout') {
                $self->_reset_connection;
                die Async::Redis::Error::Timeout->new(
                    message        => "Request timed out after $self->{request_timeout}s",
                    command        => $cmd_ref,
                    timeout        => $self->{request_timeout},
                    maybe_executed => 1,
                );
            }
            $self->_reset_connection;
            die Async::Redis::Error::Connection->new(
                message => "$error",
            );

lib/Async/Redis.pm  view on Meta::CPAN


    return unless @{$self->{inflight}};

    my $deadline = Time::HiRes::time() + $timeout;

    while (@{$self->{inflight}} && Time::HiRes::time() < $deadline) {
        await Future::IO->sleep(0.001);
    }

    if (@{$self->{inflight}}) {
        $self->_fail_all_inflight("Timeout waiting for inflight commands");
    }
}

async sub publish {
    my ($self, $channel, $message) = @_;
    return await $self->command('PUBLISH', $channel, $message);
}

async sub spublish {
    my ($self, $channel, $message) = @_;

lib/Async/Redis.pm  view on Meta::CPAN


Errors are thrown as exception objects:

    eval {
        await $redis->get('key');
        1;
    } or do {
        my $error = $@;
        if (ref($error) && $error->isa('Async::Redis::Error::Connection')) {
            # Connection error
        } elsif (ref($error) && $error->isa('Async::Redis::Error::Timeout')) {
            # Timeout error
        } elsif (ref($error) && $error->isa('Async::Redis::Error::Redis')) {
            # Redis error (e.g., WRONGTYPE)
        }
    };

Exception classes:

=over 4

=item Async::Redis::Error::Connection

Connection-related errors (refused, reset, etc.)

=item Async::Redis::Error::Timeout

Timeout errors (connect, request, read).

=item Async::Redis::Error::Protocol

Protocol parsing errors.

=item Async::Redis::Error::Redis

Errors returned by Redis (WRONGTYPE, ERR, etc.)

=item Async::Redis::Error::Disconnected

lib/Async/Redis/Error/Timeout.pm  view on Meta::CPAN

package Async::Redis::Error::Timeout;

use strict;
use warnings;
use 5.018;

use parent 'Async::Redis::Error';

sub command        { shift->{command} }
sub timeout        { shift->{timeout} }
sub maybe_executed { shift->{maybe_executed} }

1;

__END__

=head1 NAME

Async::Redis::Error::Timeout - Timeout exception

=head1 DESCRIPTION

Thrown when a Redis operation times out.

=head1 ATTRIBUTES

=over 4

=item command - The command that timed out (arrayref)

lib/Async/Redis/Pool.pm  view on Meta::CPAN

use strict;
use warnings;
use 5.018;

use Future;
use Future::AsyncAwait;
use Future::IO;
use Scalar::Util qw(refaddr);
use Async::Redis;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Timeout;

sub new {
    my ($class, %args) = @_;

    # Separate pool-specific args from connection args.
    # Everything not pool-specific is passed through to Async::Redis->new().
    my %pool_args;
    for my $key (qw(min max acquire_timeout idle_timeout cleanup_timeout on_dirty)) {
        $pool_args{$key} = delete $args{$key} if exists $args{$key};
    }

    my $self = bless {
        # Connection params (passed through to Async::Redis->new)
        _conn_args => \%args,

        # Pool sizing
        min => $pool_args{min} // 1,
        max => $pool_args{max} // 10,

        # Timeouts
        acquire_timeout  => $pool_args{acquire_timeout} // 5,
        idle_timeout     => $pool_args{idle_timeout} // 60,
        cleanup_timeout  => $pool_args{cleanup_timeout} // 5,

        # Dirty handling
        on_dirty => $pool_args{on_dirty} // 'destroy',

        # Pool state
        _idle     => [],   # Available connections
        _active   => {},   # Connections in use (refaddr => conn)

lib/Async/Redis/Pool.pm  view on Meta::CPAN


        $self->{_active}{refaddr($conn)} = $conn;
        return $conn;
    }

    # At max capacity - wait for release
    my $waiter = Future->new;
    push @{$self->{_waiters}}, $waiter;

    my $timeout_future = Future::IO->sleep($self->{acquire_timeout})->then(sub {
        Future->fail(Async::Redis::Error::Timeout->new(
            message => "Acquire timed out after $self->{acquire_timeout}s",
            timeout => $self->{acquire_timeout},
        ));
    });

    my $wait_f = Future->wait_any($waiter, $timeout_future);

    my $result;
    eval {
        $result = await $wait_f;

lib/Async/Redis/Pool.pm  view on Meta::CPAN

connections after dirty connections are destroyed if the total drops below
this value.

=item max

Maximum number of active, idle, and currently-creating connections. Default: 10.

=item acquire_timeout

Seconds to wait for a connection when the pool is at capacity. Default: 5.
Timeouts throw L<Async::Redis::Error::Timeout>.

=item cleanup_timeout

Seconds to allow a best-effort cleanup command such as C<DISCARD> or
C<UNWATCH>. Default: 5.

=item on_dirty

Dirty connection policy. Default: C<destroy>.

t/01-unit/error.t  view on Meta::CPAN

# t/01-unit/error.t
use strict;
use warnings;
use Test2::V0;

use Async::Redis::Error;
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Protocol;
use Async::Redis::Error::Redis;
use Async::Redis::Error::Disconnected;

subtest 'Error base class' => sub {
    my $e = Async::Redis::Error->new(message => 'test error');
    ok($e->isa('Async::Redis::Error'), 'isa Error');
    is($e->message, 'test error', 'message accessor');
    like("$e", qr/test error/, 'stringifies to message');
};

t/01-unit/error.t  view on Meta::CPAN

        reason  => 'timeout',
    );
    ok($e->isa('Async::Redis::Error'), 'isa base Error');
    ok($e->isa('Async::Redis::Error::Connection'), 'isa Connection');
    is($e->host, 'localhost', 'host accessor');
    is($e->port, 6379, 'port accessor');
    is($e->reason, 'timeout', 'reason accessor');
    like("$e", qr/connection lost/, 'stringifies');
};

subtest 'Timeout error' => sub {
    my $e = Async::Redis::Error::Timeout->new(
        message        => 'request timed out',
        command        => ['GET', 'mykey'],
        timeout        => 5,
        maybe_executed => 0,
    );
    ok($e->isa('Async::Redis::Error'), 'isa base Error');
    ok($e->isa('Async::Redis::Error::Timeout'), 'isa Timeout');
    is($e->command, ['GET', 'mykey'], 'command accessor');
    is($e->timeout, 5, 'timeout accessor');
    ok(!$e->maybe_executed, 'maybe_executed false');

    my $e2 = Async::Redis::Error::Timeout->new(
        message        => 'timed out after write',
        maybe_executed => 1,
    );
    ok($e2->maybe_executed, 'maybe_executed true');
};

subtest 'Protocol error' => sub {
    my $e = Async::Redis::Error::Protocol->new(
        message => 'unexpected response type',
        data    => '+OK',

t/92-concurrency/reader-fatal-paths.t  view on Meta::CPAN

    my $c = Async::Redis->new(host => 'x', port => 1);
    $c->{_socket_live} = 1;
    $c->{connected}    = 1;
    my $f1 = Future->new;
    my $f2 = Future->new;
    $c->{inflight} = [
        { future => $f1, cmd => 'GET', args => ['x'], deadline => 0 },
        { future => $f2, cmd => 'SET', args => ['y', 'z'], deadline => 0 },
    ];

    my $err = Async::Redis::Error::Timeout->new(
        message => 'test timeout',
        timeout => 5,
    );
    $c->_reader_fatal($err);

    ok $f1->is_failed, 'f1 failed';
    my ($e1) = $f1->failure;
    ok blessed($e1) && $e1->isa('Async::Redis::Error::Timeout'),
        'f1 carries the typed error, not cancellation';

    ok $f2->is_failed, 'f2 failed';
    my ($e2) = $f2->failure;
    ok blessed($e2) && $e2->isa('Async::Redis::Error::Timeout'),
        'f2 also carries the typed error';

    is scalar @{$c->{inflight}}, 0, 'inflight drained';
    is $c->{_socket_live}, 0, '_socket_live cleared';
    is $c->{connected},    0, 'connected cleared';
};

subtest 'reentrance during same call is a no-op (idempotence guard)' => sub {
    my $c = Async::Redis->new(host => 'x', port => 1);
    $c->{_socket_live} = 1;

t/92-concurrency/reader-invariants.t  view on Meta::CPAN

        # inject the synthetic timeout and verify the typed error propagates.
        my $blpop_f = $r->command('BLPOP', 'test-timeout-injection-list', 30);

        # Yield briefly so the write goes out and the reader is waiting.
        await Future::IO->sleep(0.02);
        force_read_timeout($r);

        my $ok = eval { await $blpop_f; 1 };
        ok !$ok, 'blpop failed';
        my $err = $@;
        isa_ok $err, ['Async::Redis::Error::Timeout'];
    })->()->get;
};

subtest 'fuzz: randomized mix of ops, every future resolves or fails' => sub {
    (async sub {
        my $r = new_redis();
        await $r->connect;

        my @ops;
        for my $i (1..30) {

t/92-concurrency/selector-invariants.t  view on Meta::CPAN

        eval { $r->disconnect };
    })->()->get;
};

subtest 'disconnect under load: write-gate waiters unwind cleanly' => sub {
    # Fire many concurrent commands without awaiting individually, so
    # most end up waiting on _acquire_write_lock rather than in the
    # inflight queue. Then disconnect and await them all. Verify:
    #   - every future resolves (no pending futures left)
    #   - only Disconnected / Connection error types appear
    #   - no Timeout, Protocol, or generic die leaking out
    # Uses BLPOP with a 0.1s server-side timeout so commands are slow
    # enough that disconnect actually catches some mid-flight even on
    # fast loopback.

    (async sub {
        my $r = new_redis();
        await $r->connect;

        my @futures;
        for my $i (1..30) {

t/lib/Test/Async/Redis.pm  view on Meta::CPAN

}

# Feed bytes directly into the parser, bypassing the socket. Useful
# for exercising the reader's decode/dispatch path with a crafted frame.
sub inject_unexpected_frame {
    my ($redis, $raw_bytes) = @_;
    $redis->{parser}->parse($raw_bytes) if $redis->{parser};
}

# Synthesize a fatal timeout directly. Routes through the detach-first
# _reader_fatal path so the typed Async::Redis::Error::Timeout is
# propagated to all inflight futures (not a generic cancellation).
sub force_read_timeout {
    my ($redis) = @_;
    require Async::Redis::Error::Timeout;
    $redis->_reader_fatal(Async::Redis::Error::Timeout->new(
        message => "synthetic timeout for test",
        timeout => 0,
    ));
}

1;

__END__

=head1 NAME

t/lib/Test/Async/Redis.pm  view on Meta::CPAN


=item inject_unexpected_frame($redis, $raw_bytes)

Feed raw bytes directly into the RESP parser, bypassing the socket.
Useful for exercising the reader's decode/dispatch path with a crafted
or malformed frame without needing a live server to send it.

=item force_read_timeout($redis)

Synthesize a fatal timeout by calling C<_reader_fatal> with a typed
C<Async::Redis::Error::Timeout> object. All inflight futures receive
the typed error via the detach-first path rather than generic
cancellation.

=back

=cut



( run in 3.062 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )