AnyEvent-Pg

 view release on metacpan or  search on metacpan

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

        my $conn = AnyEvent::Pg->new($pool->{conninfo},
                                     timeout => $pool->{timeout},
                                     on_connect => weak_method_callback($pool, '_on_conn_connect', $seq),
                                     on_connect_error => weak_method_callback($pool, '_on_conn_connect_error', $seq),
                                     on_empty_queue => weak_method_callback($pool, '_on_conn_empty_queue', $seq),
                                     on_error => weak_method_callback($pool, '_on_conn_error', $seq),
                                     on_notify => weak_method_callback($pool, '_on_notify', $seq),
                                     seq => $seq,
                                    );
        $debug and $debug & 8 and $pool->_debug("new connection started, seq: $seq, conn: $conn");
        $pool->{conns}{$seq} = $conn;
        $pool->{connecting}{$seq} = 1;
    }
    else {
        $debug and $debug & 8 and $pool->_debug('not starting new connection, conns: '
                                                . (scalar keys %{$pool->{conns}})
                                                . ", retries: $pool->{conn_retries}, connecting: "
                                                . (scalar keys %{$pool->{connecting}}));
    }
}

sub _on_conn_error {
    my ($pool, $seq, $conn) = @_;

    # note that failed initialization queries also come over here
    if (my $query = delete $pool->{current}{$seq}) {
        if ($query->{max_retries}-- > 0) {
            $pool->_requeue_query($query);
        }
        else {
            $pool->_maybe_callback($query, 'on_error', $conn);
        }
    }

    if ($debug and $debug & 8) {
        my @states = grep $pool->{$_}{$seq}, qw(busy idle connecting initializing);
        $pool->_debug("removing broken connection in state(s!) @states, "
                      . "\$conn: $conn, \$pool->{conns}{$seq}: "
                      . ($pool->{conns}{$seq} // '<undef>'));
    }
    delete $pool->{busy}{$seq}
        or delete $pool->{idle}{$seq}
            or delete $pool->{initializing}{$seq}
                or die "internal error, pool is corrupted, seq: $seq\n" . Dumper($pool);

    delete $pool->{init_queue_ix}{$seq};
    delete $pool->{conns}{$seq};

    my $listeners = delete $pool->{listeners_by_conn}{$seq};

    if ($pool->{dead}) {
        $pool->_maybe_callback('on_connect_error', $conn);
    }
    else {
        $pool->_maybe_callback('on_transient_error');

        if ($listeners) {
            $pool->_start_listener($_) for keys %$listeners;
        }
        else {
            $debug and $debug & 4 and $pool->_debug("connection $seq had no listeners attached: " .
                                                    Dumper($pool->{listeners_by_conn}));
        }
    }

    $pool->_check_queue;
}

sub _on_conn_connect {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("conn $conn is now connected, seq: $seq");
    $pool->{conn_retries} = 0;
    delete $pool->{max_conn_time};
    # _on_conn_empty_queue is called afterwards by the $conn object
}

sub _on_conn_connect_error {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("unable to connect to database");

    $pool->_maybe_callback('on_transient_error');

    # the connection object will be removed from the Pool on the
    # on_error callback that will be called just after this one
    # returns:
    delete $pool->{connecting}{$seq};
    $pool->{busy}{$seq} = 1;

    if ($pool->{delay_watcher}) {
        $debug and $debug & 8 and $pool->_debug("a delayed reconnection is already queued");
        return;
    }

    my $now = time;
    # This failed connection is not counted against the limit
    # unless it is the only connection remaining. Effectively the
    # module will keep going until all the connections become
    # broken and no more connections can be established.
    unless (keys(%{$pool->{conns}}) > 1) {
        $pool->{conn_retries}++;
        if ($pool->{global_timeout}) {
            $pool->{max_conn_time} ||= $now + $pool->{global_timeout} - $pool->{conn_delay};
        }
    }

    if ($pool->{conn_retries} <= $pool->{max_conn_retries}) {
        if (not $pool->{max_conn_time} or $pool->{max_conn_time} >= $now) {
            $debug and $debug & 8 and $pool->_debug("starting timer for delayed reconnection $pool->{conn_delay}s");
            $pool->{delay_watcher} = AE::timer $pool->{conn_delay}, 0, weak_method_callback($pool, '_on_delayed_reconnect');
            return
        }
        $debug and $debug & 8 and $pool->_debug("global_timeout expired");
    }

    # giving up!
    $debug and $debug & 8 and $pool->_debug("it has been impossible to connect to the database, giving up!!!");
    $pool->{dead} = 1;

    # processing continues on the on_conn_error callback
}



( run in 2.296 seconds using v1.01-cache-2.11-cpan-e1769b4cff6 )