Async-Redis

 view release on metacpan or  search on metacpan

lib/Async/Redis.pm  view on Meta::CPAN

package Async::Redis;

use strict;
use warnings;
use 5.018;

our $VERSION = '0.002000';

use Future;
use Future::AsyncAwait;
use Future::IO 0.23;
use Future::Selector 0.05;
use Scalar::Util qw(blessed weaken);
use Socket qw(pack_sockaddr_in pack_sockaddr_un inet_aton AF_INET AF_UNIX SOCK_STREAM);
use IO::Handle ();
use IO::Socket::INET;
use Time::HiRes ();

# Error classes
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Redis;
use Async::Redis::Error::Protocol;

# Import auto-generated command methods
use Async::Redis::Commands;
our @ISA = qw(Async::Redis::Commands);

# Key extraction for prefixing
use Async::Redis::KeyExtractor;

# Transaction support
use Async::Redis::Transaction;

# Script support
use Async::Redis::Script;

# Iterator support
use Async::Redis::Iterator;

# Pipeline support
use Async::Redis::Pipeline;
use Async::Redis::AutoPipeline;

# PubSub support
use Async::Redis::Subscription;

# Telemetry support
use Async::Redis::Telemetry;

# Try XS version first, fall back to pure Perl
BEGIN {
    eval { require Protocol::Redis::XS; 1 }
        or require Protocol::Redis;
}

sub _parser_class {
    return $INC{'Protocol/Redis/XS.pm'} ? 'Protocol::Redis::XS' : 'Protocol::Redis';
}

sub _calculate_backoff {
    my ($self, $attempt) = @_;

    # Exponential: delay * 2^(attempt-1)
    my $delay = $self->{reconnect_delay} * (2 ** ($attempt - 1));

    # Cap at max
    $delay = $self->{reconnect_delay_max} if $delay > $self->{reconnect_delay_max};

    # Apply jitter: delay * (1 +/- jitter)
    if ($self->{reconnect_jitter} > 0) {
        my $jitter_range = $delay * $self->{reconnect_jitter};

lib/Async/Redis.pm  view on Meta::CPAN

    my ($self) = @_;
    return if $self->{_socket_live};
    if (my $f = $self->{_reconnect_future}) {
        await $f;
        return;
    }
    my $f = $self->_reconnect;
    $self->{_reconnect_future} = $f;
    $self->{_tasks}->add(data => 'reconnect', f => $f);
    $f->on_ready(sub { $self->{_reconnect_future} = undef });
    await $f;
}

# Reconnect and replay pubsub subscriptions
async sub _reconnect_pubsub {
    my ($self) = @_;

    my $sub = $self->{_subscription}
        or die Async::Redis::Error::Disconnected->new(
            message => "No subscription to replay",
        );

    my @replay = $sub->get_replay_commands;

    # Ensure connection state is fully cleaned up before reconnecting.
    # _reset_connection may have already been called by _read_response,
    # but if the socket was closed externally, we need to clean up
    # stale IO watchers and state here. It is safe to call twice —
    # the on_disconnect callback is guarded by $was_connected.
    $self->_reset_connection('pubsub_reconnect');

    await $self->_reconnect;

    # Re-enter pubsub mode before replaying so the unified reader
    # classifies incoming message frames correctly during replay.
    $self->{in_pubsub} = 1;

    # Replay all subscription commands through the write gate and unified
    # reader. Each channel/pattern gets its own command so confirmations
    # are matched one-to-one via the inflight queue.
    for my $cmd (@replay) {
        my ($command, @args) = @$cmd;
        for my $arg (@args) {
            await $self->_pubsub_command($command, $arg);
        }
    }
}

# Asynchronously reconnect after a pubsub connection drop. Called by
# _reader_fatal when reconnect is enabled and a subscription is active.
# Fires _resume_after_reconnect on the subscription on success, or
# _fail_fatal on unrecoverable reconnect failure.
sub _reconnect_async {
    my ($self, $sub) = @_;

    # Dedup against any reconnect already in progress (from either this
    # path or _ensure_connected). The slot is the shared signal.
    return if $self->{_reconnect_future}
        && !$self->{_reconnect_future}->is_ready;

    weaken(my $weak_self = $self);
    weaken(my $weak_sub  = $sub);

    my $f = (async sub {
        # Reconnect the socket. _reconnect handles retry/backoff and
        # dies with Disconnected if reconnect_max_attempts is exhausted.
        await $weak_self->_reconnect;

        # Delegate the replay, on_reconnect, and driver-restart work to
        # the subscription's unified resume path. _resume_after_reconnect
        # handles clearing _paused, setting in_pubsub, replaying all
        # tracked channels/patterns, firing on_reconnect, and starting
        # the driver. Keeps the "who restarts what after reconnect"
        # logic in one place.
        if ($weak_sub) {
            await $weak_sub->_resume_after_reconnect;
        }
    })->();

    # Ownership: the selector owns the task; the slot is the dedup signal.
    # No ->retain — the selector holds the strong reference.
    $self->{_reconnect_future} = $f;
    $self->{_tasks}->add(data => 'pubsub-reconnect', f => $f);

    $f->on_ready(sub {
        return unless $weak_self;
        $weak_self->{_reconnect_future} = undef;
    });
    $f->on_fail(sub {
        my $err = shift;
        return unless $weak_sub;
        $weak_sub->_fail_fatal($err);
    });

    return;
}

# Execute a Redis command
async sub command {
    my ($self, $cmd, @args) = @_;

    # Check for fork - invalidate connection if PID changed
    $self->_check_fork;

    # Block regular commands on pubsub connection
    if ($self->{in_pubsub}) {
        my $ucmd = uc($cmd // '');
        unless ($ucmd =~ /^(SUBSCRIBE|UNSUBSCRIBE|PSUBSCRIBE|PUNSUBSCRIBE|SSUBSCRIBE|SUNSUBSCRIBE|PING|QUIT)$/) {
            die Async::Redis::Error::Protocol->new(
                message => "Cannot execute '$cmd' on connection in PubSub mode",
            );
        }
    }

    # Apply key prefixing if configured
    if (defined $self->{prefix} && $self->{prefix} ne '') {
        @args = Async::Redis::KeyExtractor::apply_prefix(
            $self->{prefix}, $cmd, @args
        );
    }

    # Route through auto-pipeline if enabled



( run in 0.889 second using v1.01-cache-2.11-cpan-39bf76dae61 )