DBIx-Class-Async
view release on metacpan or search on metacpan
lib/DBIx/Class/Async.pm view on Meta::CPAN
_resolve_placeholders($step, \%register);
# my $rs = $schema->resultset($step->{resultset});
my $action = $step->{action};
my $result_data;
if ($action eq 'raw') {
# Raw SQL bypasses the Resultset layer
my $dbh = $schema->storage->dbh;
$dbh->do($step->{sql}, undef, @{$step->{bind} || []});
$result_data = { success => 1 };
}
else {
# CRUD operations require a Resultset
my $rs_name = $step->{resultset}
or die "txn_do: action '$action' requires a 'resultset' parameter";
my $rs = $schema->resultset($rs_name);
if ($action eq 'create') {
my $row = $rs->create($step->{data});
$result_data = { id => $row->id, data => { $row->get_columns } };
}
elsif ($action eq 'find') {
my $row = $rs->find($step->{id});
die "txn_do: record not found" unless $row;
$result_data = { id => $row->id, data => { $row->get_columns } };
}
elsif ($action eq 'update') {
my $row = $rs->find($step->{id});
die "txn_do: record not found for update" unless $row;
$row->update($step->{data});
$result_data = { success => 1, id => $row->id };
}
}
if ($step->{name} && $result_data->{id}) {
$register{ '$' . $step->{name} . '.id' } = $result_data->{id};
}
push @step_results, $result_data;
}
return { results => \@step_results, success => 1 };
});
};
return $@ ? { error => "Transaction failed: $@", success => 0 }
: $txn_result;
}
elsif ($operation eq 'txn_begin') {
$schema->storage->txn_begin;
return { success => 1 };
}
elsif ($operation eq 'txn_commit') {
$schema->storage->txn_commit;
return { success => 1 };
}
elsif ($operation eq 'txn_rollback') {
$schema->storage->txn_rollback;
return { success => 1 };
}
elsif ($operation eq 'ping') {
my $alive = eval { $schema->storage->dbh->do("SELECT 1") };
return { success => ($alive ? 1 : 0), status => "pong" };
}
else {
die "Unknown operation: $operation";
}
}
catch {
warn "[PID $$] Worker execution error: $_"
if ASYNC_TRACE;
return { error => "$_", success => 0 };
};
my $safe_result = $deflator->($result);
if (ASYNC_TRACE) {
warn "[PID $$] Worker returning result type: " . ref($safe_result);
warn "[PID $$] Worker returning: $safe_result";
}
return $safe_result;
},
max_workers => 1,
);
$db->{_loop}->add($worker);
push @{$db->{_workers}}, {
instance => $worker,
healthy => 1,
pid => undef,
};
}
}
sub _init_metrics {
my $db = shift;
# Try to load Metrics::Any
eval {
require Metrics::Any;
Metrics::Any->import('$METRICS');
# Initialise metrics
$METRICS->make_counter('db_async_queries_total');
$METRICS->make_counter('db_async_cache_hits_total');
$METRICS->make_counter('db_async_cache_misses_total');
$METRICS->make_histogram('db_async_query_duration_seconds');
$METRICS->make_gauge('db_async_workers_active');
};
# Silently ignore if Metrics::Any is not available
if ($@) {
$db->{_enable_metrics} = 0;
undef $METRICS;
}
}
sub _next_worker {
my ($db) = @_;
return unless $db->{_workers} && @{$db->{_workers}};
( run in 1.077 second using v1.01-cache-2.11-cpan-39bf76dae61 )