DBIx-QuickORM
view release on metacpan or search on metacpan
lib/DBIx/QuickORM/Connection.pm view on Meta::CPAN
package DBIx::QuickORM::Connection;
use strict;
use warnings;
use feature qw/state/;
our $VERSION = '0.000019';
use Carp qw/confess croak cluck/;
use Scalar::Util qw/blessed weaken/;
use DBIx::QuickORM::Util qw/load_class/;
use DBIx::QuickORM::Handle;
use DBIx::QuickORM::Connection::Transaction;
use DBIx::QuickORM::Util::HashBase qw{
<orm
<dbh
<dialect
<pid
<schema
<transactions
+_savepoint_counter
+_txn_counter
<manager
<in_async
<asides
<forks
<default_sql_builder
<default_internal_txn
<default_handle_class
};
sub init {
my $self = shift;
my $orm = $self->{+ORM} or croak "An orm is required";
my $db = $orm->db;
$self->{+_SAVEPOINT_COUNTER} //= 1;
$self->{+_TXN_COUNTER} //= 1;
$self->{+PID} = $$;
$self->{+DBH} = $db->new_dbh;
$self->{+DIALECT} = $db->dialect->new(dbh => $self->{+DBH}, db_name => $db->db_name);
$self->{+DEFAULT_INTERNAL_TXN} //= 1;
$self->{+ASIDES} = {};
$self->{+FORKS} = {};
$self->{+DEFAULT_HANDLE_CLASS} //= $orm->default_handle_class // 'DBIx::QuickORM::Handle';
$self->{+DEFAULT_SQL_BUILDER} //= do {
require DBIx::QuickORM::SQLBuilder::SQLAbstract;
DBIx::QuickORM::SQLBuilder::SQLAbstract->new();
};
my $txns = $self->{+TRANSACTIONS} = [];
my $manager = $self->{+MANAGER} // 'DBIx::QuickORM::RowManager::Cached';
if (blessed($manager)) {
$manager->set_connection($self);
$manager->set_transactions($txns);
}
else {
my $class = load_class($manager) or die $@;
$self->{+MANAGER} = $class->new(transactions => $txns, connection => $self);
}
if (my $autofill = $orm->autofill) {
my $schema = $self->{+DIALECT}->build_schema_from_db(autofill => $autofill);
if (my $schema2 = $orm->schema) {
$self->{+SCHEMA} = $schema->merge($schema2);
}
else {
$self->{+SCHEMA} = $schema->clone;
}
}
else {
$self->{+SCHEMA} = $orm->schema->clone;
}
}
########################
# {{{ Async/Aside/Fork #
########################
sub set_async {
my $self = shift;
my ($async) = @_;
croak "There is already an async query in progress" if $self->{+IN_ASYNC} && !$self->{+IN_ASYNC}->done;
$self->{+IN_ASYNC} = $async;
weaken($self->{+IN_ASYNC});
return $async;
}
sub add_aside {
my $self = shift;
my ($aside) = @_;
$self->{+ASIDES}->{$aside} = $aside;
weaken($self->{+ASIDES}->{$aside});
return $aside;
}
sub add_fork {
my $self = shift;
my ($fork) = @_;
$self->{+FORKS}->{$fork} = $fork;
weaken($self->{+FORKS}->{$fork});
return $fork;
}
sub clear_async {
my $self = shift;
my ($async) = @_;
croak "Not currently running an async query" unless $self->{+IN_ASYNC};
croak "Mismatch, we are in an async query, but not the one we are trying to clear"
unless $async == $self->{+IN_ASYNC};
delete $self->{+IN_ASYNC};
}
sub clear_aside {
my $self = shift;
my ($aside) = @_;
croak "Not currently running that aside query" unless $self->{+ASIDES}->{$aside};
delete $self->{+ASIDES}->{$aside};
}
sub clear_fork {
my $self = shift;
my ($fork) = @_;
croak "Not currently running that fork query" unless $self->{+FORKS}->{$fork};
delete $self->{+FORKS}->{$fork};
}
########################
# }}} Async/Aside/Fork #
########################
#####################
# {{{ SANITY CHECKS #
#####################
sub pid_and_async_check {
my $self = shift;
return $self->pid_check && $self->async_check;
}
sub pid_check {
my $self = shift;
confess "Connections cannot be used across multiple processes, you must reconnect post-fork" unless $$ == $self->{+PID};
return 1;
}
sub async_check {
my $self = shift;
my $async = $self->{+IN_ASYNC} or return 1;
confess "There is currently an async query running, it must be completed before you run another query" unless $async->done;
delete $self->{+IN_ASYNC};
return 1;
lib/DBIx/QuickORM/Connection.pm view on Meta::CPAN
my $self = shift;
$self->pid_check;
my @caller = caller;
my $txns = $self->{+TRANSACTIONS};
my $cb = (@_ && ref($_[0]) eq 'CODE') ? shift : undef;
my %params = @_;
$cb //= $params{action};
croak "Cannot start a transaction while there is an active async query" if $self->{+IN_ASYNC} && !$self->{+IN_ASYNC}->done;
unless ($params{force}) {
unless ($params{ignore_aside}) {
my $count = grep { $_ && !$_->done } values %{$self->{+ASIDES} // {}};
croak "Cannot start a transaction while there is an active aside query (unless you use ignore_aside => 1, or force => 1)" if $count;
}
unless ($params{ignore_forks}) {
my $count = grep { $_ && !$_->done } values %{$self->{+FORKS} // {}};
croak "Cannot start a transaction while there is an active forked query (unless you use ignore_forked => 1, or force => 1)" if $count;
}
}
my $id = $self->{+_TXN_COUNTER}++;
my $dialect = $self->dialect;
my $sp;
if (@$txns) {
$sp = "SAVEPOINT_${$}_" . $self->{+_SAVEPOINT_COUNTER}++;
$dialect->create_savepoint(savepoint => $sp);
}
elsif ($self->dialect->in_txn) {
croak "A transaction is already open, but it is not controlled by DBIx::QuickORM";
}
else {
$dialect->start_txn;
}
my $txn = DBIx::QuickORM::Connection::Transaction->new(
id => $id,
savepoint => $sp,
trace => \@caller,
on_fail => $params{on_fail},
on_success => $params{on_success},
on_completion => $params{on_completion},
);
my ($root, $parent) = @$txns ? (@{$txns}[0,-1]) : ($txn, $txn);
$parent->add_fail_callback($params{'on_parent_fail'}) if $params{on_parent_fail};
$parent->add_success_callback($params{'on_parent_success'}) if $params{on_parent_success};
$parent->add_completion_callback($params{'on_parent_completion'}) if $params{on_parent_completion};
$root->add_fail_callback($params{'on_root_fail'}) if $params{on_root_fail};
$root->add_success_callback($params{'on_root_success'}) if $params{on_root_success};
$root->add_completion_callback($params{'on_root_completion'}) if $params{on_root_completion};
push @{$txns} => $txn;
weaken($txns->[-1]);
my $ran = 0;
my $finalize = sub {
my ($txnx, $ok, @errors) = @_;
return if $ran++;
$txnx->throw("Cannot stop a transaction while there is an active async query")
if $self->{+IN_ASYNC} && !$self->{+IN_ASYNC}->done;
$txnx->throw("Internal Error: Transaction stack mismatch")
unless @$txns && ($txnx->in_destroy && !$txns->[-1]) || $txns->[-1] == $txnx;
pop @$txns;
my $rolled_back = $txnx->rolled_back;
my $res = $ok && !$rolled_back;
if ($sp) {
if ($res) { $dialect->commit_savepoint(savepoint => $sp) }
else { $dialect->rollback_savepoint(savepoint => $sp) }
}
else {
if ($res) { $dialect->commit_txn }
else { $dialect->rollback_txn }
}
my ($ok2, $err2) = $txnx->terminate($res, \@errors);
unless ($ok2) {
$ok = 0;
push @errors => @$err2;
}
return if $ok;
$txnx->throw(join "\n" => @errors);
};
unless($cb) {
$txn->set_no_last(1);
$txn->set_finalize($finalize);
return $txn;
}
local $@;
my $ok = eval {
QORM_TRANSACTION: { $cb->($txn) };
1;
};
$finalize->($txn, $ok, $@);
return $txn;
}
{
no warnings 'once';
*in_transaction = \&in_txn;
}
sub in_txn {
my $self = shift;
( run in 2.770 seconds using v1.01-cache-2.11-cpan-437f7b0c052 )