DBIx-QuickORM

 view release on metacpan or  search on metacpan

lib/DBIx/QuickORM/Connection.pm  view on Meta::CPAN

package DBIx::QuickORM::Connection;
use strict;
use warnings;
use feature qw/state/;

our $VERSION = '0.000019';

use Carp qw/confess croak cluck/;
use Scalar::Util qw/blessed weaken/;
use DBIx::QuickORM::Util qw/load_class/;

use DBIx::QuickORM::Handle;
use DBIx::QuickORM::Connection::Transaction;

use DBIx::QuickORM::Util::HashBase qw{
    <orm
    <dbh
    <dialect
    <pid
    <schema
    <transactions
    +_savepoint_counter
    +_txn_counter
    <manager
    <in_async
    <asides
    <forks
    <default_sql_builder
    <default_internal_txn
    <default_handle_class
};

sub init {
    my $self = shift;

    my $orm = $self->{+ORM} or croak "An orm is required";
    my $db = $orm->db;

    $self->{+_SAVEPOINT_COUNTER} //= 1;
    $self->{+_TXN_COUNTER} //= 1;

    $self->{+PID} = $$;

    $self->{+DBH} = $db->new_dbh;

    $self->{+DIALECT} = $db->dialect->new(dbh => $self->{+DBH}, db_name => $db->db_name);

    $self->{+DEFAULT_INTERNAL_TXN} //= 1;

    $self->{+ASIDES} = {};
    $self->{+FORKS}  = {};

    $self->{+DEFAULT_HANDLE_CLASS} //= $orm->default_handle_class // 'DBIx::QuickORM::Handle';

    $self->{+DEFAULT_SQL_BUILDER} //= do {
        require DBIx::QuickORM::SQLBuilder::SQLAbstract;
        DBIx::QuickORM::SQLBuilder::SQLAbstract->new();
    };

    my $txns = $self->{+TRANSACTIONS} = [];
    my $manager = $self->{+MANAGER} // 'DBIx::QuickORM::RowManager::Cached';
    if (blessed($manager)) {
        $manager->set_connection($self);
        $manager->set_transactions($txns);
    }
    else {
        my $class = load_class($manager) or die $@;
        $self->{+MANAGER} = $class->new(transactions => $txns, connection => $self);
    }

    if (my $autofill = $orm->autofill) {
        my $schema = $self->{+DIALECT}->build_schema_from_db(autofill => $autofill);

        if (my $schema2 = $orm->schema) {
            $self->{+SCHEMA} = $schema->merge($schema2);
        }
        else {
            $self->{+SCHEMA} = $schema->clone;
        }
    }
    else {
        $self->{+SCHEMA} = $orm->schema->clone;
    }
}

########################
# {{{ Async/Aside/Fork #
########################

sub set_async {
    my $self = shift;
    my ($async) = @_;

    croak "There is already an async query in progress" if $self->{+IN_ASYNC} && !$self->{+IN_ASYNC}->done;

    $self->{+IN_ASYNC} = $async;
    weaken($self->{+IN_ASYNC});

    return $async;
}

sub add_aside {
    my $self = shift;
    my ($aside) = @_;

    $self->{+ASIDES}->{$aside} = $aside;
    weaken($self->{+ASIDES}->{$aside});

    return $aside;
}

sub add_fork {
    my $self = shift;
    my ($fork) = @_;

    $self->{+FORKS}->{$fork} = $fork;
    weaken($self->{+FORKS}->{$fork});

    return $fork;
}

sub clear_async {
    my $self = shift;
    my ($async) = @_;

    croak "Not currently running an async query" unless $self->{+IN_ASYNC};

    croak "Mismatch, we are in an async query, but not the one we are trying to clear"
        unless $async == $self->{+IN_ASYNC};

    delete $self->{+IN_ASYNC};
}

sub clear_aside {
    my $self = shift;
    my ($aside) = @_;

    croak "Not currently running that aside query" unless $self->{+ASIDES}->{$aside};

    delete $self->{+ASIDES}->{$aside};
}

sub clear_fork {
    my $self = shift;
    my ($fork) = @_;

    croak "Not currently running that fork query" unless $self->{+FORKS}->{$fork};

    delete $self->{+FORKS}->{$fork};
}

########################
# }}} Async/Aside/Fork #
########################

#####################
# {{{ SANITY CHECKS #
#####################

sub pid_and_async_check {
    my $self = shift;
    return $self->pid_check && $self->async_check;
}

sub pid_check {
    my $self = shift;
    confess "Connections cannot be used across multiple processes, you must reconnect post-fork" unless $$ == $self->{+PID};
    return 1;
}

sub async_check {
    my $self = shift;

    my $async = $self->{+IN_ASYNC} or return 1;
    confess "There is currently an async query running, it must be completed before you run another query" unless $async->done;
    delete $self->{+IN_ASYNC};
    return 1;

lib/DBIx/QuickORM/Connection.pm  view on Meta::CPAN

    my $self = shift;
    $self->pid_check;

    my @caller = caller;

    my $txns = $self->{+TRANSACTIONS};

    my $cb = (@_ && ref($_[0]) eq 'CODE') ? shift : undef;
    my %params = @_;
    $cb //= $params{action};

    croak "Cannot start a transaction while there is an active async query" if $self->{+IN_ASYNC} && !$self->{+IN_ASYNC}->done;

    unless ($params{force}) {
        unless ($params{ignore_aside}) {
            my $count = grep { $_ && !$_->done } values %{$self->{+ASIDES} // {}};
            croak "Cannot start a transaction while there is an active aside query (unless you use ignore_aside => 1, or force => 1)" if $count;
        }

        unless ($params{ignore_forks}) {
            my $count = grep { $_ && !$_->done } values %{$self->{+FORKS} // {}};
            croak "Cannot start a transaction while there is an active forked query (unless you use ignore_forked => 1, or force => 1)" if $count;
        }
    }

    my $id = $self->{+_TXN_COUNTER}++;

    my $dialect = $self->dialect;

    my $sp;
    if (@$txns) {
        $sp = "SAVEPOINT_${$}_" . $self->{+_SAVEPOINT_COUNTER}++;
        $dialect->create_savepoint(savepoint => $sp);
    }
    elsif ($self->dialect->in_txn) {
        croak "A transaction is already open, but it is not controlled by DBIx::QuickORM";
    }
    else {
        $dialect->start_txn;
    }

    my $txn = DBIx::QuickORM::Connection::Transaction->new(
        id            => $id,
        savepoint     => $sp,
        trace         => \@caller,
        on_fail       => $params{on_fail},
        on_success    => $params{on_success},
        on_completion => $params{on_completion},
    );

    my ($root, $parent) = @$txns ? (@{$txns}[0,-1]) : ($txn, $txn);

    $parent->add_fail_callback($params{'on_parent_fail'})             if $params{on_parent_fail};
    $parent->add_success_callback($params{'on_parent_success'})       if $params{on_parent_success};
    $parent->add_completion_callback($params{'on_parent_completion'}) if $params{on_parent_completion};
    $root->add_fail_callback($params{'on_root_fail'})                 if $params{on_root_fail};
    $root->add_success_callback($params{'on_root_success'})           if $params{on_root_success};
    $root->add_completion_callback($params{'on_root_completion'})     if $params{on_root_completion};

    push @{$txns} => $txn;
    weaken($txns->[-1]);

    my $ran = 0;
    my $finalize = sub {
        my ($txnx, $ok, @errors) = @_;

        return if $ran++;

        $txnx->throw("Cannot stop a transaction while there is an active async query")
            if $self->{+IN_ASYNC} && !$self->{+IN_ASYNC}->done;

        $txnx->throw("Internal Error: Transaction stack mismatch")
            unless @$txns && ($txnx->in_destroy && !$txns->[-1]) || $txns->[-1] == $txnx;

        pop @$txns;

        my $rolled_back = $txnx->rolled_back;
        my $res         = $ok && !$rolled_back;

        if ($sp) {
            if   ($res) { $dialect->commit_savepoint(savepoint => $sp) }
            else        { $dialect->rollback_savepoint(savepoint => $sp) }
        }
        else {
            if   ($res) { $dialect->commit_txn }
            else        { $dialect->rollback_txn }
        }

        my ($ok2, $err2) = $txnx->terminate($res, \@errors);
        unless ($ok2) {
            $ok = 0;
            push @errors => @$err2;
        }

        return if $ok;
        $txnx->throw(join "\n" => @errors);
    };

    unless($cb) {
        $txn->set_no_last(1);
        $txn->set_finalize($finalize);
        return $txn;
    }

    local $@;
    my $ok = eval {
        QORM_TRANSACTION: { $cb->($txn) };
        1;
    };

    $finalize->($txn, $ok, $@);

    return $txn;
}

{
    no warnings 'once';
    *in_transaction = \&in_txn;
}
sub in_txn {
    my $self = shift;



( run in 2.770 seconds using v1.01-cache-2.11-cpan-437f7b0c052 )