AnyEvent-Pg

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

          request by Yoran Heling)
        - use Test::PostgreSQL instead of Test::postgresql for testing
        - set Test::PostgreSQL as required when
          $ENV{AUTOMATED_TESTING} is set

0.14  May 6, 2014
       - remove usage of given/when construction that is experimental
         now

0.13  Aug 28, 2013
       - on_error callbacks were not being called on fatal errors
         breaking the connection (bug report and patch by Pavel
         Shaydo)

0.12  Jul 16, 2013
       - in some cases, on_connect_error callback was being called
         from the constructor
       - use weak callbacks for all the AE::postpone() invocations
       - better debugging

0.11  Jul 2, 2013
       - add support for priorizing queries
       - add initialization queries

0.10  Mar 25, 2013
       - add support for set method that allows to change pool
         parameters after creation
       - resolve some problems with Test::postgresql

Changes  view on Meta::CPAN


0.08  Feb 27, 2013
        - AnyEvent::Pg objects were not calling on_empty_queue after
          stablishing the connection
        - more tests added

0.07  Feb 25, 2013
        - remove dependency on unused Devel::FindRef 

0.06  Feb 20, 2013
        - use Method::WeakCallback to create callbacks to itself in
          order to remove some memory leaks
        - add tests based on Test::postgresql
        - empty_query callback could be called too many times due to
          stacked calls from AE::postpone

0.05  Jan 18, 2013
        - add experimental support for listeners

0.04  Oct 22, 2012
        - add support for the on_transient_error feature

lib/AnyEvent/Pg.pm  view on Meta::CPAN

=item query => $sql

SQL code for the prepared query.

=item on_error => sub { ... }

=item on_result => sub { ... }

=item on_done => sub { ... }

These callbacks perform in the same fashion as on the C<push_query>
method.

=back

=item $w = $adb->push_query_prepared(%opts)

Queues a prepared query for execution.

The accepted options are:

lib/AnyEvent/Pg.pm  view on Meta::CPAN

=item args => \@args

Arguments for the query.

=item on_result => sub { ... }

=item on_done => sub { ... }

=item on_error => sub { ... }

These callbacks work as on the C<push_query> method.

=back

=item $w = $adb->unshift_query(%opts)

=item $w = $adb->unshift_query_prepared(%opts)

These method work in the same way as its C<push> counterparts, but
instead of pushing the query at the end of the queue they push
(unshift) it at the beginning to be executed just after the current
one is done.

This methods can be used as a way to run transactions composed of
several queries.

=item $adb->abort_all

Marks the connection as dead and aborts any queued queries calling the
C<on_error> callbacks.

=item $adb->queue_size

Returns the number of queries queued for execution.

=item $adb->finish

Closes the connection to the database and frees the associated
resources.

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

    # As the channel goes passed unquoted into the SQL we ensure that
    # it is a valid identifier:
    $channel =~ /^[a-z]\w*$/i or croak "invalid listen channel";

    my $lbc = $pool->{listener_by_channel};

    my $callback = { on_notify           => delete $opts{on_notify},
                     on_listener_started => delete $opts{on_listener_started} };

    if (my $listener = $lbc->{$channel}) {
        push @{$listener->{callbacks}}, $callback;
        if ($listener->{state} eq 'running') {
            &AE::postpone(weak_method_callback($pool, '_postponed_on_listener_started_callback', $callback, $channel));
        }
    }
    else {
        $lbc->{$channel} = { seq       => $pool->{seq}++,
                             channel   => $channel,
                             callbacks => [$callback],
                             state     => 'new' };

        $pool->_start_listener($channel);
    }

    $debug and $debug & 8 and $pool->_debug("listener callback for channel $channel registered");
    return AnyEvent::Pg::Pool::ListenerWatcher->_new($callback)
        if defined wantarray;
}

sub _listener_check_callbacks {
    my ($pool, $channel) = @_;
    my $listener = $pool->{listener_by_channel}{$channel}
        or die "internal error: listener for channel $channel not found";
    my $callbacks = $listener->{callbacks};
    @$callbacks = grep !$_->{canceled}, @$callbacks;
    $debug and $debug & 8 and $pool->_debug("there are " . scalar(@$callbacks) . " watchers for listener $channel");
    scalar @$callbacks;
}

sub _start_listener {
    my ($pool, $channel) = @_;

    if ($pool->{dead}) {
        $debug and $debug & 4 and $pool->_debug("ignoring listeners, the pool is dead");
        return;
    }
    if ($pool->_listener_check_callbacks($channel)) {
        my $qw = $pool->push_query( query => "listen $channel", # the channel can not be passed in a placeholder!
                                    on_result => weak_method_callback($pool, '_on_listen_query_result', $channel),
                                    on_error  => weak_method_callback($pool, '_start_listener', $channel) );

        my $listener = $pool->{listener_by_channel}{$channel}
            or die "internal error: listener for channel $channel not found";
        $listener->{state} = 'starting';
        $listener->{listen_query_watcher} = $qw;
    }
    else {

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

    my $listener = $pool->{listener_by_channel}{$channel}
        or die "internal error: listener for channel $channel not found";

    delete $listener->{listen_query_watcher};
    $pool->{listeners_by_conn}{$seq}{$channel} = 1;
    $listener->{conn} = $seq;
    $listener->{state} = 'running';

    $debug and $debug & 4 and $pool->_debug("listeners_by_conn is now: " . Dumper($pool->{listeners_by_conn}));

    $pool->_run_listener_callbacks($channel, 'on_listener_started');
}

sub _stop_listener {
    my ($pool, $channel) = @_;
    if (my $listener = $pool->{listener_by_channel}{$channel}) {
        # We have to push the unlisten through the same connection were we do
        # the listen so we push the query directly there.
        if (my $conn = $pool->{conn}{$listener->{conn}}) {
            $listener->{state} = 'stopping';
            my $qw = $conn->push_query(query   => "unlisten $channel",

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

    if (my $listener = $pool->{listener_by_channel}{$channel}) {
        delete $listener->{unlisten_query_watcher};
        delete $pool->{listeners_by_conn}{$listener->{conn}}{$channel};
        $pool->_start_listener($channel)
    }
}

sub _on_notify {
    my ($pool, $conn, $seq, $channel, @more) = @_;
    $debug and $debug & 4 and $pool->_debug("notification for channel $channel received");
    $pool->_run_listener_callbacks($channel, 'on_notify', @more);
}

sub _run_listener_callbacks {
    my ($pool, $channel, $cbname, @more) = @_;
    if (my $listener = $pool->{listener_by_channel}{$channel}) {
        if ($listener->{state} eq 'running') {
            my $clean;
            my $callbacks = $listener->{callbacks};
            for my $cb (@$callbacks) {
                if ($cb->{canceled}) {
                    $clean = 1;
                }
                else {
                    $pool->_maybe_callback($cb, $cbname, $channel, @more);
                }
            }
            if ($clean) {
                unless ($pool->_listener_check_callbacks($channel)) {
                    $pool->_stop_listener($channel);
                }
            }
        }
    }
}

sub _is_queue_empty {
    my $pool = shift;
    my $queue = $pool->{queue};

lib/AnyEvent/Pg/Pool.pm  view on Meta::CPAN

It can be used to set up session parameters. For instance:

  $pool->push_query(initialization => 1,
                    query => "set session time zone 'UTC'");

Pushing initialization queries does not return a watcher object. Also,
once pushed, the current API does not allow removing them.

=back

The callbacks for the C<push_query> method receive as arguments the
pool object, the underlying L<AnyEvent::Pg> object actually handling
the query and the result object when applicable. For instance:

    sub on_result_cb {
        my ($pool, $conn, $result) = @_;
        ...
    }

    sub on_done_cb {
        my ($pool, $conn) = @_;



( run in 0.424 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )