AnyEvent-PgRecvlogical

 view release on metacpan or  search on metacpan

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN

application. It is safe, albeit redundant, to call this method multiple time in a row without unpausing.

=cut

sub pause { shift->_clear_fh_watch; return; }

=item unpause

Resume reading from the database. After a successful L</pause>, this will pick right back reciving data and sending it
to the provided L</callback>. It is safe, albeit redundant, to call this method multiple time in a row without pausing.

=cut

sub unpause { shift->_fh_watch; return; }

=item is_paused

Returns the current pause state.

Returns: boolean

=cut

sub is_paused { return !shift->_has_fh_watch }

sub _read_copydata {
    my $self = shift;

    my ($n, $msg);
    my $ok = try {
        $n = $self->dbh->pg_getcopydata_async($msg);
        1;
    } catch {
        # uncoverable statement count:2
        AE::postpone { $self->_handle_disconnect };
        0;
    };

    # exception thrown, going to reconnect
    return unless $ok;    # uncoverable branch true

    # nothing waiting
    # watcher will re-enter until $n == 0
    return if $n == 0;

    if ($n == -1) {
        AE::postpone { $self->_handle_disconnect };
        return;
    }

    # uncoverable branch true
    if ($n == -2) {
        # error reading
        # uncoverable statement
        $self->on_error->('could not read COPY data: ' . $self->dbh->errstr);
    }

    my $type = substr $msg, 0, 1;

    if ('k' eq $type) {
        # server keepalive
        my (undef, $lsnpos, $ts, $reply) = unpack PRIMARY_HEARTBEAT, $msg;

        $self->_set_received_lsn($lsnpos) if $lsnpos > $self->received_lsn;

        # only interested in the request-reply bit
        # uncoverable branch true
        if ($reply) {
            # uncoverable statement
            AE::postpone { $self->_heartbeat };
        }

        # an inbound heartbeat is proof enough of successful reconnect
        $self->_reconnect_counter(0) if $self->_reconnect_counter;

        return;
    }

    # uncoverable branch true
    unless ('w' eq $type) {
        # uncoverable statement
        $self->on_error->("unrecognized streaming header: '$type'");
        return;
    }

    my (undef, $startlsn, $endlsn, $ts, $record) = unpack XLOGDATA, $msg;

    $self->_set_received_lsn($startlsn) if $startlsn > $self->received_lsn;

    my $guard = $self->$curry::weak(
        sub {
            my $self = shift;
            $self->_set_flushed_lsn($startlsn) if $startlsn > $self->flushed_lsn;
        }
    );

    $self->on_message->($record, guard(\&$guard));

    return;
}

=item stop

Stop receiving replication payloads and disconnect from the PostgreSQL server.

=back

=cut

sub stop {
    my $self = shift;

    $self->_clear_fh_watch;
    $self->_clear_timer;
    $self->clear_dbh;
}

sub _handle_disconnect {
    my $self = shift;

    $self->stop;



( run in 1.167 second using v1.01-cache-2.11-cpan-df04353d9ac )