DBIx-Class-Async

 view release on metacpan or  search on metacpan

lib/DBIx/Class/Async.pm  view on Meta::CPAN

        _is_connected      => 1,
        _worker_idx        => 0,
        _query_cache       => {},
        _stats             => {
            _queries       => 0,
            _errors        => 0,
            _cache_hits    => 0,
            _cache_misses  => 0,
            _deadlocks     => 0,
            _retries       => 0,
        },
        _debug             => $args{debug} || 0,
    };

    _init_metrics($async_db) if $async_db->{_enable_metrics};
    _init_workers($async_db);

    if (my $interval = $args{health_check} // HEALTH_CHECK_INTERVAL) {
        _start_health_checks($async_db, $interval);
    }

    return $async_db;
}

=head2 disconnect

Gracefully shuts down all background workers and clears timers. Always call
this before your application exits to prevent zombie processes.

    DBIx::Class::Async->disconnect($db);

Internally this performs cleanup in the correct order required by
L<IO::Async::Function>: each worker is first removed from the event loop
(C<< $loop->remove >>) and then stopped (C<< $function->stop >>). Reversing
this order causes C<remove()> to silently fail, leaving orphaned notifiers
in the loop's registry that trigger a C<SEGV> during Perl global destruction.

=cut

sub disconnect {
    my ($db) = @_;

    return unless $db && ref $db eq 'HASH';
    return unless $db->{_loop};

    # 1. Clear the health check timer
    # remove() BEFORE stop() - same ordering rule as workers below.
    if ($db->{_health_check_timer}) {
        eval { $db->{_loop}->remove($db->{_health_check_timer}) };
        eval { $db->{_health_check_timer}->stop };
        delete $db->{_health_check_timer};
    }

    # 2. Remove and stop each worker.
    #
    # CRITICAL ORDER: remove() MUST be called BEFORE stop().
    #
    # IO::Async::Function::stop() sets an internal {stopping} flag and
    # begins tearing down IPC channels. Once that flag is set, remove()
    # silently fails - the notifier stays in the loop's internal registry,
    # keeping the loop (and its SIGCHLD handler) alive past the point where
    # we expect them to be freed. During Perl global destruction the loop's
    # XS destructor then accesses already-freed memory -> SEGV.
    #
    # Calling remove() first detaches the Function and all its child
    # notifiers (Streams, Handles, Processes) cleanly, then stop() signals
    # the worker processes to exit and returns a Future we can await.
    if ($db->{_workers}) {
        my @stop_futures;
        foreach my $worker_info (@{ $db->{_workers} }) {
            my $instance = $worker_info->{instance} or next;
            eval { $db->{_loop}->remove($instance) };
            my $f = eval { $instance->stop };
            push @stop_futures, $f if $f && ref $f;
        }
        $db->{_workers} = [];

        # Await the stop Futures so worker processes begin their shutdown
        # sequence before we proceed. stop() returns a Future that resolves
        # when the IPC channels are closed (not when the OS process exits,
        # but close enough to prevent the SEGV in the common case).
        if (@stop_futures) {
            eval { $db->{_loop}->await(Future->needs_all(@stop_futures)) };
        }
    }

    # 3. Clean up global IO::Async state that outlives the loop object.
    #
    # IO::Async::Loop maintains a $ONE_TRUE_LOOP singleton and installs a
    # SIGCHLD handler when workers are forked. Both hold strong references
    # to the loop object, preventing garbage collection even after all our
    # Perl-level references are dropped. In global destruction this causes
    # a SEGV because the loop's XS destructor runs after other XS objects
    # it depends on have already been freed.
    #
    # We only do this when we own the loop (i.e. we created it internally).
    # For user-supplied loops the caller is responsible for lifecycle.
    if ($db->{_owns_loop}) {
        undef $IO::Async::Loop::ONE_TRUE_LOOP;
        $SIG{CHLD} = $db->{_old_sigchld};
    }

    # 4. Final state update
    $db->{_is_connected} = 0;

    return 1;
}

#
#
# PRIVATE METHODS

sub _call_worker {
    my ($db, $operation, @args) = @_;

    warn "[PID $$] Bridge - sending '$operation' to worker." if ASYNC_TRACE;

    # ------------------------------------------------------------------
    # Derive result_class from the payload so it is available throughout
    # this method without changing the call signature at 13 call sites.
    #

lib/DBIx/Class/Async.pm  view on Meta::CPAN

                                    _resolve_placeholders($step, \%register);

                                    # my $rs = $schema->resultset($step->{resultset});
                                    my $action = $step->{action};
                                    my $result_data;

                                    if ($action eq 'raw') {
                                        # Raw SQL bypasses the Resultset layer
                                        my $dbh = $schema->storage->dbh;
                                        $dbh->do($step->{sql}, undef, @{$step->{bind} || []});
                                        $result_data = { success => 1 };
                                    }
                                    else {
                                        # CRUD operations require a Resultset
                                        my $rs_name = $step->{resultset}
                                            or die "txn_do: action '$action' requires a 'resultset' parameter";
                                        my $rs = $schema->resultset($rs_name);

                                        if ($action eq 'create') {
                                            my $row = $rs->create($step->{data});
                                            $result_data = { id => $row->id, data => { $row->get_columns } };
                                        }
                                        elsif ($action eq 'find') {
                                            my $row = $rs->find($step->{id});
                                            die "txn_do: record not found" unless $row;
                                            $result_data = { id => $row->id, data => { $row->get_columns } };
                                        }
                                        elsif ($action eq 'update') {
                                            my $row = $rs->find($step->{id});
                                            die "txn_do: record not found for update" unless $row;
                                            $row->update($step->{data});
                                            $result_data = { success => 1, id => $row->id };
                                        }
                                    }

                                    if ($step->{name} && $result_data->{id}) {
                                        $register{ '$' . $step->{name} . '.id' } = $result_data->{id};
                                    }
                                    push @step_results, $result_data;
                                }
                                return { results => \@step_results, success => 1 };
                            });
                        };

                        return $@ ? { error => "Transaction failed: $@", success => 0 }
                                  : $txn_result;
                    }
                    elsif ($operation eq 'txn_begin') {
                        $schema->storage->txn_begin;
                        return { success => 1 };
                    }
                    elsif ($operation eq 'txn_commit') {
                        $schema->storage->txn_commit;
                        return { success => 1 };
                    }
                    elsif ($operation eq 'txn_rollback') {
                        $schema->storage->txn_rollback;
                        return { success => 1 };
                    }
                    elsif ($operation eq 'ping') {
                        my $alive = eval { $schema->storage->dbh->do("SELECT 1") };
                        return { success => ($alive ? 1 : 0), status => "pong" };
                    }
                    else {
                        die "Unknown operation: $operation";
                    }
                }
                catch {
                    warn "[PID $$] Worker execution error: $_"
                        if ASYNC_TRACE;
                    return { error => "$_", success => 0 };
                };

                my $safe_result = $deflator->($result);
                if (ASYNC_TRACE) {
                    warn "[PID $$] Worker returning result type: " . ref($safe_result);
                    warn "[PID $$] Worker returning: $safe_result";
                }
                return $safe_result;
            },
            max_workers => 1,
        );

        $db->{_loop}->add($worker);

        push @{$db->{_workers}}, {
            instance => $worker,
            healthy  => 1,
            pid      => undef,
        };
    }
}

sub _init_metrics {
    my $db = shift;

    # Try to load Metrics::Any
    eval {
        require Metrics::Any;
        Metrics::Any->import('$METRICS');

        # Initialise metrics
        $METRICS->make_counter('db_async_queries_total');
        $METRICS->make_counter('db_async_cache_hits_total');
        $METRICS->make_counter('db_async_cache_misses_total');
        $METRICS->make_histogram('db_async_query_duration_seconds');
        $METRICS->make_gauge('db_async_workers_active');

    };

    # Silently ignore if Metrics::Any is not available
    if ($@) {
        $db->{_enable_metrics} = 0;
        undef $METRICS;
    }
}

sub _next_worker {
    my ($db) = @_;

    return unless $db->{_workers} && @{$db->{_workers}};



( run in 0.873 second using v1.01-cache-2.11-cpan-df04353d9ac )