AnyEvent-PgRecvlogical
view release on metacpan or search on metacpan
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
has plugin => (is => 'ro', isa => Str, default => 'test_decoding');
has options => (is => 'ro', isa => HashRef, default => sub { {} });
has startpos => (is => 'rwp', isa => $LSN, default => 0, coerce => 1);
has received_lsn => (is => 'rwp', isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has flushed_lsn => (is => 'rwp', isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has on_message => (is => 'ro', isa => CodeRef, required => 1);
has on_error => (is => 'ro', isa => CodeRef, default => sub { \&croak });
has _fh_watch => (is => 'lazy', isa => Ref, clearer => 1, predicate => 1);
has _timer => (is => 'lazy', isa => Ref, clearer => 1);
=head1 CONSTRUCTOR
All the L</"ATTRIBUTES"> above are accepted by the constructor, with a few exceptions:
L</"received_lsn"> and L<"flushed_lsn"> are read-only and not accepted by the constructor.
L</"dbname">, L</"slot"> and L</"on_message"> are required.
Note, that logical replication will not automatically commence upon construction. One must call L</"start"> first.
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
my $self = shift;
my $w = AE::io $self->dbh->{pg_socket}, 0, $self->curry::weak::_read_copydata;
if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
$w->priority($w->priority - 1); # be a little less aggressive
}
return $w;
}
sub _build__timer {
my $self = shift;
if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
my $w = EV::periodic(0, $self->heartbeat, 0, $self->curry::weak::_heartbeat);
$w->priority(&EV::MAXPRI);
return $w;
} else {
return AE::timer $self->heartbeat, $self->heartbeat, $self->curry::weak::_heartbeat;
}
}
=head1 METHODS
All L</"ATTRIBUTES"> are also accesible via methods. They are all read-only.
=over
=item start
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
Initialize the logical replication process asyncronously and return immediately. This performs the following steps:
=over
=item 1. L</"identify_system">
=item 2. L</"create_slot"> (if requested)
=item 3. L</"start_replication">
=item 4. heartbeat timer
=back
This method wraps the above steps for convenience. Should you desire to modify the
L<replication startup protocol|https://www.postgresql.org/docs/current/static/protocol-replication.html> (which you
shouldn't), the methods are described in detail below.
Returns: L<Promises::Promise>
=cut
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
}
);
}
sub _post_init {
my ($self, $d) = @_;
return $d->then(
sub {
$self->_fh_watch;
$self->_timer;
},
$self->on_error,
);
}
=item identify_system
Issues the C<IDENTIFY_SYSTEM> command to the server to put the connection in repliction mode.
Returns: L<Promises::Promise>
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
Stop receiving replication payloads and disconnect from the PostgreSQL server.
=back
=cut
sub stop {
my $self = shift;
$self->_clear_fh_watch;
$self->_clear_timer;
$self->clear_dbh;
}
sub _handle_disconnect {
my $self = shift;
$self->stop;
return unless $self->reconnect;
if ( $self->has_reconnect_limit
and $self->_reconnect_counter($self->_reconnect_counter + 1) > $self->reconnect_limit) {
$self->on_error->('reconnect limit reached: ' . $self->reconnect_limit);
return;
}
$self->_set_startpos($self->flushed_lsn);
$self->clear_received_lsn;
$self->clear_flushed_lsn;
my $w; $w = AE::timer $self->reconnect_delay, 0, sub {
undef $w;
$self->_post_init(deferred { $self->start_replication });
};
}
sub _heartbeat {
my ($self, $req_reply) = @_;
$req_reply = !!$req_reply || 0; #uncoverable condition right
my $status = pack STANDBY_HEARTBEAT, 'r', # receiver status update
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
}
sub _async_await {
my ($dbh) = @_;
my $d = deferred;
# no async operation in progress
return $d->reject if $dbh->{pg_async_status} == 0; # uncoverable branch true
my $w; $w = AE::timer 0, AWAIT_INTERVAL, sub {
return unless $dbh->pg_ready;
try {
$d->resolve($dbh->pg_result);
} catch {
$d->reject($_);
};
undef $w;
};
return $d->promise;
t/01_recv.t view on Meta::CPAN
use Try::Tiny;
use AnyEvent::PgRecvlogical;
my $CV;
my $end_cv = AE::cv;
sub ae_sleep {
my $t = shift || 0;
my $cv = AE::cv;
$cv->begin; my $wt = AE::timer $t, 0, sub { $cv->end };
$cv->recv;
}
my $t_dir = File::Spec->rel2abs(dirname(__FILE__));
my $pg_hba_conf = File::Spec->join($t_dir, 'pg_hba.conf');
my $pg = eval {
Test::PostgreSQL->new(extra_postmaster_args =>
"-c hba_file='$pg_hba_conf' -c wal_level=logical -c max_wal_senders=1 -c max_replication_slots=1");
}
( run in 1.145 second using v1.01-cache-2.11-cpan-49f99fa48dc )