AnyEvent-PgRecvlogical

 view release on metacpan or  search on metacpan

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN

=cut

sub is_paused { return !shift->_has_fh_watch }

sub _read_copydata {
    my $self = shift;

    my ($n, $msg);
    my $ok = try {
        $n = $self->dbh->pg_getcopydata_async($msg);
        1;
    } catch {
        # uncoverable statement count:2
        AE::postpone { $self->_handle_disconnect };
        0;
    };

    # exception thrown, going to reconnect
    return unless $ok;    # uncoverable branch true

    # nothing waiting
    # watcher will re-enter until $n == 0
    return if $n == 0;

    if ($n == -1) {
        AE::postpone { $self->_handle_disconnect };
        return;
    }

    # uncoverable branch true
    if ($n == -2) {
        # error reading
        # uncoverable statement
        $self->on_error->('could not read COPY data: ' . $self->dbh->errstr);
    }

    my $type = substr $msg, 0, 1;

    if ('k' eq $type) {
        # server keepalive
        my (undef, $lsnpos, $ts, $reply) = unpack PRIMARY_HEARTBEAT, $msg;

        $self->_set_received_lsn($lsnpos) if $lsnpos > $self->received_lsn;

        # only interested in the request-reply bit
        # uncoverable branch true
        if ($reply) {
            # uncoverable statement
            AE::postpone { $self->_heartbeat };
        }

        # an inbound heartbeat is proof enough of successful reconnect
        $self->_reconnect_counter(0) if $self->_reconnect_counter;

        return;
    }

    # uncoverable branch true
    unless ('w' eq $type) {
        # uncoverable statement
        $self->on_error->("unrecognized streaming header: '$type'");
        return;
    }

    my (undef, $startlsn, $endlsn, $ts, $record) = unpack XLOGDATA, $msg;

    $self->_set_received_lsn($startlsn) if $startlsn > $self->received_lsn;

    my $guard = $self->$curry::weak(
        sub {
            my $self = shift;
            $self->_set_flushed_lsn($startlsn) if $startlsn > $self->flushed_lsn;
        }
    );

    $self->on_message->($record, guard(\&$guard));

    return;
}

=item stop

Stop receiving replication payloads and disconnect from the PostgreSQL server.

=back

=cut

sub stop {
    my $self = shift;

    $self->_clear_fh_watch;
    $self->_clear_timer;
    $self->clear_dbh;
}

sub _handle_disconnect {
    my $self = shift;

    $self->stop;

    return unless $self->reconnect;

    if (    $self->has_reconnect_limit
        and $self->_reconnect_counter($self->_reconnect_counter + 1) > $self->reconnect_limit) {
        $self->on_error->('reconnect limit reached: ' . $self->reconnect_limit);
        return;
    }

    $self->_set_startpos($self->flushed_lsn);
    $self->clear_received_lsn;
    $self->clear_flushed_lsn;

    my $w; $w = AE::timer $self->reconnect_delay, 0, sub {
        undef $w;
        $self->_post_init(deferred { $self->start_replication });
    };
}

sub _heartbeat {
    my ($self, $req_reply) = @_;



( run in 0.700 second using v1.01-cache-2.11-cpan-97f6503c9c8 )