AnyEvent-Pg

 view release on metacpan or  search on metacpan

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

package AnyEvent::Pg::Pool;

our $VERSION = '0.14';

use strict;
use warnings;
use 5.010;

use Carp qw(verbose croak);
use Data::Dumper;

use Method::WeakCallback qw(weak_method_callback);
use AnyEvent::Pg;
BEGIN {
    *debug = \$AnyEvent::Pg::debug;
    *_maybe_callback = \&AnyEvent::Pg::_maybe_callback;
};

our $debug;

sub _debug {
    my $pool = shift;
    my $connecting   = keys %{$pool->{connecting}};
    my $initializing = keys %{$pool->{initializing}};
    my $idle         = keys %{$pool->{idle}};
    my $busy         = keys %{$pool->{busy}};
    my $delayed      = ($pool->{delay_watcher} ? 1 : 0);
    my $total        = keys %{$pool->{conns}};
    local ($ENV{__DIE__}, $@);
    my ($pkg, $file, $line, $method) = (caller 0);
    $method =~ s/.*:://;
    warn "[$pool c:$connecting/i:$initializing/-:$idle/b:$busy|t:$total|d:$delayed]\@${pkg}::$method> @_ at $file line $line\n";
}

my %default = ( connection_retries => 3,
                connection_delay => 2,
                timeout => 30,
                size => 1 );

sub new {
    my ($class, $conninfo, %opts) = @_;
    $conninfo = { %$conninfo } if ref $conninfo;
    my $size = delete $opts{size} // $default{size};
    my $connection_retries = delete $opts{connection_retries} // $default{connection_retries};
    my $connection_delay = delete $opts{connection_delay} // $default{connection_delay};
    my $timeout = delete $opts{timeout} // $default{timeout};
    my $global_timeout = delete $opts{global_timeout};
    my $on_error = delete $opts{on_error} ;
    my $on_connect_error = delete $opts{on_connect_error};
    my $on_transient_error = delete $opts{on_transient_error};
    # my $on_empty_queue = delete $opts{on_empty_queue};
    my $pool = { conninfo => $conninfo,
                 size => $size,
                 on_error => $on_error,
                 on_connect_error => $on_connect_error,
                 on_transient_error => $on_transient_error,
                 # on_empty_queue => $on_empty_queue,
                 timeout => $timeout,
                 max_conn_retries => $connection_retries,
                 conn_retries => 0,
                 conn_delay => $connection_delay,
                 global_timeout => $global_timeout,
                 conns => {},
                 current => {},
                 busy => {},
                 idle => {},
                 connecting => {},
                 initializing => {},
                 init_queue_ix => {},
                 queue => [],
                 seq => 1,
                 query_seq => 1,
                 listener_by_channel => {},
                 listeners_by_conn => {},
               };
    bless $pool, $class;
    &AE::postpone(weak_method_callback($pool, '_on_start'));
    $pool;
}

sub is_dead { shift->{dead} }

sub set {
    my $pool = shift;
    while (@_) {
        my $k = shift;
        my $v = shift // $default{$k};
        if ($k eq 'global_timeout') {
            if (defined (my $gt = shift)) {
                $pool->{max_conn_time} += $gt - $pool->{global_timeout}
                    if defined $pool->{max_conn_time};
            }
            else {
                delete $pool->{max_conn_time};
            }
            $pool->{$k} = $v;
        }
    }
}

sub _on_start {}

sub push_query {
    my ($pool, %opts) = @_;
    my %query;
    my $retry_on_sqlstate = delete $opts{retry_on_sqlstate};
    $retry_on_sqlstate = { map { $_ => 1 } @$retry_on_sqlstate }
        if ref($retry_on_sqlstate) eq 'ARRAY';
    $query{retry_on_sqlstate} = $retry_on_sqlstate // {};
    $query{$_} = delete $opts{$_} for qw(on_result on_error on_done query args max_retries);
    $query{seq} = $pool->{query_seq}++;
    my $query = \%query;

    my $queue = ($opts{initialization} ? ($pool->{init_queue} //= []) : $pool->{queue});
    if (defined(my $priority = $opts{priority})) {
        $query{priority} = $priority;
        # FIXME: improve the search algorithm used here
        my $i;
        for ($i = 0; $i < @$queue; $i++) {
            my $p2 = $queue->[$i]{priority} // last;
            $p2 >= $priority or last;
        }
        splice @$queue, $i, 0, $query;
        $debug and $debug & 8 and $pool->_debug("query with priority $priority inserted into queue at position $i/$#$queue");
    }
    else {
        push @$queue, $query;
    }

    if ($opts{initialization}) {
        &AE::postpone(weak_method_callback($pool, '_check_init_queue_idle'));
        $debug and $debug & 8 and $pool->_debug('initialization query pushed into queue, queue size is now ' . scalar @$queue);
    }
    else {
        &AE::postpone(weak_method_callback($pool, '_check_queue'));
        $debug and $debug & 8 and $pool->_debug('query pushed into queue, raw queue size is now ' . scalar @$queue);
        return AnyEvent::Pg::Pool::QueryWatcher->_new($query)
            if defined wantarray;
    }
    ()
}

sub _postponed_on_listener_started_callback {
    my ($pool, $callback, $channel) = @_;
    # at this point, even if unlikey, the listener may be
    # not in state 'running' anymore, but we ignore that
    # possibility as the on_listener_started is just a
    # hint.
    $pool->_maybe_callback($callback, 'on_listener_started', $channel)
        unless $callback->{cancelled};

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

    delete $pool->{busy}{$seq}
        or delete $pool->{idle}{$seq}
            or delete $pool->{initializing}{$seq}
                or die "internal error, pool is corrupted, seq: $seq\n" . Dumper($pool);

    delete $pool->{init_queue_ix}{$seq};
    delete $pool->{conns}{$seq};

    my $listeners = delete $pool->{listeners_by_conn}{$seq};

    if ($pool->{dead}) {
        $pool->_maybe_callback('on_connect_error', $conn);
    }
    else {
        $pool->_maybe_callback('on_transient_error');

        if ($listeners) {
            $pool->_start_listener($_) for keys %$listeners;
        }
        else {
            $debug and $debug & 4 and $pool->_debug("connection $seq had no listeners attached: " .
                                                    Dumper($pool->{listeners_by_conn}));
        }
    }

    $pool->_check_queue;
}

sub _on_conn_connect {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("conn $conn is now connected, seq: $seq");
    $pool->{conn_retries} = 0;
    delete $pool->{max_conn_time};
    # _on_conn_empty_queue is called afterwards by the $conn object
}

sub _on_conn_connect_error {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("unable to connect to database");

    $pool->_maybe_callback('on_transient_error');

    # the connection object will be removed from the Pool on the
    # on_error callback that will be called just after this one
    # returns:
    delete $pool->{connecting}{$seq};
    $pool->{busy}{$seq} = 1;

    if ($pool->{delay_watcher}) {
        $debug and $debug & 8 and $pool->_debug("a delayed reconnection is already queued");
        return;
    }

    my $now = time;
    # This failed connection is not counted against the limit
    # unless it is the only connection remaining. Effectively the
    # module will keep going until all the connections become
    # broken and no more connections can be established.
    unless (keys(%{$pool->{conns}}) > 1) {
        $pool->{conn_retries}++;
        if ($pool->{global_timeout}) {
            $pool->{max_conn_time} ||= $now + $pool->{global_timeout} - $pool->{conn_delay};
        }
    }

    if ($pool->{conn_retries} <= $pool->{max_conn_retries}) {
        if (not $pool->{max_conn_time} or $pool->{max_conn_time} >= $now) {
            $debug and $debug & 8 and $pool->_debug("starting timer for delayed reconnection $pool->{conn_delay}s");
            $pool->{delay_watcher} = AE::timer $pool->{conn_delay}, 0, weak_method_callback($pool, '_on_delayed_reconnect');
            return
        }
        $debug and $debug & 8 and $pool->_debug("global_timeout expired");
    }

    # giving up!
    $debug and $debug & 8 and $pool->_debug("it has been impossible to connect to the database, giving up!!!");
    $pool->{dead} = 1;

    # processing continues on the on_conn_error callback
}

sub _on_fatal_connect_error {
    my ($pool, $conn) = @_;
    # This error is fatal. After it happens, everything is going to
    # fail.
    $pool->{dead} = 1;

}

sub _on_delayed_reconnect {
    my $pool = shift;
    $debug and $debug & 8 and $pool->_debug("_on_delayed_reconnect called");
    undef $pool->{delay_watcher};
    $pool->_start_new_conn;
}

sub _check_init_queue_idle {
    my $pool = shift;
    my $idle = $pool->{idle};
    for my $seq (keys %$idle) {
        delete $idle->{$seq};
        $pool->_check_init_queue($seq);
    }
}

sub _check_init_queue {
    my ($pool, $seq) = @_;
    my $init_queue = $pool->{init_queue};
    no warnings 'uninitialized';
    return if $pool->{init_queue_ix}{$seq} >= @$init_queue;
    my $ix = $pool->{init_queue_ix}{$seq}++;
    my $query = { %{$init_queue->[$ix]} }; # clone
    $pool->{initializing}{$seq} = 1;
    $pool->_start_query($seq, $query);
    1;
}

sub _on_conn_empty_queue {
    my ($pool, $seq, $conn) = @_;
    $debug and $debug & 8 and $pool->_debug("conn $conn queue is now empty, seq: $seq");

    unless (delete $pool->{busy}{$seq} or
            delete $pool->{connecting}{$seq} or
            delete $pool->{initializing}{$seq}) {
        if ($debug) {
            $pool->_debug("pool object: \n" . Dumper($pool));
            die "internal error: empty_queue callback invoked by object not in state busy, connecting or initializing, seq: $seq";
        }
    }

    if (defined ($pool->{init_queue})) {
        $pool->_check_init_queue($seq) and return;

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

=head1 SYNOPSIS

  my $pool = AnyEvent::Pg::Pool->new($conninfo,
                                     on_connect_error => \&on_db_is_dead);

  my $qw = $pool->push_query(query => 'select * from foo',
                             on_result => sub { ... });

  my $lw = $pool->listen('bar',
                         on_notify => sub { ... });

=head1 DESCRIPTION

  *******************************************************************
  ***                                                             ***
  *** NOTE: This is a very early release that may contain lots of ***
  *** bugs. The API is not stable and may change between releases ***
  ***                                                             ***
  *******************************************************************

This module handles a pool of databases connections, and transparently
handles reconnection and reposting queries when network and server
errors occur.

=head2 API

The following methods are provided:

=over 4

=item $pool = AnyEvent::Pg::Pool->new($conninfo, %opts)

Creates a new object.

Accepts the following options:

=over 4

=item size => $size

Maximum number of database connections that can be simultaneously
established with the server.

=item connection_retries => $n

Maximum number of attempts to establish a new database connection
before calling the C<on_connect_error> callback when there is no other
connection alive on the pool.

=item connection_delay => $seconds

When establishing a new connection fails, this setting allows to
configure the number of seconds to delay before trying to connect
again.

=item timeout => $seconds

When some active connection does not report activity for the given
number of seconds, it is considered dead and closed.

=item global_timeout => $seconds

When all the connections to the database become broken and it is not
possible to establish a new connection for the given time period the
pool is considered dead and the C<on_error> callback will be called.

Note that this timeout is approximate. It is checked every time a new
connection attempt fails but its expiration will not cause the
abortion of an in-progress connection.

=item on_error => $callback

When some error happens that can not be automatically handled by the
module (for instance, by requeuing the current query), this callback
is invoked.

=item on_connect_error => $callback

When the number of failed reconnection attempts goes over the limit,
this callback is called. The pool object and the L<AnyEvent::Pg>
object representing the last failed attempt are passed as arguments.

=item on_transient_error => $callback

The given callback is invoked every time an internal recoverable error
happens (for instance, on of the pool connections fails or times out).

There is no guarantee about when this callback will be called and how
many times. It should be considered just a hint.

=back

=item $w = $pool->push_query(%opts)

Pushes a database query on the pool queue. It will be sent to the
database once any of the database connections becomes idle.

A watcher object is returned. If that watcher goes out of scope, the
query is canceled.

This method accepts all the options supported by the method of the
same name on L<AnyEvent::Pg> plus the following ones:

=over 4

=item retry_on_sqlstate => \@states

=item retry_on_sqlstate => \%states

A hash of sqlstate values that are retryable. When some error happens,
and the value of sqlstate from the result object has a value on this
hash, the query is reset and reintroduced on the query.

=item max_retries => $n

Maximum number of times a query can be retried. When this limit is
reached, the on_error callback will be called.

Note that queries are not retried after partial success. For instance,
when a result object is returned, but then the server decides to abort
the transaction (this is rare, but can happen from time to time).



( run in 1.811 second using v1.01-cache-2.11-cpan-13bb782fe5a )