EV-Pg
view release on metacpan or search on metacpan
0.03 2026-03-17
- fix COPY skip/drain edge cases and idle ref accounting
- hardening: skip_pending protocol safety, COPY drain, deferred PQfinish
- fix notification memory leak when on_notify is unset
- fix get_copy_data callback_depth and copy_mode lifecycle
- cancel_async callback fired on teardown
- socket migration in connect/cancel poll
- Alpine Linux CI
0.02 2026-03-13
- keep_alive flag for LISTEN/NOTIFY
- guard send_flush_request for libpq < 17
- tests skip without TEST_PG_CONNINFO
0.01 2026-03-06
- initial release
ev_pg_cb_t *delivering_cbt;
PGconn *conn_to_finish;
SV *on_connect;
SV *on_error;
SV *on_notify;
SV *on_notice;
SV *on_drain;
int callback_depth;
int keep_alive;
HV *last_error_fields;
HV *last_result_meta;
PGresult *meta_res;
FILE *trace_fp;
#ifdef LIBPQ_HAS_ASYNC_CANCEL
PGcancelConn *cancel_conn;
ev_io cancel_rio, cancel_wio;
int cancel_reading, cancel_writing;
/* Call after pending_count changes or connection completes.
* Keeps the read watcher unref'd when idle (no pending queries,
* not connecting) so EV::run can exit. */
static void update_idle_ref(ev_pg_t *self) {
int want_unref;
if (NULL == self->loop) return;
want_unref = self->reading && !self->connecting
&& self->pending_count == 0 && !self->copy_mode
&& !self->draining_single_row && !self->draining_copy
&& !self->keep_alive;
if (want_unref && !self->rio_unref) {
ev_unref(self->loop);
self->rio_unref = 1;
}
else if (!want_unref && self->rio_unref) {
ev_ref(self->loop);
self->rio_unref = 0;
}
}
int
pending_count(EV::Pg self)
CODE:
{
RETVAL = self->pending_count;
}
OUTPUT:
RETVAL
int
keep_alive(EV::Pg self, ...)
CODE:
{
if (items > 1) {
self->keep_alive = SvTRUE(ST(1)) ? 1 : 0;
update_idle_ref(self);
}
RETVAL = self->keep_alive;
}
OUTPUT:
RETVAL
void
skip_pending(EV::Pg self)
CODE:
{
int to_skip = self->pending_count;
if (self->delivering_cbt) to_skip--; /* already consumed */
on_notice
Callback invoked as "($message)" on PostgreSQL notice/warning
messages.
on_drain
Callback invoked (with no arguments) when the send buffer has been
flushed during COPY IN. Useful for resuming "put_copy_data" after it
returns 0.
keep_alive
When true, the connection keeps "EV::run" alive even when no queries
are pending. See "keep_alive".
loop
An EV loop object. Defaults to "EV::default_loop".
CONNECTION METHODS
connect
$pg->connect($conninfo);
Initiates an asynchronous connection. The "on_connect" handler fires on
success; "on_error" fires on failure.
Sends an asynchronous cancel request using the "PQcancelConn" API
(requires libpq >= 17). The callback receives no arguments on success,
or an error string on failure. Croaks if libpq was built without async
cancel support ("LIBPQ_HAS_ASYNC_CANCEL").
pending_count
my $n = $pg->pending_count;
Returns the number of callbacks in the queue.
keep_alive
$pg->keep_alive(1);
my $bool = $pg->keep_alive;
When true, the connection's read watcher keeps "EV::run" alive even when
no queries are pending. Useful when waiting for server-side "NOTIFY"
events via "on_notify" â without this flag the event loop would exit
after the "LISTEN" query completes.
skip_pending
$pg->skip_pending;
Cancels all pending callbacks, invoking each with "(undef, "skipped")".
PIPELINE METHODS
eg/notify.pl view on Meta::CPAN
#!/usr/bin/env perl
use strict;
use warnings;
use EV;
use EV::Pg;
my $conninfo = shift || $ENV{TEST_PG_CONNINFO} || 'dbname=postgres';
my $pg; $pg = EV::Pg->new(
conninfo => $conninfo,
keep_alive => 1,
on_error => sub { die "connection error: $_[0]\n" },
on_notify => sub {
my ($channel, $payload, $pid) = @_;
print "notification on '$channel': $payload (from pid $pid)\n";
EV::break;
},
on_connect => sub {
$pg->query("listen my_channel", sub {
my (undef, $err) = @_;
die $err if $err;
lib/EV/Pg.pm view on Meta::CPAN
my $loop = delete $args{loop} || EV::default_loop;
my $self = $class->_new($loop);
$self->on_error(delete $args{on_error} // sub { die @_ });
$self->on_connect(delete $args{on_connect}) if exists $args{on_connect};
$self->on_notify(delete $args{on_notify}) if exists $args{on_notify};
$self->on_notice(delete $args{on_notice}) if exists $args{on_notice};
$self->on_drain(delete $args{on_drain}) if exists $args{on_drain};
my $keep_alive = delete $args{keep_alive};
my $conninfo = delete $args{conninfo};
my $conninfo_params = delete $args{conninfo_params};
my $expand_dbname = delete $args{expand_dbname};
if (my @unknown = sort keys %args) {
Carp::carp("EV::Pg->new: unknown argument(s): @unknown");
}
$self->keep_alive(1) if $keep_alive;
if (defined $conninfo_params) {
$self->connect_params($conninfo_params, $expand_dbname ? 1 : 0);
} elsif (defined $conninfo) {
$self->connect($conninfo);
}
$self;
}
lib/EV/Pg.pm view on Meta::CPAN
Callback invoked as C<< ($message) >> on PostgreSQL notice/warning
messages.
=item on_drain
Callback invoked (with no arguments) when the send buffer has been
flushed during a COPY operation. Useful for resuming C<put_copy_data> after
it returns 0.
=item keep_alive
When true, the connection keeps C<EV::run> alive even when no queries
are pending. See L</keep_alive>.
=item loop
An L<EV> loop object. Defaults to C<EV::default_loop>.
=back
=head1 CONNECTION METHODS
=head2 connect
lib/EV/Pg.pm view on Meta::CPAN
Sends an asynchronous cancel request using the C<PQcancelConn> API
(requires libpq E<gt>= 17). The callback receives no arguments on
success, or an error string on failure.
=head2 pending_count
my $n = $pg->pending_count;
Returns the number of callbacks in the queue.
=head2 keep_alive
$pg->keep_alive(1);
my $bool = $pg->keep_alive;
When true, the connection's read watcher keeps C<EV::run> alive even when
no queries are pending. Useful when waiting for server-side C<NOTIFY>
events via C<on_notify> â without this flag the event loop would exit
after the C<LISTEN> query completes.
=head2 skip_pending
$pg->skip_pending;
Cancels all pending callbacks, invoking each with
C<(undef, "skipped")>.
t/08_features.t view on Meta::CPAN
# --- disconnected accessor tests for new methods ---
{
my $pg = EV::Pg->new(on_error => sub {});
is($pg->connection_needs_password, 0, 'connection_needs_password: 0 when not connected');
ok(!defined $pg->hostaddr, 'hostaddr: undef when not connected');
ok(!defined $pg->ssl_attribute_names, 'ssl_attribute_names: undef when not connected');
is($pg->protocol_version, 0, 'protocol_version: 0 when not connected');
}
# --- keep_alive ---
{
my $pg = EV::Pg->new(on_error => sub {});
is($pg->keep_alive, 0, 'keep_alive: default off');
$pg->keep_alive(1);
is($pg->keep_alive, 1, 'keep_alive: set on');
$pg->keep_alive(0);
is($pg->keep_alive, 0, 'keep_alive: set off');
}
# keep_alive via constructor
{
my $notified;
my $pg;
$pg = EV::Pg->new(
conninfo => $conninfo,
keep_alive => 1,
on_notify => sub {
$notified = 1;
EV::break;
},
on_connect => sub {
is($pg->keep_alive, 1, 'keep_alive: set via constructor');
$pg->query("listen ka_test", sub {
$pg->query("notify ka_test", sub {});
});
},
on_error => sub { diag "Error: $_[0]"; EV::break },
);
my $t = EV::timer(5, 0, sub { EV::break });
EV::run;
ok($notified, 'keep_alive: notification received');
$pg->finish if $pg && $pg->is_connected;
}
( run in 0.782 second using v1.01-cache-2.11-cpan-39bf76dae61 )