AnyEvent-PgRecvlogical
view release on metacpan or search on metacpan
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
=over
=item L<Str|Types::Standard/Str>
=back
Standard PostgreSQL connection parameters, see L<DBD::Pg/connect>.
=item C<do_create_slot>
=over
=item L<Bool|Types::Standard/Bool>, Default: C<0>
=back
If true, the L</slot> will be be created upon connection. Otherwise, it's assumed it already exists. If it does not,
PostgreSQL will raise an exception.
=item C<slot_exists_ok>
=over
=item L<Bool|Types::Standard/Bool>, Default: C<0>
=back
If true, and if L</do_create_slot> is also true, then no exception will be raised if the L</slot> already exists.
Otherwise, one will be raised.
=item C<reconnect>
=over
=item L<Bool|Types::Standard/Bool>, Default: C<1>
=back
If true, will attempt to reconnect to the server and resume logical replication in the event the connection fails.
Otherwise, the connection will gracefully be allowed to close.
=item C<reconnect_delay>
=over
=item L<Int|Types::Standard/Int>, Default: C<5>
=back
Time, in seconds, to wait before reconnecting.
=item C<reconnect_limit>
=over
=item L<Int|Types::Standard/Int>, Default: C<1>
=back
Number of times to attempt reconnecting. If this limit is exceded, an exception will be thrown.
=item C<heartbeat>
=over
=item L<Int|Types::Standard/Int>, Default: C<10>
=back
Interval, in seconds, to report our progress to the PostgreSQL server.
=item C<plugin>
=over
=item L<Str|Types::Standard/Str>, Default: L<test_decoding|https://www.postgresql.org/docs/current/static/test-decoding.html>
=back
The server-sider plugin used to decode the WAL file before being sent to this connection. Only required when
L</create_slot> is true.
=item C<options>
=over
=item L<HashRef|Types::Standard/HashRef>, Default: C<{}>
=back
Key-value pairs sent to the server-side L</plugin>. Keys with a value of C<undef> are sent as the keyword only.
=item C<startpos>
=over
=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>
=back
Start replication from the given LSN. Also accepts the integer form, but that is considered advanced usage.
=item C<received_lsn>
=over
=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only
=back
Holds the last LSN position received from the server.
=item C<flushed_lsn>
=over
=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only
=back
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
Issues the C<START_REPLICATION SLOT> command and immediately returns. The connection will then start receiving
logical replication payloads.
=cut
sub start_replication {
my $self = shift;
$self->dbh->do(
sprintf(
'START_REPLICATION SLOT %s LOGICAL %s%s',
$self->dbh->quote_identifier($self->slot),
$LSNStr->coerce($self->startpos),
$self->_option_string
)
);
}
=item pause
Pauses reading from the database. Useful for throttling the inbound flow of data so as to not overwhelm your
application. It is safe, albeit redundant, to call this method multiple time in a row without unpausing.
=cut
sub pause { shift->_clear_fh_watch; return; }
=item unpause
Resume reading from the database. After a successful L</pause>, this will pick right back reciving data and sending it
to the provided L</callback>. It is safe, albeit redundant, to call this method multiple time in a row without pausing.
=cut
sub unpause { shift->_fh_watch; return; }
=item is_paused
Returns the current pause state.
Returns: boolean
=cut
sub is_paused { return !shift->_has_fh_watch }
sub _read_copydata {
my $self = shift;
my ($n, $msg);
my $ok = try {
$n = $self->dbh->pg_getcopydata_async($msg);
1;
} catch {
# uncoverable statement count:2
AE::postpone { $self->_handle_disconnect };
0;
};
# exception thrown, going to reconnect
return unless $ok; # uncoverable branch true
# nothing waiting
# watcher will re-enter until $n == 0
return if $n == 0;
if ($n == -1) {
AE::postpone { $self->_handle_disconnect };
return;
}
# uncoverable branch true
if ($n == -2) {
# error reading
# uncoverable statement
$self->on_error->('could not read COPY data: ' . $self->dbh->errstr);
}
my $type = substr $msg, 0, 1;
if ('k' eq $type) {
# server keepalive
my (undef, $lsnpos, $ts, $reply) = unpack PRIMARY_HEARTBEAT, $msg;
$self->_set_received_lsn($lsnpos) if $lsnpos > $self->received_lsn;
# only interested in the request-reply bit
# uncoverable branch true
if ($reply) {
# uncoverable statement
AE::postpone { $self->_heartbeat };
}
# an inbound heartbeat is proof enough of successful reconnect
$self->_reconnect_counter(0) if $self->_reconnect_counter;
return;
}
# uncoverable branch true
unless ('w' eq $type) {
# uncoverable statement
$self->on_error->("unrecognized streaming header: '$type'");
return;
}
my (undef, $startlsn, $endlsn, $ts, $record) = unpack XLOGDATA, $msg;
$self->_set_received_lsn($startlsn) if $startlsn > $self->received_lsn;
my $guard = $self->$curry::weak(
sub {
my $self = shift;
$self->_set_flushed_lsn($startlsn) if $startlsn > $self->flushed_lsn;
}
);
$self->on_message->($record, guard(\&$guard));
return;
( run in 1.426 second using v1.01-cache-2.11-cpan-5735350b133 )