AnyEvent-PgRecvlogical

 view release on metacpan or  search on metacpan

t/01_recv.t  view on Meta::CPAN

#!/usr/bin/env perl

use strict;
use warnings;
use Test::More;
use Test::PostgreSQL;
use AnyEvent;
use File::Basename;
use File::Spec;
use Promises backend => ['AnyEvent'], qw(deferred);
use Try::Tiny;

use AnyEvent::PgRecvlogical;

my $CV;
my $end_cv = AE::cv;

sub ae_sleep {
    my $t  = shift || 0;
    my $cv = AE::cv;
    $cv->begin; my $wt = AE::timer $t, 0, sub { $cv->end };
    $cv->recv;
}

my $t_dir       = File::Spec->rel2abs(dirname(__FILE__));
my $pg_hba_conf = File::Spec->join($t_dir, 'pg_hba.conf');

my $pg = eval {
    Test::PostgreSQL->new(extra_postmaster_args =>
          "-c hba_file='$pg_hba_conf' -c wal_level=logical -c max_wal_senders=1 -c max_replication_slots=1");
}
  or plan skip_all => "cannot create test postgres database: $Test::PostgreSQL::errstr";

#<<<
my @expected = (
    'BEGIN',
    "table public.test_tbl: INSERT: id[integer]:1 payload[text]:'qwerty1'",
    'COMMIT',
    'BEGIN',
    "table public.test_tbl: INSERT: id[integer]:2 payload[text]:'asdfgh'",
    'COMMIT',
    'BEGIN',
    "table public.test_tbl: INSERT: id[integer]:3 payload[text]:'qwerty2'",
    'COMMIT',
    'BEGIN',
    "table public.test_tbl: INSERT: id[integer]:4 payload[text]:'qwerty3'",
    'COMMIT',
);
#>>>

my $control = DBI->connect($pg->dsn, 'postgres');
$control->do('create table test_tbl (id int primary key, payload text)');

my $recv = new_ok(
    'AnyEvent::PgRecvlogical' => [
        dbname          => 'test',
        host            => '127.0.0.1',
        port            => $pg->port,
        username        => 'postgres',
        slot            => 'test',
        options         => { 'skip-empty-xacts' => 1, 'include-xids' => 0 },
        do_create_slot  => 1,
        slot_exists_ok  => 1,
        heartbeat       => 1,
        reconnect_delay => 1,
        on_message      => sub {
            is $_[0], shift @expected, $_[0];
            $end_cv->send(1) unless @expected;
            $CV or diag "no CV";
            $CV->end;
        },
        on_error => sub { fail $_[0]; $end_cv->croak(@_) },
    ],
    'pg_recvlogical'
);

ok $recv->dbh, 'connected';

$recv->start->done(sub { pass 'replication started' }, sub { fail 'replication started'; diag @_ });

ae_sleep(0.1) until $recv->received_lsn;



( run in 2.427 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )