Acme-Parataxis

 view release on metacpan or  search on metacpan

lib/Acme/Parataxis.c  view on Meta::CPAN

/**
 * @file Parataxis.c
 * @brief Low-level Green Threads (Fibers) and Hybrid Thread Pool for Perl.
 *
 * @section Overview
 * This file implements a cooperative multitasking system (Fibers) integrated
 * with a preemptive native thread pool. It allows Perl to run thousands of
 * user-mode fibers that can offload blocking C-level tasks to background
 * OS threads without stalling the main interpreter.
 *
 * @section Architecture
 * - **Fibers**: The primitive unit of execution. Each fiber has its own OS context
 *   and a complete set of Perl interpreter stacks (Argument, Mark, Scope, Save, Mortal).
 * - **Coroutines**: The execution pattern (yield/call/transfer) used by fibers to
 *   pass control.
 * - **Thread Pool**: A fixed pool of worker threads that poll a job queue for
 *   blocking operations like sleep, I/O, or heavy computation.
 * - **Context Switching**: The `swap_perl_state` function manually saves and restores
 *   the global state of the Perl interpreter (`PL_*` variables) to allow disjoint
 *   execution flows.
 *
 * @section Caveats
 * Shared subroutines (CVs) with re-entrant yielding calls are handled by a
 * specialized pad-clearing mechanism in `_activate_current_depths` to satisfy
 * Perl's internal `AvFILLp` assertions in debug builds.
 */

#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0601
#endif
#else
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600
#endif
#ifdef __APPLE__
#ifndef _DARWIN_C_SOURCE
#define _DARWIN_C_SOURCE
#endif
#endif
#endif

#define PERL_NO_GET_CONTEXT
#define NO_XSLOCKS
#include "EXTERN.h"
#include "XSUB.h"
#include "perl.h"

#ifdef _WIN32
/** @brief Export macro for Windows DLLs */
#define DLLEXPORT __declspec(dllexport)
/** @brief Handle for the underlying OS fiber context */
typedef LPVOID coro_handle_t;
/** @brief Handle for a native OS thread */
typedef HANDLE para_thread_t;
/** @brief Mutex type for queue synchronization */
typedef CRITICAL_SECTION para_mutex_t;
#define LOCK(m) EnterCriticalSection(&m)
#define UNLOCK(m) LeaveCriticalSection(&m)
#define LOCK_INIT(m) InitializeCriticalSection(&m)
#else
#include <pthread.h>
#include <sched.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <ucontext.h>
#include <unistd.h>
#if defined(__APPLE__) || defined(__FreeBSD__)
#include <sys/sysctl.h>
#include <sys/types.h>
#endif
/** @brief Export macro for Unix systems */
#define DLLEXPORT __attribute__((visibility("default")))
/** @brief Handle for the underlying OS fiber context (ucontext_t) */
typedef ucontext_t coro_handle_t;
/** @brief Handle for a native OS thread (pthread_t) */
typedef pthread_t para_thread_t;
/** @brief Mutex type for queue synchronization (pthread_mutex_t) */
typedef pthread_mutex_t para_mutex_t;
#define LOCK(m) pthread_mutex_lock(&m)
#define UNLOCK(m) pthread_mutex_unlock(&m)
#define LOCK_INIT(m) pthread_mutex_init(&m, NULL)
#endif

#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>

// Forward declarations
DLLEXPORT SV * coro_yield(SV * ret_val);
DLLEXPORT SV * coro_transfer(int fiber_id, SV * args);
DLLEXPORT void destroy_coro(int fiber_id);

/**
 * @brief Get the Operating System's unique Thread ID.
 *
 * Useful for debugging to prove that background tasks are running on
 * different OS threads than the main Perl interpreter.
 *
 * @return int The TID (Windows) or LWP ID (Linux/BSD/macOS).
 */
int get_os_thread_id() {
#ifdef _WIN32
    return (int)GetCurrentThreadId();
#elif defined(__APPLE__)
    uint64_t tid;
    pthread_threadid_np(NULL, &tid);
    return (int)tid;
#elif defined(SYS_gettid)
    return (int)syscall(SYS_gettid);
#else
    return (int)(intptr_t)pthread_self();
#endif
}

/**
 * @brief Pin the current thread to a specific CPU core.
 *
 * Used by the Thread Pool to ensure worker threads are distributed
 * across available hardware cores for maximum parallelism.
 *
 * @param core_id The zero-based index of the CPU core.
 */
void pin_to_core(int core_id) {
#ifdef _WIN32
    DWORD_PTR mask = (1ULL << core_id);
    SetThreadAffinityMask(GetCurrentThread(), mask);
#elif defined(__linux__)
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(core_id, &cpuset);
    pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
#else
    (void)core_id; /* Not supported on macOS/BSD standard APIs */
#endif
}

/**
 * @brief Get the index of the CPU core currently executing this thread.
 *
 * @return int Core ID (0..N) or -1 if unsupported.
 */
int get_current_cpu() {
#ifdef _WIN32
    return GetCurrentProcessorNumber();
#elif defined(__linux__)
    return sched_getcpu();
#else
    return -1;
#endif
}

/**
 * @brief Detects the number of logical cores available on the system.
 *
 * @return int CPU count (minimum 1).
 */
int get_cpu_count() {
#ifdef _WIN32
    SYSTEM_INFO sysinfo;
    GetSystemInfo(&sysinfo);
    int count = sysinfo.dwNumberOfProcessors;
    return (count > 0) ? count : 1;
#elif defined(__APPLE__) || defined(__FreeBSD__)
    int nm[2];
    size_t len = 4;
    uint32_t count;
    nm[0] = CTL_HW;
    nm[1] = HW_NCPU;
    sysctl(nm, 2, &count, &len, NULL, 0);
    return (count > 0) ? (int)count : 1;
#else
    long count = sysconf(_SC_NPROCESSORS_ONLN);
    return (count > 0) ? (int)count : 1;
#endif
}

/**
 * @struct para_fiber_t
 * @brief The complete execution context of a Perl Fiber.
 *
 * This structure encapsulates both the OS-level register state (via context)
 * and the entire internal state of the Perl interpreter required to pause
 * and resume execution of Perl code.
 */
typedef struct {
    coro_handle_t context; /**< OS-specific context handle */

#ifndef _WIN32
    void * stack_p;  /**< Pointer to dynamically allocated fiber stack (Unix only) */
    size_t stack_sz; /**< Size of the allocated stack (Unix only) */
#endif

    /*
     * Perl Interpreter State Pointers.
     * These must be saved and restored during every context switch.
     */
    PERL_SI * si;            /**< Current Stack Info (tracks recursion and eval frames) */
    AV * curstack;           /**< The active Argument Stack (AV*) */
    SSize_t stack_sp_offset; /**< Stack Pointer offset from stack base */

    I32 * markstack;     /**< Base of the Mark Stack (tracks list start points) */
    I32 * markstack_ptr; /**< Current pointer into the Mark Stack */
    I32 * markstack_max; /**< Limit of the Mark Stack */

    I32 * scopestack;   /**< Base of the Scope Stack (tracks block nesting) */
    I32 scopestack_ix;  /**< Current index in the Scope Stack */
    I32 scopestack_max; /**< Limit of the Scope Stack */

    ANY * savestack;   /**< Base of the Save Stack (tracks local/my variables for cleanup) */
    I32 savestack_ix;  /**< Current index in the Save Stack */
    I32 savestack_max; /**< Limit of the Save Stack */

    SV ** tmps_stack; /**< Base of the Mortal Stack (tracks SVs needing refcnt decrement) */
    I32 tmps_ix;      /**< Current index in the Mortal Stack */
    I32 tmps_floor;   /**< Current floor of the Mortal Stack */
    I32 tmps_max;     /**< Limit of the Mortal Stack */

    JMPENV * top_env;   /**< Pointer to the top exception environment (eval/die buffers) */
    COP * curcop;       /**< Current Op Pointer (location in the source/bytecode) */
    OP * op;            /**< Current Operation being executed */
    PAD * comppad;      /**< Current lexical Pad (variable storage) */
    SV ** curpad;       /**< Array pointer to the current lexical Pad */
    PMOP * curpm;       /**< Current pattern match state */
    PMOP * curpm_under; /**< Current pattern match state under */
    PMOP * reg_curpm;   /**< Current regex match state */

    GV * defgv;      /**< The $_ global */
    GV * last_in_gv; /**< GV used in last <FH> */
    SV * rs;         /**< The $/ global */
    GV * ofsgv;      /**< The $, global */
    SV * ors_sv;     /**< The $\ global */
    GV * defoutgv;   /**< The default output filehandle */
    HV * curstash;   /**< Current package stash */
    HV * defstash;   /**< Default package stash */
    SV * errors;     /**< Outstanding queued errors */

    SV * user_cv;  /**< The Perl sub/coderef this fiber is running */
    SV * self_ref; /**< The Acme::Parataxis Perl object wrapper */

    SV * transfer_data; /**< Arguments or return values passed during yield/transfer */

    int id;          /**< Numeric ID of this fiber */
    int finished;    /**< Flag: 1 if the fiber has completed its entry_point */
    int parent_id;   /**< ID of the fiber that 'called' this one (asymmetric) */
    int last_sender; /**< ID of the fiber that last switched control to this one */
} para_fiber_t;

lib/Acme/Parataxis.c  view on Meta::CPAN

        job_slots[idx].fiber_id = current_fiber_id;
        job_slots[idx].type = type;
        job_slots[idx].input.i = arg;
        job_slots[idx].timeout_ms = timeout_ms;
        job_slots[idx].status = JOB_NEW;
        PARA_COND_SIGNAL(queue_cond);
    }
    UNLOCK(queue_lock);
    return idx;
}

/**
 * @brief Polls the queue for any completed background jobs.
 *
 * @return int Index of a finished job, or -1 if none are ready.
 */
DLLEXPORT int check_for_completion() {
    if (!threads_initialized)
        init_threads();
    int job_idx = -1;
    LOCK(queue_lock);
    for (int i = 0; i < MAX_JOBS; i++) {
        if (job_slots[i].status == JOB_DONE) {
            job_idx = i;
            break;
        }
    }
    UNLOCK(queue_lock);
    return job_idx;
}

/**
 * @brief Retrieves the result of a completed job as a Perl SV.
 *
 * @param idx The job index in the queue.
 * @return SV* A mortalized Perl SV containing the result (IV).
 */
DLLEXPORT SV * get_job_result(int idx) {
    dTHX;
    if (idx < 0 || idx >= MAX_JOBS)
        return &PL_sv_undef;
    SV * res = &PL_sv_undef;
    LOCK(queue_lock);
    if (job_slots[idx].status == JOB_DONE || job_slots[idx].status == JOB_BUSY) {
        if (job_slots[idx].type == TASK_SLEEP || job_slots[idx].type == TASK_GET_CPU ||
            job_slots[idx].type == TASK_READ || job_slots[idx].type == TASK_WRITE) {
            res = newSViv(job_slots[idx].output.i);
            sv_2mortal(res);
        }
    }
    UNLOCK(queue_lock);
    return res;
}

/**
 * @brief Gets the ID of the Fiber that submitted a specific job.
 *
 * @param idx Job index.
 * @return int Fiber ID.
 */
DLLEXPORT int get_job_coro_id(int idx) {
    if (idx < 0 || idx >= MAX_JOBS)
        return -1;
    return job_slots[idx].fiber_id;
}

/**
 * @brief Frees a job slot in the queue after the result has been retrieved.
 *
 * @param idx Job index.
 */
DLLEXPORT void free_job_slot(int idx) {
    if (idx < 0 || idx >= MAX_JOBS)
        return;
    LOCK(queue_lock);
    job_slots[idx].status = JOB_FREE;
    UNLOCK(queue_lock);
}

/**
 * @brief Resets the call depth of a Perl CV to zero.
 *
 * Used to ensure that a newly created fiber starts its coderef with a
 * clean execution state.
 *
 * @param cv_ref SV reference to the coderef.
 */
DLLEXPORT void force_depth_zero(SV * cv_ref) {
    dTHX;
    CV * cv = NULL;
    if (SvROK(cv_ref))
        cv = (CV *)SvRV(cv_ref);
    else if (SvTYPE(cv_ref) == SVt_PVCV)
        cv = (CV *)cv_ref;
    if (cv && SvTYPE((SV *)cv) == SVt_PVCV)
        ((XPVCV *)MUTABLE_PTR(SvANY(cv)))->xcv_depth = 0;
}

/** @brief Returns the ID of the currently executing fiber. */
DLLEXPORT int get_current_parataxis_id() { return current_fiber_id; }
/** @brief Returns the OS-level thread ID of the main interpreter thread. */
DLLEXPORT int get_os_thread_id_export() { return get_os_thread_id(); }
/** @brief Returns the number of worker threads currently running in the pool. */
DLLEXPORT int get_thread_pool_size() { return current_thread_count; }
/** @brief Returns the maximum number of worker threads allowed in the pool. */
DLLEXPORT int get_max_thread_pool_size() { return max_thread_pool_size; }

/** @brief Sets the threshold for automatic yield-based preemption. */
DLLEXPORT void set_preempt_threshold(int64_t threshold) { preempt_threshold = threshold; }
/** @brief Returns the current count towards the preemption threshold. */
DLLEXPORT int64_t get_preempt_count() { return preempt_count; }

/**
 * @brief Checks if automatic preemption should occur.
 *
 * Increments the internal counter and triggers a `coro_yield` if the
 * threshold is reached.
 *
 * @return SV* Result of the yield, or undef if no yield occurred.
 */
DLLEXPORT SV * maybe_yield() {
    dTHX;
    preempt_count++;
    if (preempt_threshold > 0 && preempt_count >= preempt_threshold) {
        preempt_count = 0;
        return coro_yield(&PL_sv_undef);
    }
    return &PL_sv_undef;
}

/**
 * @brief Restores subroutine call depths and cleans argument pads.
 *
 * This function iterates the context stack and restores CvDEPTH for
 * active subroutines in two passes to safely handle recursive calls.
 *
 * Pass 1: Restores CvDEPTH for all active frames.
 * Pass 2: Surgicaly cleans Slot 0 of the *next* pad depth for each CV.
 *
 * @param to The fiber being resumed.
 */
static void _activate_current_depths(pTHX_ para_fiber_t * to) {
    PERL_SI * si = to->si;
    if (!si || !si->si_cxstack)
        return;

    /* Pass 1: Restore CvDEPTH for all active frames */
    for (I32 i = 0; i <= si->si_cxix; i++) {
        PERL_CONTEXT * cx = &(si->si_cxstack[i]);
        if (CxTYPE(cx) == CXt_SUB || CxTYPE(cx) == CXt_FORMAT) {
            CV * cv = cx->blk_sub.cv;
            if (cv && SvTYPE((SV *)cv) == SVt_PVCV)
                CvDEPTH(cv) = cx->blk_sub.olddepth + 1;
        }
    }

    /* Pass 2: Clean the landing pads for the NEXT call in each CV */
    for (I32 i = 0; i <= si->si_cxix; i++) {
        PERL_CONTEXT * cx = &(si->si_cxstack[i]);
        if (CxTYPE(cx) == CXt_SUB || CxTYPE(cx) == CXt_FORMAT) {
            CV * cv = cx->blk_sub.cv;
            if (cv && SvTYPE((SV *)cv) == SVt_PVCV) {
                PADLIST * pl = CvPADLIST(cv);
                I32 next_depth = CvDEPTH(cv) + 1;
                if (pl && next_depth <= PadlistMAX(pl)) {
                    AV * next_pad = (AV *)PadlistARRAY(pl)[next_depth];
                    if (next_pad && SvTYPE(next_pad) == SVt_PVAV) {
                        SV ** array = AvARRAY(next_pad);
                        if (array && AvMAX(next_pad) >= 0) {
                            SV * args = array[0];
                            if (args && SvTYPE(args) == SVt_PVAV) {
                                AvFILLp((AV *)args) = -1;
                                AvREAL_off((AV *)args);
                            }
                        }
                    }
                }
            }
        }
    }
}

/**
 * @brief Swaps the internal Perl Interpreter state pointers.
 *
 * This is the core of the fiber implementation. It manually saves all

lib/Acme/Parataxis.c  view on Meta::CPAN

    main_context.defgv = PL_defgv;
    main_context.last_in_gv = PL_last_in_gv;
    main_context.rs = PL_rs;
    main_context.ofsgv = PL_ofsgv;
    main_context.ors_sv = PL_ors_sv;
    main_context.defoutgv = PL_defoutgv;
    main_context.curstash = PL_curstash;
    main_context.defstash = PL_defstash;
    main_context.errors = PL_errors;
    system_initialized = 1;
#ifdef _WIN32
    /* Convert the main thread into a fiber so it can be switched out */
    if (!main_fiber_handle) {
        main_fiber_handle = ConvertThreadToFiber(NULL);
        if (!main_fiber_handle) {
            if (GetLastError() == ERROR_ALREADY_FIBER)
                main_fiber_handle = GetCurrentFiber();
        }
    }
#endif
    init_threads();
    return 0;
}

/**
 * @brief Performs the low-level OS context switch.
 *
 * Saves the Perl state and then uses OS primitives (SwitchToFiber or
 * swapcontext) to change execution flow.
 *
 * @param target_id ID of the target fiber (-1 for Main).
 */
void perform_switch(int target_id) {
    dTHX;
    if (target_id == current_fiber_id)
        return;
    para_fiber_t * from = (current_fiber_id == -1) ? &main_context : fibers[current_fiber_id];
    para_fiber_t * to = (target_id == -1) ? &main_context : fibers[target_id];
    to->last_sender = current_fiber_id;
    current_fiber_id = target_id;
    swap_perl_state(from, to);
#ifdef _WIN32
    if (target_id == -1)
        SwitchToFiber(main_fiber_handle);
    else
        SwitchToFiber(to->context);
#else
    swapcontext(&from->context, &to->context);
#endif
}

/**
 * @brief Yields execution back to the caller or the main thread.
 *
 * Suspends the current fiber and returns a value to the context that
 * last resumed or called this fiber.
 *
 * @param ret_val The Perl SV to "return" to the caller.
 * @return SV* The value passed in when this fiber is eventually resumed.
 */
DLLEXPORT SV * coro_yield(SV * ret_val) {
    dTHX;
    if (current_fiber_id == -1)
        return &PL_sv_undef;
    para_fiber_t * self = fibers[current_fiber_id];
    int parent = self->parent_id;
    if (parent != -1 && (!fibers[parent] || fibers[parent]->finished))
        parent = self->last_sender;
    else if (parent == -1)
        parent = self->last_sender;
    if (parent >= 0 && (!fibers[parent] || fibers[parent]->finished))
        parent = -1;
    para_fiber_t * caller = (parent == -1) ? &main_context : fibers[parent];

    /* Pass return value to caller */
    if (caller->transfer_data != ret_val) {
        if (caller->transfer_data && caller->transfer_data != &PL_sv_undef)
            SvREFCNT_dec(caller->transfer_data);
        caller->transfer_data = ret_val;
        if (ret_val && ret_val != &PL_sv_undef)
            SvREFCNT_inc(ret_val);
    }

    perform_switch(parent);

    /* Retrieve value passed back during resume */
    SV * res = self->transfer_data;
    self->transfer_data = &PL_sv_undef;
    if (res && res != &PL_sv_undef)
        sv_2mortal(res);
    return res;
}

/**
 * @brief Entry point function for all new fibers.
 *
 * Sets up the Perl environment (ENTER/SAVETMPS), unpacks arguments,
 * calls the user coderef, handles results/errors, and manages the
 * fiber's completion lifecycle.
 *
 * @param c Pointer to the fiber context being started.
 */
static void entry_point(para_fiber_t * c) {
    dTHX;
    ENTER;
    SAVETMPS;
    dSP;
    PUSHMARK(SP);

    /* Unpack arguments passed during coro_call */
    if (c->transfer_data && SvROK(c->transfer_data) && SvTYPE(SvRV(c->transfer_data)) == SVt_PVAV) {
        AV * args = (AV *)SvRV(c->transfer_data);
        I32 len = av_top_index(args) + 1;
        for (I32 i = 0; i < len; i++) {
            SV ** svp = av_fetch(args, i, 0);
            if (svp)
                XPUSHs(*svp);
        }
    }
    PUTBACK;

    /* Execute the Perl sub */
    int count = call_sv(c->user_cv, G_SCALAR | G_EVAL);

    SPAGAIN;
    SV * ret_val = &PL_sv_undef;
    if (count == 1)
        ret_val = POPs;
    PUTBACK;

    c->finished = true;

    /* Cleanup transfer data and store result */
    if (c->transfer_data && c->transfer_data != &PL_sv_undef) {
        SvREFCNT_dec(c->transfer_data);
        c->transfer_data = &PL_sv_undef;
    }
    if (ret_val && ret_val != &PL_sv_undef) {
        SvREFCNT_inc(ret_val);
        c->transfer_data = ret_val;
    }

    /* Update the Perl-level Acme::Parataxis object */
    if (c->self_ref && SvROK(c->self_ref)) {
        dSP;
        ENTER;
        SAVETMPS;
        PUSHMARK(SP);
        XPUSHs(c->self_ref);
        if (SvTRUE(ERRSV)) {
            XPUSHs(ERRSV);
            PUTBACK;
            call_method("set_error", G_DISCARD);
        }
        else {
            XPUSHs(ret_val);
            PUTBACK;
            call_method("set_result", G_DISCARD);
        }
        FREETMPS;
        LEAVE;
    }
    FREETMPS;
    LEAVE;

    /* Final yield back to caller */
    coro_yield(c->transfer_data ? c->transfer_data : &PL_sv_undef);

    /* Loop indefinitely if resumed after finish */
    while (1)
        coro_yield(&PL_sv_undef);
}

#ifdef _WIN32
/** @brief Windows fiber callback wrapper. */
static void WINAPI fiber_entry(void * param) { entry_point((para_fiber_t *)param); }
#else
/** @brief POSIX makecontext callback wrapper. */
static void posix_entry(int fiber_id) { entry_point(fibers[fiber_id]); }
#endif

/**
 * @brief Allocates and prepares a new Fiber context.
 *
 * @param user_code Coderef to execute in the fiber.
 * @param self_ref Acme::Parataxis object to notify on completion.
 * @return int Unique ID of the new fiber, or negative on error.
 */
DLLEXPORT int create_fiber(SV * user_code, SV * self_ref) {
    dTHX;
    int idx = -1;
    for (int i = 0; i < MAX_FIBERS; i++) {
        if (fibers[i] == NULL) {
            idx = i;
            break;
        }
    }
    if (idx == -1)
        return -2;
    para_fiber_t * c = (para_fiber_t *)malloc(sizeof(para_fiber_t));
    if (!c)
        return -3;
    memset(c, 0, sizeof(para_fiber_t));
    c->user_cv = user_code;
    if (user_code && user_code != &PL_sv_undef)
        SvREFCNT_inc(user_code);
    c->self_ref = self_ref;
    if (self_ref && self_ref != &PL_sv_undef)
        SvREFCNT_inc(self_ref);
    c->id = idx;
    c->parent_id = -1;
    c->last_sender = -1;
    c->transfer_data = &PL_sv_undef;
    fibers[idx] = c;

    /* Initialize Perl stacks */
    init_perl_stacks(c);

#ifdef _WIN32
    c->context = CreateFiber(0, fiber_entry, c);
#else
    c->stack_sz = 512 * 1024;  // 512KB is plenty for Perl fibers
    if (posix_memalign(&c->stack_p, 16, c->stack_sz) != 0) {
        destroy_coro(idx);
        return -3;
    }
    getcontext(&c->context);
    c->context.uc_stack.ss_sp = c->stack_p;
    c->context.uc_stack.ss_size = c->stack_sz;
    c->context.uc_link = &main_context.context;
    makecontext(&c->context, (void (*)())posix_entry, 1, c->id);
#endif
    return idx;
}

/**
 * @brief Resumes a fiber (asymmetric call).
 *
 * Suspends the caller and switches execution to the specified fiber.
 * Sets the caller as the 'parent' for future yields.
 *
 * @param fiber_id Fiber ID to call.
 * @param args Perl SV (usually arrayref) to pass as arguments to the fiber.
 * @return SV* Result yielded by the fiber.
 */
DLLEXPORT SV * coro_call(int fiber_id, SV * args) {
    dTHX;
    if (fiber_id < 0 || fiber_id >= MAX_FIBERS || !fibers[fiber_id] || fibers[fiber_id]->finished)
        return &PL_sv_undef;
    if (fibers[fiber_id]->transfer_data != args) {
        if (fibers[fiber_id]->transfer_data && fibers[fiber_id]->transfer_data != &PL_sv_undef)
            SvREFCNT_dec(fibers[fiber_id]->transfer_data);
        fibers[fiber_id]->transfer_data = args;
        if (args && args != &PL_sv_undef)
            SvREFCNT_inc(args);
    }
    fibers[fiber_id]->parent_id = current_fiber_id;
    perform_switch(fiber_id);
    if (fibers[fiber_id] && fibers[fiber_id]->finished) {
        if (fibers[fiber_id]->transfer_data && fibers[fiber_id]->transfer_data != &PL_sv_undef) {
            SvREFCNT_dec(fibers[fiber_id]->transfer_data);
            fibers[fiber_id]->transfer_data = &PL_sv_undef;
        }
    }
    para_fiber_t * me = (current_fiber_id == -1) ? &main_context : fibers[current_fiber_id];
    SV * res = me->transfer_data;
    me->transfer_data = &PL_sv_undef;
    if (res && res != &PL_sv_undef)
        sv_2mortal(res);
    return res;
}

/**
 * @brief Transfers control directly to another fiber (symmetric).
 *
 * Suspends the current fiber and switches directly to the target. No
 * parent/child relationship is established.
 *
 * @param target_id Fiber ID to transfer to.
 * @param args Arguments to pass to the target.
 * @return SV* Data eventually transferred back to this fiber.
 */
DLLEXPORT SV * coro_transfer(int target_id, SV * args) {
    dTHX;
    if (target_id < -1 || (target_id >= 0 && (target_id >= MAX_FIBERS || !fibers[target_id])))
        return &PL_sv_undef;
    if (target_id >= 0 && fibers[target_id]->finished)
        return &PL_sv_undef;
    para_fiber_t * target = (target_id == -1) ? &main_context : fibers[target_id];
    if (target->transfer_data != args) {
        if (target->transfer_data && target->transfer_data != &PL_sv_undef)
            SvREFCNT_dec(target->transfer_data);
        target->transfer_data = args;
        if (args && args != &PL_sv_undef)
            SvREFCNT_inc(args);
    }
    perform_switch(target_id);
    if (target_id >= 0 && fibers[target_id] && fibers[target_id]->finished) {
        if (fibers[target_id]->transfer_data && fibers[target_id]->transfer_data != &PL_sv_undef) {
            SvREFCNT_dec(fibers[target_id]->transfer_data);
            fibers[target_id]->transfer_data = &PL_sv_undef;
        }
    }
    para_fiber_t * me = (current_fiber_id == -1) ? &main_context : fibers[current_fiber_id];
    SV * res = me->transfer_data;
    me->transfer_data = &PL_sv_undef;
    if (res && res != &PL_sv_undef)
        sv_2mortal(res);
    return res;
}

/** @brief Returns 1 if the fiber has finished execution. */
DLLEXPORT int is_finished(int fiber_id) {
    if (fiber_id < 0)
        return 0;
    return (fibers[fiber_id] && fibers[fiber_id]->finished) ? 1 : 0;
}

/** @brief Internal helper to reset subroutine depth for cleanup. */
static void recursive_depth_reset(pTHX_ CV * cv) {
    if (!cv || SvTYPE((SV *)cv) != SVt_PVCV)
        return;
    if (CvDEPTH(cv) > 0)
        CvDEPTH(cv) = 0;
}

/**
 * @brief Clears active pads in the fiber stack.
 *
 * Internal helper used during fiber destruction to ensure all active lexical
 * scopes are unwound and their variables freed.
 *
 * @param si The Stack Info structure of the fiber.
 */
static void _clear_pads_in_stack(pTHX_ PERL_SI * si) {
    if (!si || !si->si_cxstack)
        return;
    for (I32 i = si->si_cxix; i >= 0; i--) {
        PERL_CONTEXT * cx = &(si->si_cxstack[i]);
        if (CxTYPE(cx) == CXt_SUB || CxTYPE(cx) == CXt_FORMAT) {
            CV * cv = cx->blk_sub.cv;
            if (cv && SvTYPE((SV *)cv) == SVt_PVCV) {
                PADLIST * padlist = CvPADLIST(cv);
                if (padlist) {
                    I32 depth = cx->blk_sub.olddepth + 1;
                    if (depth > 0 && depth <= PadlistMAX(padlist)) {
                        AV * pad = (AV *)PadlistARRAY(padlist)[depth];
                        if (pad && SvTYPE((SV *)pad) == SVt_PVAV)
                            av_clear(pad);
                    }
                }
                if (CvDEPTH(cv) > 0)
                    CvDEPTH(cv)--;
            }
        }
    }
}

/**
 * @brief Destroys a fiber and releases all associated memory.
 *
 * This includes freeing OS-level stacks and context, but also carefully
 * decrementing refcounts of Perl SVs stored within the fiber.
 *
 * @param fiber_id Fiber ID to destroy.
 */
DLLEXPORT void destroy_coro(int fiber_id) {
    dTHX;
    if (fiber_id < 0 || fiber_id >= MAX_FIBERS)
        return;
    para_fiber_t * c = fibers[fiber_id];
    if (!c)
        return;
    fibers[fiber_id] = NULL;

    /* Unwind pads */
    if (c->si)
        _clear_pads_in_stack(aTHX_ c->si);

    /* Release Perl references */
    if (c->user_cv && c->user_cv != &PL_sv_undef) {
        SvREFCNT_dec(c->user_cv);
        c->user_cv = NULL;
    }
    if (c->self_ref && c->self_ref != &PL_sv_undef) {
        SvREFCNT_dec(c->self_ref);
        c->self_ref = NULL;
    }
    if (c->transfer_data && c->transfer_data != &PL_sv_undef) {
        SvREFCNT_dec(c->transfer_data);
        c->transfer_data = NULL;
    }

    /* Early exit if Perl is already shutting down */
    if (PL_dirty) {
#ifndef _WIN32
        if (c->stack_p)
            free(c->stack_p);
#endif
        free(c);
        return;
    }

#ifdef _WIN32
    if (c->context)
        DeleteFiber(c->context);
#else
    if (c->stack_p)
        free(c->stack_p);
#endif

    /* Safely free Perl-allocated stacks */
    if (c->si) {
        if (c->si->si_cxstack)
            Safefree(c->si->si_cxstack);
        Safefree(c->si);
    }
    if (c->curstack) {
        av_clear(c->curstack);
        SvREFCNT_dec((SV *)c->curstack);
        c->curstack = NULL;
    }
    if (c->markstack)
        Safefree(c->markstack);
    if (c->scopestack)
        Safefree(c->scopestack);
    if (c->savestack)
        Safefree(c->savestack);
    if (c->tmps_stack) {
        for (I32 i = 0; i <= c->tmps_ix; i++) {
            SV * sv = c->tmps_stack[i];
            if (sv && sv != &PL_sv_undef)
                SvREFCNT_dec(sv);
        }
        Safefree(c->tmps_stack);
    }
    free(c);
}

/**
 * @brief Global cleanup function for the fiber and thread pool system.
 *
 * Signals all worker threads to terminate and destroys all remaining
 * fibers. Should be called during global destruction or system shutdown.
 */
DLLEXPORT void cleanup() {
    dTHX;
    if (threads_initialized) {
        LOCK(queue_lock);
        threads_keep_running = 0;
        PARA_COND_BROADCAST(queue_cond);
        UNLOCK(queue_lock);

#ifdef _WIN32
        /* Wait for threads to finish and close handles */
        for (int i = 0; i < current_thread_count; i++) {
            if (thread_handles[i]) {
                WaitForSingleObject(thread_handles[i], 100);
                CloseHandle(thread_handles[i]);
                thread_handles[i] = NULL;
            }
        }
#else
        /* Give threads a moment to notice threads_keep_running = 0 */
        usleep(10000);
#endif
    }

    if (current_fiber_id != -1) {
        swap_perl_state(fibers[current_fiber_id], &main_context);
        current_fiber_id = -1;
    }
    for (int i = 0; i < MAX_FIBERS; i++)
        if (fibers[i])
            destroy_coro(i);
    if (main_context.transfer_data && main_context.transfer_data != &PL_sv_undef) {
        SvREFCNT_dec(main_context.transfer_data);
        main_context.transfer_data = &PL_sv_undef;
    }
}



( run in 0.685 second using v1.01-cache-2.11-cpan-2398b32b56e )