AnyEvent-Pg
view release on metacpan or search on metacpan
lib/AnyEvent/Pg.pm view on Meta::CPAN
}
$debug and $debug & 2 and $self->_debug("looking for notifications");
while (my @notify = $dbc->notifies) {
$debug and $debug & 2 and $self->_debug("notify recived: @notify");
$self->_maybe_callback(on_notify => @notify);
}
if (defined (my $cq = $self->{current_query})) {
while (1) {
if ($self->{write_watcher} or $dbc->busy) {
$debug and $debug & 1 and $self->_debug($self->{write_watcher}
? "wants to write and read"
: "wants to read");
$self->{timeout_watcher} = AE::timer $self->{timeout}, 0, weak_method_callback_cached($self, '_on_timeout')
if $self->{timeout};
return;
}
else {
$debug and $debug & 1 and $self->_debug("data available");
lib/AnyEvent/Pg/Pool.pm view on Meta::CPAN
*_maybe_callback = \&AnyEvent::Pg::_maybe_callback;
};
our $debug;
sub _debug {
my $pool = shift;
my $connecting = keys %{$pool->{connecting}};
my $initializing = keys %{$pool->{initializing}};
my $idle = keys %{$pool->{idle}};
my $busy = keys %{$pool->{busy}};
my $delayed = ($pool->{delay_watcher} ? 1 : 0);
my $total = keys %{$pool->{conns}};
local ($ENV{__DIE__}, $@);
my ($pkg, $file, $line, $method) = (caller 0);
$method =~ s/.*:://;
warn "[$pool c:$connecting/i:$initializing/-:$idle/b:$busy|t:$total|d:$delayed]\@${pkg}::$method> @_ at $file line $line\n";
}
my %default = ( connection_retries => 3,
connection_delay => 2,
timeout => 30,
size => 1 );
sub new {
my ($class, $conninfo, %opts) = @_;
$conninfo = { %$conninfo } if ref $conninfo;
lib/AnyEvent/Pg/Pool.pm view on Meta::CPAN
on_connect_error => $on_connect_error,
on_transient_error => $on_transient_error,
# on_empty_queue => $on_empty_queue,
timeout => $timeout,
max_conn_retries => $connection_retries,
conn_retries => 0,
conn_delay => $connection_delay,
global_timeout => $global_timeout,
conns => {},
current => {},
busy => {},
idle => {},
connecting => {},
initializing => {},
init_queue_ix => {},
queue => [],
seq => 1,
query_seq => 1,
listener_by_channel => {},
listeners_by_conn => {},
};
lib/AnyEvent/Pg/Pool.pm view on Meta::CPAN
$debug and $debug & 8 and $pool->_debug('on_error called for query $query');
next;
}
$debug and $debug & 8 and $pool->_debug('starting new connection');
$pool->_start_new_conn;
return;
}
keys %$idle;
my ($seq) = each %$idle;
delete $idle->{$seq};
$pool->{busy}{$seq} = 1;
my $query = shift @{$pool->{queue}};
$pool->_start_query($seq, $query);
}
$debug and $debug & 8 and $pool->_debug('queue is empty!');
}
my %error_severiry_fatal = map { $_ => 1 } qw(FATAL PANIC);
sub _on_query_result {
lib/AnyEvent/Pg/Pool.pm view on Meta::CPAN
if (my $query = delete $pool->{current}{$seq}) {
if ($query->{max_retries}-- > 0) {
$pool->_requeue_query($query);
}
else {
$pool->_maybe_callback($query, 'on_error', $conn);
}
}
if ($debug and $debug & 8) {
my @states = grep $pool->{$_}{$seq}, qw(busy idle connecting initializing);
$pool->_debug("removing broken connection in state(s!) @states, "
. "\$conn: $conn, \$pool->{conns}{$seq}: "
. ($pool->{conns}{$seq} // '<undef>'));
}
delete $pool->{busy}{$seq}
or delete $pool->{idle}{$seq}
or delete $pool->{initializing}{$seq}
or die "internal error, pool is corrupted, seq: $seq\n" . Dumper($pool);
delete $pool->{init_queue_ix}{$seq};
delete $pool->{conns}{$seq};
my $listeners = delete $pool->{listeners_by_conn}{$seq};
if ($pool->{dead}) {
lib/AnyEvent/Pg/Pool.pm view on Meta::CPAN
sub _on_conn_connect_error {
my ($pool, $seq, $conn) = @_;
$debug and $debug & 8 and $pool->_debug("unable to connect to database");
$pool->_maybe_callback('on_transient_error');
# the connection object will be removed from the Pool on the
# on_error callback that will be called just after this one
# returns:
delete $pool->{connecting}{$seq};
$pool->{busy}{$seq} = 1;
if ($pool->{delay_watcher}) {
$debug and $debug & 8 and $pool->_debug("a delayed reconnection is already queued");
return;
}
my $now = time;
# This failed connection is not counted against the limit
# unless it is the only connection remaining. Effectively the
# module will keep going until all the connections become
lib/AnyEvent/Pg/Pool.pm view on Meta::CPAN
my $query = { %{$init_queue->[$ix]} }; # clone
$pool->{initializing}{$seq} = 1;
$pool->_start_query($seq, $query);
1;
}
sub _on_conn_empty_queue {
my ($pool, $seq, $conn) = @_;
$debug and $debug & 8 and $pool->_debug("conn $conn queue is now empty, seq: $seq");
unless (delete $pool->{busy}{$seq} or
delete $pool->{connecting}{$seq} or
delete $pool->{initializing}{$seq}) {
if ($debug) {
$pool->_debug("pool object: \n" . Dumper($pool));
die "internal error: empty_queue callback invoked by object not in state busy, connecting or initializing, seq: $seq";
}
}
if (defined ($pool->{init_queue})) {
$pool->_check_init_queue($seq) and return;
}
$pool->{idle}{$seq} = 1;
$pool->_check_queue;
}
( run in 0.378 second using v1.01-cache-2.11-cpan-87723dcf8b7 )