AnyEvent-PgRecvlogical

 view release on metacpan or  search on metacpan

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN


=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only

=back

Holds the last LSN position received from the server.

=item C<flushed_lsn>

=over

=item L<LSN|https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html>, Default: C<0/0>, Read Only

=back

Holds the last LSN signaled to handled by the client (see: L</on_message>)

=item C<on_error>

=over

=item L<CodeRef|Types::Standard/CodeRef>, Default: L<croak|Carp/croak>

=back

Callback in the event of an error.

=item C<on_message>

=over

=item L<CodeRef|Types::Standard/CodeRef>, Required

=back

Callback to receive the replication payload from the server. This is the raw output from the L</plugin>.

The callback is passed the C<$payload> received and a C<$guard> object. Hang onto the C<$guard> until you have handled
the payload. Once it is released, the server will be informed that the WAL position has been "flushed."

=back

=cut

has dbname   => (is => 'ro', isa => Str, required  => 1);
has host     => (is => 'ro', isa => Str, predicate => 1);
has port     => (is => 'ro', isa => Int, predicate => 1);
has username => (is => 'ro', isa => Str, default => q{});
has password => (is => 'ro', isa => Str, default => q{});
has slot     => (is => 'ro', isa => Str, required  => 1);

has dbh => (is => 'lazy', isa => $DBH, clearer => 1, init_arg => undef);
has do_create_slot     => (is => 'ro', isa => Bool, default   => 0);
has slot_exists_ok     => (is => 'ro', isa => Bool, default   => 0);
has reconnect          => (is => 'ro', isa => Bool, default   => 1);
has reconnect_delay    => (is => 'ro', isa => Int,  default   => 5);
has reconnect_limit    => (is => 'ro', isa => Int,  predicate => 1);
has _reconnect_counter => (is => 'rw', isa => Int,  default   => 0);
has heartbeat          => (is => 'ro', isa => Int,  default   => 10);
has plugin             => (is => 'ro', isa => Str,  default   => 'test_decoding');
has options => (is => 'ro', isa => HashRef, default => sub { {} });
has startpos     => (is => 'rwp', isa => $LSN, default => 0, coerce  => 1);
has received_lsn => (is => 'rwp', isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);
has flushed_lsn  => (is => 'rwp', isa => $LSN, default => 0, clearer => 1, init_arg => undef, lazy => 1);

has on_message => (is => 'ro', isa => CodeRef, required => 1);
has on_error => (is => 'ro', isa => CodeRef, default => sub { \&croak });

has _fh_watch => (is => 'lazy', isa => Ref,  clearer => 1, predicate => 1);
has _timer    => (is => 'lazy', isa => Ref,  clearer => 1);

=head1 CONSTRUCTOR

All the L</"ATTRIBUTES"> above are accepted by the constructor, with a few exceptions:

L</"received_lsn"> and L<"flushed_lsn"> are read-only and not accepted by the constructor.

L</"dbname">, L</"slot"> and L</"on_message"> are required.

Note, that logical replication will not automatically commence upon construction. One must call L</"start"> first.

=cut

sub _dsn {
    my $self = shift;

    my %dsn = (replication => 'database', client_encoding => 'sql_ascii');
    foreach (qw(host port dbname)) {
        my $x = "has_$_";
        next if $self->can($x) and not $self->$x;

        $dsn{$_} = $self->$_;
    }

    return 'dbi:Pg:' . join q{;}, map { "$_=$dsn{$_}" } sort keys %dsn;
}

sub _build_dbh {
    my $self = shift;
    my $dbh = DBI->connect(
        $self->_dsn,
        $self->username,
        $self->password,
        { PrintError => 0 },
    );

    croak $DBI::errstr unless $dbh;

    return $dbh;
}

sub _build__fh_watch {
    my $self = shift;
    return AE::io $self->dbh->{pg_socket}, 0, $self->curry::weak::_read_copydata;
}

sub _build__timer {
    my $self = shift;
    if ($AnyEvent::MODEL and $AnyEvent::MODEL eq 'AnyEvent::Impl::EV') {
        my $w = EV::periodic(0, $self->heartbeat, 0, $self->curry::weak::_heartbeat);
        $w->priority(&EV::MAXPRI);

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 2.048 seconds using v1.00-cache-2.02-grep-82fe00e-cpan-72ae3ad1e6da )