AnyEvent-PgRecvlogical

 view release on metacpan or  search on metacpan

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN


=over

=item L<Str|Types::Standard/Str>

=back

Standard PostgreSQL connection parameters, see L<DBD::Pg/connect>.

=item C<do_create_slot>

=over

=item L<Bool|Types::Standard/Bool>, Default: C<0>

=back

If true, the L</slot> will be be created upon connection. Otherwise, it's assumed it already exists. If it does not,
PostgreSQL will raise an exception.

=item C<slot_exists_ok>

=over

=item L<Bool|Types::Standard/Bool>, Default: C<0>

=back

If true, and if L</do_create_slot> is also true, then no exception will be raised if the L</slot> already exists.
Otherwise, one will be raised.

=item C<reconnect>

=over

=item L<Bool|Types::Standard/Bool>, Default: C<1>

=back

If true, will attempt to reconnect to the server and resume logical replication in the event the connection fails.
Otherwise, the connection will gracefully be allowed to close.

=item C<reconnect_delay>

=over

=item L<Int|Types::Standard/Int>, Default: C<5>

=back

Time, in seconds, to wait before reconnecting.

=item C<reconnect_limit>

=over

=item L<Int|Types::Standard/Int>, Default: C<1>

=back

Number of times to attempt reconnecting. If this limit is exceded, an exception will be thrown.

=item C<heartbeat>

=over

=item L<Int|Types::Standard/Int>, Default: C<10>

=back

Interval, in seconds, to report our progress to the PostgreSQL server.

=item C<plugin>

=over

=item L<Str|Types::Standard/Str>, Default: L<test_decoding|https://www.postgresql.org/docs/current/static/test-decoding.html>

=back

The server-sider plugin used to decode the WAL file before being sent to this connection. Only required when
L</create_slot> is true.

=item C<options>

=over

=item L<HashRef|Types::Standard/HashRef>, Default: C<{}>

=back

Key-value pairs sent to the server-side L</plugin>. Keys with a value of C<undef> are sent as the keyword only.

=item C<startpos>

=over

=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>

=back

Start replication from the given LSN. Also accepts the integer form, but that is considered advanced usage.

=item C<received_lsn>

=over

=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only

=back

Holds the last LSN position received from the server.

=item C<flushed_lsn>

=over

=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only

=back

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN


Issues the C<START_REPLICATION SLOT> command and immediately returns. The connection will then start receiving
logical replication payloads.

=cut

sub start_replication {
    my $self = shift;

    $self->dbh->do(
        sprintf(
            'START_REPLICATION SLOT %s LOGICAL %s%s',
            $self->dbh->quote_identifier($self->slot),
            $LSNStr->coerce($self->startpos),
            $self->_option_string
        )
    );
}

=item pause

Pauses reading from the database. Useful for throttling the inbound flow of data so as to not overwhelm your
application. It is safe, albeit redundant, to call this method multiple time in a row without unpausing.

=cut

sub pause { shift->_clear_fh_watch; return; }

=item unpause

Resume reading from the database. After a successful L</pause>, this will pick right back reciving data and sending it
to the provided L</callback>. It is safe, albeit redundant, to call this method multiple time in a row without pausing.

=cut

sub unpause { shift->_fh_watch; return; }

=item is_paused

Returns the current pause state.

Returns: boolean

=cut

sub is_paused { return !shift->_has_fh_watch }

sub _read_copydata {
    my $self = shift;

    my ($n, $msg);
    my $ok = try {
        $n = $self->dbh->pg_getcopydata_async($msg);
        1;
    } catch {
        # uncoverable statement count:2
        AE::postpone { $self->_handle_disconnect };
        0;
    };

    # exception thrown, going to reconnect
    return unless $ok;    # uncoverable branch true

    # nothing waiting
    # watcher will re-enter until $n == 0
    return if $n == 0;

    if ($n == -1) {
        AE::postpone { $self->_handle_disconnect };
        return;
    }

    # uncoverable branch true
    if ($n == -2) {
        # error reading
        # uncoverable statement
        $self->on_error->('could not read COPY data: ' . $self->dbh->errstr);
    }

    my $type = substr $msg, 0, 1;

    if ('k' eq $type) {
        # server keepalive
        my (undef, $lsnpos, $ts, $reply) = unpack PRIMARY_HEARTBEAT, $msg;

        $self->_set_received_lsn($lsnpos) if $lsnpos > $self->received_lsn;

        # only interested in the request-reply bit
        # uncoverable branch true
        if ($reply) {
            # uncoverable statement
            AE::postpone { $self->_heartbeat };
        }

        # an inbound heartbeat is proof enough of successful reconnect
        $self->_reconnect_counter(0) if $self->_reconnect_counter;

        return;
    }

    # uncoverable branch true
    unless ('w' eq $type) {
        # uncoverable statement
        $self->on_error->("unrecognized streaming header: '$type'");
        return;
    }

    my (undef, $startlsn, $endlsn, $ts, $record) = unpack XLOGDATA, $msg;

    $self->_set_received_lsn($startlsn) if $startlsn > $self->received_lsn;

    my $guard = $self->$curry::weak(
        sub {
            my $self = shift;
            $self->_set_flushed_lsn($startlsn) if $startlsn > $self->flushed_lsn;
        }
    );

    $self->on_message->($record, guard(\&$guard));

    return;



( run in 1.426 second using v1.01-cache-2.11-cpan-5735350b133 )