IO-Async-Pg
view release on metacpan or search on metacpan
lib/IO/Async/Pg/Connection.pm view on Meta::CPAN
eval { $self->{dbh}->pg_cancel };
}
# Execute code within a transaction
async sub transaction {
my ($self, $code, %opts) = @_;
my $isolation = $opts{isolation};
my $savepoint_depth = $self->{_savepoint_depth} // 0;
if ($savepoint_depth > 0) {
# Nested transaction - use savepoint
my $savepoint = "sp_$savepoint_depth";
await $self->query("SAVEPOINT $savepoint");
$self->{_savepoint_depth} = $savepoint_depth + 1;
my $result = eval { await $code->($self) };
my $err = $@;
$self->{_savepoint_depth} = $savepoint_depth;
if ($err) {
await $self->query("ROLLBACK TO SAVEPOINT $savepoint");
die $err;
}
await $self->query("RELEASE SAVEPOINT $savepoint");
return $result;
}
else {
# Top-level transaction
my $begin = 'BEGIN';
if ($isolation) {
my $level = uc($isolation);
$level =~ s/_/ /g; # read_committed -> READ COMMITTED
$begin .= " ISOLATION LEVEL $level";
}
await $self->query($begin);
$self->{in_transaction} = 1;
$self->{_savepoint_depth} = 1;
my $result = eval { await $code->($self) };
my $err = $@;
$self->{_savepoint_depth} = 0;
if ($err) {
eval { await $self->query('ROLLBACK') };
$self->{in_transaction} = 0;
die $err;
}
await $self->query('COMMIT');
$self->{in_transaction} = 0;
return $result;
}
}
# Create a streaming cursor for large result sets
async sub cursor {
my ($self, $sql, @args) = @_;
# Parse arguments: can be positional values, hashref for named, or options
my ($bind, $opts) = $self->_parse_cursor_args(@args);
# Convert named placeholders if hashref provided
if (ref $bind eq 'HASH') {
($sql, $bind) = convert_placeholders($sql, $bind);
}
my $batch_size = delete $opts->{batch_size} // 1000;
my $cursor_name = delete $opts->{name} // IO::Async::Pg::Cursor::_generate_name();
# Must be in a transaction for cursors
my $was_in_transaction = $self->{in_transaction};
if (!$was_in_transaction) {
await $self->query('BEGIN');
$self->{in_transaction} = 1;
}
# Build DECLARE CURSOR statement
my $declare_sql = "DECLARE $cursor_name CURSOR FOR $sql";
# Execute the DECLARE
if (ref $bind eq 'ARRAY' && @$bind) {
await $self->query($declare_sql, @$bind);
}
else {
await $self->query($declare_sql);
}
my $cursor = IO::Async::Pg::Cursor->new(
name => $cursor_name,
batch_size => $batch_size,
conn => $self,
_owns_transaction => !$was_in_transaction,
);
return $cursor;
}
# Parse cursor arguments into bind values and options
sub _parse_cursor_args {
my ($self, @args) = @_;
my $opts = {};
my $bind = [];
# Check if last arg is options hash (contains cursor-specific keys)
if (@args && ref $args[-1] eq 'HASH') {
my $last = $args[-1];
# Distinguish between named placeholders and options
if (exists $last->{batch_size} || exists $last->{name}) {
$opts = pop @args;
}
}
# Remaining args are bind values
if (@args == 1 && ref $args[0] eq 'HASH') {
( run in 0.772 second using v1.01-cache-2.11-cpan-140bd7fdf52 )