Async-Redis

 view release on metacpan or  search on metacpan

lib/Async/Redis.pm  view on Meta::CPAN

package Async::Redis;

use strict;
use warnings;
use 5.018;

our $VERSION = '0.002000';

use Future;
use Future::AsyncAwait;
use Future::IO 0.23;
use Future::Selector 0.05;
use Scalar::Util qw(blessed weaken);
use Socket qw(pack_sockaddr_in pack_sockaddr_un inet_aton AF_INET AF_UNIX SOCK_STREAM);
use IO::Handle ();
use IO::Socket::INET;
use Time::HiRes ();

# Error classes
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Redis;
use Async::Redis::Error::Protocol;

# Import auto-generated command methods
use Async::Redis::Commands;
our @ISA = qw(Async::Redis::Commands);

# Key extraction for prefixing
use Async::Redis::KeyExtractor;

# Transaction support
use Async::Redis::Transaction;

# Script support
use Async::Redis::Script;

# Iterator support
use Async::Redis::Iterator;

# Pipeline support
use Async::Redis::Pipeline;
use Async::Redis::AutoPipeline;

# PubSub support
use Async::Redis::Subscription;

# Telemetry support
use Async::Redis::Telemetry;

# Try XS version first, fall back to pure Perl
BEGIN {
    eval { require Protocol::Redis::XS; 1 }
        or require Protocol::Redis;
}

sub _parser_class {
    return $INC{'Protocol/Redis/XS.pm'} ? 'Protocol::Redis::XS' : 'Protocol::Redis';
}

sub _calculate_backoff {
    my ($self, $attempt) = @_;

    # Exponential: delay * 2^(attempt-1)
    my $delay = $self->{reconnect_delay} * (2 ** ($attempt - 1));

    # Cap at max
    $delay = $self->{reconnect_delay_max} if $delay > $self->{reconnect_delay_max};

    # Apply jitter: delay * (1 +/- jitter)
    if ($self->{reconnect_jitter} > 0) {
        my $jitter_range = $delay * $self->{reconnect_jitter};
        my $jitter = (rand(2) - 1) * $jitter_range;
        $delay += $jitter;
    }

    return $delay;
}

# Free function, not a method. Call as _await_with_deadline($f, $deadline).

lib/Async/Redis.pm  view on Meta::CPAN

sub _await_with_deadline {
    my ($read_f, $deadline) = @_;

    if (!defined $deadline) {
        return $read_f->followed_by(sub { Future->done($read_f, 0) });
    }

    my $remaining = $deadline - Time::HiRes::time();
    if ($remaining <= 0) {
        return Future->done($read_f, 1);
    }

    my $timeout_f = Future::IO->sleep($remaining)
        ->then(sub { Future->fail('__deadline__') });

    # Use without_cancel so that if timeout wins, wait_any's cancel of the
    # losing future does not propagate to $read_f (caller owns its lifecycle).
    return Future->wait_any($read_f->without_cancel, $timeout_f)
        ->followed_by(sub {
            my ($f) = @_;
            my $timed_out = $f->is_failed
                && (($f->failure)[0] // '') eq '__deadline__' ? 1 : 0;

            if (!$timed_out && !$timeout_f->is_ready) {
                $timeout_f->cancel;
            }

            return Future->done($read_f, $timed_out);
        });
}

sub new {
    my ($class, %args) = @_;

    # Parse URI if provided
    if ($args{uri}) {
        require Async::Redis::URI;
        my $uri = Async::Redis::URI->parse($args{uri});
        if ($uri) {
            my %uri_args = $uri->to_hash;
            # URI values are defaults, explicit args override
            %args = (%uri_args, %args);
            delete $args{uri};  # don't store the string
        }
    }

    my $self = bless {
        path     => $args{path},
        host     => $args{path} ? undef : ($args{host} // 'localhost'),
        port     => $args{path} ? undef : ($args{port} // 6379),
        socket   => undef,
        parser   => undef,
        connected          => 0,
        _socket_live       => 0,
        _fatal_in_progress => 0,
        _reader_running    => 0,   # dedup guard; the selector owns the reader Future itself
        _write_lock        => undef,     # will be a Future used as a lock, populated lazily
        _reconnect_future  => undef,
        _tasks             => Future::Selector->new,

        # Timeout settings
        connect_timeout         => $args{connect_timeout} // 10,
        request_timeout         => $args{request_timeout} // 5,
        blocking_timeout_buffer => $args{blocking_timeout_buffer} // 2,

        # Inflight tracking with deadlines
        # Entry: { future => $f, cmd => $cmd, args => \@args, deadline => $t, sent_at => $t }
        inflight => [],

        # Reconnection settings
        reconnect              => $args{reconnect} // 0,
        reconnect_delay        => $args{reconnect_delay} // 0.1,
        reconnect_delay_max    => $args{reconnect_delay_max} // 60,
        reconnect_jitter       => $args{reconnect_jitter} // 0.25,
        reconnect_max_attempts => $args{reconnect_max_attempts} // 10,  # 0 = unlimited
        _reconnect_attempt     => 0,

        # Callbacks
        on_connect    => $args{on_connect},
        on_disconnect => $args{on_disconnect},
        on_error      => $args{on_error},

        # Authentication
        password    => $args{password},
        username    => $args{username},
        database    => $args{database} // 0,
        client_name => $args{client_name},

        # TLS
        tls => $args{tls},

        # Key prefixing
        prefix => $args{prefix},

        # Pipeline settings
        pipeline_depth => $args{pipeline_depth} // 10000,
        auto_pipeline  => $args{auto_pipeline} // 0,

        # Backpressure: max queued messages before _dispatch_frame's slot wait blocks.
        message_queue_depth => do {
            my $d = $args{message_queue_depth} // 1;
            die "message_queue_depth must be >= 1 (got $d)" if $d < 1;
            $d;
        },

        # Transaction state
        in_multi => 0,
        watching => 0,

        # PubSub state
        in_pubsub     => 0,
        _subscription => undef,
        _pump_running => 0,

        # Fork safety
        _pid => $$,

        # Script registry
        _scripts => {},

        # Current read future for clean disconnect cancellation

lib/Async/Redis.pm  view on Meta::CPAN


    return $self if $self->{connected};

    # Create socket — AF_UNIX for path, AF_INET for host:port
    my ($socket, $sockaddr);

    if ($self->{path}) {
        socket($socket, AF_UNIX, SOCK_STREAM, 0)
            or die Async::Redis::Error::Connection->new(
                message => "Cannot create unix socket: $!",
                host    => $self->{path},
                port    => 0,
            );
        IO::Handle::blocking($socket, 0);
        $sockaddr = pack_sockaddr_un($self->{path});
    } else {
        $socket = IO::Socket::INET->new(
            Proto    => 'tcp',
            Blocking => 0,
        ) or die Async::Redis::Error::Connection->new(
            message => "Cannot create socket: $!",
            host    => $self->{host},
            port    => $self->{port},
        );

        # Build sockaddr
        my $addr = inet_aton($self->{host})
            or die Async::Redis::Error::Connection->new(
                message => "Cannot resolve host: $self->{host}",
                host    => $self->{host},
                port    => $self->{port},
            );
        $sockaddr = pack_sockaddr_in($self->{port}, $addr);
    }

    # Connect with timeout using Future->wait_any
    my $connect_f = Future::IO->connect($socket, $sockaddr);
    my $sleep_f = Future::IO->sleep($self->{connect_timeout});

    my $timeout_f = $sleep_f->then(sub {
        return Future->fail('connect_timeout');
    });

    my $wait_f = Future->wait_any($connect_f, $timeout_f);

    # Use followed_by to handle both success and failure without await propagating failure
    my $result_f = $wait_f->followed_by(sub {
        my ($f) = @_;
        return Future->done($f);  # wrap the future itself
    });

    my $completed_f = await $result_f;

    # Now check the result
    if ($completed_f->is_failed) {
        my ($error) = $completed_f->failure;
        # Don't call close() - let $socket go out of scope when we die.
        # Perl's DESTROY will close it after the exception unwinds.

        if ($error eq 'connect_timeout') {
            die Async::Redis::Error::Timeout->new(
                message => "Connect timed out after $self->{connect_timeout}s",
                timeout => $self->{connect_timeout},
            );
        }
        die Async::Redis::Error::Connection->new(
            message => "$error",
            host    => $self->{path} // $self->{host},
            port    => $self->{port} // 0,
        );
    }

    # TLS upgrade if enabled
    if ($self->{tls}) {
        eval {
            $socket = await $self->_tls_upgrade($socket);
        };
        if ($@) {
            # Don't call close() - let $socket go out of scope when we die.
            # Perl's DESTROY will close it after the exception unwinds.
            die $@;
        }
    }

    $self->{socket} = $socket;
    $self->{parser} = _parser_class()->new(api => 1);
    $self->{_socket_live} = 1;   # write gate and reader can now submit
    $self->{inflight} = [];
    $self->{_pid} = $$;  # Track PID for fork safety
    $self->{_current_read_future} = undef;

    # Run Redis protocol handshake (AUTH, SELECT, CLIENT SETNAME).
    # connected stays 0 during handshake; set it only on success so
    # callers never see a half-initialised object.
    my $handshake_ok = eval { await $self->_redis_handshake; 1 };
    unless ($handshake_ok) {
        my $err = $@;
        $self->_reset_connection('handshake_failure');
        die $err;
    }

    $self->{connected} = 1;

    # Initialize auto-pipeline if enabled
    if ($self->{auto_pipeline}) {
        $self->{_auto_pipeline} = Async::Redis::AutoPipeline->new(
            redis     => $self,
            max_depth => $self->{pipeline_depth},
        );
    }

    # Fire on_connect callback and reset reconnect counter
    if ($self->{on_connect}) {
        $self->{on_connect}->($self);
    }
    $self->{_reconnect_attempt} = 0;

    # Telemetry: record connection
    if ($self->{_telemetry}) {
        $self->{_telemetry}->record_connection(1);
        $self->{_telemetry}->log_event('connected',

lib/Async/Redis.pm  view on Meta::CPAN

# Add command to inflight queue - returns queue depth.
# redis_error_policy: 'fail' (default) fails the future on -ERR frames;
# 'capture' calls ->done($err_obj) so callers can inspect per-slot errors
# (used by pipelining in Task N+).
sub _add_inflight {
    my ($self, $future, $cmd, $args, $deadline, $redis_error_policy) = @_;
    push @{$self->{inflight}}, {
        future      => $future,
        cmd         => $cmd,
        args        => $args,
        deadline    => $deadline,
        redis_error => $redis_error_policy // 'fail',
        sent_at     => Time::HiRes::time(),
    };
    return scalar @{$self->{inflight}};
}

# Shift first entry from inflight queue
sub _shift_inflight {
    my ($self) = @_;
    return shift @{$self->{inflight}};
}

# Fail all pending inflight futures with given error
sub _fail_all_inflight {
    my ($self, $error) = @_;
    while (my $entry = $self->_shift_inflight) {
        if ($entry->{future} && !$entry->{future}->is_ready) {
            $entry->{future}->fail($error);
        }
    }
}

# The single socket reader. Runs while there is work (inflight or pubsub).
# Calls _reader_fatal on any stream-alignment failure. The selector
# (_tasks) owns this task; _reader_running is cleared on every exit path
# via on_ready so _ensure_reader can restart it on the next submission.
async sub _run_reader {
    my ($self) = @_;

    while (1) {
        # Exit conditions.
        return unless $self->{_socket_live};
        last if !$self->{in_pubsub} && !@{$self->{inflight}};
        last if $self->{in_pubsub} && !$self->{_subscription};

        my $head = $self->{inflight}[0];
        my $deadline = $head ? $head->{deadline} : undef;

        # Set up read future; track so _reader_fatal can cancel it.
        my $read_f = Future::IO->read($self->{socket}, 65536);
        $self->{_current_read_future} = $read_f;

        my ($returned_f, $timed_out) = await _await_with_deadline($read_f, $deadline);

        # Clear slot on success path; fatal clears it on timeout/cancel.
        $self->{_current_read_future} = undef
            if !$timed_out && $returned_f->is_ready && !$returned_f->is_failed;

        if ($timed_out) {
            my $err = Async::Redis::Error::Timeout->new(
                message        => "Request timed out",
                command        => $head ? $head->{args} : undef,
                timeout        => $self->{request_timeout},
                maybe_executed => 1,
            );
            $self->_reader_fatal($err);
            return;
        }

        if ($returned_f->is_failed) {
            my ($rerr) = $returned_f->failure;
            my $err = Async::Redis::Error::Connection->new(
                message => "Connection read error: $rerr",
                host    => $self->{host},
                port    => $self->{port},
            );
            $self->_reader_fatal($err);
            return;
        }

        my $buf = $returned_f->get;
        if (!defined $buf || length($buf) == 0) {
            my $err = Async::Redis::Error::Connection->new(
                message => "Connection closed by peer",
                host    => $self->{host},
                port    => $self->{port},
            );
            $self->_reader_fatal($err);
            return;
        }

        $self->{parser}->parse($buf);

        # Drain all complete messages the parser has.
        while (my $msg = $self->{parser}->get_message) {
            my ($kind, $value) = $self->_decode_response_result($msg);

            if ($kind eq 'protocol_error') {
                $self->_reader_fatal($value);
                return;
            }

            my $is_pubsub_message = 0;
            if ($self->{in_pubsub} && $kind eq 'ok' && ref($value) eq 'ARRAY') {
                my $frame_name = $value->[0] // '';
                $is_pubsub_message = 1
                    if $frame_name eq 'message'
                    || $frame_name eq 'pmessage'
                    || $frame_name eq 'smessage';
            }

            if ($is_pubsub_message) {
                my $sub = $self->{_subscription};
                if (!$sub) {
                    # No active subscription but got a message frame: strict desync.
                    $self->_reader_fatal(
                        Async::Redis::Error::Protocol->new(
                            message => "message frame but no active subscription",
                        )
                    );

lib/Async/Redis.pm  view on Meta::CPAN

sub _ssl_verify_none {
    require IO::Socket::SSL;
    return IO::Socket::SSL::SSL_VERIFY_NONE();
}

# Build the IO::Socket::SSL option hash for the current connection.
# Handles chain verification, SNI, hostname identity checking, and
# client cert/key/CA forwarding. Called by _tls_upgrade and directly
# by unit tests.
sub _build_tls_options {
    my ($self) = @_;
    my %ssl_opts = (SSL_startHandshake => 0);

    my $tls      = $self->{tls};
    my $tls_hash = ref $tls eq 'HASH' ? $tls : {};

    my $verify          = exists $tls_hash->{verify}          ? !!$tls_hash->{verify}          : 1;
    my $verify_hostname = exists $tls_hash->{verify_hostname} ? !!$tls_hash->{verify_hostname} : 1;

    $ssl_opts{SSL_ca_file}   = $tls_hash->{ca_file}   if $tls_hash->{ca_file};
    $ssl_opts{SSL_cert_file} = $tls_hash->{cert_file} if $tls_hash->{cert_file};
    $ssl_opts{SSL_key_file}  = $tls_hash->{key_file}  if $tls_hash->{key_file};

    if ($verify) {
        $ssl_opts{SSL_verify_mode} = $self->_ssl_verify_peer;
        $ssl_opts{SSL_hostname}    = $self->{host};

        if ($verify_hostname) {
            $ssl_opts{SSL_verifycn_name}   = $self->{host};
            $ssl_opts{SSL_verifycn_scheme} = 'default';
        }
    } else {
        $ssl_opts{SSL_verify_mode} = $self->_ssl_verify_none;
    }

    return %ssl_opts;
}

# Non-blocking TLS upgrade
async sub _tls_upgrade {
    my ($self, $socket) = @_;

    require IO::Socket::SSL;

    my %ssl_opts = $self->_build_tls_options;

    # Start SSL (does not block because SSL_startHandshake => 0)
    IO::Socket::SSL->start_SSL($socket, %ssl_opts)
        or die Async::Redis::Error::Connection->new(
            message => "SSL setup failed: " . IO::Socket::SSL::errstr(),
            host    => $self->{host},
            port    => $self->{port},
        );

    # Drive handshake with non-blocking loop
    my $deadline = Time::HiRes::time() + $self->{connect_timeout};

    while (1) {
        # Check timeout
        if (Time::HiRes::time() >= $deadline) {
            die Async::Redis::Error::Timeout->new(
                message => "TLS handshake timed out",
                timeout => $self->{connect_timeout},
            );
        }

        # Attempt handshake step
        my $rv = $socket->connect_SSL;

        if ($rv) {
            # Handshake complete!
            return $socket;
        }

        # Check what the handshake needs
        my $remaining = $deadline - Time::HiRes::time();
        $remaining = 0.1 if $remaining <= 0;

        if ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_READ()) {
            # Wait for socket to become readable with timeout
            my $read_f = Future::IO->waitfor_readable($socket);
            my $timeout_f = Future::IO->sleep($remaining)->then(sub {
                return Future->fail('tls_timeout');
            });

            my $wait_f = Future->wait_any($read_f, $timeout_f);
            await $wait_f;

            if ($wait_f->is_failed) {
                die Async::Redis::Error::Timeout->new(
                    message => "TLS handshake timed out",
                    timeout => $self->{connect_timeout},
                );
            }
        }
        elsif ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_WRITE()) {
            # Wait for socket to become writable with timeout
            my $write_f = Future::IO->waitfor_writable($socket);
            my $timeout_f = Future::IO->sleep($remaining)->then(sub {
                return Future->fail('tls_timeout');
            });

            my $wait_f = Future->wait_any($write_f, $timeout_f);
            await $wait_f;

            if ($wait_f->is_failed) {
                die Async::Redis::Error::Timeout->new(
                    message => "TLS handshake timed out",
                    timeout => $self->{connect_timeout},
                );
            }
        }
        else {
            # Actual error
            die Async::Redis::Error::Connection->new(
                message => "TLS handshake failed: " . IO::Socket::SSL::errstr(),
                host    => $self->{host},
                port    => $self->{port},
            );
        }
    }
}

# Reconnect with exponential backoff
async sub _reconnect {
    my ($self) = @_;

    my $max = $self->{reconnect_max_attempts};
    my $attempt = 0;

    while (!$self->{connected}) {
        $attempt++;
        $self->{_reconnect_attempt} = $attempt;

        my $ok = eval {
            await $self->connect;
            1;
        };

        if ($ok) {
            $self->{_reconnect_attempt} = 0;
            last;
        }

        my $error = $@;

        # Fire on_error callback
        if ($self->{on_error}) {
            $self->{on_error}->($self, $error);
        }

        # Honor reconnect_max_attempts cap so an unreachable Redis
        # doesn't spin forever. 0 means unlimited.
        if ($max && $attempt >= $max) {
            $self->{_reconnect_attempt} = 0;
            die Async::Redis::Error::Disconnected->new(
                message => "Reconnect gave up after $max attempts",
            );
        }

        my $delay = $self->_calculate_backoff($attempt);
        await Future::IO->sleep($delay);
    }

    # Reset attempt counter on success so subsequent reconnects start fresh.
    $self->{_reconnect_attempt} = 0;
}

lib/Async/Redis.pm  view on Meta::CPAN

                        die Async::Redis::Error::Disconnected->new(
                            message => "Not connected",
                        );
                    }
                }
                # Register inflight BEFORE writing so order matches the wire.
                $self->_add_inflight($response, $cmd, \@args, $deadline, 'fail');
                await $self->_send($raw_cmd);
            })->();
        });
        1;
    };

    if (!$submit_ok) {
        $error = $@;
        # _with_write_gate already called _reader_fatal on write failure.
    } else {
        $self->_ensure_reader;
        # run_until_ready awaits $response while the selector pumps the
        # reader (and any other adopted tasks). If any selector task fails
        # unhandled — in particular, the reader — the failure propagates
        # here, so callers never hang waiting on a dead reader.
        my $await_ok = eval {
            $result = await $self->{_tasks}->run_until_ready($response);
            1;
        };
        if (!$await_ok) { $error = $@ }
    }

    # Telemetry: log result and end span
    if ($self->{_telemetry}) {
        my $elapsed_ms = (Time::HiRes::time() - $start_time) * 1000;
        if ($error) {
            $self->{_telemetry}->log_error($error);
        }
        else {
            $self->{_telemetry}->log_recv($result, $elapsed_ms);
        }
        $self->{_telemetry}->end_command_span($span_context, $error);
    }

    die $error if $error;
    return $result;
}

# Read response with deadline enforcement
async sub _read_response_with_deadline {
    my ($self, $deadline, $cmd_ref) = @_;

    # First check if parser already has a complete message
    if (my $msg = $self->{parser}->get_message) {
        return $msg;
    }

    # Read until we get a complete message
    while (1) {
        my $remaining = $deadline - Time::HiRes::time();

        if ($remaining <= 0) {
            $self->_reset_connection;
            die Async::Redis::Error::Timeout->new(
                message        => "Request timed out after $self->{request_timeout}s",
                command        => $cmd_ref,
                timeout        => $self->{request_timeout},
                maybe_executed => 1,  # already sent the command
            );
        }

        # Use wait_any for timeout
        my $read_f = Future::IO->read($self->{socket}, 65536);

        # Store reference so disconnect() can cancel it
        $self->{_current_read_future} = $read_f;

        my $timeout_f = Future::IO->sleep($remaining)->then(sub {
            return Future->fail('read_timeout');
        });

        my $wait_f = Future->wait_any($read_f, $timeout_f);
        await $wait_f;

        # Clear stored reference after await completes
        $self->{_current_read_future} = undef;

        # Check if read was cancelled (by disconnect)
        if ($read_f->is_cancelled) {
            die Async::Redis::Error::Disconnected->new(
                message => "Disconnected during read",
            );
        }

        if ($wait_f->is_failed) {
            my ($error) = $wait_f->failure;
            if ($error eq 'read_timeout') {
                $self->_reset_connection;
                die Async::Redis::Error::Timeout->new(
                    message        => "Request timed out after $self->{request_timeout}s",
                    command        => $cmd_ref,
                    timeout        => $self->{request_timeout},
                    maybe_executed => 1,
                );
            }
            $self->_reset_connection;
            die Async::Redis::Error::Connection->new(
                message => "$error",
            );
        }

        # Get the read result
        my $buf = $wait_f->get;

        # EOF
        if (!defined $buf || length($buf) == 0) {
            $self->_reset_connection;
            die Async::Redis::Error::Connection->new(
                message => "Connection closed by server",
            );
        }

        $self->{parser}->parse($buf);

        if (my $msg = $self->{parser}->get_message) {
            return $msg;
        }
    }
}

# Reset connection after timeout (stream is desynced)
sub _reset_connection {
    my ($self, $reason) = @_;
    $reason //= 'timeout';

    my $was_connected = $self->{connected};

    # Cancel any active read future BEFORE closing socket
    # This ensures Future::IO unregisters its watcher while fileno is still valid
    if ($self->{_current_read_future} && !$self->{_current_read_future}->is_ready) {
        $self->{_current_read_future}->cancel;
        $self->{_current_read_future} = undef;
    }

    # Cancel any pending inflight operations before closing socket
    if (my $inflight = $self->{inflight}) {
        for my $entry (@$inflight) {
            if ($entry->{future} && !$entry->{future}->is_ready) {
                $entry->{future}->cancel;
            }
        }
        $self->{inflight} = [];
    }

    if ($self->{socket}) {
        $self->_close_socket;
    }

    $self->{_socket_live}       = 0;

lib/Async/Redis.pm  view on Meta::CPAN

        unless (@commands) {
            await $self->unwatch;
            $watch_active = 0;
            $results = [];
        }
        else {
            await $self->multi_start;
            $multi_started = 1;

            for my $cmd (@commands) {
                await $self->command(@$cmd);
            }

            $results = await $self->exec;
            $multi_started = 0;
            $watch_active  = 0;
        }

        1;
    };
    my $error = $@;

    if (!$ok) {
        if ($multi_started) {
            eval { await $self->discard; 1 };
        }
        elsif ($watch_active) {
            eval { await $self->unwatch; 1 };
        }

        # Cleanup can fail on a dead socket; keep local state conservative
        # and preserve the original caller-facing error.
        $self->{in_multi} = 0;
        $self->{watching} = 0;

        die $error;
    }

    # EXEC returns undef/nil if WATCH failed
    return $results;
}

# ============================================================================
# PUB/SUB
# ============================================================================

# Wait for inflight commands to complete before mode change
async sub _wait_for_inflight_drain {
    my ($self, $timeout) = @_;
    $timeout //= 30;

    return unless @{$self->{inflight}};

    my $deadline = Time::HiRes::time() + $timeout;

    while (@{$self->{inflight}} && Time::HiRes::time() < $deadline) {
        await Future::IO->sleep(0.001);
    }

    if (@{$self->{inflight}}) {
        $self->_fail_all_inflight("Timeout waiting for inflight commands");
    }
}

async sub publish {
    my ($self, $channel, $message) = @_;
    return await $self->command('PUBLISH', $channel, $message);
}

async sub spublish {
    my ($self, $channel, $message) = @_;
    return await $self->command('SPUBLISH', $channel, $message);
}

# Subscribe to channels - returns a Subscription object
async sub subscribe {
    my ($self, @channels) = @_;

    die Async::Redis::Error::Disconnected->new(
        message => "Not connected",
    ) unless $self->{connected};

    # Wait for pending commands before entering PubSub mode
    await $self->_wait_for_inflight_drain;

    # Clear a stale closed subscription so we allocate a fresh object.
    if ($self->{_subscription} && $self->{_subscription}->is_closed) {
        delete $self->{_subscription};
    }

    # Create or reuse subscription
    my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);

    # Set in_pubsub BEFORE submitting so the unified reader classifies
    # racing message frames correctly (e.g. published before our
    # confirmation arrives).
    $self->{in_pubsub} = 1;

    # Issue one SUBSCRIBE per channel through the write gate and unified
    # reader. Each call awaits its matching confirmation frame.
    for my $ch (@channels) {
        await $self->_pubsub_command('SUBSCRIBE', $ch);
        $sub->_add_channel($ch);
    }

    return $sub;
}

# Pattern subscribe
async sub psubscribe {
    my ($self, @patterns) = @_;

    die Async::Redis::Error::Disconnected->new(
        message => "Not connected",
    ) unless $self->{connected};

    # Wait for pending commands before entering PubSub mode
    await $self->_wait_for_inflight_drain;

    # Clear a stale closed subscription so we allocate a fresh object.
    if ($self->{_subscription} && $self->{_subscription}->is_closed) {

lib/Async/Redis.pm  view on Meta::CPAN


    my $pipe = $redis->pipeline;
    $pipe->run_script('atomic_incr', 'counter:a', 1);
    $pipe->run_script('atomic_incr', 'counter:b', 1);
    $pipe->set('other:key', 'value');
    my $results = await $pipe->execute;

Scripts are automatically preloaded before pipeline execution.

=head2 EVALSHA Optimization

Scripts automatically use EVALSHA (by SHA1 hash) for efficiency.
If the script isn't cached on the server, it falls back to EVAL
and caches for future calls. This is transparent to your code.

=head2 scan_iter

    my $iter = $redis->scan_iter(match => 'user:*', count => 100);
    while (my $keys = await $iter->next) {
        for my $key (@$keys) { ... }
    }

Create an iterator for SCAN. Also available:

    my $hash_iter = $redis->hscan_iter('hash', match => 'field:*');
    my $set_iter  = $redis->sscan_iter('set', count => 100);
    my $zset_iter = $redis->zscan_iter('zset');

Iterators return batches. C<ZSCAN> batches are the Redis flat
member/score list.

=head1 CONNECTION POOLING

For high-throughput applications, use L<Async::Redis::Pool>:

    use Async::Redis::Pool;

    my $pool = Async::Redis::Pool->new(
        host => 'localhost',
        min  => 2,
        max  => 10,
    );

    # Use with() for automatic acquire/release
    my $result = await $pool->with(sub {
        my ($conn) = @_;
        return $conn->get('key');
    });

=head1 ERROR HANDLING

Errors are thrown as exception objects:

    eval {
        await $redis->get('key');
        1;
    } or do {
        my $error = $@;
        if (ref($error) && $error->isa('Async::Redis::Error::Connection')) {
            # Connection error
        } elsif (ref($error) && $error->isa('Async::Redis::Error::Timeout')) {
            # Timeout error
        } elsif (ref($error) && $error->isa('Async::Redis::Error::Redis')) {
            # Redis error (e.g., WRONGTYPE)
        }
    };

Exception classes:

=over 4

=item Async::Redis::Error::Connection

Connection-related errors (refused, reset, etc.)

=item Async::Redis::Error::Timeout

Timeout errors (connect, request, read).

=item Async::Redis::Error::Protocol

Protocol parsing errors.

=item Async::Redis::Error::Redis

Errors returned by Redis (WRONGTYPE, ERR, etc.)

=item Async::Redis::Error::Disconnected

Operation attempted on disconnected client.

=back

=head1 FORK SAFETY

Async::Redis is fork-safe. When a fork is detected, the child
process will automatically invalidate its connection state and
reconnect when needed. The parent retains ownership of the original
connection.

=head1 EVENT LOOP CONFIGURATION

Async::Redis uses L<Future::IO> for event loop abstraction, making it
compatible with IO::Async, UV, AnyEvent, and other event loops. However,
B<Async::Redis does not choose which event loop to use> - that's the
application's responsibility.

=head2 Default (No Configuration Needed)

B<Future::IO 0.23+> includes a built-in poll-based implementation that works
out of the box. For standalone scripts, you don't need to configure anything:

    #!/usr/bin/env perl
    use strict;
    use warnings;
    use Async::Redis;

    my $redis = Async::Redis->new(host => 'localhost');
    # Just works - Future::IO uses its built-in IO::Poll backend

=head2 The Golden Rule

B<Only executable scripts should configure Future::IO.> Library modules
(C<.pm> files) should never configure the backend because they don't know
what event loop the application wants to use.

=head2 For IO::Async Applications

If your application already uses IO::Async for its event loop, load the
implementation directly:

    use IO::Async::Loop;
    require Future::IO::Impl::IOAsync;

    my $loop = IO::Async::Loop->new;

    use Async::Redis;
    my $redis = Async::Redis->new(host => 'localhost');



( run in 1.555 second using v1.01-cache-2.11-cpan-39bf76dae61 )