IO-Async-Pg

 view release on metacpan or  search on metacpan

lib/IO/Async/Pg/Connection.pm  view on Meta::CPAN

    eval { $self->{dbh}->pg_cancel };
}

# Execute code within a transaction
async sub transaction {
    my ($self, $code, %opts) = @_;

    my $isolation = $opts{isolation};
    my $savepoint_depth = $self->{_savepoint_depth} // 0;

    if ($savepoint_depth > 0) {
        # Nested transaction - use savepoint
        my $savepoint = "sp_$savepoint_depth";
        await $self->query("SAVEPOINT $savepoint");

        $self->{_savepoint_depth} = $savepoint_depth + 1;

        my $result = eval { await $code->($self) };
        my $err = $@;

        $self->{_savepoint_depth} = $savepoint_depth;

        if ($err) {
            await $self->query("ROLLBACK TO SAVEPOINT $savepoint");
            die $err;
        }

        await $self->query("RELEASE SAVEPOINT $savepoint");
        return $result;
    }
    else {
        # Top-level transaction
        my $begin = 'BEGIN';
        if ($isolation) {
            my $level = uc($isolation);
            $level =~ s/_/ /g;  # read_committed -> READ COMMITTED
            $begin .= " ISOLATION LEVEL $level";
        }
        await $self->query($begin);
        $self->{in_transaction} = 1;

        $self->{_savepoint_depth} = 1;

        my $result = eval { await $code->($self) };
        my $err = $@;

        $self->{_savepoint_depth} = 0;

        if ($err) {
            eval { await $self->query('ROLLBACK') };
            $self->{in_transaction} = 0;
            die $err;
        }

        await $self->query('COMMIT');
        $self->{in_transaction} = 0;
        return $result;
    }
}

# Create a streaming cursor for large result sets
async sub cursor {
    my ($self, $sql, @args) = @_;

    # Parse arguments: can be positional values, hashref for named, or options
    my ($bind, $opts) = $self->_parse_cursor_args(@args);

    # Convert named placeholders if hashref provided
    if (ref $bind eq 'HASH') {
        ($sql, $bind) = convert_placeholders($sql, $bind);
    }

    my $batch_size = delete $opts->{batch_size} // 1000;
    my $cursor_name = delete $opts->{name} // IO::Async::Pg::Cursor::_generate_name();

    # Must be in a transaction for cursors
    my $was_in_transaction = $self->{in_transaction};
    if (!$was_in_transaction) {
        await $self->query('BEGIN');
        $self->{in_transaction} = 1;
    }

    # Build DECLARE CURSOR statement
    my $declare_sql = "DECLARE $cursor_name CURSOR FOR $sql";

    # Execute the DECLARE
    if (ref $bind eq 'ARRAY' && @$bind) {
        await $self->query($declare_sql, @$bind);
    }
    else {
        await $self->query($declare_sql);
    }

    my $cursor = IO::Async::Pg::Cursor->new(
        name       => $cursor_name,
        batch_size => $batch_size,
        conn       => $self,
        _owns_transaction => !$was_in_transaction,
    );

    return $cursor;
}

# Parse cursor arguments into bind values and options
sub _parse_cursor_args {
    my ($self, @args) = @_;

    my $opts = {};
    my $bind = [];

    # Check if last arg is options hash (contains cursor-specific keys)
    if (@args && ref $args[-1] eq 'HASH') {
        my $last = $args[-1];
        # Distinguish between named placeholders and options
        if (exists $last->{batch_size} || exists $last->{name}) {
            $opts = pop @args;
        }
    }

    # Remaining args are bind values
    if (@args == 1 && ref $args[0] eq 'HASH') {



( run in 0.772 second using v1.01-cache-2.11-cpan-140bd7fdf52 )