AnyEvent-PgRecvlogical
view release on metacpan or search on metacpan
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
}
);
my $LSNStr = Str->where(sub { m{[0-9A-F]{1,8}/[0-9A-F]{1,8}} })
->plus_coercions(Int() => sub { sprintf '%X/%X', (($_ >> 32) & 0xffff_ffff), ($_ & 0xffff_ffff) });
my $LSN = Int->plus_coercions(
$LSNStr => sub {
my ($h, $l) = map { hex } split m{/}; ($h << 32) | $l;
}
);
=head1 ATTRIBUTES
=over
=item C<dbname>
=over
=item L<Str|Types::Standard/Str>, Required
=back
Database name to connect to.
=item C<slot>
=over
=item L<Str|Types::Standard/Str>, Required
=back
Name of the replication slot to use (and/or create, see L</do_create_slot> and L</slot_exists_ok>)
=item C<host>
=over
=item L<Str|Types::Standard/Str>
=back
=item C<port>
=over
=item L<Int|Types::Standard/SInt>
=back
=item C<username>
=over
=item L<Str|Types::Standard/Str>
=back
=item C<password>
=over
=item L<Str|Types::Standard/Str>
=back
Standard PostgreSQL connection parameters, see L<DBD::Pg/connect>.
=item C<do_create_slot>
=over
=item L<Bool|Types::Standard/Bool>, Default: C<0>
=back
If true, the L</slot> will be be created upon connection. Otherwise, it's assumed it already exists. If it does not,
PostgreSQL will raise an exception.
=item C<slot_exists_ok>
=over
=item L<Bool|Types::Standard/Bool>, Default: C<0>
=back
If true, and if L</do_create_slot> is also true, then no exception will be raised if the L</slot> already exists.
Otherwise, one will be raised.
=item C<reconnect>
=over
=item L<Bool|Types::Standard/Bool>, Default: C<1>
=back
If true, will attempt to reconnect to the server and resume logical replication in the event the connection fails.
Otherwise, the connection will gracefully be allowed to close.
=item C<reconnect_delay>
=over
=item L<Int|Types::Standard/Int>, Default: C<5>
=back
Time, in seconds, to wait before reconnecting.
=item C<reconnect_limit>
=over
=item L<Int|Types::Standard/Int>, Default: C<1>
=back
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
=over
=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>
=back
Start replication from the given LSN. Also accepts the integer form, but that is considered advanced usage.
=item C<received_lsn>
=over
=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only
=back
Holds the last LSN position received from the server.
=item C<flushed_lsn>
=over
=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only
=back
Holds the last LSN signaled to handled by the client (see: L</on_message>)
=item C<on_error>
=over
=item L<CodeRef|Types::Standard/CodeRef>, Default: L<croak|Carp/croak>
=back
Callback in the event of an error.
=item C<on_message>
=over
=item L<CodeRef|Types::Standard/CodeRef>, Required
=back
Callback to receive the replication payload from the server. This is the raw output from the L</plugin>.
The callback is passed the C<$payload> received and a C<$guard> object. Hang onto the C<$guard> until you have handled
the payload. Once it is released, the server will be informed that the WAL position has been "flushed."
=back
=cut
has dbname => (is => 'ro', isa => Str, required => 1);
has host => (is => 'ro', isa => Str, predicate => 1);
has port => (is => 'ro', isa => Int, predicate => 1);
has username => (is => 'ro', isa => Str, default => q{});
has password => (is => 'ro', isa => Str, default => q{});
has slot => (is => 'ro', isa => Str, required => 1);
has dbh => (is => 'lazy', isa => $DBH, clearer => 1, init_arg => undef);
has do_create_slot => (is => 'ro', isa => Bool, default => 0);
has slot_exists_ok => (is => 'ro', isa => Bool, default => 0);
has reconnect => (is => 'ro', isa => Bool, default => 1);
has reconnect_delay => (is => 'ro', isa => Int, default => 5);
has reconnect_limit => (is => 'ro', isa => Int, predicate => 1);
has _reconnect_counter => (is => 'rw', isa => Int, default => 0);
has heartbeat => (is => 'ro', isa => Int, default => 10);
has plugin => (is => 'ro', isa => Str, default => 'test_decoding');
has options => (is => 'ro', isa => HashRef, default => sub { {} });
has startpos => (is => 'rwp', isa => $LSN, default => 0, coerce => 1);
has received_lsn => (is => 'rwp', isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has flushed_lsn => (is => 'rwp', isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has on_message => (is => 'ro', isa => CodeRef, required => 1);
has on_error => (is => 'ro', isa => CodeRef, default => sub { \&croak });
has _fh_watch => (is => 'lazy', isa => Ref, clearer => 1, predicate => 1);
has _timer => (is => 'lazy', isa => Ref, clearer => 1);
=head1 CONSTRUCTOR
All the L</"ATTRIBUTES"> above are accepted by the constructor, with a few exceptions:
L</"received_lsn"> and L<"flushed_lsn"> are read-only and not accepted by the constructor.
L</"dbname">, L</"slot"> and L</"on_message"> are required.
Note, that logical replication will not automatically commence upon construction. One must call L</"start"> first.
=cut
sub _dsn {
my $self = shift;
my %dsn = (replication => 'database', client_encoding => 'sql_ascii');
foreach (qw(host port dbname)) {
my $x = "has_$_";
next if $self->can($x) and not $self->$x;
$dsn{$_} = $self->$_;
}
return 'dbi:Pg:' . join q{;}, map { "$_=$dsn{$_}" } sort keys %dsn;
}
sub _build_dbh {
my $self = shift;
my $dbh = DBI->connect($self->_dsn, $self->username, $self->password, { PrintError => 0 },);
croak $DBI::errstr unless $dbh;
return $dbh;
}
sub _build__fh_watch {
my $self = shift;
my $w = AE::io $self->dbh->{pg_socket}, 0, $self->curry::weak::_read_copydata;
if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
$w->priority($w->priority - 1); # be a little less aggressive
}
return $w;
}
sub _build__timer {
my $self = shift;
if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
my $w = EV::periodic(0, $self->heartbeat, 0, $self->curry::weak::_heartbeat);
$w->priority(&EV::MAXPRI);
return $w;
} else {
return AE::timer $self->heartbeat, $self->heartbeat, $self->curry::weak::_heartbeat;
}
}
=head1 METHODS
All L</"ATTRIBUTES"> are also accesible via methods. They are all read-only.
=over
=item start
Initialize the logical replication process asyncronously and return immediately. This performs the following steps:
=over
=item 1. L</"identify_system">
=item 2. L</"create_slot"> (if requested)
=item 3. L</"start_replication">
=item 4. heartbeat timer
=back
This method wraps the above steps for convenience. Should you desire to modify the
L<replication startup protocol|https://www.postgresql.org/docs/current/static/protocol-replication.html> (which you
shouldn't), the methods are described in detail below.
Returns: L<Promises::Promise>
=cut
sub start {
my $self = shift;
( run in 0.457 second using v1.01-cache-2.11-cpan-5735350b133 )