AnyEvent-PgRecvlogical
view release on metacpan - search on metacpan
view release on metacpan or search on metacpan
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only
=back
Holds the last LSN position received from the server.
=item C<flushed_lsn>
=over
=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only
=back
Holds the last LSN signaled to handled by the client (see: L</on_message>)
=item C<on_error>
=over
=item L<CodeRef|Types::Standard/CodeRef>, Default: L<croak|Carp/croak>
=back
Callback in the event of an error.
=item C<on_message>
=over
=item L<CodeRef|Types::Standard/CodeRef>, Required
=back
Callback to receive the replication payload from the server. This is the raw output from the L</plugin>.
The callback is passed the C<$payload> received and a C<$guard> object. Hang onto the C<$guard> until you have handled
the payload. Once it is released, the server will be informed that the WAL position has been "flushed."
=back
=cut
has dbname => (is => 'ro', isa => Str, required => 1);
has host => (is => 'ro', isa => Str, predicate => 1);
has port => (is => 'ro', isa => Int, predicate => 1);
has username => (is => 'ro', isa => Str, default => q{});
has password => (is => 'ro', isa => Str, default => q{});
has slot => (is => 'ro', isa => Str, required => 1);
has dbh => (is => 'lazy', isa => $DBH, clearer => 1, init_arg => undef);
has do_create_slot => (is => 'ro', isa => Bool, default => 0);
has slot_exists_ok => (is => 'ro', isa => Bool, default => 0);
has reconnect => (is => 'ro', isa => Bool, default => 1);
has reconnect_delay => (is => 'ro', isa => Int, default => 5);
has reconnect_limit => (is => 'ro', isa => Int, predicate => 1);
has _reconnect_counter => (is => 'rw', isa => Int, default => 0);
has heartbeat => (is => 'ro', isa => Int, default => 10);
has plugin => (is => 'ro', isa => Str, default => 'test_decoding');
has options => (is => 'ro', isa => HashRef, default => sub { {} });
has startpos => (is => 'rwp', isa => $LSN, default => 0, coerce => 1);
has received_lsn => (is => 'rwp', isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has flushed_lsn => (is => 'rwp', isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has on_message => (is => 'ro', isa => CodeRef, required => 1);
has on_error => (is => 'ro', isa => CodeRef, default => sub { \&croak });
has _fh_watch => (is => 'lazy', isa => Ref, clearer => 1, predicate => 1);
has _timer => (is => 'lazy', isa => Ref, clearer => 1);
=head1 CONSTRUCTOR
All the L</"ATTRIBUTES"> above are accepted by the constructor, with a few exceptions:
L</"received_lsn"> and L<"flushed_lsn"> are read-only and not accepted by the constructor.
L</"dbname">, L</"slot"> and L</"on_message"> are required.
Note, that logical replication will not automatically commence upon construction. One must call L</"start"> first.
=cut
sub _dsn {
my $self = shift;
my %dsn = (replication => 'database', client_encoding => 'sql_ascii');
foreach (qw(host port dbname)) {
my $x = "has_$_";
next if $self->can($x) and not $self->$x;
$dsn{$_} = $self->$_;
}
return 'dbi:Pg:' . join q{;}, map { "$_=$dsn{$_}" } sort keys %dsn;
}
sub _build_dbh {
my $self = shift;
my $dbh = DBI->connect(
$self->_dsn,
$self->username,
$self->password,
{ PrintError => 0 },
);
croak $DBI::errstr unless $dbh;
return $dbh;
}
sub _build__fh_watch {
my $self = shift;
return AE::io $self->dbh->{pg_socket}, 0, $self->curry::weak::_read_copydata;
}
sub _build__timer {
my $self = shift;
if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
my $w = EV::periodic(0, $self->heartbeat, 0, $self->curry::weak::_heartbeat);
$w->priority(&EV::MAXPRI);
view all matches for this distributionview release on metacpan - search on metacpan
( run in 2.048 seconds using v1.00-cache-2.02-grep-82fe00e-cpan-72ae3ad1e6da )