AnyEvent-PgRecvlogical

 view release on metacpan or  search on metacpan

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN

=over

=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only

=back

Holds the last LSN signaled to handled by the client (see: L</on_message>)

=item C<on_error>

=over

=item L<CodeRef|Types::Standard/CodeRef>, Default: L<croak|Carp/croak>

=back

Callback in the event of an error.

=item C<on_message>

=over

=item L<CodeRef|Types::Standard/CodeRef>, Required

=back

Callback to receive the replication payload from the server. This is the raw output from the L</plugin>.

The callback is passed the C<$payload> received and a C<$guard> object. Hang onto the C<$guard> until you have handled
the payload. Once it is released, the server will be informed that the WAL position has been "flushed."

=back

=cut

has dbname   => (is => 'ro', isa => Str, required  => 1);
has host     => (is => 'ro', isa => Str, predicate => 1);
has port     => (is => 'ro', isa => Int, predicate => 1);
has username => (is => 'ro', isa => Str, default   => q{});
has password => (is => 'ro', isa => Str, default   => q{});
has slot     => (is => 'ro', isa => Str, required  => 1);

has dbh                => (is => 'lazy', isa => $DBH, clearer => 1, init_arg => undef);
has do_create_slot     => (is => 'ro',   isa => Bool,    default   => 0);
has slot_exists_ok     => (is => 'ro',   isa => Bool,    default   => 0);
has reconnect          => (is => 'ro',   isa => Bool,    default   => 1);
has reconnect_delay    => (is => 'ro',   isa => Int,     default   => 5);
has reconnect_limit    => (is => 'ro',   isa => Int,     predicate => 1);
has _reconnect_counter => (is => 'rw',   isa => Int,     default   => 0);
has heartbeat          => (is => 'ro',   isa => Int,     default   => 10);
has plugin             => (is => 'ro',   isa => Str,     default   => 'test_decoding');
has options            => (is => 'ro',   isa => HashRef, default   => sub { {} });
has startpos           => (is => 'rwp',  isa => $LSN, default => 0, coerce  => 1);
has received_lsn       => (is => 'rwp',  isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has flushed_lsn        => (is => 'rwp',  isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);

has on_message => (is => 'ro', isa => CodeRef, required => 1);
has on_error   => (is => 'ro', isa => CodeRef, default  => sub { \&croak });

has _fh_watch => (is => 'lazy', isa => Ref, clearer => 1, predicate => 1);
has _timer => (is => 'lazy', isa => Ref, clearer => 1);

=head1 CONSTRUCTOR

All the L</"ATTRIBUTES"> above are accepted by the constructor, with a few exceptions:

L</"received_lsn"> and L<"flushed_lsn"> are read-only and not accepted by the constructor.

L</"dbname">, L</"slot"> and L</"on_message"> are required.

Note, that logical replication will not automatically commence upon construction. One must call L</"start"> first.

=cut

sub _dsn {
    my $self = shift;

    my %dsn = (replication => 'database', client_encoding => 'sql_ascii');
    foreach (qw(host port dbname)) {
        my $x = "has_$_";
        next if $self->can($x) and not $self->$x;

        $dsn{$_} = $self->$_;
    }

    return 'dbi:Pg:' . join q{;}, map { "$_=$dsn{$_}" } sort keys %dsn;
}

sub _build_dbh {
    my $self = shift;
    my $dbh  = DBI->connect($self->_dsn, $self->username, $self->password, { PrintError => 0 },);

    croak $DBI::errstr unless $dbh;

    return $dbh;
}

sub _build__fh_watch {
    my $self = shift;

    my $w = AE::io $self->dbh->{pg_socket}, 0, $self->curry::weak::_read_copydata;
    if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
        $w->priority($w->priority - 1);    # be a little less aggressive
    }

    return $w;
}

sub _build__timer {
    my $self = shift;
    if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
        my $w = EV::periodic(0, $self->heartbeat, 0, $self->curry::weak::_heartbeat);
        $w->priority(&EV::MAXPRI);
        return $w;
    } else {
        return AE::timer $self->heartbeat, $self->heartbeat, $self->curry::weak::_heartbeat;
    }
}

=head1 METHODS

All L</"ATTRIBUTES"> are also accesible via methods. They are all read-only.

=over

=item start

Initialize the logical replication process asyncronously and return immediately. This performs the following steps:

=over

=item 1. L</"identify_system">

=item 2. L</"create_slot"> (if requested)

=item 3. L</"start_replication">

=item 4. heartbeat timer

=back

This method wraps the above steps for convenience. Should you desire to modify the
L<replication startup protocol|https://www.postgresql.org/docs/current/static/protocol-replication.html> (which you
shouldn't), the methods are described in detail below.

Returns: L<Promises::Promise>

=cut

sub start {
    my $self = shift;

    $self->_post_init(
        deferred {
            shift->chain($self->curry::identify_system, $self->curry::create_slot, $self->curry::start_replication);
        }
    );
}

sub _post_init {
    my ($self, $d) = @_;

    return $d->then(
        sub {
            $self->_fh_watch;
            $self->_timer;
        },
        $self->on_error,
    );
}

=item identify_system

Issues the C<IDENTIFY_SYSTEM> command to the server to put the connection in repliction mode.

Returns: L<Promises::Promise>

=cut

sub identify_system {
    my $self = shift;
    $self->dbh->do('IDENTIFY_SYSTEM', { pg_async => PG_ASYNC });
    return _async_await($self->dbh)->catch(
        sub {
            my @error = @_;
            unshift @error, $DBI::errstr if $DBI::errstr;

            croak @error;
        }
    );
}

=item create_slot

Issues the appropriate C<CREATE_REPLICATION_SLOT> command to the server, if requested.

Returns: L<Promises::Promise>

=cut

sub create_slot {
    my $self = shift;

    return deferred->resolve unless $self->do_create_slot;

    my $dbh = $self->dbh;
    $dbh->do(
        sprintf(
            'CREATE_REPLICATION_SLOT %s LOGICAL %s%s',
            $dbh->quote_identifier($self->slot),
            $dbh->quote_identifier($self->plugin),
            ($dbh->{pg_server_version} >= PG_MIN_NOEXPORT ? ' NOEXPORT_SNAPSHOT' : '')    # uncoverable branch true
        ),
        { pg_async => PG_ASYNC }
    );

    return _async_await($dbh)->catch(
        sub {
            croak @_ unless $dbh->state eq PG_STATE_DUPEOBJ and $self->slot_exists_ok;
        }
    );
}

sub _option_string {
    my $self = shift;

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN

        # uncoverable statement
        $self->on_error->('could not read COPY data: ' . $self->dbh->errstr);
    }

    my $type = substr $msg, 0, 1;

    if ('k' eq $type) {
        # server keepalive
        my (undef, $lsnpos, $ts, $reply) = unpack PRIMARY_HEARTBEAT, $msg;

        $self->_set_received_lsn($lsnpos) if $lsnpos > $self->received_lsn;

        # only interested in the request-reply bit
        # uncoverable branch true
        if ($reply) {
            # uncoverable statement
            AE::postpone { $self->_heartbeat };
        }

        # an inbound heartbeat is proof enough of successful reconnect
        $self->_reconnect_counter(0) if $self->_reconnect_counter;

        return;
    }

    # uncoverable branch true
    unless ('w' eq $type) {
        # uncoverable statement
        $self->on_error->("unrecognized streaming header: '$type'");
        return;
    }

    my (undef, $startlsn, $endlsn, $ts, $record) = unpack XLOGDATA, $msg;

    $self->_set_received_lsn($startlsn) if $startlsn > $self->received_lsn;

    my $guard = $self->$curry::weak(
        sub {
            my $self = shift;
            $self->_set_flushed_lsn($startlsn) if $startlsn > $self->flushed_lsn;
        }
    );

    $self->on_message->($record, guard(\&$guard));

    return;
}

=item stop

Stop receiving replication payloads and disconnect from the PostgreSQL server.

=back

=cut

sub stop {
    my $self = shift;

    $self->_clear_fh_watch;
    $self->_clear_timer;
    $self->clear_dbh;
}

sub _handle_disconnect {
    my $self = shift;

    $self->stop;

    return unless $self->reconnect;

    if (    $self->has_reconnect_limit
        and $self->_reconnect_counter($self->_reconnect_counter + 1) > $self->reconnect_limit) {
        $self->on_error->('reconnect limit reached: ' . $self->reconnect_limit);
        return;
    }

    $self->_set_startpos($self->flushed_lsn);
    $self->clear_received_lsn;
    $self->clear_flushed_lsn;

    my $w; $w = AE::timer $self->reconnect_delay, 0, sub {
        undef $w;
        $self->_post_init(deferred { $self->start_replication });
    };
}

sub _heartbeat {
    my ($self, $req_reply) = @_;
    $req_reply = !!$req_reply || 0;    #uncoverable condition right

    my $status = pack STANDBY_HEARTBEAT, 'r',     # receiver status update
      $self->received_lsn,                        # last WAL received
      $self->flushed_lsn,                         # last WAL flushed
      0,                                          # last WAL applied
      int((AE::now - PG_EPOCH_DELTA) * USECS),    # ms since 2000-01-01
      $req_reply;                                 # request heartbeat

    $self->dbh->pg_putcopydata($status);
}

sub _async_await {
    my ($dbh) = @_;

    my $d = deferred;

    # no async operation in progress
    return $d->reject if $dbh->{pg_async_status} == 0;    # uncoverable branch true

    my $w; $w = AE::timer 0, AWAIT_INTERVAL, sub {
        return unless $dbh->pg_ready;
        try {
            $d->resolve($dbh->pg_result);
        } catch {
            $d->reject($_);
        };
        undef $w;
    };

    return $d->promise;
}

=head1 AUTHOR

William Cox (cpan:MYDMNSN) <mydimension@gmail.com>

=head1 COPYRIGHT

Copyright (c) 2017-2018 William Cox

=head1 LICENSE

This library is free software and may be distributed under the same terms as perl itself.
See L<http://dev.perl.org/licenses/>.

=cut

1;



( run in 1.695 second using v1.01-cache-2.11-cpan-ceb78f64989 )