Async-Redis

 view release on metacpan or  search on metacpan

lib/Async/Redis.pm  view on Meta::CPAN

    BRPOPLPUSH => { position => 'last' },
    BLMOVE     => { position => 'last' },
    BZPOPMIN   => { position => 'last' },
    BZPOPMAX   => { position => 'last' },
    BLMPOP     => { position => 0 },
    BZMPOP     => { position => 0 },
    XREAD      => { position => 'block_option', unit => 'ms' },
    XREADGROUP => { position => 'block_option', unit => 'ms' },
    WAIT       => { position => 'last', unit => 'ms' },
    WAITAOF    => { position => 'last', unit => 'ms' },
);

# Calculate deadline based on command type
sub _calculate_deadline {
    my ($self, $cmd, @args) = @_;
    $cmd = uc($cmd // '');

    my $spec = $BLOCKING_TIMEOUT{$cmd};
    if (!$spec) {
        return Time::HiRes::time() + $self->{request_timeout};
    }

    my $raw;
    my $pos = $spec->{position};

    if ($pos eq 'last') {
        $raw = $args[-1];
    }
    elsif ($pos eq 'block_option') {
        for my $i (0 .. $#args - 1) {
            if (uc($args[$i] // '') eq 'BLOCK') {
                $raw = $args[$i + 1];
                last;
            }
        }
        # No BLOCK option found — non-blocking variant; use request_timeout
        return Time::HiRes::time() + $self->{request_timeout}
            unless defined $raw;
    }
    else {
        # Numeric index into @args
        $raw = $args[$pos];
    }

    if (!defined $raw || $raw !~ /^-?\d+(?:\.\d+)?$/) {
        warn "_calculate_deadline: non-numeric timeout for $cmd; falling back to request_timeout\n";
        return Time::HiRes::time() + $self->{request_timeout};
    }

    my $seconds = ($spec->{unit} // 'seconds') eq 'ms'
        ? $raw / 1000
        : $raw + 0;

    # Zero means block indefinitely — no client-side deadline
    return undef if $seconds == 0;

    return Time::HiRes::time() + $seconds + $self->{blocking_timeout_buffer};
}

sub _ssl_verify_peer {
    require IO::Socket::SSL;
    return IO::Socket::SSL::SSL_VERIFY_PEER();
}

sub _ssl_verify_none {
    require IO::Socket::SSL;
    return IO::Socket::SSL::SSL_VERIFY_NONE();
}

# Build the IO::Socket::SSL option hash for the current connection.
# Handles chain verification, SNI, hostname identity checking, and
# client cert/key/CA forwarding. Called by _tls_upgrade and directly
# by unit tests.
sub _build_tls_options {
    my ($self) = @_;
    my %ssl_opts = (SSL_startHandshake => 0);

    my $tls      = $self->{tls};
    my $tls_hash = ref $tls eq 'HASH' ? $tls : {};

    my $verify          = exists $tls_hash->{verify}          ? !!$tls_hash->{verify}          : 1;
    my $verify_hostname = exists $tls_hash->{verify_hostname} ? !!$tls_hash->{verify_hostname} : 1;

    $ssl_opts{SSL_ca_file}   = $tls_hash->{ca_file}   if $tls_hash->{ca_file};
    $ssl_opts{SSL_cert_file} = $tls_hash->{cert_file} if $tls_hash->{cert_file};
    $ssl_opts{SSL_key_file}  = $tls_hash->{key_file}  if $tls_hash->{key_file};

    if ($verify) {
        $ssl_opts{SSL_verify_mode} = $self->_ssl_verify_peer;
        $ssl_opts{SSL_hostname}    = $self->{host};

        if ($verify_hostname) {
            $ssl_opts{SSL_verifycn_name}   = $self->{host};
            $ssl_opts{SSL_verifycn_scheme} = 'default';
        }
    } else {
        $ssl_opts{SSL_verify_mode} = $self->_ssl_verify_none;
    }

    return %ssl_opts;
}

# Non-blocking TLS upgrade
async sub _tls_upgrade {
    my ($self, $socket) = @_;

    require IO::Socket::SSL;

    my %ssl_opts = $self->_build_tls_options;

    # Start SSL (does not block because SSL_startHandshake => 0)
    IO::Socket::SSL->start_SSL($socket, %ssl_opts)
        or die Async::Redis::Error::Connection->new(
            message => "SSL setup failed: " . IO::Socket::SSL::errstr(),
            host    => $self->{host},
            port    => $self->{port},
        );

    # Drive handshake with non-blocking loop
    my $deadline = Time::HiRes::time() + $self->{connect_timeout};

    while (1) {
        # Check timeout
        if (Time::HiRes::time() >= $deadline) {
            die Async::Redis::Error::Timeout->new(
                message => "TLS handshake timed out",
                timeout => $self->{connect_timeout},
            );
        }

        # Attempt handshake step
        my $rv = $socket->connect_SSL;

        if ($rv) {
            # Handshake complete!
            return $socket;
        }

        # Check what the handshake needs
        my $remaining = $deadline - Time::HiRes::time();
        $remaining = 0.1 if $remaining <= 0;

        if ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_READ()) {
            # Wait for socket to become readable with timeout
            my $read_f = Future::IO->waitfor_readable($socket);
            my $timeout_f = Future::IO->sleep($remaining)->then(sub {
                return Future->fail('tls_timeout');
            });

            my $wait_f = Future->wait_any($read_f, $timeout_f);
            await $wait_f;

            if ($wait_f->is_failed) {
                die Async::Redis::Error::Timeout->new(
                    message => "TLS handshake timed out",
                    timeout => $self->{connect_timeout},
                );
            }
        }
        elsif ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_WRITE()) {
            # Wait for socket to become writable with timeout
            my $write_f = Future::IO->waitfor_writable($socket);
            my $timeout_f = Future::IO->sleep($remaining)->then(sub {
                return Future->fail('tls_timeout');
            });

            my $wait_f = Future->wait_any($write_f, $timeout_f);
            await $wait_f;

            if ($wait_f->is_failed) {
                die Async::Redis::Error::Timeout->new(
                    message => "TLS handshake timed out",
                    timeout => $self->{connect_timeout},
                );
            }
        }
        else {
            # Actual error
            die Async::Redis::Error::Connection->new(
                message => "TLS handshake failed: " . IO::Socket::SSL::errstr(),
                host    => $self->{host},
                port    => $self->{port},
            );
        }
    }
}

# Reconnect with exponential backoff
async sub _reconnect {
    my ($self) = @_;

    my $max = $self->{reconnect_max_attempts};
    my $attempt = 0;

    while (!$self->{connected}) {
        $attempt++;
        $self->{_reconnect_attempt} = $attempt;

        my $ok = eval {
            await $self->connect;
            1;
        };

        if ($ok) {
            $self->{_reconnect_attempt} = 0;
            last;
        }

        my $error = $@;

        # Fire on_error callback
        if ($self->{on_error}) {
            $self->{on_error}->($self, $error);
        }

        # Honor reconnect_max_attempts cap so an unreachable Redis
        # doesn't spin forever. 0 means unlimited.
        if ($max && $attempt >= $max) {
            $self->{_reconnect_attempt} = 0;
            die Async::Redis::Error::Disconnected->new(
                message => "Reconnect gave up after $max attempts",
            );
        }

        my $delay = $self->_calculate_backoff($attempt);
        await Future::IO->sleep($delay);
    }

    # Reset attempt counter on success so subsequent reconnects start fresh.
    $self->{_reconnect_attempt} = 0;
}

# Ensure the socket is live, reconnecting if configured.
#
# Dedup: $self->{_reconnect_future} is the Future for the in-flight
# reconnect. Concurrent callers share it. The slot is the shared-await
# signal, NOT the ownership — ownership lives in $self->{_tasks}.
#
# Structured-concurrency: the reconnect task is added to the selector
# so any caller currently awaiting via run_until_ready sees reconnect



( run in 0.723 second using v1.01-cache-2.11-cpan-39bf76dae61 )