DBIx-Class-Async
view release on metacpan or search on metacpan
lib/DBIx/Class/Async.pm view on Meta::CPAN
_is_connected => 1,
_worker_idx => 0,
_query_cache => {},
_stats => {
_queries => 0,
_errors => 0,
_cache_hits => 0,
_cache_misses => 0,
_deadlocks => 0,
_retries => 0,
},
_debug => $args{debug} || 0,
};
_init_metrics($async_db) if $async_db->{_enable_metrics};
_init_workers($async_db);
if (my $interval = $args{health_check} // HEALTH_CHECK_INTERVAL) {
_start_health_checks($async_db, $interval);
}
return $async_db;
}
=head2 disconnect
Gracefully shuts down all background workers and clears timers. Always call
this before your application exits to prevent zombie processes.
DBIx::Class::Async->disconnect($db);
Internally this performs cleanup in the correct order required by
L<IO::Async::Function>: each worker is first removed from the event loop
(C<< $loop->remove >>) and then stopped (C<< $function->stop >>). Reversing
this order causes C<remove()> to silently fail, leaving orphaned notifiers
in the loop's registry that trigger a C<SEGV> during Perl global destruction.
=cut
sub disconnect {
my ($db) = @_;
return unless $db && ref $db eq 'HASH';
return unless $db->{_loop};
# 1. Clear the health check timer
# remove() BEFORE stop() - same ordering rule as workers below.
if ($db->{_health_check_timer}) {
eval { $db->{_loop}->remove($db->{_health_check_timer}) };
eval { $db->{_health_check_timer}->stop };
delete $db->{_health_check_timer};
}
# 2. Remove and stop each worker.
#
# CRITICAL ORDER: remove() MUST be called BEFORE stop().
#
# IO::Async::Function::stop() sets an internal {stopping} flag and
# begins tearing down IPC channels. Once that flag is set, remove()
# silently fails - the notifier stays in the loop's internal registry,
# keeping the loop (and its SIGCHLD handler) alive past the point where
# we expect them to be freed. During Perl global destruction the loop's
# XS destructor then accesses already-freed memory -> SEGV.
#
# Calling remove() first detaches the Function and all its child
# notifiers (Streams, Handles, Processes) cleanly, then stop() signals
# the worker processes to exit and returns a Future we can await.
if ($db->{_workers}) {
my @stop_futures;
foreach my $worker_info (@{ $db->{_workers} }) {
my $instance = $worker_info->{instance} or next;
eval { $db->{_loop}->remove($instance) };
my $f = eval { $instance->stop };
push @stop_futures, $f if $f && ref $f;
}
$db->{_workers} = [];
# Await the stop Futures so worker processes begin their shutdown
# sequence before we proceed. stop() returns a Future that resolves
# when the IPC channels are closed (not when the OS process exits,
# but close enough to prevent the SEGV in the common case).
if (@stop_futures) {
eval { $db->{_loop}->await(Future->needs_all(@stop_futures)) };
}
}
# 3. Clean up global IO::Async state that outlives the loop object.
#
# IO::Async::Loop maintains a $ONE_TRUE_LOOP singleton and installs a
# SIGCHLD handler when workers are forked. Both hold strong references
# to the loop object, preventing garbage collection even after all our
# Perl-level references are dropped. In global destruction this causes
# a SEGV because the loop's XS destructor runs after other XS objects
# it depends on have already been freed.
#
# We only do this when we own the loop (i.e. we created it internally).
# For user-supplied loops the caller is responsible for lifecycle.
if ($db->{_owns_loop}) {
undef $IO::Async::Loop::ONE_TRUE_LOOP;
$SIG{CHLD} = $db->{_old_sigchld};
}
# 4. Final state update
$db->{_is_connected} = 0;
return 1;
}
#
#
# PRIVATE METHODS
sub _call_worker {
my ($db, $operation, @args) = @_;
warn "[PID $$] Bridge - sending '$operation' to worker." if ASYNC_TRACE;
# ------------------------------------------------------------------
# Derive result_class from the payload so it is available throughout
# this method without changing the call signature at 13 call sites.
#
lib/DBIx/Class/Async.pm view on Meta::CPAN
_resolve_placeholders($step, \%register);
# my $rs = $schema->resultset($step->{resultset});
my $action = $step->{action};
my $result_data;
if ($action eq 'raw') {
# Raw SQL bypasses the Resultset layer
my $dbh = $schema->storage->dbh;
$dbh->do($step->{sql}, undef, @{$step->{bind} || []});
$result_data = { success => 1 };
}
else {
# CRUD operations require a Resultset
my $rs_name = $step->{resultset}
or die "txn_do: action '$action' requires a 'resultset' parameter";
my $rs = $schema->resultset($rs_name);
if ($action eq 'create') {
my $row = $rs->create($step->{data});
$result_data = { id => $row->id, data => { $row->get_columns } };
}
elsif ($action eq 'find') {
my $row = $rs->find($step->{id});
die "txn_do: record not found" unless $row;
$result_data = { id => $row->id, data => { $row->get_columns } };
}
elsif ($action eq 'update') {
my $row = $rs->find($step->{id});
die "txn_do: record not found for update" unless $row;
$row->update($step->{data});
$result_data = { success => 1, id => $row->id };
}
}
if ($step->{name} && $result_data->{id}) {
$register{ '$' . $step->{name} . '.id' } = $result_data->{id};
}
push @step_results, $result_data;
}
return { results => \@step_results, success => 1 };
});
};
return $@ ? { error => "Transaction failed: $@", success => 0 }
: $txn_result;
}
elsif ($operation eq 'txn_begin') {
$schema->storage->txn_begin;
return { success => 1 };
}
elsif ($operation eq 'txn_commit') {
$schema->storage->txn_commit;
return { success => 1 };
}
elsif ($operation eq 'txn_rollback') {
$schema->storage->txn_rollback;
return { success => 1 };
}
elsif ($operation eq 'ping') {
my $alive = eval { $schema->storage->dbh->do("SELECT 1") };
return { success => ($alive ? 1 : 0), status => "pong" };
}
else {
die "Unknown operation: $operation";
}
}
catch {
warn "[PID $$] Worker execution error: $_"
if ASYNC_TRACE;
return { error => "$_", success => 0 };
};
my $safe_result = $deflator->($result);
if (ASYNC_TRACE) {
warn "[PID $$] Worker returning result type: " . ref($safe_result);
warn "[PID $$] Worker returning: $safe_result";
}
return $safe_result;
},
max_workers => 1,
);
$db->{_loop}->add($worker);
push @{$db->{_workers}}, {
instance => $worker,
healthy => 1,
pid => undef,
};
}
}
sub _init_metrics {
my $db = shift;
# Try to load Metrics::Any
eval {
require Metrics::Any;
Metrics::Any->import('$METRICS');
# Initialise metrics
$METRICS->make_counter('db_async_queries_total');
$METRICS->make_counter('db_async_cache_hits_total');
$METRICS->make_counter('db_async_cache_misses_total');
$METRICS->make_histogram('db_async_query_duration_seconds');
$METRICS->make_gauge('db_async_workers_active');
};
# Silently ignore if Metrics::Any is not available
if ($@) {
$db->{_enable_metrics} = 0;
undef $METRICS;
}
}
sub _next_worker {
my ($db) = @_;
return unless $db->{_workers} && @{$db->{_workers}};
( run in 0.873 second using v1.01-cache-2.11-cpan-df04353d9ac )