AnyEvent-PgRecvlogical
view release on metacpan or search on metacpan
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
=over
=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only
=back
Holds the last LSN signaled to handled by the client (see: L</on_message>)
=item C<on_error>
=over
=item L<CodeRef|Types::Standard/CodeRef>, Default: L<croak|Carp/croak>
=back
Callback in the event of an error.
=item C<on_message>
=over
=item L<CodeRef|Types::Standard/CodeRef>, Required
=back
Callback to receive the replication payload from the server. This is the raw output from the L</plugin>.
The callback is passed the C<$payload> received and a C<$guard> object. Hang onto the C<$guard> until you have handled
the payload. Once it is released, the server will be informed that the WAL position has been "flushed."
=back
=cut
has dbname => (is => 'ro', isa => Str, required => 1);
has host => (is => 'ro', isa => Str, predicate => 1);
has port => (is => 'ro', isa => Int, predicate => 1);
has username => (is => 'ro', isa => Str, default => q{});
has password => (is => 'ro', isa => Str, default => q{});
has slot => (is => 'ro', isa => Str, required => 1);
has dbh => (is => 'lazy', isa => $DBH, clearer => 1, init_arg => undef);
has do_create_slot => (is => 'ro', isa => Bool, default => 0);
has slot_exists_ok => (is => 'ro', isa => Bool, default => 0);
has reconnect => (is => 'ro', isa => Bool, default => 1);
has reconnect_delay => (is => 'ro', isa => Int, default => 5);
has reconnect_limit => (is => 'ro', isa => Int, predicate => 1);
has _reconnect_counter => (is => 'rw', isa => Int, default => 0);
has heartbeat => (is => 'ro', isa => Int, default => 10);
has plugin => (is => 'ro', isa => Str, default => 'test_decoding');
has options => (is => 'ro', isa => HashRef, default => sub { {} });
has startpos => (is => 'rwp', isa => $LSN, default => 0, coerce => 1);
has received_lsn => (is => 'rwp', isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has flushed_lsn => (is => 'rwp', isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has on_message => (is => 'ro', isa => CodeRef, required => 1);
has on_error => (is => 'ro', isa => CodeRef, default => sub { \&croak });
has _fh_watch => (is => 'lazy', isa => Ref, clearer => 1, predicate => 1);
has _timer => (is => 'lazy', isa => Ref, clearer => 1);
=head1 CONSTRUCTOR
All the L</"ATTRIBUTES"> above are accepted by the constructor, with a few exceptions:
L</"received_lsn"> and L<"flushed_lsn"> are read-only and not accepted by the constructor.
L</"dbname">, L</"slot"> and L</"on_message"> are required.
Note, that logical replication will not automatically commence upon construction. One must call L</"start"> first.
=cut
sub _dsn {
my $self = shift;
my %dsn = (replication => 'database', client_encoding => 'sql_ascii');
foreach (qw(host port dbname)) {
my $x = "has_$_";
next if $self->can($x) and not $self->$x;
$dsn{$_} = $self->$_;
}
return 'dbi:Pg:' . join q{;}, map { "$_=$dsn{$_}" } sort keys %dsn;
}
sub _build_dbh {
my $self = shift;
my $dbh = DBI->connect($self->_dsn, $self->username, $self->password, { PrintError => 0 },);
croak $DBI::errstr unless $dbh;
return $dbh;
}
sub _build__fh_watch {
my $self = shift;
my $w = AE::io $self->dbh->{pg_socket}, 0, $self->curry::weak::_read_copydata;
if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
$w->priority($w->priority - 1); # be a little less aggressive
}
return $w;
}
sub _build__timer {
my $self = shift;
if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
my $w = EV::periodic(0, $self->heartbeat, 0, $self->curry::weak::_heartbeat);
$w->priority(&EV::MAXPRI);
return $w;
} else {
return AE::timer $self->heartbeat, $self->heartbeat, $self->curry::weak::_heartbeat;
}
}
=head1 METHODS
All L</"ATTRIBUTES"> are also accesible via methods. They are all read-only.
=over
=item start
Initialize the logical replication process asyncronously and return immediately. This performs the following steps:
=over
=item 1. L</"identify_system">
=item 2. L</"create_slot"> (if requested)
=item 3. L</"start_replication">
=item 4. heartbeat timer
=back
This method wraps the above steps for convenience. Should you desire to modify the
L<replication startup protocol|https://www.postgresql.org/docs/current/static/protocol-replication.html> (which you
shouldn't), the methods are described in detail below.
Returns: L<Promises::Promise>
=cut
sub start {
my $self = shift;
$self->_post_init(
deferred {
shift->chain($self->curry::identify_system, $self->curry::create_slot, $self->curry::start_replication);
}
);
}
sub _post_init {
my ($self, $d) = @_;
return $d->then(
sub {
$self->_fh_watch;
$self->_timer;
},
$self->on_error,
);
}
=item identify_system
Issues the C<IDENTIFY_SYSTEM> command to the server to put the connection in repliction mode.
Returns: L<Promises::Promise>
=cut
sub identify_system {
my $self = shift;
$self->dbh->do('IDENTIFY_SYSTEM', { pg_async => PG_ASYNC });
return _async_await($self->dbh)->catch(
sub {
my @error = @_;
unshift @error, $DBI::errstr if $DBI::errstr;
croak @error;
}
);
}
=item create_slot
Issues the appropriate C<CREATE_REPLICATION_SLOT> command to the server, if requested.
Returns: L<Promises::Promise>
=cut
sub create_slot {
my $self = shift;
return deferred->resolve unless $self->do_create_slot;
my $dbh = $self->dbh;
$dbh->do(
sprintf(
'CREATE_REPLICATION_SLOT %s LOGICAL %s%s',
$dbh->quote_identifier($self->slot),
$dbh->quote_identifier($self->plugin),
($dbh->{pg_server_version} >= PG_MIN_NOEXPORT ? ' NOEXPORT_SNAPSHOT' : '') # uncoverable branch true
),
{ pg_async => PG_ASYNC }
);
return _async_await($dbh)->catch(
sub {
croak @_ unless $dbh->state eq PG_STATE_DUPEOBJ and $self->slot_exists_ok;
}
);
}
sub _option_string {
my $self = shift;
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
# uncoverable statement
$self->on_error->('could not read COPY data: ' . $self->dbh->errstr);
}
my $type = substr $msg, 0, 1;
if ('k' eq $type) {
# server keepalive
my (undef, $lsnpos, $ts, $reply) = unpack PRIMARY_HEARTBEAT, $msg;
$self->_set_received_lsn($lsnpos) if $lsnpos > $self->received_lsn;
# only interested in the request-reply bit
# uncoverable branch true
if ($reply) {
# uncoverable statement
AE::postpone { $self->_heartbeat };
}
# an inbound heartbeat is proof enough of successful reconnect
$self->_reconnect_counter(0) if $self->_reconnect_counter;
return;
}
# uncoverable branch true
unless ('w' eq $type) {
# uncoverable statement
$self->on_error->("unrecognized streaming header: '$type'");
return;
}
my (undef, $startlsn, $endlsn, $ts, $record) = unpack XLOGDATA, $msg;
$self->_set_received_lsn($startlsn) if $startlsn > $self->received_lsn;
my $guard = $self->$curry::weak(
sub {
my $self = shift;
$self->_set_flushed_lsn($startlsn) if $startlsn > $self->flushed_lsn;
}
);
$self->on_message->($record, guard(\&$guard));
return;
}
=item stop
Stop receiving replication payloads and disconnect from the PostgreSQL server.
=back
=cut
sub stop {
my $self = shift;
$self->_clear_fh_watch;
$self->_clear_timer;
$self->clear_dbh;
}
sub _handle_disconnect {
my $self = shift;
$self->stop;
return unless $self->reconnect;
if ( $self->has_reconnect_limit
and $self->_reconnect_counter($self->_reconnect_counter + 1) > $self->reconnect_limit) {
$self->on_error->('reconnect limit reached: ' . $self->reconnect_limit);
return;
}
$self->_set_startpos($self->flushed_lsn);
$self->clear_received_lsn;
$self->clear_flushed_lsn;
my $w; $w = AE::timer $self->reconnect_delay, 0, sub {
undef $w;
$self->_post_init(deferred { $self->start_replication });
};
}
sub _heartbeat {
my ($self, $req_reply) = @_;
$req_reply = !!$req_reply || 0; #uncoverable condition right
my $status = pack STANDBY_HEARTBEAT, 'r', # receiver status update
$self->received_lsn, # last WAL received
$self->flushed_lsn, # last WAL flushed
0, # last WAL applied
int((AE::now - PG_EPOCH_DELTA) * USECS), # ms since 2000-01-01
$req_reply; # request heartbeat
$self->dbh->pg_putcopydata($status);
}
sub _async_await {
my ($dbh) = @_;
my $d = deferred;
# no async operation in progress
return $d->reject if $dbh->{pg_async_status} == 0; # uncoverable branch true
my $w; $w = AE::timer 0, AWAIT_INTERVAL, sub {
return unless $dbh->pg_ready;
try {
$d->resolve($dbh->pg_result);
} catch {
$d->reject($_);
};
undef $w;
};
return $d->promise;
}
=head1 AUTHOR
William Cox (cpan:MYDMNSN) <mydimension@gmail.com>
=head1 COPYRIGHT
Copyright (c) 2017-2018 William Cox
=head1 LICENSE
This library is free software and may be distributed under the same terms as perl itself.
See L<http://dev.perl.org/licenses/>.
=cut
1;
( run in 1.695 second using v1.01-cache-2.11-cpan-ceb78f64989 )