EV-Pg

 view release on metacpan or  search on metacpan

t/08_features.t  view on Meta::CPAN

        conninfo => $conninfo,
        on_connect => sub {
            $pg->query("select generate_series(1,100) as n", sub {
                my ($data, $err) = @_;
                if (ref $data eq 'ARRAY' && @$data > 0) {
                    push @rows, $data->[0][0];
                    $pg->finish;
                    return;
                }
                $cb_after_finish++;
            });
            $pg->set_single_row_mode;
        },
        on_error => sub { diag "Error: $_[0]"; EV::break },
    );
    my $t = EV::timer(3, 0, sub { EV::break });
    EV::run;
    is(scalar @rows, 1, 'single_row finish: got only 1 row before abort');
    is($cb_after_finish, 0, 'single_row finish: no callback after finish');
}

# single row mode: abort via reset after first row
{
    my $pg;
    my @rows;
    my $reconnected = 0;
    my $followup_ok = 0;
    $pg = EV::Pg->new(
        conninfo => $conninfo,
        on_connect => sub {
            if ($reconnected) {
                $pg->query("select 'after_reset'", sub {
                    my ($data, $err) = @_;
                    ok(!$err, 'single_row reset: followup query ok');
                    is($data->[0][0], 'after_reset', 'single_row reset: followup result');
                    $followup_ok = 1;
                    EV::break;
                });
                return;
            }
            $pg->query("select generate_series(1,100) as n", sub {
                my ($data, $err) = @_;
                if (ref $data eq 'ARRAY' && @$data > 0) {
                    push @rows, $data->[0][0];
                    $reconnected = 1;
                    $pg->reset;
                    return;
                }
            });
            $pg->set_single_row_mode;
        },
        on_error => sub { diag "Error: $_[0]"; EV::break },
    );
    my $t = EV::timer(5, 0, sub { EV::break });
    EV::run;
    is(scalar @rows, 1, 'single_row reset: got only 1 row before abort');
    ok($reconnected, 'single_row reset: reconnected');
    ok($followup_ok, 'single_row reset: post-reconnect query succeeded');
}

# single row mode: DESTROY during single-row streaming
{
    my @rows;
    my $destroyed = 0;
    my $pg;
    $pg = EV::Pg->new(
        conninfo => $conninfo,
        on_connect => sub {
            $pg->query("select generate_series(1,100) as n", sub {
                my ($data, $err) = @_;
                if (ref $data eq 'ARRAY' && @$data > 0) {
                    push @rows, $data->[0][0];
                    undef $pg;  # triggers DESTROY
                    $destroyed = 1;
                    EV::break;
                    return;
                }
            });
            $pg->set_single_row_mode;
        },
        on_error => sub { diag "Error: $_[0]"; EV::break },
    );
    my $t = EV::timer(3, 0, sub { EV::break });
    EV::run;
    is(scalar @rows, 1, 'single_row destroy: got only 1 row');
    ok($destroyed, 'single_row destroy: DESTROY did not crash');
}

# ssl_attribute (may return undef if not using SSL)
with_pg(cb => sub {
    my ($pg) = @_;
    my $lib = $pg->ssl_attribute("library");
    ok(1, 'ssl_attribute: did not crash');
    EV::break;
});

# set_error_verbosity
with_pg(cb => sub {
    my ($pg) = @_;
    my $prev = $pg->set_error_verbosity(PQERRORS_VERBOSE);
    is($prev, PQERRORS_DEFAULT, 'set_error_verbosity: was DEFAULT');
    my $prev2 = $pg->set_error_verbosity(PQERRORS_DEFAULT);
    is($prev2, PQERRORS_VERBOSE, 'set_error_verbosity: was VERBOSE');
    EV::break;
});

# cancel_async (libpq >= 17)
SKIP: {
    skip 'requires libpq >= 17', 2 unless EV::Pg->lib_version >= 170000;
    my $cancel_timer;
    my ($got_query_err, $got_cancel_ok);
    with_pg(cb => sub {
        my ($pg) = @_;
        $pg->query("select pg_sleep(30)", sub {
            my ($data, $err) = @_;
            ok($err, 'cancel_async: query received error');
            $got_query_err = 1;
            EV::break if $got_cancel_ok;
        });
        $cancel_timer = EV::timer(0.5, 0, sub {
            $pg->cancel_async(sub {

t/08_features.t  view on Meta::CPAN

        is($m->{fields}[0]{name}, 'col_a', 'result_meta: field 0 name');
        is($m->{fields}[1]{name}, 'col_b', 'result_meta: field 1 name');
        ok(defined $m->{fields}[0]{type}, 'result_meta: field 0 has type OID');

        # result_meta cache must drop on reset (fresh conn = no prior result)
        $pg->reset;
        $pg->on_connect(sub {
            ok(!defined $pg->result_meta, 'result_meta: cleared on reset');
            EV::break;
        });
    });
});

# result_meta must NOT be refreshed by an error result (POD contract).
# When an error is the very first result, result_meta stays undef rather
# than returning a hash built from the error PGresult.
with_pg(cb => sub {
    my ($pg) = @_;
    $pg->query("select * from no_such_table_rm_xyz", sub {
        my ($data, $err) = @_;
        ok($err, 'result_meta error-first: query errored');
        ok(!defined $pg->result_meta,
            'result_meta: undef when first result is an error (not refreshed)');
        # a successful query refreshes it...
        $pg->query("select 1 as a, 2 as b", sub {
            is($pg->result_meta->{nfields}, 2, 'result_meta: refreshed by success');
            # ...and a following error leaves the previous meta in place
            $pg->query("select * from no_such_table_rm_xyz", sub {
                my (undef, $err2) = @_;
                ok($err2, 'result_meta error-after-success: query errored');
                is($pg->result_meta->{nfields}, 2,
                    'result_meta: error keeps last successful meta');
                EV::break;
            });
        });
    });
});

# result_meta after INSERT
with_pg(cb => sub {
    my ($pg) = @_;
    $pg->query("create temp table rm_test (id int)", sub {
        $pg->query("insert into rm_test values (1), (2)", sub {
            my ($data, $err) = @_;
            ok(!$err, 'result_meta insert: no error');
            my $m = $pg->result_meta;
            like($m->{cmd_status}, qr/INSERT/, 'result_meta insert: cmd_status');
            EV::break;
        });
    });
});

# result_meta during single-row mode
with_pg(cb => sub {
    my ($pg) = @_;
    my $meta_seen = 0;
    $pg->query("select 42 as val, 'x' as tag", sub {
        my ($data, $err) = @_;
        if (ref $data eq 'ARRAY' && @$data > 0) {
            my $m = $pg->result_meta;
            is(ref $m, 'HASH', 'result_meta single_row: hashref during streaming');
            is($m->{nfields}, 2, 'result_meta single_row: nfields = 2');
            is($m->{fields}[0]{name}, 'val', 'result_meta single_row: field 0 name');
            $meta_seen = 1;
            return;
        }
        ok($meta_seen, 'result_meta single_row: meta was available during streaming');
        EV::break;
    });
    $pg->set_single_row_mode;
});

# --- set_chunked_rows_mode (libpq >= 17) ---
SKIP: {
    skip 'requires libpq >= 17', 3 unless EV::Pg->lib_version >= 170000;
    with_pg(cb => sub {
        my ($pg) = @_;
        my @chunks;
        $pg->query("select generate_series(1,10) as n", sub {
            my ($data, $err) = @_;
            if (ref $data eq 'ARRAY' && @$data > 0) {
                push @chunks, scalar @$data;
                return;
            }
            ok(scalar @chunks >= 1, 'chunked_rows: got at least 1 chunk');
            my $total = 0;
            $total += $_ for @chunks;
            is($total, 10, 'chunked_rows: total 10 rows');
            ok(!$err, 'chunked_rows: no error');
            EV::break;
        });
        $pg->set_chunked_rows_mode(3);
    });
}

# chunked rows abort via finish
SKIP: {
    skip 'requires libpq >= 17', 2 unless EV::Pg->lib_version >= 170000;
    my $pg;
    my @chunks;
    $pg = EV::Pg->new(
        conninfo => $conninfo,
        on_connect => sub {
            $pg->query("select generate_series(1,1000) as n", sub {
                my ($data, $err) = @_;
                if (ref $data eq 'ARRAY' && @$data > 0) {
                    push @chunks, scalar @$data;
                    $pg->finish;
                    return;
                }
            });
            $pg->set_chunked_rows_mode(5);
        },
        on_error => sub { diag "Error: $_[0]"; EV::break },
    );
    my $t = EV::timer(3, 0, sub { EV::break });
    EV::run;
    is(scalar @chunks, 1, 'chunked finish: only 1 chunk before abort');
    ok(!$pg->is_connected, 'chunked finish: disconnected after finish');
}

# --- close_prepared (libpq >= 17) ---
SKIP: {
    skip 'requires libpq >= 17', 2 unless EV::Pg->lib_version >= 170000;
    with_pg(cb => sub {
        my ($pg) = @_;
        $pg->prepare("cp_test", "select 1", sub {



( run in 0.630 second using v1.01-cache-2.11-cpan-140bd7fdf52 )