AnyEvent-PgRecvlogical

 view release on metacpan or  search on metacpan

lib/AnyEvent/PgRecvlogical.pm  view on Meta::CPAN

package AnyEvent::PgRecvlogical;
$AnyEvent::PgRecvlogical::VERSION = '1.04';
# ABSTRACT: perl port of pg_recvlogical

=pod

=head1 NAME

AnyEvent::PgRecvlogical - perl port of pg_recvlogical

=for html
<a href="https://travis-ci.org/mydimension/AnyEvent-PgRecvlogical"><img src="https://travis-ci.org/mydimension/AnyEvent-PgRecvlogical.svg?branch=master" /></a>
<a href='https://coveralls.io/github/mydimension/AnyEvent-PgRecvlogical?branch=master'><img src='https://coveralls.io/repos/github/mydimension/AnyEvent-PgRecvlogical/badge.svg?branch=master' alt='Coverage Status' /></a>
<a href="https://badge.fury.io/pl/AnyEvent-PgRecvlogical"><img src="https://badge.fury.io/pl/AnyEvent-PgRecvlogical.svg" alt="CPAN version" height="18"></a>

=head1 SYNOPSIS

    use AnyEvent::PgRecvlogical;

    my $recv = AnyEvent::PgRecvlogical->new(
        dbname     => 'mydb',
        slot       => 'myreplslot',
        on_message => sub {
            my ($record, $guard) = @_;

            process($record);

            undef $guard; # declare done with $record
        }
    );

    $recv->start;

=head1 DESCRIPTION

C<AnyEvent::PgRecvlogical> provides perl bindings of similar functionality to that of
L<pg_recvlogical|https://www.postgresql.org/docs/current/static/app-pgrecvlogical.html>.
The reasoning being that C<pg_recvlogical> does afford the consuming process the opportunity to emit feedback to
PostgreSQL. This results is potentially being sent more data than you can handle in a timely fashion.

=cut

use Moo;
use DBI;
use DBD::Pg 3.7.0 ':async';
use AnyEvent;
use AnyEvent::Util 'guard';
use Promises 0.99 backend => ['AnyEvent'], qw(deferred);
use Types::Standard ':all';
use Try::Tiny;
use Carp 'croak';
use curry;

use constant {
    AWAIT_INTERVAL    => 1,
    USECS             => 1_000_000,
    PG_MIN_VERSION    => 9_04_00,
    PG_MIN_NOEXPORT   => 10_00_00,
    PG_STATE_DUPEOBJ  => '42710',
    PG_EPOCH_DELTA    => 946_684_800,
    XLOGDATA          => 'Aq>3a*',
    PRIMARY_HEARTBEAT => 'Aq>2b',
    STANDBY_HEARTBEAT => 'Aq>4b',
};

use namespace::clean;

my $DBH = (InstanceOf ['DBI::db'])->create_child_type(
    constraint => sub {
        $_->{Driver}->{Name} eq 'Pg'
          and $_->{pg_server_version} >= PG_MIN_VERSION
          and $_->{Name} =~ /replication=/;
    },
    message => sub {
        my $parent_check = (InstanceOf ['DBI::db'])->validate($_);
        return $parent_check if $parent_check;
        return "$_ is not a DBD::Pg handle" unless $_->{Driver}->{Name} eq 'Pg';
        return "$_ is connected to an old postgres version ($_->{pg_server_version} < 9.4.0)"
          unless $_->{pg_server_version} >= PG_MIN_VERSION;
        return "$_ is not a replication connection: $_->{Name}" unless $_->{Name} =~ /replication=/;
    }
);

my $LSNStr = Str->where(sub { m{[0-9A-F]{1,8}/[0-9A-F]{1,8}} })
  ->plus_coercions(Int() => sub { sprintf '%X/%X', (($_ >> 32) & 0xffff_ffff), ($_ & 0xffff_ffff) });

my $LSN = Int->plus_coercions(
    $LSNStr => sub {
        my ($h, $l) = map { hex } split m{/}; ($h << 32) | $l;
    }
);

=head1 ATTRIBUTES

=over

=item C<dbname>

=over

=item L<Str|Types::Standard/Str>, Required

=back

Database name to connect to.

=item C<slot>

=over



( run in 0.823 second using v1.01-cache-2.11-cpan-d8267643d1d )