AnyEvent-Pg
view release on metacpan or search on metacpan
lib/AnyEvent/Pg/Pool.pm view on Meta::CPAN
package AnyEvent::Pg::Pool;
our $VERSION = '0.14';
use strict;
use warnings;
use 5.010;
use Carp qw(verbose croak);
use Data::Dumper;
use Method::WeakCallback qw(weak_method_callback);
use AnyEvent::Pg;
BEGIN {
*debug = \$AnyEvent::Pg::debug;
*_maybe_callback = \&AnyEvent::Pg::_maybe_callback;
};
our $debug;
sub _debug {
my $pool = shift;
my $connecting = keys %{$pool->{connecting}};
my $initializing = keys %{$pool->{initializing}};
my $idle = keys %{$pool->{idle}};
my $busy = keys %{$pool->{busy}};
my $delayed = ($pool->{delay_watcher} ? 1 : 0);
my $total = keys %{$pool->{conns}};
local ($ENV{__DIE__}, $@);
my ($pkg, $file, $line, $method) = (caller 0);
$method =~ s/.*:://;
warn "[$pool c:$connecting/i:$initializing/-:$idle/b:$busy|t:$total|d:$delayed]\@${pkg}::$method> @_ at $file line $line\n";
}
my %default = ( connection_retries => 3,
connection_delay => 2,
timeout => 30,
size => 1 );
sub new {
my ($class, $conninfo, %opts) = @_;
$conninfo = { %$conninfo } if ref $conninfo;
my $size = delete $opts{size} // $default{size};
my $connection_retries = delete $opts{connection_retries} // $default{connection_retries};
my $connection_delay = delete $opts{connection_delay} // $default{connection_delay};
my $timeout = delete $opts{timeout} // $default{timeout};
my $global_timeout = delete $opts{global_timeout};
my $on_error = delete $opts{on_error} ;
my $on_connect_error = delete $opts{on_connect_error};
my $on_transient_error = delete $opts{on_transient_error};
# my $on_empty_queue = delete $opts{on_empty_queue};
my $pool = { conninfo => $conninfo,
size => $size,
on_error => $on_error,
on_connect_error => $on_connect_error,
on_transient_error => $on_transient_error,
# on_empty_queue => $on_empty_queue,
timeout => $timeout,
max_conn_retries => $connection_retries,
conn_retries => 0,
conn_delay => $connection_delay,
global_timeout => $global_timeout,
conns => {},
current => {},
busy => {},
idle => {},
connecting => {},
initializing => {},
init_queue_ix => {},
queue => [],
seq => 1,
query_seq => 1,
listener_by_channel => {},
listeners_by_conn => {},
};
bless $pool, $class;
&AE::postpone(weak_method_callback($pool, '_on_start'));
$pool;
}
sub is_dead { shift->{dead} }
sub set {
my $pool = shift;
while (@_) {
my $k = shift;
my $v = shift // $default{$k};
if ($k eq 'global_timeout') {
if (defined (my $gt = shift)) {
$pool->{max_conn_time} += $gt - $pool->{global_timeout}
if defined $pool->{max_conn_time};
}
else {
delete $pool->{max_conn_time};
}
$pool->{$k} = $v;
}
}
}
sub _on_start {}
sub push_query {
my ($pool, %opts) = @_;
my %query;
my $retry_on_sqlstate = delete $opts{retry_on_sqlstate};
$retry_on_sqlstate = { map { $_ => 1 } @$retry_on_sqlstate }
if ref($retry_on_sqlstate) eq 'ARRAY';
$query{retry_on_sqlstate} = $retry_on_sqlstate // {};
$query{$_} = delete $opts{$_} for qw(on_result on_error on_done query args max_retries);
$query{seq} = $pool->{query_seq}++;
my $query = \%query;
my $queue = ($opts{initialization} ? ($pool->{init_queue} //= []) : $pool->{queue});
if (defined(my $priority = $opts{priority})) {
$query{priority} = $priority;
# FIXME: improve the search algorithm used here
my $i;
for ($i = 0; $i < @$queue; $i++) {
my $p2 = $queue->[$i]{priority} // last;
$p2 >= $priority or last;
}
splice @$queue, $i, 0, $query;
$debug and $debug & 8 and $pool->_debug("query with priority $priority inserted into queue at position $i/$#$queue");
}
lib/AnyEvent/Pg/Pool.pm view on Meta::CPAN
unless ($pool->_listener_check_callbacks($channel)) {
$pool->_stop_listener($channel);
}
}
}
}
}
sub _is_queue_empty {
my $pool = shift;
my $queue = $pool->{queue};
$debug and $debug & 8 and $pool->_debug('raw queue size is ' . scalar @$queue);
while (@$queue) {
return unless $queue->[0]{canceled};
shift @$queue;
}
$debug and $debug & 8 and $pool->_debug('queue is empty');
return 1;
}
sub _start_query {
my ($pool, $seq, $query) = @_;
my $conn = $pool->{conns}{$seq}
or die("internal error, pool is corrupted, seq: $seq:\n" . Dumper($pool));
my $watcher = $conn->push_query(query => $query->{query},
args => $query->{args},
on_result => weak_method_callback($pool, '_on_query_result', $seq),
on_done => weak_method_callback($pool, '_on_query_done', $seq) );
$pool->{current}{$seq} = $query;
$query->{watcher} = $watcher;
$debug and $debug & 8 and $pool->_debug("query $query started on conn $conn, seq: $seq");
}
sub _check_queue {
my $pool = shift;
my $idle = $pool->{idle};
while (1) {
$debug and $debug & 8 and $pool->_debug('checking queue, there are '
. (scalar keys %$idle)
. ' idle connections, queue size is '
. (scalar @{$pool->{queue}}));
if ($pool->_is_queue_empty) {
$debug and $debug & 8 and $pool->_debug('queue is now empty');
last;
}
$debug and $debug & 8 and $pool->_debug('processing first query from the queue');
unless (%$idle) {
if ($pool->{dead}) {
my $query = shift @{$pool->{queue}};
$pool->_maybe_callback($query, 'on_error');
$debug and $debug & 8 and $pool->_debug('on_error called for query $query');
next;
}
$debug and $debug & 8 and $pool->_debug('starting new connection');
$pool->_start_new_conn;
return;
}
keys %$idle;
my ($seq) = each %$idle;
delete $idle->{$seq};
$pool->{busy}{$seq} = 1;
my $query = shift @{$pool->{queue}};
$pool->_start_query($seq, $query);
}
$debug and $debug & 8 and $pool->_debug('queue is empty!');
}
my %error_severiry_fatal = map { $_ => 1 } qw(FATAL PANIC);
sub _on_query_result {
my ($pool, $seq, $conn, $result) = @_;
my $query = $pool->{current}{$seq};
if ($debug and $debug & 8) {
$pool->_debug("query result $result received for query $query on connection $conn, seq: $seq");
$result->status == Pg::PQ::PGRES_FATAL_ERROR and
$pool->_debug("errorDescription:\n" . Dumper [$result->errorDescription]);
}
if ($query->{retry}) {
$debug and $debug & 8 and $pool->_debug("retry is set, ignoring later on_result");
}
else {
if ($query->{max_retries} and $result->status == Pg::PQ::PGRES_FATAL_ERROR) {
if ($query->{retry_on_sqlstate}{$result->errorField('sqlstate')}) {
$pool->_debug("this is a retry-able error, skipping the on_result callback");
$query->{retry} = 1;
return;
}
if ($error_severiry_fatal{$result->errorField('severity')}) {
$pool->_debug("this is a real FATAL error, skipping the on_result callback");
$query->{retry} = 1;
return;
}
}
$query->{max_retries} = 0;
$pool->_maybe_callback($query, 'on_result', $conn, $result);
}
}
sub _requeue_query {
my ($pool, $query) = @_;
$query->{priority} = 0 + 'inf';
unshift @{$pool->{queue}}, $query;
}
sub _on_query_done {
my ($pool, $seq, $conn) = @_;
my $query = delete $pool->{current}{$seq};
if (delete $query->{retry}) {
$debug and $debug & 8 and $pool->_debug("unshifting failed query into queue");
$query->{max_retries}--;
$pool->_requeue_query($query);
}
else {
$pool->_maybe_callback($query, 'on_done', $conn);
}
}
sub _start_new_conn {
my $pool = shift;
if (keys %{$pool->{conns}} < $pool->{size} and
!%{$pool->{connecting}} and
$pool->{conn_retries} <= $pool->{max_conn_retries} and
!$pool->{delay_watcher}) {
my $seq = $pool->{seq}++;
$debug and $debug & 8 and $pool->_debug("starting new connection, seq: $seq");
my $conn = AnyEvent::Pg->new($pool->{conninfo},
timeout => $pool->{timeout},
on_connect => weak_method_callback($pool, '_on_conn_connect', $seq),
on_connect_error => weak_method_callback($pool, '_on_conn_connect_error', $seq),
on_empty_queue => weak_method_callback($pool, '_on_conn_empty_queue', $seq),
on_error => weak_method_callback($pool, '_on_conn_error', $seq),
on_notify => weak_method_callback($pool, '_on_notify', $seq),
seq => $seq,
);
$debug and $debug & 8 and $pool->_debug("new connection started, seq: $seq, conn: $conn");
$pool->{conns}{$seq} = $conn;
$pool->{connecting}{$seq} = 1;
}
else {
$debug and $debug & 8 and $pool->_debug('not starting new connection, conns: '
. (scalar keys %{$pool->{conns}})
. ", retries: $pool->{conn_retries}, connecting: "
. (scalar keys %{$pool->{connecting}}));
}
}
sub _on_conn_error {
my ($pool, $seq, $conn) = @_;
# note that failed initialization queries also come over here
if (my $query = delete $pool->{current}{$seq}) {
if ($query->{max_retries}-- > 0) {
$pool->_requeue_query($query);
}
else {
$pool->_maybe_callback($query, 'on_error', $conn);
}
}
if ($debug and $debug & 8) {
my @states = grep $pool->{$_}{$seq}, qw(busy idle connecting initializing);
$pool->_debug("removing broken connection in state(s!) @states, "
. "\$conn: $conn, \$pool->{conns}{$seq}: "
. ($pool->{conns}{$seq} // '<undef>'));
}
delete $pool->{busy}{$seq}
or delete $pool->{idle}{$seq}
or delete $pool->{initializing}{$seq}
or die "internal error, pool is corrupted, seq: $seq\n" . Dumper($pool);
delete $pool->{init_queue_ix}{$seq};
delete $pool->{conns}{$seq};
my $listeners = delete $pool->{listeners_by_conn}{$seq};
if ($pool->{dead}) {
$pool->_maybe_callback('on_connect_error', $conn);
}
else {
$pool->_maybe_callback('on_transient_error');
if ($listeners) {
$pool->_start_listener($_) for keys %$listeners;
}
else {
$debug and $debug & 4 and $pool->_debug("connection $seq had no listeners attached: " .
Dumper($pool->{listeners_by_conn}));
}
}
$pool->_check_queue;
}
sub _on_conn_connect {
my ($pool, $seq, $conn) = @_;
$debug and $debug & 8 and $pool->_debug("conn $conn is now connected, seq: $seq");
$pool->{conn_retries} = 0;
delete $pool->{max_conn_time};
# _on_conn_empty_queue is called afterwards by the $conn object
}
sub _on_conn_connect_error {
my ($pool, $seq, $conn) = @_;
$debug and $debug & 8 and $pool->_debug("unable to connect to database");
$pool->_maybe_callback('on_transient_error');
# the connection object will be removed from the Pool on the
# on_error callback that will be called just after this one
# returns:
delete $pool->{connecting}{$seq};
$pool->{busy}{$seq} = 1;
if ($pool->{delay_watcher}) {
$debug and $debug & 8 and $pool->_debug("a delayed reconnection is already queued");
return;
}
my $now = time;
# This failed connection is not counted against the limit
# unless it is the only connection remaining. Effectively the
# module will keep going until all the connections become
# broken and no more connections can be established.
unless (keys(%{$pool->{conns}}) > 1) {
$pool->{conn_retries}++;
if ($pool->{global_timeout}) {
$pool->{max_conn_time} ||= $now + $pool->{global_timeout} - $pool->{conn_delay};
}
}
if ($pool->{conn_retries} <= $pool->{max_conn_retries}) {
if (not $pool->{max_conn_time} or $pool->{max_conn_time} >= $now) {
$debug and $debug & 8 and $pool->_debug("starting timer for delayed reconnection $pool->{conn_delay}s");
$pool->{delay_watcher} = AE::timer $pool->{conn_delay}, 0, weak_method_callback($pool, '_on_delayed_reconnect');
return
}
$debug and $debug & 8 and $pool->_debug("global_timeout expired");
}
# giving up!
$debug and $debug & 8 and $pool->_debug("it has been impossible to connect to the database, giving up!!!");
$pool->{dead} = 1;
# processing continues on the on_conn_error callback
}
sub _on_fatal_connect_error {
my ($pool, $conn) = @_;
# This error is fatal. After it happens, everything is going to
# fail.
$pool->{dead} = 1;
}
sub _on_delayed_reconnect {
my $pool = shift;
$debug and $debug & 8 and $pool->_debug("_on_delayed_reconnect called");
undef $pool->{delay_watcher};
$pool->_start_new_conn;
}
sub _check_init_queue_idle {
my $pool = shift;
my $idle = $pool->{idle};
for my $seq (keys %$idle) {
delete $idle->{$seq};
$pool->_check_init_queue($seq);
}
}
sub _check_init_queue {
my ($pool, $seq) = @_;
my $init_queue = $pool->{init_queue};
no warnings 'uninitialized';
return if $pool->{init_queue_ix}{$seq} >= @$init_queue;
my $ix = $pool->{init_queue_ix}{$seq}++;
my $query = { %{$init_queue->[$ix]} }; # clone
$pool->{initializing}{$seq} = 1;
$pool->_start_query($seq, $query);
1;
}
sub _on_conn_empty_queue {
my ($pool, $seq, $conn) = @_;
$debug and $debug & 8 and $pool->_debug("conn $conn queue is now empty, seq: $seq");
unless (delete $pool->{busy}{$seq} or
delete $pool->{connecting}{$seq} or
delete $pool->{initializing}{$seq}) {
if ($debug) {
$pool->_debug("pool object: \n" . Dumper($pool));
die "internal error: empty_queue callback invoked by object not in state busy, connecting or initializing, seq: $seq";
}
}
if (defined ($pool->{init_queue})) {
$pool->_check_init_queue($seq) and return;
}
$pool->{idle}{$seq} = 1;
$pool->_check_queue;
}
package AnyEvent::Pg::Pool::Watcher;
sub _new {
my ($class, $obj) = @_;
my $watcher = \$obj;
bless $watcher, $class;
}
sub DESTROY {
my $watcher = shift;
my $obj = $$watcher // {};
$obj->{canceled} = 1;
}
package AnyEvent::Pg::Pool::QueryWatcher;
our @ISA = ('AnyEvent::Pg::Pool::Watcher');
sub DESTROY {
my $watcher = shift;
my $obj = $$watcher // {};
$obj->{canceled} = 1;
# delete also the watcher for the slave query sent to the conn
# object:
delete $obj->{watcher};
}
package AnyEvent::Pg::Pool::ListenerWatcher;
our @ISA = ('AnyEvent::Pg::Pool::Watcher');
1;
=head1 NAME
AnyEvent::Pg::Pool
=head1 SYNOPSIS
my $pool = AnyEvent::Pg::Pool->new($conninfo,
on_connect_error => \&on_db_is_dead);
my $qw = $pool->push_query(query => 'select * from foo',
on_result => sub { ... });
my $lw = $pool->listen('bar',
on_notify => sub { ... });
=head1 DESCRIPTION
( run in 0.986 second using v1.01-cache-2.11-cpan-39bf76dae61 )