EV-Pg

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

0.03  2026-03-17
    - fix COPY skip/drain edge cases and idle ref accounting
    - hardening: skip_pending protocol safety, COPY drain, deferred PQfinish
    - fix notification memory leak when on_notify is unset
    - fix get_copy_data callback_depth and copy_mode lifecycle
    - cancel_async callback fired on teardown
    - socket migration in connect/cancel poll
    - Alpine Linux CI

0.02  2026-03-13
    - keep_alive flag for LISTEN/NOTIFY
    - guard send_flush_request for libpq < 17
    - tests skip without TEST_PG_CONNINFO

0.01  2026-03-06
    - initial release

Pg.xs  view on Meta::CPAN

    ev_pg_cb_t *delivering_cbt;
    PGconn     *conn_to_finish;

    SV *on_connect;
    SV *on_error;
    SV *on_notify;
    SV *on_notice;
    SV *on_drain;

    int callback_depth;
    int keep_alive;

    HV    *last_error_fields;
    HV    *last_result_meta;
    PGresult *meta_res;
    FILE  *trace_fp;

#ifdef LIBPQ_HAS_ASYNC_CANCEL
    PGcancelConn *cancel_conn;
    ev_io  cancel_rio, cancel_wio;
    int    cancel_reading, cancel_writing;

Pg.xs  view on Meta::CPAN


/* Call after pending_count changes or connection completes.
 * Keeps the read watcher unref'd when idle (no pending queries,
 * not connecting) so EV::run can exit. */
static void update_idle_ref(ev_pg_t *self) {
    int want_unref;
    if (NULL == self->loop) return;
    want_unref = self->reading && !self->connecting
                 && self->pending_count == 0 && !self->copy_mode
                 && !self->draining_single_row && !self->draining_copy
                 && !self->keep_alive;
    if (want_unref && !self->rio_unref) {
        ev_unref(self->loop);
        self->rio_unref = 1;
    }
    else if (!want_unref && self->rio_unref) {
        ev_ref(self->loop);
        self->rio_unref = 0;
    }
}

Pg.xs  view on Meta::CPAN

int
pending_count(EV::Pg self)
CODE:
{
    RETVAL = self->pending_count;
}
OUTPUT:
    RETVAL

int
keep_alive(EV::Pg self, ...)
CODE:
{
    if (items > 1) {
        self->keep_alive = SvTRUE(ST(1)) ? 1 : 0;
        update_idle_ref(self);
    }
    RETVAL = self->keep_alive;
}
OUTPUT:
    RETVAL

void
skip_pending(EV::Pg self)
CODE:
{
    int to_skip = self->pending_count;
    if (self->delivering_cbt) to_skip--;  /* already consumed */

README  view on Meta::CPAN


    on_notice
        Callback invoked as "($message)" on PostgreSQL notice/warning
        messages.

    on_drain
        Callback invoked (with no arguments) when the send buffer has been
        flushed during COPY IN. Useful for resuming "put_copy_data" after it
        returns 0.

    keep_alive
        When true, the connection keeps "EV::run" alive even when no queries
        are pending. See "keep_alive".

    loop
        An EV loop object. Defaults to "EV::default_loop".

CONNECTION METHODS
  connect
        $pg->connect($conninfo);

    Initiates an asynchronous connection. The "on_connect" handler fires on
    success; "on_error" fires on failure.

README  view on Meta::CPAN

    Sends an asynchronous cancel request using the "PQcancelConn" API
    (requires libpq >= 17). The callback receives no arguments on success,
    or an error string on failure. Croaks if libpq was built without async
    cancel support ("LIBPQ_HAS_ASYNC_CANCEL").

  pending_count
        my $n = $pg->pending_count;

    Returns the number of callbacks in the queue.

  keep_alive
        $pg->keep_alive(1);
        my $bool = $pg->keep_alive;

    When true, the connection's read watcher keeps "EV::run" alive even when
    no queries are pending. Useful when waiting for server-side "NOTIFY"
    events via "on_notify" — without this flag the event loop would exit
    after the "LISTEN" query completes.

  skip_pending
        $pg->skip_pending;

    Cancels all pending callbacks, invoking each with "(undef, "skipped")".

PIPELINE METHODS

eg/notify.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;
use EV;
use EV::Pg;

my $conninfo = shift || $ENV{TEST_PG_CONNINFO} || 'dbname=postgres';

my $pg; $pg = EV::Pg->new(
    conninfo   => $conninfo,
    keep_alive => 1,
    on_error   => sub { die "connection error: $_[0]\n" },
    on_notify => sub {
        my ($channel, $payload, $pid) = @_;
        print "notification on '$channel': $payload (from pid $pid)\n";
        EV::break;
    },
    on_connect => sub {
        $pg->query("listen my_channel", sub {
            my (undef, $err) = @_;
            die $err if $err;

lib/EV/Pg.pm  view on Meta::CPAN


    my $loop = delete $args{loop} || EV::default_loop;
    my $self = $class->_new($loop);

    $self->on_error(delete $args{on_error} // sub { die @_ });
    $self->on_connect(delete $args{on_connect})   if exists $args{on_connect};
    $self->on_notify(delete $args{on_notify})     if exists $args{on_notify};
    $self->on_notice(delete $args{on_notice})     if exists $args{on_notice};
    $self->on_drain(delete $args{on_drain})       if exists $args{on_drain};

    my $keep_alive      = delete $args{keep_alive};
    my $conninfo        = delete $args{conninfo};
    my $conninfo_params = delete $args{conninfo_params};
    my $expand_dbname   = delete $args{expand_dbname};

    if (my @unknown = sort keys %args) {
        Carp::carp("EV::Pg->new: unknown argument(s): @unknown");
    }

    $self->keep_alive(1) if $keep_alive;

    if (defined $conninfo_params) {
        $self->connect_params($conninfo_params, $expand_dbname ? 1 : 0);
    } elsif (defined $conninfo) {
        $self->connect($conninfo);
    }

    $self;
}

lib/EV/Pg.pm  view on Meta::CPAN


Callback invoked as C<< ($message) >> on PostgreSQL notice/warning
messages.

=item on_drain

Callback invoked (with no arguments) when the send buffer has been
flushed during a COPY operation.  Useful for resuming C<put_copy_data> after
it returns 0.

=item keep_alive

When true, the connection keeps C<EV::run> alive even when no queries
are pending.  See L</keep_alive>.

=item loop

An L<EV> loop object.  Defaults to C<EV::default_loop>.

=back

=head1 CONNECTION METHODS

=head2 connect

lib/EV/Pg.pm  view on Meta::CPAN

Sends an asynchronous cancel request using the C<PQcancelConn> API
(requires libpq E<gt>= 17).  The callback receives no arguments on
success, or an error string on failure.

=head2 pending_count

    my $n = $pg->pending_count;

Returns the number of callbacks in the queue.

=head2 keep_alive

    $pg->keep_alive(1);
    my $bool = $pg->keep_alive;

When true, the connection's read watcher keeps C<EV::run> alive even when
no queries are pending.  Useful when waiting for server-side C<NOTIFY>
events via C<on_notify> — without this flag the event loop would exit
after the C<LISTEN> query completes.

=head2 skip_pending

    $pg->skip_pending;

Cancels all pending callbacks, invoking each with
C<(undef, "skipped")>.

t/08_features.t  view on Meta::CPAN


# --- disconnected accessor tests for new methods ---
{
    my $pg = EV::Pg->new(on_error => sub {});
    is($pg->connection_needs_password, 0, 'connection_needs_password: 0 when not connected');
    ok(!defined $pg->hostaddr, 'hostaddr: undef when not connected');
    ok(!defined $pg->ssl_attribute_names, 'ssl_attribute_names: undef when not connected');
    is($pg->protocol_version, 0, 'protocol_version: 0 when not connected');
}

# --- keep_alive ---
{
    my $pg = EV::Pg->new(on_error => sub {});
    is($pg->keep_alive, 0, 'keep_alive: default off');
    $pg->keep_alive(1);
    is($pg->keep_alive, 1, 'keep_alive: set on');
    $pg->keep_alive(0);
    is($pg->keep_alive, 0, 'keep_alive: set off');
}

# keep_alive via constructor
{
    my $notified;
    my $pg;
    $pg = EV::Pg->new(
        conninfo   => $conninfo,
        keep_alive => 1,
        on_notify  => sub {
            $notified = 1;
            EV::break;
        },
        on_connect => sub {
            is($pg->keep_alive, 1, 'keep_alive: set via constructor');
            $pg->query("listen ka_test", sub {
                $pg->query("notify ka_test", sub {});
            });
        },
        on_error => sub { diag "Error: $_[0]"; EV::break },
    );
    my $t = EV::timer(5, 0, sub { EV::break });
    EV::run;
    ok($notified, 'keep_alive: notification received');
    $pg->finish if $pg && $pg->is_connected;
}



( run in 0.782 second using v1.01-cache-2.11-cpan-39bf76dae61 )