AnyEvent-Pg

 view release on metacpan or  search on metacpan

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN


    my $listeners = delete $pool->{listeners_by_conn}{$seq};

    if ($pool->{dead}) {
        $pool->_maybe_callback('on_connect_error', $conn);
    }
    else {
        $pool->_maybe_callback('on_transient_error');

        if ($listeners) {
            $pool->_start_listener($_) for keys %$listeners;
        }
        else {
            $debug and $debug & 4 and $pool->_debug("connection $seq had no listeners attached: " .
                                                    Dumper($pool->{listeners_by_conn}));
        }
    }

    $pool->_check_queue;
}

sub _on_conn_connect {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("conn $conn is now connected, seq: $seq");
    $pool->{conn_retries} = 0;
    delete $pool->{max_conn_time};
    # _on_conn_empty_queue is called afterwards by the $conn object
}

sub _on_conn_connect_error {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("unable to connect to database");

    $pool->_maybe_callback('on_transient_error');

    # the connection object will be removed from the Pool on the
    # on_error callback that will be called just after this one
    # returns:
    delete $pool->{connecting}{$seq};
    $pool->{busy}{$seq} = 1;

    if ($pool->{delay_watcher}) {
        $debug and $debug & 8 and $pool->_debug("a delayed reconnection is already queued");
        return;
    }

    my $now = time;
    # This failed connection is not counted against the limit
    # unless it is the only connection remaining. Effectively the
    # module will keep going until all the connections become
    # broken and no more connections can be established.
    unless (keys(%{$pool->{conns}}) > 1) {
        $pool->{conn_retries}++;
        if ($pool->{global_timeout}) {
            $pool->{max_conn_time} ||= $now + $pool->{global_timeout} - $pool->{conn_delay};
        }
    }

    if ($pool->{conn_retries} <= $pool->{max_conn_retries}) {
        if (not $pool->{max_conn_time} or $pool->{max_conn_time} >= $now) {
            $debug and $debug & 8 and $pool->_debug("starting timer for delayed reconnection $pool->{conn_delay}s");
            $pool->{delay_watcher} = AE::timer $pool->{conn_delay}, 0, weak_method_callback($pool, '_on_delayed_reconnect');
            return
        }
        $debug and $debug & 8 and $pool->_debug("global_timeout expired");
    }

    # giving up!
    $debug and $debug & 8 and $pool->_debug("it has been impossible to connect to the database, giving up!!!");
    $pool->{dead} = 1;

    # processing continues on the on_conn_error callback
}

sub _on_fatal_connect_error {
    my ($pool, $conn) = @_;
    # This error is fatal. After it happens, everything is going to
    # fail.
    $pool->{dead} = 1;

}

sub _on_delayed_reconnect {
    my $pool = shift;
    $debug and $debug & 8 and $pool->_debug("_on_delayed_reconnect called");
    undef $pool->{delay_watcher};
    $pool->_start_new_conn;
}

sub _check_init_queue_idle {
    my $pool = shift;
    my $idle = $pool->{idle};
    for my $seq (keys %$idle) {
        delete $idle->{$seq};
        $pool->_check_init_queue($seq);
    }
}

sub _check_init_queue {
    my ($pool, $seq) = @_;
    my $init_queue = $pool->{init_queue};
    no warnings 'uninitialized';
    return if $pool->{init_queue_ix}{$seq} >= @$init_queue;
    my $ix = $pool->{init_queue_ix}{$seq}++;
    my $query = { %{$init_queue->[$ix]} }; # clone
    $pool->{initializing}{$seq} = 1;
    $pool->_start_query($seq, $query);
    1;
}

sub _on_conn_empty_queue {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("conn $conn queue is now empty, seq: $seq");

    unless (delete $pool->{busy}{$seq} or
            delete $pool->{connecting}{$seq} or
            delete $pool->{initializing}{$seq}) {
        if ($debug) {
            $pool->_debug("pool object: \n" . Dumper($pool));
            die "internal error: empty_queue callback invoked by object not in state busy, connecting or initializing, seq: $seq";
        }
    }



( run in 1.878 second using v1.01-cache-2.11-cpan-63c85eba8c4 )