EV-Pg
view release on metacpan or search on metacpan
0.03 2026-03-17
- fix COPY skip/drain edge cases and idle ref accounting
- hardening: skip_pending protocol safety, COPY drain, deferred PQfinish
- fix notification memory leak when on_notify is unset
- fix get_copy_data callback_depth and copy_mode lifecycle
- cancel_async callback fired on teardown
- socket migration in connect/cancel poll
- Alpine Linux CI
0.02 2026-03-13
- keep_alive flag for LISTEN/NOTIFY
- guard send_flush_request for libpq < 17
- tests skip without TEST_PG_CONNINFO
0.01 2026-03-06
- initial release
ev_pg_cb_t *delivering_cbt;
PGconn *conn_to_finish;
SV *on_connect;
SV *on_error;
SV *on_notify;
SV *on_notice;
SV *on_drain;
int callback_depth;
int keep_alive;
HV *last_error_fields;
HV *last_result_meta;
PGresult *meta_res;
FILE *trace_fp;
#ifdef LIBPQ_HAS_ASYNC_CANCEL
PGcancelConn *cancel_conn;
ev_io cancel_rio, cancel_wio;
int cancel_reading, cancel_writing;
/* Call after pending_count changes or connection completes.
* Keeps the read watcher unref'd when idle (no pending queries,
* not connecting) so EV::run can exit. */
static void update_idle_ref(ev_pg_t *self) {
int want_unref;
if (NULL == self->loop) return;
want_unref = self->reading && !self->connecting
&& self->pending_count == 0 && !self->copy_mode
&& !self->draining_single_row && !self->draining_copy
&& !self->keep_alive;
if (want_unref && !self->rio_unref) {
ev_unref(self->loop);
self->rio_unref = 1;
}
else if (!want_unref && self->rio_unref) {
ev_ref(self->loop);
self->rio_unref = 0;
}
}
int
pending_count(EV::Pg self)
CODE:
{
RETVAL = self->pending_count;
}
OUTPUT:
RETVAL
int
keep_alive(EV::Pg self, ...)
CODE:
{
if (items > 1) {
self->keep_alive = SvTRUE(ST(1)) ? 1 : 0;
update_idle_ref(self);
}
RETVAL = self->keep_alive;
}
OUTPUT:
RETVAL
void
skip_pending(EV::Pg self)
CODE:
{
int to_skip = self->pending_count;
if (self->delivering_cbt) to_skip--; /* already consumed */
expand_dbname
When true together with "conninfo_params", the "dbname" value is
itself parsed as a connection string -- so "dbname =>
'postgresql://host/db?sslmode=require'" works.
on_connect
Fires once with no arguments when the handshake completes.
on_error
Fires as "($error_message)" on connection-level errors. Defaults to
"sub { die @_ }"; pass an explicit handler to keep the loop alive.
on_notify
Fires as "($channel, $payload, $backend_pid)" for LISTEN/NOTIFY
messages.
on_notice
Fires as "($message)" for server NOTICE/WARNING messages.
on_drain
Fires with no arguments when the libpq send buffer has been fully
flushed during a COPY -- use it to resume sending after
"put_copy_data" returned 0.
keep_alive
When true, the connection keeps "EV::run" alive even with an empty
callback queue. See "keep_alive".
loop
An EV loop object. Defaults to "EV::default_loop".
Unknown arguments produce a "carp" warning and are otherwise ignored.
CONNECTION METHODS
connect
$pg->connect($conninfo);
(requires libpq >= 17). The callback receives "(1)" on success or
"(undef, $errmsg)" on failure. Croaks if a cancel is already in
progress.
pending_count
my $n = $pg->pending_count;
Number of callbacks currently in the queue (queries sent but not yet
delivered).
keep_alive
$pg->keep_alive(1);
my $bool = $pg->keep_alive;
When true, the read watcher keeps "EV::run" alive even when the callback
queue is empty. Required when waiting for server-side "NOTIFY" events
via "on_notify" -- without this flag the loop would exit as soon as the
"LISTEN" query completes. Getter/setter.
skip_pending
$pg->skip_pending;
Drops every queued callback, invoking each with "(undef, "skipped")".
Any in-flight server results are drained and discarded; the connection
remains usable for new queries.
eg/notify.pl view on Meta::CPAN
#!/usr/bin/env perl
use strict;
use warnings;
use EV;
use EV::Pg;
my $conninfo = shift || $ENV{TEST_PG_CONNINFO} || 'dbname=postgres';
my $pg; $pg = EV::Pg->new(
conninfo => $conninfo,
keep_alive => 1,
on_error => sub { die "connection error: $_[0]\n" },
on_notify => sub {
my ($channel, $payload, $pid) = @_;
print "notification on '$channel': $payload (from pid $pid)\n";
EV::break;
},
on_connect => sub {
$pg->query("listen my_channel", sub {
my (undef, $err) = @_;
die $err if $err;
lib/EV/Pg.pm view on Meta::CPAN
my $loop = delete $args{loop} || EV::default_loop;
my $self = $class->_new($loop);
$self->on_error(delete $args{on_error} // sub { die @_ });
$self->on_connect(delete $args{on_connect}) if exists $args{on_connect};
$self->on_notify(delete $args{on_notify}) if exists $args{on_notify};
$self->on_notice(delete $args{on_notice}) if exists $args{on_notice};
$self->on_drain(delete $args{on_drain}) if exists $args{on_drain};
my $keep_alive = delete $args{keep_alive};
my $conninfo = delete $args{conninfo};
my $conninfo_params = delete $args{conninfo_params};
my $expand_dbname = delete $args{expand_dbname};
if (my @unknown = sort keys %args) {
Carp::carp("EV::Pg->new: unknown argument(s): @unknown");
}
$self->keep_alive(1) if $keep_alive;
if (defined $conninfo_params) {
$self->connect_params($conninfo_params, $expand_dbname ? 1 : 0);
} elsif (defined $conninfo) {
$self->connect($conninfo);
}
$self;
}
lib/EV/Pg.pm view on Meta::CPAN
itself parsed as a connection string -- so
C<< dbname => 'postgresql://host/db?sslmode=require' >> works.
=item on_connect
Fires once with no arguments when the handshake completes.
=item on_error
Fires as C<($error_message)> on connection-level errors. Defaults to
C<sub { die @_ }>; pass an explicit handler to keep the loop alive.
=item on_notify
Fires as C<($channel, $payload, $backend_pid)> for LISTEN/NOTIFY
messages.
=item on_notice
Fires as C<($message)> for server NOTICE/WARNING messages.
=item on_drain
Fires with no arguments when the libpq send buffer has been fully
flushed during a COPY -- use it to resume sending after
C<put_copy_data> returned 0.
=item keep_alive
When true, the connection keeps C<EV::run> alive even with an empty
callback queue. See L</keep_alive>.
=item loop
An L<EV> loop object. Defaults to C<EV::default_loop>.
=back
Unknown arguments produce a C<carp> warning and are otherwise ignored.
=head1 CONNECTION METHODS
lib/EV/Pg.pm view on Meta::CPAN
or C<(undef, $errmsg)> on failure. Croaks if a cancel is already in
progress.
=head2 pending_count
my $n = $pg->pending_count;
Number of callbacks currently in the queue (queries sent but not yet
delivered).
=head2 keep_alive
$pg->keep_alive(1);
my $bool = $pg->keep_alive;
When true, the read watcher keeps C<EV::run> alive even when the
callback queue is empty. Required when waiting for server-side
C<NOTIFY> events via C<on_notify> -- without this flag the loop would
exit as soon as the C<LISTEN> query completes. Getter/setter.
=head2 skip_pending
$pg->skip_pending;
Drops every queued callback, invoking each with
C<(undef, "skipped")>. Any in-flight server results are drained and
t/05_leak.t view on Meta::CPAN
$pg = EV::Pg->new(
conninfo => $conninfo,
on_notify => sub { $got++; EV::break },
on_connect => sub {
$pg->query("listen leak_chan", sub {
$pg->query("notify leak_chan, 'payload'", sub {});
});
},
on_error => sub { diag "err: $_[0]"; EV::break },
);
$pg->keep_alive(1);
my $t = EV::timer(5, 0, sub { EV::break });
EV::run;
$pg->on_notify(undef); # release handler before destroy
ok($got >= 1, 'notify round-trip without leak');
}
# Notice receiver: reset role with a notice-emitting plpgsql block
{
my $pg;
my $notices = 0;
t/08_features.t view on Meta::CPAN
# --- disconnected accessor tests for new methods ---
{
my $pg = EV::Pg->new(on_error => sub {});
is($pg->connection_needs_password, 0, 'connection_needs_password: 0 when not connected');
ok(!defined $pg->hostaddr, 'hostaddr: undef when not connected');
ok(!defined $pg->ssl_attribute_names, 'ssl_attribute_names: undef when not connected');
is($pg->protocol_version, 0, 'protocol_version: 0 when not connected');
}
# --- keep_alive ---
{
my $pg = EV::Pg->new(on_error => sub {});
is($pg->keep_alive, 0, 'keep_alive: default off');
$pg->keep_alive(1);
is($pg->keep_alive, 1, 'keep_alive: set on');
$pg->keep_alive(0);
is($pg->keep_alive, 0, 'keep_alive: set off');
}
# keep_alive via constructor
{
my $notified;
my $pg;
$pg = EV::Pg->new(
conninfo => $conninfo,
keep_alive => 1,
on_notify => sub {
$notified = 1;
EV::break;
},
on_connect => sub {
is($pg->keep_alive, 1, 'keep_alive: set via constructor');
$pg->query("listen ka_test", sub {
$pg->query("notify ka_test", sub {});
});
},
on_error => sub { diag "Error: $_[0]"; EV::break },
);
my $t = EV::timer(5, 0, sub { EV::break });
EV::run;
ok($notified, 'keep_alive: notification received');
$pg->finish if $pg && $pg->is_connected;
}
( run in 1.607 second using v1.01-cache-2.11-cpan-99c4e6809bf )