EV-Pg

 view release on metacpan or  search on metacpan

lib/EV/Pg.pm  view on Meta::CPAN

    PGRES_NONFATAL_ERROR PGRES_FATAL_ERROR PGRES_COPY_BOTH
    PGRES_SINGLE_TUPLE PGRES_PIPELINE_SYNC PGRES_PIPELINE_ABORTED
    PGRES_TUPLES_CHUNK
)];

$EXPORT_TAGS{conn} = [qw(CONNECTION_OK CONNECTION_BAD)];

$EXPORT_TAGS{transaction} = [qw(
    PQTRANS_IDLE PQTRANS_ACTIVE PQTRANS_INTRANS
    PQTRANS_INERROR PQTRANS_UNKNOWN
)];

$EXPORT_TAGS{pipeline} = [qw(
    PQ_PIPELINE_OFF PQ_PIPELINE_ON PQ_PIPELINE_ABORTED
)];

$EXPORT_TAGS{verbosity} = [qw(
    PQERRORS_TERSE PQERRORS_DEFAULT PQERRORS_VERBOSE PQERRORS_SQLSTATE
)];

$EXPORT_TAGS{context} = [qw(
    PQSHOW_CONTEXT_NEVER PQSHOW_CONTEXT_ERRORS PQSHOW_CONTEXT_ALWAYS
)];

$EXPORT_TAGS{trace} = [qw(
    PQTRACE_SUPPRESS_TIMESTAMPS PQTRACE_REGRESS_MODE
)];

{
    my %seen;
    @EXPORT_OK = grep { !$seen{$_}++ } map { @$_ } values %EXPORT_TAGS;
    $EXPORT_TAGS{all} = \@EXPORT_OK;
}

*q          = \&query;
*qp         = \&query_params;
*qx         = \&query_prepared;
*prep       = \&prepare;
*reconnect  = \&reset;
*disconnect = \&finish;
*flush      = \&send_flush_request if defined &send_flush_request;
*sync       = \&pipeline_sync;
*quote      = \&escape_literal;
*quote_id   = \&escape_identifier;
*errstr     = \&error_message;
*txn_status = \&transaction_status;
*pid        = \&backend_pid;

sub new {
    my ($class, %args) = @_;

    my $loop = delete $args{loop} || EV::default_loop;
    my $self = $class->_new($loop);

    $self->on_error(delete $args{on_error} // sub { die @_ });
    $self->on_connect(delete $args{on_connect})   if exists $args{on_connect};
    $self->on_notify(delete $args{on_notify})     if exists $args{on_notify};
    $self->on_notice(delete $args{on_notice})     if exists $args{on_notice};
    $self->on_drain(delete $args{on_drain})       if exists $args{on_drain};

    my $keep_alive      = delete $args{keep_alive};
    my $conninfo        = delete $args{conninfo};
    my $conninfo_params = delete $args{conninfo_params};
    my $expand_dbname   = delete $args{expand_dbname};

    if (my @unknown = sort keys %args) {
        Carp::carp("EV::Pg->new: unknown argument(s): @unknown");
    }

    $self->keep_alive(1) if $keep_alive;

    if (defined $conninfo_params) {
        $self->connect_params($conninfo_params, $expand_dbname ? 1 : 0);
    } elsif (defined $conninfo) {
        $self->connect($conninfo);
    }

    $self;
}

1;

__END__

=head1 NAME

EV::Pg - asynchronous PostgreSQL client using libpq and EV

=head1 SYNOPSIS

    use v5.10;
    use EV;
    use EV::Pg;

    my $pg = EV::Pg->new(
        conninfo   => 'dbname=mydb',
        on_error   => sub { die "PG error: $_[0]\n" },
    );
    $pg->on_connect(sub {
        $pg->query_params(
            'select $1::int + $2::int', [10, 20],
            sub {
                my ($rows, $err) = @_;
                die $err if $err;
                say $rows->[0][0];  # 30
                EV::break;
            },
        );
    });
    EV::run;

=head1 DESCRIPTION

EV::Pg is a non-blocking PostgreSQL client that integrates with the L<EV>
event loop.  It drives the libpq async API (C<PQsendQuery>,
C<PQconsumeInput>, C<PQgetResult>) via C<ev_io> watchers on the libpq
socket, so the event loop never blocks on database I/O.

Features: parameterized queries, prepared statements, pipeline mode,
single-row mode, chunked rows (libpq E<gt>= 17), COPY IN/OUT,
LISTEN/NOTIFY, async cancel (libpq E<gt>= 17), structured error fields,
protocol tracing, and notice handling.

=head1 CALLBACKS

Query callbacks receive C<($result)> on success, C<(undef, $error)> on error:

=over

=item B<SELECT> / B<single-row mode>

lib/EV/Pg.pm  view on Meta::CPAN

C<(1)>.

=item B<Error>

C<(undef, $error_message)>.

=back

Exceptions thrown inside callbacks are caught and emitted as warnings.

=head1 CONSTRUCTOR

=head2 new

    my $pg = EV::Pg->new(%args);

Arguments:

=over

=item conninfo

libpq connection string.  If provided, C<connect> is called immediately.

=item conninfo_params

Hashref of connection parameters (e.g. C<< { host => 'localhost',
dbname => 'mydb', port => '5432' } >>).  Alternative to C<conninfo>.
If provided, C<connect_params> is called immediately.

=item expand_dbname

If true and C<conninfo_params> is used, the C<dbname> value is parsed
as a connection string (allowing C<< dbname => 'postgresql://...' >>).

=item on_connect

Callback invoked (with no arguments) when the connection is established.

=item on_error

Callback invoked as C<< ($error_message) >> on connection-level errors.
Defaults to C<sub { die @_ }>.

=item on_notify

Callback invoked as C<< ($channel, $payload, $backend_pid) >> on
LISTEN/NOTIFY messages.

=item on_notice

Callback invoked as C<< ($message) >> on PostgreSQL notice/warning
messages.

=item on_drain

Callback invoked (with no arguments) when the send buffer has been
flushed during a COPY operation.  Useful for resuming C<put_copy_data> after
it returns 0.

=item keep_alive

When true, the connection keeps C<EV::run> alive even when no queries
are pending.  See L</keep_alive>.

=item loop

An L<EV> loop object.  Defaults to C<EV::default_loop>.

=back

=head1 CONNECTION METHODS

=head2 connect

    $pg->connect($conninfo);

Initiates an asynchronous connection.  The C<on_connect> handler fires
on success; C<on_error> fires on failure.

=head2 connect_params

    $pg->connect_params(\%params);
    $pg->connect_params(\%params, $expand_dbname);

Initiates an asynchronous connection using keyword/value parameters
instead of a connection string.  C<$expand_dbname> allows the C<dbname>
parameter to contain a full connection URI.

=head2 reset

    $pg->reset;

Drops the current connection and reconnects using the original conninfo.
Pending callbacks receive C<(undef, "connection reset")>.
Alias: C<reconnect>.

=head2 finish

    $pg->finish;

Closes the connection.  Pending callbacks receive
C<(undef, "connection finished")>.  Alias: C<disconnect>.

=head2 is_connected

    my $bool = $pg->is_connected;

Returns 1 if connected and ready for queries.

=head2 status

    my $st = $pg->status;

Returns the libpq connection status (C<CONNECTION_OK> or
C<CONNECTION_BAD>).

=head1 QUERY METHODS

=head2 query

    $pg->query($sql, sub { my ($result, $err) = @_; });

Sends a simple query.  B<Not allowed in pipeline mode> -- use

lib/EV/Pg.pm  view on Meta::CPAN

Describes a portal.  The callback receives the same hashref structure
as C<describe_prepared>.

=head2 set_single_row_mode

    my $ok = $pg->set_single_row_mode;

Switches the most recently sent query to single-row mode.  Returns 1
on success, 0 on failure (e.g. no query pending).  The callback fires
once per row with C<(\@rows)> where C<@rows> is an arrayref
containing a single row (e.g. C<[[$col1, $col2, ...]]>), then a
final empty C<(\@rows)> (where C<@rows> has zero elements) for the
completion.

=head2 set_chunked_rows_mode

    my $ok = $pg->set_chunked_rows_mode($chunk_size);

Switches the most recently sent query to chunked rows mode, delivering
up to C<$chunk_size> rows at a time (requires libpq E<gt>= 17).
Like single-row mode, but with lower per-callback overhead for large
result sets.  Returns 1 on success, 0 on failure.

=head2 close_prepared

    $pg->close_prepared($name, sub { my ($result, $err) = @_; });

Closes (deallocates) a prepared statement at protocol level (requires
libpq E<gt>= 17).  The callback receives an empty string (C<"">) on
success.  Works in pipeline mode, unlike C<DEALLOCATE> SQL.

=head2 close_portal

    $pg->close_portal($name, sub { my ($result, $err) = @_; });

Closes a portal at protocol level (requires libpq E<gt>= 17).
The callback receives an empty string (C<"">) on success.

=head2 cancel

    my $err = $pg->cancel;

Sends a cancel request using the legacy C<PQcancel> API.  This is a
B<blocking> call.  Returns C<undef> on success, an error string on
failure.

=head2 cancel_async

    $pg->cancel_async(sub { my ($err) = @_; });

Sends an asynchronous cancel request using the C<PQcancelConn> API
(requires libpq E<gt>= 17).  The callback receives no arguments on
success, or an error string on failure.

=head2 pending_count

    my $n = $pg->pending_count;

Returns the number of callbacks in the queue.

=head2 keep_alive

    $pg->keep_alive(1);
    my $bool = $pg->keep_alive;

When true, the connection's read watcher keeps C<EV::run> alive even when
no queries are pending.  Useful when waiting for server-side C<NOTIFY>
events via C<on_notify> — without this flag the event loop would exit
after the C<LISTEN> query completes.

=head2 skip_pending

    $pg->skip_pending;

Cancels all pending callbacks, invoking each with
C<(undef, "skipped")>.

=head1 PIPELINE METHODS

=head2 enter_pipeline

    $pg->enter_pipeline;

Enters pipeline mode.  Queries are batched and sent without waiting
for individual results.

=head2 exit_pipeline

    $pg->exit_pipeline;

Exits pipeline mode.  Croaks if the pipeline is not idle (has
pending queries).

=head2 pipeline_sync

    $pg->pipeline_sync(sub { my ($ok) = @_; });

Sends a pipeline sync point.  The callback fires with C<(1)> when
all preceding queries have completed.  Alias: C<sync>.

=head2 send_pipeline_sync

    $pg->send_pipeline_sync(sub { my ($ok) = @_; });

Like C<pipeline_sync> but does B<not> flush the send buffer (requires
libpq E<gt>= 17).  Useful for batching multiple sync points before a
single manual flush via C<send_flush_request>.

=head2 send_flush_request

    $pg->send_flush_request;

Sends a flush request, asking the server to deliver results for
queries sent so far.  Alias: C<flush>.

=head2 pipeline_status

    my $st = $pg->pipeline_status;

Returns C<PQ_PIPELINE_OFF>, C<PQ_PIPELINE_ON>, or
C<PQ_PIPELINE_ABORTED>.

=head1 COPY METHODS

=head2 put_copy_data



( run in 0.787 second using v1.01-cache-2.11-cpan-39bf76dae61 )