AnyEvent-Pg

 view release on metacpan or  search on metacpan

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

        if ref($retry_on_sqlstate) eq 'ARRAY';
    $query{retry_on_sqlstate} = $retry_on_sqlstate // {};
    $query{$_} = delete $opts{$_} for qw(on_result on_error on_done query args max_retries);
    $query{seq} = $pool->{query_seq}++;
    my $query = \%query;

    my $queue = ($opts{initialization} ? ($pool->{init_queue} //= []) : $pool->{queue});
    if (defined(my $priority = $opts{priority})) {
        $query{priority} = $priority;
        # FIXME: improve the search algorithm used here
        my $i;
        for ($i = 0; $i < @$queue; $i++) {
            my $p2 = $queue->[$i]{priority} // last;
            $p2 >= $priority or last;
        }
        splice @$queue, $i, 0, $query;
        $debug and $debug & 8 and $pool->_debug("query with priority $priority inserted into queue at position $i/$#$queue");
    }
    else {
        push @$queue, $query;
    }

    if ($opts{initialization}) {
        &AE::postpone(weak_method_callback($pool, '_check_init_queue_idle'));
        $debug and $debug & 8 and $pool->_debug('initialization query pushed into queue, queue size is now ' . scalar @$queue);
    }
    else {
        &AE::postpone(weak_method_callback($pool, '_check_queue'));
        $debug and $debug & 8 and $pool->_debug('query pushed into queue, raw queue size is now ' . scalar @$queue);
        return AnyEvent::Pg::Pool::QueryWatcher->_new($query)
            if defined wantarray;
    }
    ()
}

sub _postponed_on_listener_started_callback {
    my ($pool, $callback, $channel) = @_;
    # at this point, even if unlikey, the listener may be
    # not in state 'running' anymore, but we ignore that
    # possibility as the on_listener_started is just a
    # hint.
    $pool->_maybe_callback($callback, 'on_listener_started', $channel)
        unless $callback->{cancelled};
}

sub listen {
    my $pool = shift;
    my %opts = (@_ & 1 ? (channel => @_) : @_);
    my $channel = delete $opts{channel} // croak "channel tag missing";

    # As the channel goes passed unquoted into the SQL we ensure that
    # it is a valid identifier:
    $channel =~ /^[a-z]\w*$/i or croak "invalid listen channel";

    my $lbc = $pool->{listener_by_channel};

    my $callback = { on_notify           => delete $opts{on_notify},
                     on_listener_started => delete $opts{on_listener_started} };

    if (my $listener = $lbc->{$channel}) {
        push @{$listener->{callbacks}}, $callback;
        if ($listener->{state} eq 'running') {
            &AE::postpone(weak_method_callback($pool, '_postponed_on_listener_started_callback', $callback, $channel));
        }
    }
    else {
        $lbc->{$channel} = { seq       => $pool->{seq}++,
                             channel   => $channel,
                             callbacks => [$callback],
                             state     => 'new' };

        $pool->_start_listener($channel);
    }

    $debug and $debug & 8 and $pool->_debug("listener callback for channel $channel registered");
    return AnyEvent::Pg::Pool::ListenerWatcher->_new($callback)
        if defined wantarray;
}

sub _listener_check_callbacks {
    my ($pool, $channel) = @_;
    my $listener = $pool->{listener_by_channel}{$channel}
        or die "internal error: listener for channel $channel not found";
    my $callbacks = $listener->{callbacks};
    @$callbacks = grep !$_->{canceled}, @$callbacks;
    $debug and $debug & 8 and $pool->_debug("there are " . scalar(@$callbacks) . " watchers for listener $channel");
    scalar @$callbacks;
}

sub _start_listener {
    my ($pool, $channel) = @_;

    if ($pool->{dead}) {
        $debug and $debug & 4 and $pool->_debug("ignoring listeners, the pool is dead");
        return;
    }
    if ($pool->_listener_check_callbacks($channel)) {
        my $qw = $pool->push_query( query => "listen $channel", # the channel can not be passed in a placeholder!
                                    on_result => weak_method_callback($pool, '_on_listen_query_result', $channel),
                                    on_error  => weak_method_callback($pool, '_start_listener', $channel) );

        my $listener = $pool->{listener_by_channel}{$channel}
            or die "internal error: listener for channel $channel not found";
        $listener->{state} = 'starting';
        $listener->{listen_query_watcher} = $qw;
    }
    else {
        # Just forget about this listener:
        delete $pool->{listener_by_channel}{$channel};
    }
}

sub _on_listen_query_result {
    my ($pool, $channel, undef, $conn, $result) = @_;

    my $seq = $conn->{seq};
    $debug and $debug & 8 and $pool->_debug("result for listen query is here, served by conn $seq. Conn: " . Dumper($conn));

    my $listener = $pool->{listener_by_channel}{$channel}
        or die "internal error: listener for channel $channel not found";

    delete $listener->{listen_query_watcher};
    $pool->{listeners_by_conn}{$seq}{$channel} = 1;
    $listener->{conn} = $seq;
    $listener->{state} = 'running';

    $debug and $debug & 4 and $pool->_debug("listeners_by_conn is now: " . Dumper($pool->{listeners_by_conn}));

    $pool->_run_listener_callbacks($channel, 'on_listener_started');
}

sub _stop_listener {
    my ($pool, $channel) = @_;
    if (my $listener = $pool->{listener_by_channel}{$channel}) {
        # We have to push the unlisten through the same connection were we do
        # the listen so we push the query directly there.
        if (my $conn = $pool->{conn}{$listener->{conn}}) {
            $listener->{state} = 'stopping';
            my $qw = $conn->push_query(query   => "unlisten $channel",
                                       on_done => weak_method_callback($pool, '_on_unlisten_query_done', $channel));
            $listener->{unlisten_query_watcher} = $qw;
        }
    }
}

sub _on_unlisten_query_done {
    my ($pool, $channel) = @_;
    if (my $listener = $pool->{listener_by_channel}{$channel}) {
        delete $listener->{unlisten_query_watcher};
        delete $pool->{listeners_by_conn}{$listener->{conn}}{$channel};
        $pool->_start_listener($channel)
    }
}

sub _on_notify {
    my ($pool, $conn, $seq, $channel, @more) = @_;
    $debug and $debug & 4 and $pool->_debug("notification for channel $channel received");
    $pool->_run_listener_callbacks($channel, 'on_notify', @more);
}

sub _run_listener_callbacks {
    my ($pool, $channel, $cbname, @more) = @_;
    if (my $listener = $pool->{listener_by_channel}{$channel}) {
        if ($listener->{state} eq 'running') {
            my $clean;
            my $callbacks = $listener->{callbacks};
            for my $cb (@$callbacks) {
                if ($cb->{canceled}) {
                    $clean = 1;
                }
                else {
                    $pool->_maybe_callback($cb, $cbname, $channel, @more);
                }
            }
            if ($clean) {
                unless ($pool->_listener_check_callbacks($channel)) {
                    $pool->_stop_listener($channel);
                }
            }
        }
    }
}

sub _is_queue_empty {
    my $pool = shift;
    my $queue = $pool->{queue};
    $debug and $debug & 8 and $pool->_debug('raw queue size is ' . scalar @$queue);
    while (@$queue) {
        return unless $queue->[0]{canceled};
        shift @$queue;
    }
    $debug and $debug & 8 and $pool->_debug('queue is empty');
    return 1;
}

sub _start_query {
    my ($pool, $seq, $query) = @_;
    my $conn = $pool->{conns}{$seq}
        or die("internal error, pool is corrupted, seq: $seq:\n" . Dumper($pool));
    my $watcher = $conn->push_query(query     => $query->{query},
                                    args      => $query->{args},
                                    on_result => weak_method_callback($pool, '_on_query_result', $seq),
                                    on_done   => weak_method_callback($pool, '_on_query_done',   $seq) );
    $pool->{current}{$seq} = $query;
    $query->{watcher} = $watcher;
    $debug and $debug & 8 and $pool->_debug("query $query started on conn $conn, seq: $seq");
}

sub _check_queue {
    my $pool = shift;
    my $idle = $pool->{idle};
    while (1) {
        $debug and $debug & 8 and $pool->_debug('checking queue, there are '
                                                . (scalar keys %$idle)
                                                . ' idle connections, queue size is '
                                                . (scalar @{$pool->{queue}}));
        if ($pool->_is_queue_empty) {
            $debug and $debug & 8 and $pool->_debug('queue is now empty');
            last;
        }
        $debug and $debug & 8 and $pool->_debug('processing first query from the queue');
        unless (%$idle) {
            if ($pool->{dead}) {
                my $query = shift @{$pool->{queue}};
                $pool->_maybe_callback($query, 'on_error');
                $debug and $debug & 8 and $pool->_debug('on_error called for query $query');
                next;
            }
            $debug and $debug & 8 and $pool->_debug('starting new connection');
            $pool->_start_new_conn;
            return;
        }
        keys %$idle;
        my ($seq) = each %$idle;
        delete $idle->{$seq};
        $pool->{busy}{$seq} = 1;

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

There is no guarantee about when this callback will be called and how
many times. It should be considered just a hint.

=back

=item $w = $pool->push_query(%opts)

Pushes a database query on the pool queue. It will be sent to the
database once any of the database connections becomes idle.

A watcher object is returned. If that watcher goes out of scope, the
query is canceled.

This method accepts all the options supported by the method of the
same name on L<AnyEvent::Pg> plus the following ones:

=over 4

=item retry_on_sqlstate => \@states

=item retry_on_sqlstate => \%states

A hash of sqlstate values that are retryable. When some error happens,
and the value of sqlstate from the result object has a value on this
hash, the query is reset and reintroduced on the query.

=item max_retries => $n

Maximum number of times a query can be retried. When this limit is
reached, the on_error callback will be called.

Note that queries are not retried after partial success. For instance,
when a result object is returned, but then the server decides to abort
the transaction (this is rare, but can happen from time to time).

=item priority => $n

This option allows to prioritize queries. The pool dispatches first those
with the highest priority value.

The default priority is -inf.

Queries of equal priority are dispatched in FIFO order.

=item initialization => $bool

When this option is set, the query will be invoked for every database
connection (both currently existing or created on the future) before
any other query.

It can be used to set up session parameters. For instance:

  $pool->push_query(initialization => 1,
                    query => "set session time zone 'UTC'");

Pushing initialization queries does not return a watcher object. Also,
once pushed, the current API does not allow removing them.

=back

The callbacks for the C<push_query> method receive as arguments the
pool object, the underlying L<AnyEvent::Pg> object actually handling
the query and the result object when applicable. For instance:

    sub on_result_cb {
        my ($pool, $conn, $result) = @_;
        ...
    }

    sub on_done_cb {
        my ($pool, $conn) = @_;
    }

    my $watcher = $pool->push_query("select * from foo",
                                    on_result => \&on_result_cb,
                                    on_done   => \&on_done_cb);

=item $w = $pool->listen($channel, %opts)

This method allows to subscribe to the given notification channel and
receive an event every time another sends a notification (see
PostgreSQL NOTIFY/LISTEN documentation).

The module will take care of keeping an active L<AnyEvent::Pg> connection
subscribed to the channel, recovering from errors automatically.

Currently, due to some limitations on the way the C<LISTEN> SQL
command is parsed, the channel selector has to match C</^[a-z]\w*$/i>.

The options accepted by the method are as follow:

=over 4

=item on_notify => $callback

The given callback will be called every time some client sends a
notification for the selected channel.

The arguments to the callback are the pool object, the channel
selector and any possible data load passed by the client sending the
notification.

=item on_listener_started => $callback

When the connection is started the first time, or when recovering from
some connection error, there may be a lapse of time where no
connection is subscribed to the channel and notifications sent by
other clients lost.

This callback is called every time a connection is subscribed to the
channel. It is really a hint that allows to check in some application
specific way (i.e. performing a select) that no event has been lost.

The arguments passed to the callback are the pool object and the
channel selector.

=back

=item $bool = $pool->is_dead

Returns a true value if the pool object has been marked as dead.



( run in 0.789 second using v1.01-cache-2.11-cpan-39bf76dae61 )