AnyEvent-PgRecvlogical
view release on metacpan or search on metacpan
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
=cut
sub is_paused { return !shift->_has_fh_watch }
sub _read_copydata {
my $self = shift;
my ($n, $msg);
my $ok = try {
$n = $self->dbh->pg_getcopydata_async($msg);
1;
} catch {
# uncoverable statement count:2
AE::postpone { $self->_handle_disconnect };
0;
};
# exception thrown, going to reconnect
return unless $ok; # uncoverable branch true
# nothing waiting
# watcher will re-enter until $n == 0
return if $n == 0;
if ($n == -1) {
AE::postpone { $self->_handle_disconnect };
return;
}
# uncoverable branch true
if ($n == -2) {
# error reading
# uncoverable statement
$self->on_error->('could not read COPY data: ' . $self->dbh->errstr);
}
my $type = substr $msg, 0, 1;
if ('k' eq $type) {
# server keepalive
my (undef, $lsnpos, $ts, $reply) = unpack PRIMARY_HEARTBEAT, $msg;
$self->_set_received_lsn($lsnpos) if $lsnpos > $self->received_lsn;
# only interested in the request-reply bit
# uncoverable branch true
if ($reply) {
# uncoverable statement
AE::postpone { $self->_heartbeat };
}
# an inbound heartbeat is proof enough of successful reconnect
$self->_reconnect_counter(0) if $self->_reconnect_counter;
return;
}
# uncoverable branch true
unless ('w' eq $type) {
# uncoverable statement
$self->on_error->("unrecognized streaming header: '$type'");
return;
}
my (undef, $startlsn, $endlsn, $ts, $record) = unpack XLOGDATA, $msg;
$self->_set_received_lsn($startlsn) if $startlsn > $self->received_lsn;
my $guard = $self->$curry::weak(
sub {
my $self = shift;
$self->_set_flushed_lsn($startlsn) if $startlsn > $self->flushed_lsn;
}
);
$self->on_message->($record, guard(\&$guard));
return;
}
=item stop
Stop receiving replication payloads and disconnect from the PostgreSQL server.
=back
=cut
sub stop {
my $self = shift;
$self->_clear_fh_watch;
$self->_clear_timer;
$self->clear_dbh;
}
sub _handle_disconnect {
my $self = shift;
$self->stop;
return unless $self->reconnect;
if ( $self->has_reconnect_limit
and $self->_reconnect_counter($self->_reconnect_counter + 1) > $self->reconnect_limit) {
$self->on_error->('reconnect limit reached: ' . $self->reconnect_limit);
return;
}
$self->_set_startpos($self->flushed_lsn);
$self->clear_received_lsn;
$self->clear_flushed_lsn;
my $w; $w = AE::timer $self->reconnect_delay, 0, sub {
undef $w;
$self->_post_init(deferred { $self->start_replication });
};
}
sub _heartbeat {
my ($self, $req_reply) = @_;
( run in 0.700 second using v1.01-cache-2.11-cpan-97f6503c9c8 )