AnyEvent-Pg
view release on metacpan or search on metacpan
lib/AnyEvent/Pg/Pool.pm view on Meta::CPAN
my $conn = AnyEvent::Pg->new($pool->{conninfo},
timeout => $pool->{timeout},
on_connect => weak_method_callback($pool, '_on_conn_connect', $seq),
on_connect_error => weak_method_callback($pool, '_on_conn_connect_error', $seq),
on_empty_queue => weak_method_callback($pool, '_on_conn_empty_queue', $seq),
on_error => weak_method_callback($pool, '_on_conn_error', $seq),
on_notify => weak_method_callback($pool, '_on_notify', $seq),
seq => $seq,
);
$debug and $debug & 8 and $pool->_debug("new connection started, seq: $seq, conn: $conn");
$pool->{conns}{$seq} = $conn;
$pool->{connecting}{$seq} = 1;
}
else {
$debug and $debug & 8 and $pool->_debug('not starting new connection, conns: '
. (scalar keys %{$pool->{conns}})
. ", retries: $pool->{conn_retries}, connecting: "
. (scalar keys %{$pool->{connecting}}));
}
}
sub _on_conn_error {
my ($pool, $seq, $conn) = @_;
# note that failed initialization queries also come over here
if (my $query = delete $pool->{current}{$seq}) {
if ($query->{max_retries}-- > 0) {
$pool->_requeue_query($query);
}
else {
$pool->_maybe_callback($query, 'on_error', $conn);
}
}
if ($debug and $debug & 8) {
my @states = grep $pool->{$_}{$seq}, qw(busy idle connecting initializing);
$pool->_debug("removing broken connection in state(s!) @states, "
. "\$conn: $conn, \$pool->{conns}{$seq}: "
. ($pool->{conns}{$seq} // '<undef>'));
}
delete $pool->{busy}{$seq}
or delete $pool->{idle}{$seq}
or delete $pool->{initializing}{$seq}
or die "internal error, pool is corrupted, seq: $seq\n" . Dumper($pool);
delete $pool->{init_queue_ix}{$seq};
delete $pool->{conns}{$seq};
my $listeners = delete $pool->{listeners_by_conn}{$seq};
if ($pool->{dead}) {
$pool->_maybe_callback('on_connect_error', $conn);
}
else {
$pool->_maybe_callback('on_transient_error');
if ($listeners) {
$pool->_start_listener($_) for keys %$listeners;
}
else {
$debug and $debug & 4 and $pool->_debug("connection $seq had no listeners attached: " .
Dumper($pool->{listeners_by_conn}));
}
}
$pool->_check_queue;
}
sub _on_conn_connect {
my ($pool, $seq, $conn) = @_;
$debug and $debug & 8 and $pool->_debug("conn $conn is now connected, seq: $seq");
$pool->{conn_retries} = 0;
delete $pool->{max_conn_time};
# _on_conn_empty_queue is called afterwards by the $conn object
}
sub _on_conn_connect_error {
my ($pool, $seq, $conn) = @_;
$debug and $debug & 8 and $pool->_debug("unable to connect to database");
$pool->_maybe_callback('on_transient_error');
# the connection object will be removed from the Pool on the
# on_error callback that will be called just after this one
# returns:
delete $pool->{connecting}{$seq};
$pool->{busy}{$seq} = 1;
if ($pool->{delay_watcher}) {
$debug and $debug & 8 and $pool->_debug("a delayed reconnection is already queued");
return;
}
my $now = time;
# This failed connection is not counted against the limit
# unless it is the only connection remaining. Effectively the
# module will keep going until all the connections become
# broken and no more connections can be established.
unless (keys(%{$pool->{conns}}) > 1) {
$pool->{conn_retries}++;
if ($pool->{global_timeout}) {
$pool->{max_conn_time} ||= $now + $pool->{global_timeout} - $pool->{conn_delay};
}
}
if ($pool->{conn_retries} <= $pool->{max_conn_retries}) {
if (not $pool->{max_conn_time} or $pool->{max_conn_time} >= $now) {
$debug and $debug & 8 and $pool->_debug("starting timer for delayed reconnection $pool->{conn_delay}s");
$pool->{delay_watcher} = AE::timer $pool->{conn_delay}, 0, weak_method_callback($pool, '_on_delayed_reconnect');
return
}
$debug and $debug & 8 and $pool->_debug("global_timeout expired");
}
# giving up!
$debug and $debug & 8 and $pool->_debug("it has been impossible to connect to the database, giving up!!!");
$pool->{dead} = 1;
# processing continues on the on_conn_error callback
}
( run in 2.296 seconds using v1.01-cache-2.11-cpan-e1769b4cff6 )