AnyEvent-Pg
view release on metacpan or search on metacpan
lib/AnyEvent/Pg/Pool.pm view on Meta::CPAN
my $listeners = delete $pool->{listeners_by_conn}{$seq};
if ($pool->{dead}) {
$pool->_maybe_callback('on_connect_error', $conn);
}
else {
$pool->_maybe_callback('on_transient_error');
if ($listeners) {
$pool->_start_listener($_) for keys %$listeners;
}
else {
$debug and $debug & 4 and $pool->_debug("connection $seq had no listeners attached: " .
Dumper($pool->{listeners_by_conn}));
}
}
$pool->_check_queue;
}
sub _on_conn_connect {
my ($pool, $seq, $conn) = @_;
$debug and $debug & 8 and $pool->_debug("conn $conn is now connected, seq: $seq");
$pool->{conn_retries} = 0;
delete $pool->{max_conn_time};
# _on_conn_empty_queue is called afterwards by the $conn object
}
sub _on_conn_connect_error {
my ($pool, $seq, $conn) = @_;
$debug and $debug & 8 and $pool->_debug("unable to connect to database");
$pool->_maybe_callback('on_transient_error');
# the connection object will be removed from the Pool on the
# on_error callback that will be called just after this one
# returns:
delete $pool->{connecting}{$seq};
$pool->{busy}{$seq} = 1;
if ($pool->{delay_watcher}) {
$debug and $debug & 8 and $pool->_debug("a delayed reconnection is already queued");
return;
}
my $now = time;
# This failed connection is not counted against the limit
# unless it is the only connection remaining. Effectively the
# module will keep going until all the connections become
# broken and no more connections can be established.
unless (keys(%{$pool->{conns}}) > 1) {
$pool->{conn_retries}++;
if ($pool->{global_timeout}) {
$pool->{max_conn_time} ||= $now + $pool->{global_timeout} - $pool->{conn_delay};
}
}
if ($pool->{conn_retries} <= $pool->{max_conn_retries}) {
if (not $pool->{max_conn_time} or $pool->{max_conn_time} >= $now) {
$debug and $debug & 8 and $pool->_debug("starting timer for delayed reconnection $pool->{conn_delay}s");
$pool->{delay_watcher} = AE::timer $pool->{conn_delay}, 0, weak_method_callback($pool, '_on_delayed_reconnect');
return
}
$debug and $debug & 8 and $pool->_debug("global_timeout expired");
}
# giving up!
$debug and $debug & 8 and $pool->_debug("it has been impossible to connect to the database, giving up!!!");
$pool->{dead} = 1;
# processing continues on the on_conn_error callback
}
sub _on_fatal_connect_error {
my ($pool, $conn) = @_;
# This error is fatal. After it happens, everything is going to
# fail.
$pool->{dead} = 1;
}
sub _on_delayed_reconnect {
my $pool = shift;
$debug and $debug & 8 and $pool->_debug("_on_delayed_reconnect called");
undef $pool->{delay_watcher};
$pool->_start_new_conn;
}
sub _check_init_queue_idle {
my $pool = shift;
my $idle = $pool->{idle};
for my $seq (keys %$idle) {
delete $idle->{$seq};
$pool->_check_init_queue($seq);
}
}
sub _check_init_queue {
my ($pool, $seq) = @_;
my $init_queue = $pool->{init_queue};
no warnings 'uninitialized';
return if $pool->{init_queue_ix}{$seq} >= @$init_queue;
my $ix = $pool->{init_queue_ix}{$seq}++;
my $query = { %{$init_queue->[$ix]} }; # clone
$pool->{initializing}{$seq} = 1;
$pool->_start_query($seq, $query);
1;
}
sub _on_conn_empty_queue {
my ($pool, $seq, $conn) = @_;
$debug and $debug & 8 and $pool->_debug("conn $conn queue is now empty, seq: $seq");
unless (delete $pool->{busy}{$seq} or
delete $pool->{connecting}{$seq} or
delete $pool->{initializing}{$seq}) {
if ($debug) {
$pool->_debug("pool object: \n" . Dumper($pool));
die "internal error: empty_queue callback invoked by object not in state busy, connecting or initializing, seq: $seq";
}
}
( run in 1.878 second using v1.01-cache-2.11-cpan-63c85eba8c4 )