AnyEvent-PgRecvlogical
view release on metacpan or search on metacpan
lib/AnyEvent/PgRecvlogical.pm view on Meta::CPAN
}
);
$recv->start;
=head1 DESCRIPTION
C<AnyEvent::PgRecvlogical> provides perl bindings of similar functionality to that of
L<pg_recvlogical|https://www.postgresql.org/docs/current/static/app-pgrecvlogical.html>.
The reasoning being that C<pg_recvlogical> does afford the consuming process the opportunity to emit feedback to
PostgreSQL. This results is potentially being sent more data than you can handle in a timely fashion.
=cut
use Moo;
use DBI;
use DBD::Pg 3.7.0 ':async';
use AnyEvent;
use AnyEvent::Util 'guard';
use Promises 0.99 backend => ['AnyEvent'], qw(deferred);
use Types::Standard ':all';
use Try::Tiny;
use Carp 'croak';
use curry;
use constant {
AWAIT_INTERVAL => 1,
USECS => 1_000_000,
PG_MIN_VERSION => 9_04_00,
PG_MIN_NOEXPORT => 10_00_00,
PG_STATE_DUPEOBJ => '42710',
PG_EPOCH_DELTA => 946_684_800,
XLOGDATA => 'Aq>3a*',
PRIMARY_HEARTBEAT => 'Aq>2b',
STANDBY_HEARTBEAT => 'Aq>4b',
};
use namespace::clean;
my $DBH = (InstanceOf ['DBI::db'])->create_child_type(
constraint => sub {
$_->{Driver}->{Name} eq 'Pg'
and $_->{pg_server_version} >= PG_MIN_VERSION
and $_->{Name} =~ /replication=/;
},
message => sub {
my $parent_check = (InstanceOf ['DBI::db'])->validate($_);
return $parent_check if $parent_check;
return "$_ is not a DBD::Pg handle" unless $_->{Driver}->{Name} eq 'Pg';
return "$_ is connected to an old postgres version ($_->{pg_server_version} < 9.4.0)"
unless $_->{pg_server_version} >= PG_MIN_VERSION;
return "$_ is not a replication connection: $_->{Name}" unless $_->{Name} =~ /replication=/;
}
);
my $LSNStr = Str->where(sub { m{[0-9A-F]{1,8}/[0-9A-F]{1,8}} })
->plus_coercions(Int() => sub { sprintf '%X/%X', (($_ >> 32) & 0xffff_ffff), ($_ & 0xffff_ffff) });
my $LSN = Int->plus_coercions(
$LSNStr => sub {
my ($h, $l) = map { hex } split m{/}; ($h << 32) | $l;
}
);
=head1 ATTRIBUTES
=over
=item C<dbname>
=over
=item L<Str|Types::Standard/Str>, Required
=back
Database name to connect to.
=item C<slot>
=over
=item L<Str|Types::Standard/Str>, Required
=back
Name of the replication slot to use (and/or create, see L</do_create_slot> and L</slot_exists_ok>)
=item C<host>
=over
=item L<Str|Types::Standard/Str>
=back
=item C<port>
=over
=item L<Int|Types::Standard/SInt>
=back
=item C<username>
=over
=item L<Str|Types::Standard/Str>
=back
=item C<password>
=over
=item L<Str|Types::Standard/Str>
=back
Standard PostgreSQL connection parameters, see L<DBD::Pg/connect>.
( run in 2.424 seconds using v1.01-cache-2.11-cpan-5a3173703d6 )