EV-Pg

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

0.03  2026-03-17
    - fix COPY skip/drain edge cases and idle ref accounting
    - hardening: skip_pending protocol safety, COPY drain, deferred PQfinish
    - fix notification memory leak when on_notify is unset
    - fix get_copy_data callback_depth and copy_mode lifecycle
    - cancel_async callback fired on teardown
    - socket migration in connect/cancel poll
    - Alpine Linux CI

0.02  2026-03-13
    - keep_alive flag for LISTEN/NOTIFY
    - guard send_flush_request for libpq < 17
    - tests skip without TEST_PG_CONNINFO

0.01  2026-03-06
    - initial release

Pg.xs  view on Meta::CPAN

    ev_pg_cb_t *delivering_cbt;
    PGconn     *conn_to_finish;

    SV *on_connect;
    SV *on_error;
    SV *on_notify;
    SV *on_notice;
    SV *on_drain;

    int callback_depth;
    int keep_alive;

    HV    *last_error_fields;
    HV    *last_result_meta;
    PGresult *meta_res;
    FILE  *trace_fp;

#ifdef LIBPQ_HAS_ASYNC_CANCEL
    PGcancelConn *cancel_conn;
    ev_io  cancel_rio, cancel_wio;
    int    cancel_reading, cancel_writing;

Pg.xs  view on Meta::CPAN


/* Call after pending_count changes or connection completes.
 * Keeps the read watcher unref'd when idle (no pending queries,
 * not connecting) so EV::run can exit. */
static void update_idle_ref(ev_pg_t *self) {
    int want_unref;
    if (NULL == self->loop) return;
    want_unref = self->reading && !self->connecting
                 && self->pending_count == 0 && !self->copy_mode
                 && !self->draining_single_row && !self->draining_copy
                 && !self->keep_alive;
    if (want_unref && !self->rio_unref) {
        ev_unref(self->loop);
        self->rio_unref = 1;
    }
    else if (!want_unref && self->rio_unref) {
        ev_ref(self->loop);
        self->rio_unref = 0;
    }
}

Pg.xs  view on Meta::CPAN

int
pending_count(EV::Pg self)
CODE:
{
    RETVAL = self->pending_count;
}
OUTPUT:
    RETVAL

int
keep_alive(EV::Pg self, ...)
CODE:
{
    if (items > 1) {
        self->keep_alive = SvTRUE(ST(1)) ? 1 : 0;
        update_idle_ref(self);
    }
    RETVAL = self->keep_alive;
}
OUTPUT:
    RETVAL

void
skip_pending(EV::Pg self)
CODE:
{
    int to_skip = self->pending_count;
    if (self->delivering_cbt) to_skip--;  /* already consumed */

README  view on Meta::CPAN

    expand_dbname
        When true together with "conninfo_params", the "dbname" value is
        itself parsed as a connection string -- so "dbname =>
        'postgresql://host/db?sslmode=require'" works.

    on_connect
        Fires once with no arguments when the handshake completes.

    on_error
        Fires as "($error_message)" on connection-level errors. Defaults to
        "sub { die @_ }"; pass an explicit handler to keep the loop alive.

    on_notify
        Fires as "($channel, $payload, $backend_pid)" for LISTEN/NOTIFY
        messages.

    on_notice
        Fires as "($message)" for server NOTICE/WARNING messages.

    on_drain
        Fires with no arguments when the libpq send buffer has been fully
        flushed during a COPY -- use it to resume sending after
        "put_copy_data" returned 0.

    keep_alive
        When true, the connection keeps "EV::run" alive even with an empty
        callback queue. See "keep_alive".

    loop
        An EV loop object. Defaults to "EV::default_loop".

    Unknown arguments produce a "carp" warning and are otherwise ignored.

CONNECTION METHODS
  connect
        $pg->connect($conninfo);

README  view on Meta::CPAN

    (requires libpq >= 17). The callback receives "(1)" on success or
    "(undef, $errmsg)" on failure. Croaks if a cancel is already in
    progress.

  pending_count
        my $n = $pg->pending_count;

    Number of callbacks currently in the queue (queries sent but not yet
    delivered).

  keep_alive
        $pg->keep_alive(1);
        my $bool = $pg->keep_alive;

    When true, the read watcher keeps "EV::run" alive even when the callback
    queue is empty. Required when waiting for server-side "NOTIFY" events
    via "on_notify" -- without this flag the loop would exit as soon as the
    "LISTEN" query completes. Getter/setter.

  skip_pending
        $pg->skip_pending;

    Drops every queued callback, invoking each with "(undef, "skipped")".
    Any in-flight server results are drained and discarded; the connection
    remains usable for new queries.

eg/notify.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;
use EV;
use EV::Pg;

my $conninfo = shift || $ENV{TEST_PG_CONNINFO} || 'dbname=postgres';

my $pg; $pg = EV::Pg->new(
    conninfo   => $conninfo,
    keep_alive => 1,
    on_error   => sub { die "connection error: $_[0]\n" },
    on_notify => sub {
        my ($channel, $payload, $pid) = @_;
        print "notification on '$channel': $payload (from pid $pid)\n";
        EV::break;
    },
    on_connect => sub {
        $pg->query("listen my_channel", sub {
            my (undef, $err) = @_;
            die $err if $err;

lib/EV/Pg.pm  view on Meta::CPAN


    my $loop = delete $args{loop} || EV::default_loop;
    my $self = $class->_new($loop);

    $self->on_error(delete $args{on_error} // sub { die @_ });
    $self->on_connect(delete $args{on_connect})   if exists $args{on_connect};
    $self->on_notify(delete $args{on_notify})     if exists $args{on_notify};
    $self->on_notice(delete $args{on_notice})     if exists $args{on_notice};
    $self->on_drain(delete $args{on_drain})       if exists $args{on_drain};

    my $keep_alive      = delete $args{keep_alive};
    my $conninfo        = delete $args{conninfo};
    my $conninfo_params = delete $args{conninfo_params};
    my $expand_dbname   = delete $args{expand_dbname};

    if (my @unknown = sort keys %args) {
        Carp::carp("EV::Pg->new: unknown argument(s): @unknown");
    }

    $self->keep_alive(1) if $keep_alive;

    if (defined $conninfo_params) {
        $self->connect_params($conninfo_params, $expand_dbname ? 1 : 0);
    } elsif (defined $conninfo) {
        $self->connect($conninfo);
    }

    $self;
}

lib/EV/Pg.pm  view on Meta::CPAN

itself parsed as a connection string -- so
C<< dbname => 'postgresql://host/db?sslmode=require' >> works.

=item on_connect

Fires once with no arguments when the handshake completes.

=item on_error

Fires as C<($error_message)> on connection-level errors.  Defaults to
C<sub { die @_ }>; pass an explicit handler to keep the loop alive.

=item on_notify

Fires as C<($channel, $payload, $backend_pid)> for LISTEN/NOTIFY
messages.

=item on_notice

Fires as C<($message)> for server NOTICE/WARNING messages.

=item on_drain

Fires with no arguments when the libpq send buffer has been fully
flushed during a COPY -- use it to resume sending after
C<put_copy_data> returned 0.

=item keep_alive

When true, the connection keeps C<EV::run> alive even with an empty
callback queue.  See L</keep_alive>.

=item loop

An L<EV> loop object.  Defaults to C<EV::default_loop>.

=back

Unknown arguments produce a C<carp> warning and are otherwise ignored.

=head1 CONNECTION METHODS

lib/EV/Pg.pm  view on Meta::CPAN

or C<(undef, $errmsg)> on failure.  Croaks if a cancel is already in
progress.

=head2 pending_count

    my $n = $pg->pending_count;

Number of callbacks currently in the queue (queries sent but not yet
delivered).

=head2 keep_alive

    $pg->keep_alive(1);
    my $bool = $pg->keep_alive;

When true, the read watcher keeps C<EV::run> alive even when the
callback queue is empty.  Required when waiting for server-side
C<NOTIFY> events via C<on_notify> -- without this flag the loop would
exit as soon as the C<LISTEN> query completes.  Getter/setter.

=head2 skip_pending

    $pg->skip_pending;

Drops every queued callback, invoking each with
C<(undef, "skipped")>.  Any in-flight server results are drained and

t/05_leak.t  view on Meta::CPAN

    $pg = EV::Pg->new(
        conninfo   => $conninfo,
        on_notify  => sub { $got++; EV::break },
        on_connect => sub {
            $pg->query("listen leak_chan", sub {
                $pg->query("notify leak_chan, 'payload'", sub {});
            });
        },
        on_error => sub { diag "err: $_[0]"; EV::break },
    );
    $pg->keep_alive(1);
    my $t = EV::timer(5, 0, sub { EV::break });
    EV::run;
    $pg->on_notify(undef);  # release handler before destroy
    ok($got >= 1, 'notify round-trip without leak');
}

# Notice receiver: reset role with a notice-emitting plpgsql block
{
    my $pg;
    my $notices = 0;

t/08_features.t  view on Meta::CPAN


# --- disconnected accessor tests for new methods ---
{
    my $pg = EV::Pg->new(on_error => sub {});
    is($pg->connection_needs_password, 0, 'connection_needs_password: 0 when not connected');
    ok(!defined $pg->hostaddr, 'hostaddr: undef when not connected');
    ok(!defined $pg->ssl_attribute_names, 'ssl_attribute_names: undef when not connected');
    is($pg->protocol_version, 0, 'protocol_version: 0 when not connected');
}

# --- keep_alive ---
{
    my $pg = EV::Pg->new(on_error => sub {});
    is($pg->keep_alive, 0, 'keep_alive: default off');
    $pg->keep_alive(1);
    is($pg->keep_alive, 1, 'keep_alive: set on');
    $pg->keep_alive(0);
    is($pg->keep_alive, 0, 'keep_alive: set off');
}

# keep_alive via constructor
{
    my $notified;
    my $pg;
    $pg = EV::Pg->new(
        conninfo   => $conninfo,
        keep_alive => 1,
        on_notify  => sub {
            $notified = 1;
            EV::break;
        },
        on_connect => sub {
            is($pg->keep_alive, 1, 'keep_alive: set via constructor');
            $pg->query("listen ka_test", sub {
                $pg->query("notify ka_test", sub {});
            });
        },
        on_error => sub { diag "Error: $_[0]"; EV::break },
    );
    my $t = EV::timer(5, 0, sub { EV::break });
    EV::run;
    ok($notified, 'keep_alive: notification received');
    $pg->finish if $pg && $pg->is_connected;
}



( run in 1.607 second using v1.01-cache-2.11-cpan-99c4e6809bf )