Async-Redis

 view release on metacpan or  search on metacpan

lib/Async/Redis/Pool.pm  view on Meta::CPAN

package Async::Redis::Pool;

use strict;
use warnings;
use 5.018;

use Future;
use Future::AsyncAwait;
use Future::IO;
use Scalar::Util qw(refaddr);
use Async::Redis;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Timeout;

sub new {
    my ($class, %args) = @_;

    # Separate pool-specific args from connection args.
    # Everything not pool-specific is passed through to Async::Redis->new().
    my %pool_args;
    for my $key (qw(min max acquire_timeout idle_timeout cleanup_timeout on_dirty)) {
        $pool_args{$key} = delete $args{$key} if exists $args{$key};
    }

    my $self = bless {
        # Connection params (passed through to Async::Redis->new)
        _conn_args => \%args,

        # Pool sizing
        min => $pool_args{min} // 1,
        max => $pool_args{max} // 10,

        # Timeouts
        acquire_timeout  => $pool_args{acquire_timeout} // 5,
        idle_timeout     => $pool_args{idle_timeout} // 60,
        cleanup_timeout  => $pool_args{cleanup_timeout} // 5,

        # Dirty handling
        on_dirty => $pool_args{on_dirty} // 'destroy',

        # Pool state
        _idle     => [],   # Available connections
        _active   => {},   # Connections in use (refaddr => conn)
        _waiters  => [],   # Futures waiting for connection
        _shutdown => 0,    # Set by shutdown(); blocks new acquires
        _pending         => [],   # Background futures (creation, cleanup)
        _creating        => 0,    # Connections currently being created
        _total_created   => 0,
        _total_destroyed => 0,

        # Fork safety
        _pid => $$,
    }, $class;

    return $self;
}

# Accessors
sub min { shift->{min} }
sub max { shift->{max} }

# Statistics
sub stats {
    my ($self) = @_;

    return {
        active    => scalar keys %{$self->{_active}},
        idle      => scalar @{$self->{_idle}},
        waiting   => scalar @{$self->{_waiters}},
        total     => (scalar keys %{$self->{_active}}) + (scalar @{$self->{_idle}}),
        destroyed => $self->{_total_destroyed},
    };
}

# Check if fork occurred and clear pool
sub _check_fork {
    my ($self) = @_;

    if ($self->{_pid} && $self->{_pid} != $$) {
        # Fork detected - invalidate all connections
        $self->_clear_all_connections;
        $self->{_pid} = $$;
        return 1;
    }

    return 0;
}

# Clear all connections (called after fork)
sub _clear_all_connections {
    my ($self) = @_;

    # Clear idle connections without closing (parent owns the sockets)

lib/Async/Redis/Pool.pm  view on Meta::CPAN

    $self->{_pending} = [];
    $self->{_creating} = 0;
}

# Acquire a connection from the pool
async sub acquire {
    my ($self) = @_;

    if ($self->{_shutdown}) {
        die Async::Redis::Error::Disconnected->new(
            message => "Pool is shut down",
        );
    }

    # Check for fork - clear pool if PID changed
    $self->_check_fork;

    # Try to get an idle connection
    while (@{$self->{_idle}}) {
        my $conn = shift @{$self->{_idle}};

        # Health check
        my $healthy = await $self->_health_check($conn);
        if ($healthy) {
            $self->{_active}{refaddr($conn)} = $conn;
            return $conn;
        }

        # Unhealthy - destroy and try next
        $self->_destroy_connection($conn);
    }

    # No idle connections - can we create a new one?
    # Include _creating count to prevent concurrent acquires from exceeding max
    my $current_total = (scalar keys %{$self->{_active}})
                      + (scalar @{$self->{_idle}})
                      + $self->{_creating};

    if ($current_total < $self->{max}) {
        $self->{_creating}++;
        my $conn;
        eval {
            $conn = await $self->_create_connection;
        };
        my $error = $@;
        $self->{_creating}--;

        if ($error) {
            die $error;
        }

        $self->{_active}{refaddr($conn)} = $conn;
        return $conn;
    }

    # At max capacity - wait for release
    my $waiter = Future->new;
    push @{$self->{_waiters}}, $waiter;

    my $timeout_future = Future::IO->sleep($self->{acquire_timeout})->then(sub {
        Future->fail(Async::Redis::Error::Timeout->new(
            message => "Acquire timed out after $self->{acquire_timeout}s",
            timeout => $self->{acquire_timeout},
        ));
    });

    my $wait_f = Future->wait_any($waiter, $timeout_future);

    my $result;
    eval {
        $result = await $wait_f;
    };
    my $error = $@;

    # If waiter was cancelled by timeout, remove from queue
    if (!$waiter->is_done) {
        @{$self->{_waiters}} = grep { $_ != $waiter } @{$self->{_waiters}};
    }

    die $error if $error;
    return $result;
}

# Release a connection back to the pool
sub release {
    my ($self, $conn) = @_;

    return unless defined $conn;

    # Check for fork - if forked, don't return to pool
    if ($self->_check_fork) {
        # Pool was cleared, just drop this connection
        return;
    }

    my $id = refaddr($conn);
    unless (exists $self->{_active}{$id}) {
        warn "Pool: release called on unknown or already-released connection";
        return;
    }
    delete $self->{_active}{$id};

    # After shutdown, destroy instead of pooling
    if ($self->{_shutdown}) {
        $self->_destroy_connection($conn);
        return;
    }

    # Check if connection is dirty
    if ($conn->is_dirty) {
        if ($self->{on_dirty} eq 'cleanup' && $self->_can_attempt_cleanup($conn)) {
            # Attempt cleanup asynchronously
            $self->_track_pending(
                $self->_cleanup_connection($conn)->on_done(sub {
                    $self->_return_to_pool($conn);
                })->on_fail(sub {
                    $self->_destroy_connection($conn);
                    $self->_maybe_create_replacement;
                })
            );
        }

lib/Async/Redis/Pool.pm  view on Meta::CPAN


=head1 NAME

Async::Redis::Pool - Connection pool for Async::Redis

=head1 SYNOPSIS

    my $pool = Async::Redis::Pool->new(
        host => 'localhost',
        min  => 2,
        max  => 10,
    );

    # Recommended: scoped pattern
    my $result = await $pool->with(async sub {
        my ($redis) = @_;
        await $redis->incr('counter');
    });

    # Manual acquire/release (be careful!)
    my $redis = await $pool->acquire;
    await $redis->set('key', 'value');
    $pool->release($redis);

=head1 DESCRIPTION

Manages a pool of Redis connections with automatic dirty detection. Pool-specific
options are consumed by C<Async::Redis::Pool>; all other constructor arguments
are passed through to C<< Async::Redis->new >>.

=head1 CONSTRUCTOR

=head2 new

    my $pool = Async::Redis::Pool->new(
        host            => 'localhost',
        min             => 2,
        max             => 10,
        acquire_timeout => 5,
        cleanup_timeout => 5,
        on_dirty        => 'destroy',
    );

Options:

=over 4

=item min

Minimum desired pool size. Default: 1. The pool creates replacement
connections after dirty connections are destroyed if the total drops below
this value.

=item max

Maximum number of active, idle, and currently-creating connections. Default: 10.

=item acquire_timeout

Seconds to wait for a connection when the pool is at capacity. Default: 5.
Timeouts throw L<Async::Redis::Error::Timeout>.

=item cleanup_timeout

Seconds to allow a best-effort cleanup command such as C<DISCARD> or
C<UNWATCH>. Default: 5.

=item on_dirty

Dirty connection policy. Default: C<destroy>.

C<destroy> closes dirty connections instead of returning them to the pool.

C<cleanup> attempts bounded cleanup only for transaction/watch state. PubSub
connections and connections with pending responses are still destroyed.

=item idle_timeout

Accepted as a pool option but not currently enforced.

=back

=head1 METHODS

=head2 acquire

    my $redis = await $pool->acquire;

Return a healthy L<Async::Redis> connection from the pool, creating one if the
pool is below C<max>. The caller must later call C<release>.

=head2 release

    $pool->release($redis);

Return a connection to the pool. Dirty connections are either destroyed or
cleaned according to C<on_dirty>.

=head2 with

    my $result = await $pool->with(async sub {
        my ($redis) = @_;
        return await $redis->get('key');
    });

Acquire a connection, run the callback, and release the connection even if the
callback dies. This is the recommended public API.

=head2 stats

    my $stats = $pool->stats;

Returns a hashref with C<active>, C<idle>, C<waiting>, C<total>, and
C<destroyed> counts.

=head2 shutdown

    $pool->shutdown;

Stop new acquires, fail pending waiters, and close idle connections. Active
connections are destroyed when they are released.



( run in 1.750 second using v1.01-cache-2.11-cpan-39bf76dae61 )