Async-Redis
view release on metacpan or search on metacpan
lib/Async/Redis/Pool.pm view on Meta::CPAN
package Async::Redis::Pool;
use strict;
use warnings;
use 5.018;
use Future;
use Future::AsyncAwait;
use Future::IO;
use Scalar::Util qw(refaddr);
use Async::Redis;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Timeout;
sub new {
my ($class, %args) = @_;
# Separate pool-specific args from connection args.
# Everything not pool-specific is passed through to Async::Redis->new().
my %pool_args;
for my $key (qw(min max acquire_timeout idle_timeout cleanup_timeout on_dirty)) {
$pool_args{$key} = delete $args{$key} if exists $args{$key};
}
my $self = bless {
# Connection params (passed through to Async::Redis->new)
_conn_args => \%args,
# Pool sizing
min => $pool_args{min} // 1,
max => $pool_args{max} // 10,
# Timeouts
acquire_timeout => $pool_args{acquire_timeout} // 5,
idle_timeout => $pool_args{idle_timeout} // 60,
cleanup_timeout => $pool_args{cleanup_timeout} // 5,
# Dirty handling
on_dirty => $pool_args{on_dirty} // 'destroy',
# Pool state
_idle => [], # Available connections
_active => {}, # Connections in use (refaddr => conn)
_waiters => [], # Futures waiting for connection
_shutdown => 0, # Set by shutdown(); blocks new acquires
_pending => [], # Background futures (creation, cleanup)
_creating => 0, # Connections currently being created
_total_created => 0,
_total_destroyed => 0,
# Fork safety
_pid => $$,
}, $class;
return $self;
}
# Accessors
sub min { shift->{min} }
sub max { shift->{max} }
# Statistics
sub stats {
my ($self) = @_;
return {
active => scalar keys %{$self->{_active}},
idle => scalar @{$self->{_idle}},
waiting => scalar @{$self->{_waiters}},
total => (scalar keys %{$self->{_active}}) + (scalar @{$self->{_idle}}),
destroyed => $self->{_total_destroyed},
};
}
# Check if fork occurred and clear pool
sub _check_fork {
my ($self) = @_;
if ($self->{_pid} && $self->{_pid} != $$) {
# Fork detected - invalidate all connections
$self->_clear_all_connections;
$self->{_pid} = $$;
return 1;
}
return 0;
}
# Clear all connections (called after fork)
sub _clear_all_connections {
my ($self) = @_;
# Clear idle connections without closing (parent owns the sockets)
lib/Async/Redis/Pool.pm view on Meta::CPAN
$self->{_pending} = [];
$self->{_creating} = 0;
}
# Acquire a connection from the pool
async sub acquire {
my ($self) = @_;
if ($self->{_shutdown}) {
die Async::Redis::Error::Disconnected->new(
message => "Pool is shut down",
);
}
# Check for fork - clear pool if PID changed
$self->_check_fork;
# Try to get an idle connection
while (@{$self->{_idle}}) {
my $conn = shift @{$self->{_idle}};
# Health check
my $healthy = await $self->_health_check($conn);
if ($healthy) {
$self->{_active}{refaddr($conn)} = $conn;
return $conn;
}
# Unhealthy - destroy and try next
$self->_destroy_connection($conn);
}
# No idle connections - can we create a new one?
# Include _creating count to prevent concurrent acquires from exceeding max
my $current_total = (scalar keys %{$self->{_active}})
+ (scalar @{$self->{_idle}})
+ $self->{_creating};
if ($current_total < $self->{max}) {
$self->{_creating}++;
my $conn;
eval {
$conn = await $self->_create_connection;
};
my $error = $@;
$self->{_creating}--;
if ($error) {
die $error;
}
$self->{_active}{refaddr($conn)} = $conn;
return $conn;
}
# At max capacity - wait for release
my $waiter = Future->new;
push @{$self->{_waiters}}, $waiter;
my $timeout_future = Future::IO->sleep($self->{acquire_timeout})->then(sub {
Future->fail(Async::Redis::Error::Timeout->new(
message => "Acquire timed out after $self->{acquire_timeout}s",
timeout => $self->{acquire_timeout},
));
});
my $wait_f = Future->wait_any($waiter, $timeout_future);
my $result;
eval {
$result = await $wait_f;
};
my $error = $@;
# If waiter was cancelled by timeout, remove from queue
if (!$waiter->is_done) {
@{$self->{_waiters}} = grep { $_ != $waiter } @{$self->{_waiters}};
}
die $error if $error;
return $result;
}
# Release a connection back to the pool
sub release {
my ($self, $conn) = @_;
return unless defined $conn;
# Check for fork - if forked, don't return to pool
if ($self->_check_fork) {
# Pool was cleared, just drop this connection
return;
}
my $id = refaddr($conn);
unless (exists $self->{_active}{$id}) {
warn "Pool: release called on unknown or already-released connection";
return;
}
delete $self->{_active}{$id};
# After shutdown, destroy instead of pooling
if ($self->{_shutdown}) {
$self->_destroy_connection($conn);
return;
}
# Check if connection is dirty
if ($conn->is_dirty) {
if ($self->{on_dirty} eq 'cleanup' && $self->_can_attempt_cleanup($conn)) {
# Attempt cleanup asynchronously
$self->_track_pending(
$self->_cleanup_connection($conn)->on_done(sub {
$self->_return_to_pool($conn);
})->on_fail(sub {
$self->_destroy_connection($conn);
$self->_maybe_create_replacement;
})
);
}
lib/Async/Redis/Pool.pm view on Meta::CPAN
=head1 NAME
Async::Redis::Pool - Connection pool for Async::Redis
=head1 SYNOPSIS
my $pool = Async::Redis::Pool->new(
host => 'localhost',
min => 2,
max => 10,
);
# Recommended: scoped pattern
my $result = await $pool->with(async sub {
my ($redis) = @_;
await $redis->incr('counter');
});
# Manual acquire/release (be careful!)
my $redis = await $pool->acquire;
await $redis->set('key', 'value');
$pool->release($redis);
=head1 DESCRIPTION
Manages a pool of Redis connections with automatic dirty detection. Pool-specific
options are consumed by C<Async::Redis::Pool>; all other constructor arguments
are passed through to C<< Async::Redis->new >>.
=head1 CONSTRUCTOR
=head2 new
my $pool = Async::Redis::Pool->new(
host => 'localhost',
min => 2,
max => 10,
acquire_timeout => 5,
cleanup_timeout => 5,
on_dirty => 'destroy',
);
Options:
=over 4
=item min
Minimum desired pool size. Default: 1. The pool creates replacement
connections after dirty connections are destroyed if the total drops below
this value.
=item max
Maximum number of active, idle, and currently-creating connections. Default: 10.
=item acquire_timeout
Seconds to wait for a connection when the pool is at capacity. Default: 5.
Timeouts throw L<Async::Redis::Error::Timeout>.
=item cleanup_timeout
Seconds to allow a best-effort cleanup command such as C<DISCARD> or
C<UNWATCH>. Default: 5.
=item on_dirty
Dirty connection policy. Default: C<destroy>.
C<destroy> closes dirty connections instead of returning them to the pool.
C<cleanup> attempts bounded cleanup only for transaction/watch state. PubSub
connections and connections with pending responses are still destroyed.
=item idle_timeout
Accepted as a pool option but not currently enforced.
=back
=head1 METHODS
=head2 acquire
my $redis = await $pool->acquire;
Return a healthy L<Async::Redis> connection from the pool, creating one if the
pool is below C<max>. The caller must later call C<release>.
=head2 release
$pool->release($redis);
Return a connection to the pool. Dirty connections are either destroyed or
cleaned according to C<on_dirty>.
=head2 with
my $result = await $pool->with(async sub {
my ($redis) = @_;
return await $redis->get('key');
});
Acquire a connection, run the callback, and release the connection even if the
callback dies. This is the recommended public API.
=head2 stats
my $stats = $pool->stats;
Returns a hashref with C<active>, C<idle>, C<waiting>, C<total>, and
C<destroyed> counts.
=head2 shutdown
$pool->shutdown;
Stop new acquires, fail pending waiters, and close idle connections. Active
connections are destroyed when they are released.
( run in 1.750 second using v1.01-cache-2.11-cpan-39bf76dae61 )