AnyEvent-PgRecvlogical

 view release on metacpan or  search on metacpan

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN

    }
);

my $LSNStr = Str->where(sub { m{[0-9A-F]{1,8}/[0-9A-F]{1,8}} })
  ->plus_coercions(Int() => sub { sprintf '%X/%X', (($_ >> 32) & 0xffff_ffff), ($_ & 0xffff_ffff) });

my $LSN = Int->plus_coercions(
    $LSNStr => sub {
        my ($h, $l) = map { hex } split m{/}; ($h << 32) | $l;
    }
);

=head1 ATTRIBUTES

=over

=item C<dbname>

=over

=item L<Str|Types::Standard/Str>, Required

=back

Database name to connect to.

=item C<slot>

=over

=item L<Str|Types::Standard/Str>, Required

=back

Name of the replication slot to use (and/or create, see L</do_create_slot> and L</slot_exists_ok>)

=item C<host>

=over

=item L<Str|Types::Standard/Str>

=back

=item C<port>

=over

=item L<Int|Types::Standard/SInt>

=back

=item C<username>

=over

=item L<Str|Types::Standard/Str>

=back

=item C<password>

=over

=item L<Str|Types::Standard/Str>

=back

Standard PostgreSQL connection parameters, see L<DBD::Pg/connect>.

=item C<do_create_slot>

=over

=item L<Bool|Types::Standard/Bool>, Default: C<0>

=back

If true, the L</slot> will be be created upon connection. Otherwise, it's assumed it already exists. If it does not,
PostgreSQL will raise an exception.

=item C<slot_exists_ok>

=over

=item L<Bool|Types::Standard/Bool>, Default: C<0>

=back

If true, and if L</do_create_slot> is also true, then no exception will be raised if the L</slot> already exists.
Otherwise, one will be raised.

=item C<reconnect>

=over

=item L<Bool|Types::Standard/Bool>, Default: C<1>

=back

If true, will attempt to reconnect to the server and resume logical replication in the event the connection fails.
Otherwise, the connection will gracefully be allowed to close.

=item C<reconnect_delay>

=over

=item L<Int|Types::Standard/Int>, Default: C<5>

=back

Time, in seconds, to wait before reconnecting.

=item C<reconnect_limit>

=over

=item L<Int|Types::Standard/Int>, Default: C<1>

=back

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN


=over

=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>

=back

Start replication from the given LSN. Also accepts the integer form, but that is considered advanced usage.

=item C<received_lsn>

=over

=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only

=back

Holds the last LSN position received from the server.

=item C<flushed_lsn>

=over

=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only

=back

Holds the last LSN signaled to handled by the client (see: L</on_message>)

=item C<on_error>

=over

=item L<CodeRef|Types::Standard/CodeRef>, Default: L<croak|Carp/croak>

=back

Callback in the event of an error.

=item C<on_message>

=over

=item L<CodeRef|Types::Standard/CodeRef>, Required

=back

Callback to receive the replication payload from the server. This is the raw output from the L</plugin>.

The callback is passed the C<$payload> received and a C<$guard> object. Hang onto the C<$guard> until you have handled
the payload. Once it is released, the server will be informed that the WAL position has been "flushed."

=back

=cut

has dbname   => (is => 'ro', isa => Str, required  => 1);
has host     => (is => 'ro', isa => Str, predicate => 1);
has port     => (is => 'ro', isa => Int, predicate => 1);
has username => (is => 'ro', isa => Str, default   => q{});
has password => (is => 'ro', isa => Str, default   => q{});
has slot     => (is => 'ro', isa => Str, required  => 1);

has dbh                => (is => 'lazy', isa => $DBH, clearer => 1, init_arg => undef);
has do_create_slot     => (is => 'ro',   isa => Bool,    default   => 0);
has slot_exists_ok     => (is => 'ro',   isa => Bool,    default   => 0);
has reconnect          => (is => 'ro',   isa => Bool,    default   => 1);
has reconnect_delay    => (is => 'ro',   isa => Int,     default   => 5);
has reconnect_limit    => (is => 'ro',   isa => Int,     predicate => 1);
has _reconnect_counter => (is => 'rw',   isa => Int,     default   => 0);
has heartbeat          => (is => 'ro',   isa => Int,     default   => 10);
has plugin             => (is => 'ro',   isa => Str,     default   => 'test_decoding');
has options            => (is => 'ro',   isa => HashRef, default   => sub { {} });
has startpos           => (is => 'rwp',  isa => $LSN, default => 0, coerce  => 1);
has received_lsn       => (is => 'rwp',  isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has flushed_lsn        => (is => 'rwp',  isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);

has on_message => (is => 'ro', isa => CodeRef, required => 1);
has on_error   => (is => 'ro', isa => CodeRef, default  => sub { \&croak });

has _fh_watch => (is => 'lazy', isa => Ref, clearer => 1, predicate => 1);
has _timer => (is => 'lazy', isa => Ref, clearer => 1);

=head1 CONSTRUCTOR

All the L</"ATTRIBUTES"> above are accepted by the constructor, with a few exceptions:

L</"received_lsn"> and L<"flushed_lsn"> are read-only and not accepted by the constructor.

L</"dbname">, L</"slot"> and L</"on_message"> are required.

Note, that logical replication will not automatically commence upon construction. One must call L</"start"> first.

=cut

sub _dsn {
    my $self = shift;

    my %dsn = (replication => 'database', client_encoding => 'sql_ascii');
    foreach (qw(host port dbname)) {
        my $x = "has_$_";
        next if $self->can($x) and not $self->$x;

        $dsn{$_} = $self->$_;
    }

    return 'dbi:Pg:' . join q{;}, map { "$_=$dsn{$_}" } sort keys %dsn;
}

sub _build_dbh {
    my $self = shift;
    my $dbh  = DBI->connect($self->_dsn, $self->username, $self->password, { PrintError => 0 },);

    croak $DBI::errstr unless $dbh;

    return $dbh;
}

sub _build__fh_watch {
    my $self = shift;

    my $w = AE::io $self->dbh->{pg_socket}, 0, $self->curry::weak::_read_copydata;
    if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
        $w->priority($w->priority - 1);    # be a little less aggressive
    }

    return $w;
}

sub _build__timer {
    my $self = shift;
    if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
        my $w = EV::periodic(0, $self->heartbeat, 0, $self->curry::weak::_heartbeat);
        $w->priority(&EV::MAXPRI);
        return $w;
    } else {
        return AE::timer $self->heartbeat, $self->heartbeat, $self->curry::weak::_heartbeat;
    }
}

=head1 METHODS

All L</"ATTRIBUTES"> are also accesible via methods. They are all read-only.

=over

=item start

Initialize the logical replication process asyncronously and return immediately. This performs the following steps:

=over

=item 1. L</"identify_system">

=item 2. L</"create_slot"> (if requested)

=item 3. L</"start_replication">

=item 4. heartbeat timer

=back

This method wraps the above steps for convenience. Should you desire to modify the
L<replication startup protocol|https://www.postgresql.org/docs/current/static/protocol-replication.html> (which you
shouldn't), the methods are described in detail below.

Returns: L<Promises::Promise>

=cut

sub start {
    my $self = shift;



( run in 0.457 second using v1.01-cache-2.11-cpan-5735350b133 )