DBIx-Class-Async

 view release on metacpan or  search on metacpan

lib/DBIx/Class/Async.pm  view on Meta::CPAN

                                    _resolve_placeholders($step, \%register);

                                    # my $rs = $schema->resultset($step->{resultset});
                                    my $action = $step->{action};
                                    my $result_data;

                                    if ($action eq 'raw') {
                                        # Raw SQL bypasses the Resultset layer
                                        my $dbh = $schema->storage->dbh;
                                        $dbh->do($step->{sql}, undef, @{$step->{bind} || []});
                                        $result_data = { success => 1 };
                                    }
                                    else {
                                        # CRUD operations require a Resultset
                                        my $rs_name = $step->{resultset}
                                            or die "txn_do: action '$action' requires a 'resultset' parameter";
                                        my $rs = $schema->resultset($rs_name);

                                        if ($action eq 'create') {
                                            my $row = $rs->create($step->{data});
                                            $result_data = { id => $row->id, data => { $row->get_columns } };
                                        }
                                        elsif ($action eq 'find') {
                                            my $row = $rs->find($step->{id});
                                            die "txn_do: record not found" unless $row;
                                            $result_data = { id => $row->id, data => { $row->get_columns } };
                                        }
                                        elsif ($action eq 'update') {
                                            my $row = $rs->find($step->{id});
                                            die "txn_do: record not found for update" unless $row;
                                            $row->update($step->{data});
                                            $result_data = { success => 1, id => $row->id };
                                        }
                                    }

                                    if ($step->{name} && $result_data->{id}) {
                                        $register{ '$' . $step->{name} . '.id' } = $result_data->{id};
                                    }
                                    push @step_results, $result_data;
                                }
                                return { results => \@step_results, success => 1 };
                            });
                        };

                        return $@ ? { error => "Transaction failed: $@", success => 0 }
                                  : $txn_result;
                    }
                    elsif ($operation eq 'txn_begin') {
                        $schema->storage->txn_begin;
                        return { success => 1 };
                    }
                    elsif ($operation eq 'txn_commit') {
                        $schema->storage->txn_commit;
                        return { success => 1 };
                    }
                    elsif ($operation eq 'txn_rollback') {
                        $schema->storage->txn_rollback;
                        return { success => 1 };
                    }
                    elsif ($operation eq 'ping') {
                        my $alive = eval { $schema->storage->dbh->do("SELECT 1") };
                        return { success => ($alive ? 1 : 0), status => "pong" };
                    }
                    else {
                        die "Unknown operation: $operation";
                    }
                }
                catch {
                    warn "[PID $$] Worker execution error: $_"
                        if ASYNC_TRACE;
                    return { error => "$_", success => 0 };
                };

                my $safe_result = $deflator->($result);
                if (ASYNC_TRACE) {
                    warn "[PID $$] Worker returning result type: " . ref($safe_result);
                    warn "[PID $$] Worker returning: $safe_result";
                }
                return $safe_result;
            },
            max_workers => 1,
        );

        $db->{_loop}->add($worker);

        push @{$db->{_workers}}, {
            instance => $worker,
            healthy  => 1,
            pid      => undef,
        };
    }
}

sub _init_metrics {
    my $db = shift;

    # Try to load Metrics::Any
    eval {
        require Metrics::Any;
        Metrics::Any->import('$METRICS');

        # Initialise metrics
        $METRICS->make_counter('db_async_queries_total');
        $METRICS->make_counter('db_async_cache_hits_total');
        $METRICS->make_counter('db_async_cache_misses_total');
        $METRICS->make_histogram('db_async_query_duration_seconds');
        $METRICS->make_gauge('db_async_workers_active');

    };

    # Silently ignore if Metrics::Any is not available
    if ($@) {
        $db->{_enable_metrics} = 0;
        undef $METRICS;
    }
}

sub _next_worker {
    my ($db) = @_;

    return unless $db->{_workers} && @{$db->{_workers}};



( run in 1.077 second using v1.01-cache-2.11-cpan-39bf76dae61 )