AnyEvent-PgRecvlogical

 view release on metacpan or  search on metacpan

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN

has plugin             => (is => 'ro',   isa => Str,     default   => 'test_decoding');
has options            => (is => 'ro',   isa => HashRef, default   => sub { {} });
has startpos           => (is => 'rwp',  isa => $LSN, default => 0, coerce  => 1);
has received_lsn       => (is => 'rwp',  isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has flushed_lsn        => (is => 'rwp',  isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);

has on_message => (is => 'ro', isa => CodeRef, required => 1);
has on_error   => (is => 'ro', isa => CodeRef, default  => sub { \&croak });

has _fh_watch => (is => 'lazy', isa => Ref, clearer => 1, predicate => 1);
has _timer => (is => 'lazy', isa => Ref, clearer => 1);

=head1 CONSTRUCTOR

All the L</"ATTRIBUTES"> above are accepted by the constructor, with a few exceptions:

L</"received_lsn"> and L<"flushed_lsn"> are read-only and not accepted by the constructor.

L</"dbname">, L</"slot"> and L</"on_message"> are required.

Note, that logical replication will not automatically commence upon construction. One must call L</"start"> first.

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN

    my $self = shift;

    my $w = AE::io $self->dbh->{pg_socket}, 0, $self->curry::weak::_read_copydata;
    if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
        $w->priority($w->priority - 1);    # be a little less aggressive
    }

    return $w;
}

sub _build__timer {
    my $self = shift;
    if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
        my $w = EV::periodic(0, $self->heartbeat, 0, $self->curry::weak::_heartbeat);
        $w->priority(&EV::MAXPRI);
        return $w;
    } else {
        return AE::timer $self->heartbeat, $self->heartbeat, $self->curry::weak::_heartbeat;
    }
}

=head1 METHODS

All L</"ATTRIBUTES"> are also accesible via methods. They are all read-only.

=over

=item start

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN

Initialize the logical replication process asyncronously and return immediately. This performs the following steps:

=over

=item 1. L</"identify_system">

=item 2. L</"create_slot"> (if requested)

=item 3. L</"start_replication">

=item 4. heartbeat timer

=back

This method wraps the above steps for convenience. Should you desire to modify the
L<replication startup protocol|https://www.postgresql.org/docs/current/static/protocol-replication.html> (which you
shouldn't), the methods are described in detail below.

Returns: L<Promises::Promise>

=cut

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN

        }
    );
}

sub _post_init {
    my ($self, $d) = @_;

    return $d->then(
        sub {
            $self->_fh_watch;
            $self->_timer;
        },
        $self->on_error,
    );
}

=item identify_system

Issues the C<IDENTIFY_SYSTEM> command to the server to put the connection in repliction mode.

Returns: L<Promises::Promise>

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN

Stop receiving replication payloads and disconnect from the PostgreSQL server.

=back

=cut

sub stop {
    my $self = shift;

    $self->_clear_fh_watch;
    $self->_clear_timer;
    $self->clear_dbh;
}

sub _handle_disconnect {
    my $self = shift;

    $self->stop;

    return unless $self->reconnect;

    if (    $self->has_reconnect_limit
        and $self->_reconnect_counter($self->_reconnect_counter + 1) > $self->reconnect_limit) {
        $self->on_error->('reconnect limit reached: ' . $self->reconnect_limit);
        return;
    }

    $self->_set_startpos($self->flushed_lsn);
    $self->clear_received_lsn;
    $self->clear_flushed_lsn;

    my $w; $w = AE::timer $self->reconnect_delay, 0, sub {
        undef $w;
        $self->_post_init(deferred { $self->start_replication });
    };
}

sub _heartbeat {
    my ($self, $req_reply) = @_;
    $req_reply = !!$req_reply || 0;    #uncoverable condition right

    my $status = pack STANDBY_HEARTBEAT, 'r',     # receiver status update

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN

}

sub _async_await {
    my ($dbh) = @_;

    my $d = deferred;

    # no async operation in progress
    return $d->reject if $dbh->{pg_async_status} == 0;    # uncoverable branch true

    my $w; $w = AE::timer 0, AWAIT_INTERVAL, sub {
        return unless $dbh->pg_ready;
        try {
            $d->resolve($dbh->pg_result);
        } catch {
            $d->reject($_);
        };
        undef $w;
    };

    return $d->promise;

t/01_recv.t  view on Meta::CPAN

use Try::Tiny;

use AnyEvent::PgRecvlogical;

my $CV;
my $end_cv = AE::cv;

sub ae_sleep {
    my $t  = shift || 0;
    my $cv = AE::cv;
    $cv->begin; my $wt = AE::timer $t, 0, sub { $cv->end };
    $cv->recv;
}

my $t_dir       = File::Spec->rel2abs(dirname(__FILE__));
my $pg_hba_conf = File::Spec->join($t_dir, 'pg_hba.conf');

my $pg = eval {
    Test::PostgreSQL->new(extra_postmaster_args =>
          "-c hba_file='$pg_hba_conf' -c wal_level=logical -c max_wal_senders=1 -c max_replication_slots=1");
}



( run in 1.145 second using v1.01-cache-2.11-cpan-49f99fa48dc )