AnyEvent-Pg

 view release on metacpan or  search on metacpan

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

package AnyEvent::Pg::Pool;

our $VERSION = '0.14';

use strict;
use warnings;
use 5.010;

use Carp qw(verbose croak);
use Data::Dumper;

use Method::WeakCallback qw(weak_method_callback);
use AnyEvent::Pg;
BEGIN {
    *debug = \$AnyEvent::Pg::debug;
    *_maybe_callback = \&AnyEvent::Pg::_maybe_callback;
};

our $debug;

sub _debug {
    my $pool = shift;
    my $connecting   = keys %{$pool->{connecting}};
    my $initializing = keys %{$pool->{initializing}};
    my $idle         = keys %{$pool->{idle}};
    my $busy         = keys %{$pool->{busy}};
    my $delayed      = ($pool->{delay_watcher} ? 1 : 0);
    my $total        = keys %{$pool->{conns}};
    local ($ENV{__DIE__}, $@);
    my ($pkg, $file, $line, $method) = (caller 0);
    $method =~ s/.*:://;
    warn "[$pool c:$connecting/i:$initializing/-:$idle/b:$busy|t:$total|d:$delayed]\@${pkg}::$method> @_ at $file line $line\n";
}

my %default = ( connection_retries => 3,
                connection_delay => 2,
                timeout => 30,
                size => 1 );

sub new {
    my ($class, $conninfo, %opts) = @_;
    $conninfo = { %$conninfo } if ref $conninfo;
    my $size = delete $opts{size} // $default{size};
    my $connection_retries = delete $opts{connection_retries} // $default{connection_retries};
    my $connection_delay = delete $opts{connection_delay} // $default{connection_delay};
    my $timeout = delete $opts{timeout} // $default{timeout};
    my $global_timeout = delete $opts{global_timeout};
    my $on_error = delete $opts{on_error} ;
    my $on_connect_error = delete $opts{on_connect_error};
    my $on_transient_error = delete $opts{on_transient_error};
    # my $on_empty_queue = delete $opts{on_empty_queue};
    my $pool = { conninfo => $conninfo,
                 size => $size,
                 on_error => $on_error,
                 on_connect_error => $on_connect_error,
                 on_transient_error => $on_transient_error,
                 # on_empty_queue => $on_empty_queue,
                 timeout => $timeout,
                 max_conn_retries => $connection_retries,
                 conn_retries => 0,
                 conn_delay => $connection_delay,
                 global_timeout => $global_timeout,
                 conns => {},
                 current => {},
                 busy => {},
                 idle => {},
                 connecting => {},
                 initializing => {},
                 init_queue_ix => {},
                 queue => [],
                 seq => 1,
                 query_seq => 1,
                 listener_by_channel => {},
                 listeners_by_conn => {},
               };
    bless $pool, $class;
    &AE::postpone(weak_method_callback($pool, '_on_start'));
    $pool;
}

sub is_dead { shift->{dead} }

sub set {
    my $pool = shift;
    while (@_) {
        my $k = shift;
        my $v = shift // $default{$k};
        if ($k eq 'global_timeout') {
            if (defined (my $gt = shift)) {
                $pool->{max_conn_time} += $gt - $pool->{global_timeout}
                    if defined $pool->{max_conn_time};
            }
            else {
                delete $pool->{max_conn_time};
            }
            $pool->{$k} = $v;
        }
    }
}

sub _on_start {}

sub push_query {
    my ($pool, %opts) = @_;
    my %query;
    my $retry_on_sqlstate = delete $opts{retry_on_sqlstate};
    $retry_on_sqlstate = { map { $_ => 1 } @$retry_on_sqlstate }
        if ref($retry_on_sqlstate) eq 'ARRAY';
    $query{retry_on_sqlstate} = $retry_on_sqlstate // {};
    $query{$_} = delete $opts{$_} for qw(on_result on_error on_done query args max_retries);
    $query{seq} = $pool->{query_seq}++;
    my $query = \%query;

    my $queue = ($opts{initialization} ? ($pool->{init_queue} //= []) : $pool->{queue});
    if (defined(my $priority = $opts{priority})) {
        $query{priority} = $priority;
        # FIXME: improve the search algorithm used here
        my $i;
        for ($i = 0; $i < @$queue; $i++) {
            my $p2 = $queue->[$i]{priority} // last;
            $p2 >= $priority or last;
        }
        splice @$queue, $i, 0, $query;
        $debug and $debug & 8 and $pool->_debug("query with priority $priority inserted into queue at position $i/$#$queue");
    }

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

                unless ($pool->_listener_check_callbacks($channel)) {
                    $pool->_stop_listener($channel);
                }
            }
        }
    }
}

sub _is_queue_empty {
    my $pool = shift;
    my $queue = $pool->{queue};
    $debug and $debug & 8 and $pool->_debug('raw queue size is ' . scalar @$queue);
    while (@$queue) {
        return unless $queue->[0]{canceled};
        shift @$queue;
    }
    $debug and $debug & 8 and $pool->_debug('queue is empty');
    return 1;
}

sub _start_query {
    my ($pool, $seq, $query) = @_;
    my $conn = $pool->{conns}{$seq}
        or die("internal error, pool is corrupted, seq: $seq:\n" . Dumper($pool));
    my $watcher = $conn->push_query(query     => $query->{query},
                                    args      => $query->{args},
                                    on_result => weak_method_callback($pool, '_on_query_result', $seq),
                                    on_done   => weak_method_callback($pool, '_on_query_done',   $seq) );
    $pool->{current}{$seq} = $query;
    $query->{watcher} = $watcher;
    $debug and $debug & 8 and $pool->_debug("query $query started on conn $conn, seq: $seq");
}

sub _check_queue {
    my $pool = shift;
    my $idle = $pool->{idle};
    while (1) {
        $debug and $debug & 8 and $pool->_debug('checking queue, there are '
                                                . (scalar keys %$idle)
                                                . ' idle connections, queue size is '
                                                . (scalar @{$pool->{queue}}));
        if ($pool->_is_queue_empty) {
            $debug and $debug & 8 and $pool->_debug('queue is now empty');
            last;
        }
        $debug and $debug & 8 and $pool->_debug('processing first query from the queue');
        unless (%$idle) {
            if ($pool->{dead}) {
                my $query = shift @{$pool->{queue}};
                $pool->_maybe_callback($query, 'on_error');
                $debug and $debug & 8 and $pool->_debug('on_error called for query $query');
                next;
            }
            $debug and $debug & 8 and $pool->_debug('starting new connection');
            $pool->_start_new_conn;
            return;
        }
        keys %$idle;
        my ($seq) = each %$idle;
        delete $idle->{$seq};
        $pool->{busy}{$seq} = 1;

        my $query = shift @{$pool->{queue}};
        $pool->_start_query($seq, $query);
    }
    $debug and $debug & 8 and $pool->_debug('queue is empty!');
}

my %error_severiry_fatal = map { $_ => 1 } qw(FATAL PANIC);

sub _on_query_result {
    my ($pool, $seq, $conn, $result) = @_;
    my $query = $pool->{current}{$seq};
    if ($debug and $debug & 8) {
        $pool->_debug("query result $result received for query $query on connection $conn, seq: $seq");
        $result->status == Pg::PQ::PGRES_FATAL_ERROR and
            $pool->_debug("errorDescription:\n" . Dumper [$result->errorDescription]);
    }
    if ($query->{retry}) {
        $debug and $debug & 8 and $pool->_debug("retry is set, ignoring later on_result");
    }
    else {
        if ($query->{max_retries} and $result->status == Pg::PQ::PGRES_FATAL_ERROR) {
            if ($query->{retry_on_sqlstate}{$result->errorField('sqlstate')}) {
                $pool->_debug("this is a retry-able error, skipping the on_result callback");
                $query->{retry} = 1;
                return;
            }
            if ($error_severiry_fatal{$result->errorField('severity')}) {
                $pool->_debug("this is a real FATAL error, skipping the on_result callback");
                $query->{retry} = 1;
                return;
            }
        }
        $query->{max_retries} = 0;
        $pool->_maybe_callback($query, 'on_result', $conn, $result);
    }
}

sub _requeue_query {
    my ($pool, $query) = @_;
    $query->{priority} = 0 + 'inf';
    unshift @{$pool->{queue}}, $query;
}

sub _on_query_done {
    my ($pool, $seq, $conn) = @_;
    my $query = delete $pool->{current}{$seq};
    if (delete $query->{retry}) {
        $debug and $debug & 8 and $pool->_debug("unshifting failed query into queue");
        $query->{max_retries}--;
        $pool->_requeue_query($query);
    }
    else {
        $pool->_maybe_callback($query, 'on_done', $conn);
    }
}

sub _start_new_conn {
    my $pool = shift;
    if (keys %{$pool->{conns}} < $pool->{size}             and
        !%{$pool->{connecting}}                            and
        $pool->{conn_retries} <= $pool->{max_conn_retries} and
        !$pool->{delay_watcher}) {
        my $seq = $pool->{seq}++;
        $debug and $debug & 8 and $pool->_debug("starting new connection, seq: $seq");
        my $conn = AnyEvent::Pg->new($pool->{conninfo},
                                     timeout => $pool->{timeout},
                                     on_connect => weak_method_callback($pool, '_on_conn_connect', $seq),
                                     on_connect_error => weak_method_callback($pool, '_on_conn_connect_error', $seq),
                                     on_empty_queue => weak_method_callback($pool, '_on_conn_empty_queue', $seq),
                                     on_error => weak_method_callback($pool, '_on_conn_error', $seq),
                                     on_notify => weak_method_callback($pool, '_on_notify', $seq),
                                     seq => $seq,
                                    );
        $debug and $debug & 8 and $pool->_debug("new connection started, seq: $seq, conn: $conn");
        $pool->{conns}{$seq} = $conn;
        $pool->{connecting}{$seq} = 1;
    }
    else {
        $debug and $debug & 8 and $pool->_debug('not starting new connection, conns: '
                                                . (scalar keys %{$pool->{conns}})
                                                . ", retries: $pool->{conn_retries}, connecting: "
                                                . (scalar keys %{$pool->{connecting}}));
    }
}

sub _on_conn_error {
    my ($pool, $seq, $conn) = @_;

    # note that failed initialization queries also come over here
    if (my $query = delete $pool->{current}{$seq}) {
        if ($query->{max_retries}-- > 0) {
            $pool->_requeue_query($query);
        }
        else {
            $pool->_maybe_callback($query, 'on_error', $conn);
        }
    }

    if ($debug and $debug & 8) {
        my @states = grep $pool->{$_}{$seq}, qw(busy idle connecting initializing);
        $pool->_debug("removing broken connection in state(s!) @states, "
                      . "\$conn: $conn, \$pool->{conns}{$seq}: "
                      . ($pool->{conns}{$seq} // '<undef>'));
    }
    delete $pool->{busy}{$seq}
        or delete $pool->{idle}{$seq}
            or delete $pool->{initializing}{$seq}
                or die "internal error, pool is corrupted, seq: $seq\n" . Dumper($pool);

    delete $pool->{init_queue_ix}{$seq};
    delete $pool->{conns}{$seq};

    my $listeners = delete $pool->{listeners_by_conn}{$seq};

    if ($pool->{dead}) {
        $pool->_maybe_callback('on_connect_error', $conn);
    }
    else {
        $pool->_maybe_callback('on_transient_error');

        if ($listeners) {
            $pool->_start_listener($_) for keys %$listeners;
        }
        else {
            $debug and $debug & 4 and $pool->_debug("connection $seq had no listeners attached: " .
                                                    Dumper($pool->{listeners_by_conn}));
        }
    }

    $pool->_check_queue;
}

sub _on_conn_connect {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("conn $conn is now connected, seq: $seq");
    $pool->{conn_retries} = 0;
    delete $pool->{max_conn_time};
    # _on_conn_empty_queue is called afterwards by the $conn object
}

sub _on_conn_connect_error {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("unable to connect to database");

    $pool->_maybe_callback('on_transient_error');

    # the connection object will be removed from the Pool on the
    # on_error callback that will be called just after this one
    # returns:
    delete $pool->{connecting}{$seq};
    $pool->{busy}{$seq} = 1;

    if ($pool->{delay_watcher}) {
        $debug and $debug & 8 and $pool->_debug("a delayed reconnection is already queued");
        return;
    }

    my $now = time;
    # This failed connection is not counted against the limit
    # unless it is the only connection remaining. Effectively the
    # module will keep going until all the connections become
    # broken and no more connections can be established.
    unless (keys(%{$pool->{conns}}) > 1) {
        $pool->{conn_retries}++;
        if ($pool->{global_timeout}) {
            $pool->{max_conn_time} ||= $now + $pool->{global_timeout} - $pool->{conn_delay};
        }
    }

    if ($pool->{conn_retries} <= $pool->{max_conn_retries}) {
        if (not $pool->{max_conn_time} or $pool->{max_conn_time} >= $now) {
            $debug and $debug & 8 and $pool->_debug("starting timer for delayed reconnection $pool->{conn_delay}s");
            $pool->{delay_watcher} = AE::timer $pool->{conn_delay}, 0, weak_method_callback($pool, '_on_delayed_reconnect');
            return
        }
        $debug and $debug & 8 and $pool->_debug("global_timeout expired");
    }

    # giving up!
    $debug and $debug & 8 and $pool->_debug("it has been impossible to connect to the database, giving up!!!");
    $pool->{dead} = 1;

    # processing continues on the on_conn_error callback
}

sub _on_fatal_connect_error {
    my ($pool, $conn) = @_;
    # This error is fatal. After it happens, everything is going to
    # fail.
    $pool->{dead} = 1;

}

sub _on_delayed_reconnect {
    my $pool = shift;
    $debug and $debug & 8 and $pool->_debug("_on_delayed_reconnect called");
    undef $pool->{delay_watcher};
    $pool->_start_new_conn;
}

sub _check_init_queue_idle {
    my $pool = shift;
    my $idle = $pool->{idle};
    for my $seq (keys %$idle) {
        delete $idle->{$seq};
        $pool->_check_init_queue($seq);
    }
}

sub _check_init_queue {
    my ($pool, $seq) = @_;
    my $init_queue = $pool->{init_queue};
    no warnings 'uninitialized';
    return if $pool->{init_queue_ix}{$seq} >= @$init_queue;
    my $ix = $pool->{init_queue_ix}{$seq}++;
    my $query = { %{$init_queue->[$ix]} }; # clone
    $pool->{initializing}{$seq} = 1;
    $pool->_start_query($seq, $query);
    1;
}

sub _on_conn_empty_queue {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("conn $conn queue is now empty, seq: $seq");

    unless (delete $pool->{busy}{$seq} or
            delete $pool->{connecting}{$seq} or
            delete $pool->{initializing}{$seq}) {
        if ($debug) {
            $pool->_debug("pool object: \n" . Dumper($pool));
            die "internal error: empty_queue callback invoked by object not in state busy, connecting or initializing, seq: $seq";
        }
    }

    if (defined ($pool->{init_queue})) {
        $pool->_check_init_queue($seq) and return;
    }

    $pool->{idle}{$seq} = 1;
    $pool->_check_queue;
}


package AnyEvent::Pg::Pool::Watcher;

sub _new {
    my ($class, $obj) = @_;
    my $watcher = \$obj;
    bless $watcher, $class;
}

sub DESTROY {
    my $watcher = shift;
    my $obj = $$watcher // {};
    $obj->{canceled} = 1;
}

package AnyEvent::Pg::Pool::QueryWatcher;
our @ISA = ('AnyEvent::Pg::Pool::Watcher');

sub DESTROY {
    my $watcher = shift;
    my $obj = $$watcher // {};
    $obj->{canceled} = 1;
    # delete also the watcher for the slave query sent to the conn
    # object:
    delete $obj->{watcher};
}

package AnyEvent::Pg::Pool::ListenerWatcher;
our @ISA = ('AnyEvent::Pg::Pool::Watcher');


1;

=head1 NAME

AnyEvent::Pg::Pool

=head1 SYNOPSIS

  my $pool = AnyEvent::Pg::Pool->new($conninfo,
                                     on_connect_error => \&on_db_is_dead);

  my $qw = $pool->push_query(query => 'select * from foo',
                             on_result => sub { ... });

  my $lw = $pool->listen('bar',
                         on_notify => sub { ... });

=head1 DESCRIPTION



( run in 0.986 second using v1.01-cache-2.11-cpan-39bf76dae61 )