EV-Pg
view release on metacpan or search on metacpan
lib/EV/Pg.pm view on Meta::CPAN
PGRES_NONFATAL_ERROR PGRES_FATAL_ERROR PGRES_COPY_BOTH
PGRES_SINGLE_TUPLE PGRES_PIPELINE_SYNC PGRES_PIPELINE_ABORTED
PGRES_TUPLES_CHUNK
)];
$EXPORT_TAGS{conn} = [qw(CONNECTION_OK CONNECTION_BAD)];
$EXPORT_TAGS{transaction} = [qw(
PQTRANS_IDLE PQTRANS_ACTIVE PQTRANS_INTRANS
PQTRANS_INERROR PQTRANS_UNKNOWN
)];
$EXPORT_TAGS{pipeline} = [qw(
PQ_PIPELINE_OFF PQ_PIPELINE_ON PQ_PIPELINE_ABORTED
)];
$EXPORT_TAGS{verbosity} = [qw(
PQERRORS_TERSE PQERRORS_DEFAULT PQERRORS_VERBOSE PQERRORS_SQLSTATE
)];
$EXPORT_TAGS{context} = [qw(
PQSHOW_CONTEXT_NEVER PQSHOW_CONTEXT_ERRORS PQSHOW_CONTEXT_ALWAYS
)];
$EXPORT_TAGS{trace} = [qw(
PQTRACE_SUPPRESS_TIMESTAMPS PQTRACE_REGRESS_MODE
)];
{
my %seen;
@EXPORT_OK = grep { !$seen{$_}++ } map { @$_ } values %EXPORT_TAGS;
$EXPORT_TAGS{all} = \@EXPORT_OK;
}
*q = \&query;
*qp = \&query_params;
*qx = \&query_prepared;
*prep = \&prepare;
*reconnect = \&reset;
*disconnect = \&finish;
*flush = \&send_flush_request if defined &send_flush_request;
*sync = \&pipeline_sync;
*quote = \&escape_literal;
*quote_id = \&escape_identifier;
*errstr = \&error_message;
*txn_status = \&transaction_status;
*pid = \&backend_pid;
sub new {
my ($class, %args) = @_;
my $loop = delete $args{loop} || EV::default_loop;
my $self = $class->_new($loop);
$self->on_error(delete $args{on_error} // sub { die @_ });
$self->on_connect(delete $args{on_connect}) if exists $args{on_connect};
$self->on_notify(delete $args{on_notify}) if exists $args{on_notify};
$self->on_notice(delete $args{on_notice}) if exists $args{on_notice};
$self->on_drain(delete $args{on_drain}) if exists $args{on_drain};
my $keep_alive = delete $args{keep_alive};
my $conninfo = delete $args{conninfo};
my $conninfo_params = delete $args{conninfo_params};
my $expand_dbname = delete $args{expand_dbname};
if (my @unknown = sort keys %args) {
Carp::carp("EV::Pg->new: unknown argument(s): @unknown");
}
$self->keep_alive(1) if $keep_alive;
if (defined $conninfo_params) {
$self->connect_params($conninfo_params, $expand_dbname ? 1 : 0);
} elsif (defined $conninfo) {
$self->connect($conninfo);
}
$self;
}
1;
__END__
=head1 NAME
EV::Pg - asynchronous PostgreSQL client using libpq and EV
=head1 SYNOPSIS
use v5.10;
use EV;
use EV::Pg;
my $pg = EV::Pg->new(
conninfo => 'dbname=mydb',
on_error => sub { die "PG error: $_[0]\n" },
);
$pg->on_connect(sub {
$pg->query_params(
'select $1::int + $2::int', [10, 20],
sub {
my ($rows, $err) = @_;
die $err if $err;
say $rows->[0][0]; # 30
EV::break;
},
);
});
EV::run;
=head1 DESCRIPTION
EV::Pg is a non-blocking PostgreSQL client that integrates with the L<EV>
event loop. It drives the libpq async API (C<PQsendQuery>,
C<PQconsumeInput>, C<PQgetResult>) via C<ev_io> watchers on the libpq
socket, so the event loop never blocks on database I/O.
Features: parameterized queries, prepared statements, pipeline mode,
single-row mode, chunked rows (libpq E<gt>= 17), COPY IN/OUT,
LISTEN/NOTIFY, async cancel (libpq E<gt>= 17), structured error fields,
protocol tracing, and notice handling.
=head1 CALLBACKS
Query callbacks receive C<($result)> on success, C<(undef, $error)> on error:
=over
=item B<SELECT> / B<single-row mode>
lib/EV/Pg.pm view on Meta::CPAN
C<(1)>.
=item B<Error>
C<(undef, $error_message)>.
=back
Exceptions thrown inside callbacks are caught and emitted as warnings.
=head1 CONSTRUCTOR
=head2 new
my $pg = EV::Pg->new(%args);
Arguments:
=over
=item conninfo
libpq connection string. If provided, C<connect> is called immediately.
=item conninfo_params
Hashref of connection parameters (e.g. C<< { host => 'localhost',
dbname => 'mydb', port => '5432' } >>). Alternative to C<conninfo>.
If provided, C<connect_params> is called immediately.
=item expand_dbname
If true and C<conninfo_params> is used, the C<dbname> value is parsed
as a connection string (allowing C<< dbname => 'postgresql://...' >>).
=item on_connect
Callback invoked (with no arguments) when the connection is established.
=item on_error
Callback invoked as C<< ($error_message) >> on connection-level errors.
Defaults to C<sub { die @_ }>.
=item on_notify
Callback invoked as C<< ($channel, $payload, $backend_pid) >> on
LISTEN/NOTIFY messages.
=item on_notice
Callback invoked as C<< ($message) >> on PostgreSQL notice/warning
messages.
=item on_drain
Callback invoked (with no arguments) when the send buffer has been
flushed during a COPY operation. Useful for resuming C<put_copy_data> after
it returns 0.
=item keep_alive
When true, the connection keeps C<EV::run> alive even when no queries
are pending. See L</keep_alive>.
=item loop
An L<EV> loop object. Defaults to C<EV::default_loop>.
=back
=head1 CONNECTION METHODS
=head2 connect
$pg->connect($conninfo);
Initiates an asynchronous connection. The C<on_connect> handler fires
on success; C<on_error> fires on failure.
=head2 connect_params
$pg->connect_params(\%params);
$pg->connect_params(\%params, $expand_dbname);
Initiates an asynchronous connection using keyword/value parameters
instead of a connection string. C<$expand_dbname> allows the C<dbname>
parameter to contain a full connection URI.
=head2 reset
$pg->reset;
Drops the current connection and reconnects using the original conninfo.
Pending callbacks receive C<(undef, "connection reset")>.
Alias: C<reconnect>.
=head2 finish
$pg->finish;
Closes the connection. Pending callbacks receive
C<(undef, "connection finished")>. Alias: C<disconnect>.
=head2 is_connected
my $bool = $pg->is_connected;
Returns 1 if connected and ready for queries.
=head2 status
my $st = $pg->status;
Returns the libpq connection status (C<CONNECTION_OK> or
C<CONNECTION_BAD>).
=head1 QUERY METHODS
=head2 query
$pg->query($sql, sub { my ($result, $err) = @_; });
Sends a simple query. B<Not allowed in pipeline mode> -- use
lib/EV/Pg.pm view on Meta::CPAN
Describes a portal. The callback receives the same hashref structure
as C<describe_prepared>.
=head2 set_single_row_mode
my $ok = $pg->set_single_row_mode;
Switches the most recently sent query to single-row mode. Returns 1
on success, 0 on failure (e.g. no query pending). The callback fires
once per row with C<(\@rows)> where C<@rows> is an arrayref
containing a single row (e.g. C<[[$col1, $col2, ...]]>), then a
final empty C<(\@rows)> (where C<@rows> has zero elements) for the
completion.
=head2 set_chunked_rows_mode
my $ok = $pg->set_chunked_rows_mode($chunk_size);
Switches the most recently sent query to chunked rows mode, delivering
up to C<$chunk_size> rows at a time (requires libpq E<gt>= 17).
Like single-row mode, but with lower per-callback overhead for large
result sets. Returns 1 on success, 0 on failure.
=head2 close_prepared
$pg->close_prepared($name, sub { my ($result, $err) = @_; });
Closes (deallocates) a prepared statement at protocol level (requires
libpq E<gt>= 17). The callback receives an empty string (C<"">) on
success. Works in pipeline mode, unlike C<DEALLOCATE> SQL.
=head2 close_portal
$pg->close_portal($name, sub { my ($result, $err) = @_; });
Closes a portal at protocol level (requires libpq E<gt>= 17).
The callback receives an empty string (C<"">) on success.
=head2 cancel
my $err = $pg->cancel;
Sends a cancel request using the legacy C<PQcancel> API. This is a
B<blocking> call. Returns C<undef> on success, an error string on
failure.
=head2 cancel_async
$pg->cancel_async(sub { my ($err) = @_; });
Sends an asynchronous cancel request using the C<PQcancelConn> API
(requires libpq E<gt>= 17). The callback receives no arguments on
success, or an error string on failure.
=head2 pending_count
my $n = $pg->pending_count;
Returns the number of callbacks in the queue.
=head2 keep_alive
$pg->keep_alive(1);
my $bool = $pg->keep_alive;
When true, the connection's read watcher keeps C<EV::run> alive even when
no queries are pending. Useful when waiting for server-side C<NOTIFY>
events via C<on_notify> â without this flag the event loop would exit
after the C<LISTEN> query completes.
=head2 skip_pending
$pg->skip_pending;
Cancels all pending callbacks, invoking each with
C<(undef, "skipped")>.
=head1 PIPELINE METHODS
=head2 enter_pipeline
$pg->enter_pipeline;
Enters pipeline mode. Queries are batched and sent without waiting
for individual results.
=head2 exit_pipeline
$pg->exit_pipeline;
Exits pipeline mode. Croaks if the pipeline is not idle (has
pending queries).
=head2 pipeline_sync
$pg->pipeline_sync(sub { my ($ok) = @_; });
Sends a pipeline sync point. The callback fires with C<(1)> when
all preceding queries have completed. Alias: C<sync>.
=head2 send_pipeline_sync
$pg->send_pipeline_sync(sub { my ($ok) = @_; });
Like C<pipeline_sync> but does B<not> flush the send buffer (requires
libpq E<gt>= 17). Useful for batching multiple sync points before a
single manual flush via C<send_flush_request>.
=head2 send_flush_request
$pg->send_flush_request;
Sends a flush request, asking the server to deliver results for
queries sent so far. Alias: C<flush>.
=head2 pipeline_status
my $st = $pg->pipeline_status;
Returns C<PQ_PIPELINE_OFF>, C<PQ_PIPELINE_ON>, or
C<PQ_PIPELINE_ABORTED>.
=head1 COPY METHODS
=head2 put_copy_data
( run in 0.787 second using v1.01-cache-2.11-cpan-39bf76dae61 )