AnyEvent-PgRecvlogical
view release on metacpan or search on metacpan
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
application. It is safe, albeit redundant, to call this method multiple time in a row without unpausing.
=cut
sub pause { shift->_clear_fh_watch; return; }
=item unpause
Resume reading from the database. After a successful L</pause>, this will pick right back reciving data and sending it
to the provided L</callback>. It is safe, albeit redundant, to call this method multiple time in a row without pausing.
=cut
sub unpause { shift->_fh_watch; return; }
=item is_paused
Returns the current pause state.
Returns: boolean
=cut
sub is_paused { return !shift->_has_fh_watch }
sub _read_copydata {
my $self = shift;
my ($n, $msg);
my $ok = try {
$n = $self->dbh->pg_getcopydata_async($msg);
1;
} catch {
# uncoverable statement count:2
AE::postpone { $self->_handle_disconnect };
0;
};
# exception thrown, going to reconnect
return unless $ok; # uncoverable branch true
# nothing waiting
# watcher will re-enter until $n == 0
return if $n == 0;
if ($n == -1) {
AE::postpone { $self->_handle_disconnect };
return;
}
# uncoverable branch true
if ($n == -2) {
# error reading
# uncoverable statement
$self->on_error->('could not read COPY data: ' . $self->dbh->errstr);
}
my $type = substr $msg, 0, 1;
if ('k' eq $type) {
# server keepalive
my (undef, $lsnpos, $ts, $reply) = unpack PRIMARY_HEARTBEAT, $msg;
$self->_set_received_lsn($lsnpos) if $lsnpos > $self->received_lsn;
# only interested in the request-reply bit
# uncoverable branch true
if ($reply) {
# uncoverable statement
AE::postpone { $self->_heartbeat };
}
# an inbound heartbeat is proof enough of successful reconnect
$self->_reconnect_counter(0) if $self->_reconnect_counter;
return;
}
# uncoverable branch true
unless ('w' eq $type) {
# uncoverable statement
$self->on_error->("unrecognized streaming header: '$type'");
return;
}
my (undef, $startlsn, $endlsn, $ts, $record) = unpack XLOGDATA, $msg;
$self->_set_received_lsn($startlsn) if $startlsn > $self->received_lsn;
my $guard = $self->$curry::weak(
sub {
my $self = shift;
$self->_set_flushed_lsn($startlsn) if $startlsn > $self->flushed_lsn;
}
);
$self->on_message->($record, guard(\&$guard));
return;
}
=item stop
Stop receiving replication payloads and disconnect from the PostgreSQL server.
=back
=cut
sub stop {
my $self = shift;
$self->_clear_fh_watch;
$self->_clear_timer;
$self->clear_dbh;
}
sub _handle_disconnect {
my $self = shift;
$self->stop;
( run in 1.167 second using v1.01-cache-2.11-cpan-df04353d9ac )