AnyEvent-Pg

 view release on metacpan or  search on metacpan

lib/AnyEvent/Pg.pm  view on Meta::CPAN

    }

    $debug and $debug & 2 and $self->_debug("looking for notifications");
    while (my @notify = $dbc->notifies) {
        $debug and $debug & 2 and $self->_debug("notify recived: @notify");
        $self->_maybe_callback(on_notify => @notify);
    }

    if (defined (my $cq = $self->{current_query})) {
        while (1) {
            if ($self->{write_watcher} or $dbc->busy) {
                $debug and $debug & 1 and $self->_debug($self->{write_watcher}
                                                        ? "wants to write and read"
                                                        : "wants to read");
                $self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_on_timeout')
                    if $self->{timeout};
                return;
            }
            else {
                $debug and $debug & 1 and $self->_debug("data available");

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

    *_maybe_callback = \&AnyEvent::Pg::_maybe_callback;
};

our $debug;

sub _debug {
    my $pool = shift;
    my $connecting   = keys %{$pool->{connecting}};
    my $initializing = keys %{$pool->{initializing}};
    my $idle         = keys %{$pool->{idle}};
    my $busy         = keys %{$pool->{busy}};
    my $delayed      = ($pool->{delay_watcher} ? 1 : 0);
    my $total        = keys %{$pool->{conns}};
    local ($ENV{__DIE__}, $@);
    my ($pkg, $file, $line, $method) = (caller 0);
    $method =~ s/.*:://;
    warn "[$pool c:$connecting/i:$initializing/-:$idle/b:$busy|t:$total|d:$delayed]\@${pkg}::$method> @_ at $file line $line\n";
}

my %default = ( connection_retries => 3,
                connection_delay => 2,
                timeout => 30,
                size => 1 );

sub new {
    my ($class, $conninfo, %opts) = @_;
    $conninfo = { %$conninfo } if ref $conninfo;

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

                 on_connect_error => $on_connect_error,
                 on_transient_error => $on_transient_error,
                 # on_empty_queue => $on_empty_queue,
                 timeout => $timeout,
                 max_conn_retries => $connection_retries,
                 conn_retries => 0,
                 conn_delay => $connection_delay,
                 global_timeout => $global_timeout,
                 conns => {},
                 current => {},
                 busy => {},
                 idle => {},
                 connecting => {},
                 initializing => {},
                 init_queue_ix => {},
                 queue => [],
                 seq => 1,
                 query_seq => 1,
                 listener_by_channel => {},
                 listeners_by_conn => {},
               };

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

                $debug and $debug & 8 and $pool->_debug('on_error called for query $query');
                next;
            }
            $debug and $debug & 8 and $pool->_debug('starting new connection');
            $pool->_start_new_conn;
            return;
        }
        keys %$idle;
        my ($seq) = each %$idle;
        delete $idle->{$seq};
        $pool->{busy}{$seq} = 1;

        my $query = shift @{$pool->{queue}};
        $pool->_start_query($seq, $query);
    }
    $debug and $debug & 8 and $pool->_debug('queue is empty!');
}

my %error_severiry_fatal = map { $_ => 1 } qw(FATAL PANIC);

sub _on_query_result {

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

    if (my $query = delete $pool->{current}{$seq}) {
        if ($query->{max_retries}-- > 0) {
            $pool->_requeue_query($query);
        }
        else {
            $pool->_maybe_callback($query, 'on_error', $conn);
        }
    }

    if ($debug and $debug & 8) {
        my @states = grep $pool->{$_}{$seq}, qw(busy idle connecting initializing);
        $pool->_debug("removing broken connection in state(s!) @states, "
                      . "\$conn: $conn, \$pool->{conns}{$seq}: "
                      . ($pool->{conns}{$seq} // '<undef>'));
    }
    delete $pool->{busy}{$seq}
        or delete $pool->{idle}{$seq}
            or delete $pool->{initializing}{$seq}
                or die "internal error, pool is corrupted, seq: $seq\n" . Dumper($pool);

    delete $pool->{init_queue_ix}{$seq};
    delete $pool->{conns}{$seq};

    my $listeners = delete $pool->{listeners_by_conn}{$seq};

    if ($pool->{dead}) {

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

sub _on_conn_connect_error {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("unable to connect to database");

    $pool->_maybe_callback('on_transient_error');

    # the connection object will be removed from the Pool on the
    # on_error callback that will be called just after this one
    # returns:
    delete $pool->{connecting}{$seq};
    $pool->{busy}{$seq} = 1;

    if ($pool->{delay_watcher}) {
        $debug and $debug & 8 and $pool->_debug("a delayed reconnection is already queued");
        return;
    }

    my $now = time;
    # This failed connection is not counted against the limit
    # unless it is the only connection remaining. Effectively the
    # module will keep going until all the connections become

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

    my $query = { %{$init_queue->[$ix]} }; # clone
    $pool->{initializing}{$seq} = 1;
    $pool->_start_query($seq, $query);
    1;
}

sub _on_conn_empty_queue {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("conn $conn queue is now empty, seq: $seq");

    unless (delete $pool->{busy}{$seq} or
            delete $pool->{connecting}{$seq} or
            delete $pool->{initializing}{$seq}) {
        if ($debug) {
            $pool->_debug("pool object: \n" . Dumper($pool));
            die "internal error: empty_queue callback invoked by object not in state busy, connecting or initializing, seq: $seq";
        }
    }

    if (defined ($pool->{init_queue})) {
        $pool->_check_init_queue($seq) and return;
    }

    $pool->{idle}{$seq} = 1;
    $pool->_check_queue;
}



( run in 0.378 second using v1.01-cache-2.11-cpan-87723dcf8b7 )