DBIx-TransactionManager-Distributed
view release on metacpan or search on metacpan
lib/DBIx/TransactionManager/Distributed.pm view on Meta::CPAN
package DBIx::TransactionManager::Distributed;
use strict;
use warnings;
use Exporter qw(import export_to_level);
=head1 NAME
DBIx::TransactionManager::Distributed;
=head1 VERSION
0.02
=cut
our $VERSION = "0.02";
=head1 DESCRIPTION
Generic database handling utilities.
Currently provides a minimal database handle tracking facility, allowing code
to request a transaction against all active database handles.
=head1 SYNOPSIS
use DBIx::TransactionManager::Distributed qw(register_dbh release_dbh txn);
my $dbh1 = DBI->connect('dbi:Pg', '', '', { RaiseError => 1});
my $dbh2 = DBI->connect('dbi:Pg', '', '', { RaiseError => 1});
my $dbh3 = DBI->connect('dbi:Pg', '', '', { RaiseError => 1});
register_dbh(category1 => $dbh1);
register_dbh(category1 => $dbh2);
register_dbh(category2 => $dbh2);
register_dbh(category2 => $dbh3);
txn { $dbh1->do('update ta set name = "a"'); $dbh2->do('insert into tb values (1)') } 'category1';
txn { $dbh2->do('update tc set name = "b"'); $dbh3->do('insert into td values (2)') } 'category2';
release_dbh(category1 => $dbh1);
release_dbh(category1 => $dbh1);
release_dbh(category2 => $dbh2);
release_dbh(category3 => $dbh3);
=cut
use Scalar::Util qw(weaken refaddr);
use List::UtilsBy qw(extract_by);
our @EXPORT_OK = qw(register_dbh release_dbh dbh_is_registered txn register_cached_dbh);
# List of all retained handles by category. Since we don't expect to update
# the list often, and the usual action is to iterate through them all in
# sequence, we're using an array rather than a hash.
# Each $dbh will be stored as a weakref: all calls to register_dbh should
# be matched with a release_dbh or global destruction, but we can recover
# (and complain) if that doesn't happen.
my %DBH;
# Where we registered the dbh originally - top level key is category, second
# level is refaddr.
my %DBH_SOURCE;
# Last PID we saw - used for invalidating stale DBH on fork
my $PID = $$;
our $IN_TRANSACTION = 0;
=head2 register_dbh
Records the given database handle as being active and available for running transactions against.
Expects a category (string value) and L<DBI::db> instance.
Returns the database handle.
Example:
sub _dbh {
my $dbh = DBI->connect('dbi:Pg', '', '', { RaiseError => 1});
return DBIx::TransactionManager::Distributaed::register_dbh(category => $dbh);
}
=cut
sub register_dbh {
my ($category, $dbh) = @_;
die "too many parameters to register_dbh: @_" if @_ > 2;
my $addr = refaddr $dbh;
if (dbh_is_registered($category, $dbh)) {
warn "already registered this database handle at " . $DBH_SOURCE{$category}{$addr};
return;
}
push @{$DBH{$category}}, $dbh;
weaken($DBH{$category}[-1]);
# filename:line (package::sub)
$DBH_SOURCE{$category}{$addr} = sprintf "%s:%d (%s::%s)", (caller 1)[1, 2, 0, 3];
# We may be connecting partway through a transaction - if so, we want to join this handle onto the list of
# active transactions
$dbh->begin_work if $IN_TRANSACTION && $dbh->{AutoCommit};
$dbh;
}
=head2 release_dbh
Marks the given database handle as no longer active - it will not be used for any further transaction requests
via L</txn>.
Returns the database handle.
Example:
sub DESTROY {
my $self = shift;
return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
DBIx::TransactionManager::Distributaed::release_dbh($self->dbh)->disconnect;
}
=cut
sub release_dbh {
my ($category, $dbh) = @_;
die "too many parameters to release_dbh: @_" if @_ > 2;
# At destruction we may have an invalid handle
my $addr = refaddr $dbh or return $dbh;
unless (dbh_is_registered($category, $dbh)) {
my @other_categories = grep exists $DBH_SOURCE{$_}{$addr}, sort keys %DBH_SOURCE;
warn "releasing unregistered dbh $dbh for category $category"
. (@other_categories ? " (but found it in these categories instead: " . join ', ', @other_categories, ')' : '');
# If we did find it elsewhere, make sure we do cleanup to reduce confusion
_remove_dbh_from_category($_ => $dbh) for @other_categories;
}
_remove_dbh_from_category($category => $dbh);
return $dbh;
}
=head2 _remove_dbh_from_category
Helper function to reduce common code - removes the given C<$dbh> from a single category.
Used internally.
=cut
sub _remove_dbh_from_category {
my ($category, $dbh) = @_;
my $addr = refaddr $dbh or return $dbh;
delete $DBH_SOURCE{$category}{$addr};
# avoiding grep here because these are weakrefs and we want them to stay that way.
# since they're weakrefs, some of these may be undef
extract_by { $addr == (defined($_) ? refaddr($_) : 0) } @{$DBH{$category}};
return $dbh;
}
=head2 dbh_is_registered
( run in 0.542 second using v1.01-cache-2.11-cpan-39bf76dae61 )