Acme-Parataxis

 view release on metacpan or  search on metacpan

lib/Acme/Parataxis.c  view on Meta::CPAN

/**
 * @file Parataxis.c
 * @brief Low-level Green Threads (Fibers) and Hybrid Thread Pool for Perl.
 *
 * @section Overview
 * This file implements a cooperative multitasking system (Fibers) integrated
 * with a preemptive native thread pool. It allows Perl to run thousands of
 * user-mode fibers that can offload blocking C-level tasks to background
 * OS threads without stalling the main interpreter.
 *
 * @section Architecture
 * - **Fibers**: The primitive unit of execution. Each fiber has its own OS context
 *   and a complete set of Perl interpreter stacks (Argument, Mark, Scope, Save, Mortal).
 * - **Coroutines**: The execution pattern (yield/call/transfer) used by fibers to
 *   pass control.
 * - **Thread Pool**: A fixed pool of worker threads that poll a job queue for
 *   blocking operations like sleep, I/O, or heavy computation.
 * - **Context Switching**: The `swap_perl_state` function manually saves and restores
 *   the global state of the Perl interpreter (`PL_*` variables) to allow disjoint
 *   execution flows.
 *
 * @section Caveats
 * Shared subroutines (CVs) with re-entrant yielding calls are handled by a
 * specialized pad-clearing mechanism in `_activate_current_depths` to satisfy
 * Perl's internal `AvFILLp` assertions in debug builds.
 */

#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0601
#endif
#else
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600
#endif
#ifdef __APPLE__
#ifndef _DARWIN_C_SOURCE
#define _DARWIN_C_SOURCE
#endif
#endif
#endif

#define PERL_NO_GET_CONTEXT
#define NO_XSLOCKS
#include "EXTERN.h"
#include "XSUB.h"
#include "perl.h"

#ifdef _WIN32
/** @brief Export macro for Windows DLLs */
#define DLLEXPORT __declspec(dllexport)
/** @brief Handle for the underlying OS fiber context */
typedef LPVOID coro_handle_t;
/** @brief Handle for a native OS thread */
typedef HANDLE para_thread_t;
/** @brief Mutex type for queue synchronization */
typedef CRITICAL_SECTION para_mutex_t;
#define LOCK(m) EnterCriticalSection(&m)
#define UNLOCK(m) LeaveCriticalSection(&m)
#define LOCK_INIT(m) InitializeCriticalSection(&m)
#else
#include <pthread.h>
#include <sched.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <ucontext.h>
#include <unistd.h>
#if defined(__APPLE__) || defined(__FreeBSD__)
#include <sys/sysctl.h>
#include <sys/types.h>
#endif
/** @brief Export macro for Unix systems */
#define DLLEXPORT __attribute__((visibility("default")))
/** @brief Handle for the underlying OS fiber context (ucontext_t) */
typedef ucontext_t coro_handle_t;
/** @brief Handle for a native OS thread (pthread_t) */

lib/Acme/Parataxis.c  view on Meta::CPAN

    nm[0] = CTL_HW;
    nm[1] = HW_NCPU;
    sysctl(nm, 2, &count, &len, NULL, 0);
    return (count > 0) ? (int)count : 1;
#else
    long count = sysconf(_SC_NPROCESSORS_ONLN);
    return (count > 0) ? (int)count : 1;
#endif
}

/**
 * @struct para_fiber_t
 * @brief The complete execution context of a Perl Fiber.
 *
 * This structure encapsulates both the OS-level register state (via context)
 * and the entire internal state of the Perl interpreter required to pause
 * and resume execution of Perl code.
 */
typedef struct {
    coro_handle_t context; /**< OS-specific context handle */

#ifndef _WIN32
    void * stack_p;  /**< Pointer to dynamically allocated fiber stack (Unix only) */
    size_t stack_sz; /**< Size of the allocated stack (Unix only) */
#endif

    /*
     * Perl Interpreter State Pointers.
     * These must be saved and restored during every context switch.
     */
    PERL_SI * si;            /**< Current Stack Info (tracks recursion and eval frames) */
    AV * curstack;           /**< The active Argument Stack (AV*) */
    SSize_t stack_sp_offset; /**< Stack Pointer offset from stack base */

    I32 * markstack;     /**< Base of the Mark Stack (tracks list start points) */
    I32 * markstack_ptr; /**< Current pointer into the Mark Stack */
    I32 * markstack_max; /**< Limit of the Mark Stack */

    I32 * scopestack;   /**< Base of the Scope Stack (tracks block nesting) */
    I32 scopestack_ix;  /**< Current index in the Scope Stack */
    I32 scopestack_max; /**< Limit of the Scope Stack */

    ANY * savestack;   /**< Base of the Save Stack (tracks local/my variables for cleanup) */
    I32 savestack_ix;  /**< Current index in the Save Stack */
    I32 savestack_max; /**< Limit of the Save Stack */

    SV ** tmps_stack; /**< Base of the Mortal Stack (tracks SVs needing refcnt decrement) */
    I32 tmps_ix;      /**< Current index in the Mortal Stack */
    I32 tmps_floor;   /**< Current floor of the Mortal Stack */
    I32 tmps_max;     /**< Limit of the Mortal Stack */

    JMPENV * top_env;   /**< Pointer to the top exception environment (eval/die buffers) */
    COP * curcop;       /**< Current Op Pointer (location in the source/bytecode) */
    OP * op;            /**< Current Operation being executed */
    PAD * comppad;      /**< Current lexical Pad (variable storage) */
    SV ** curpad;       /**< Array pointer to the current lexical Pad */
    PMOP * curpm;       /**< Current pattern match state */
    PMOP * curpm_under; /**< Current pattern match state under */
    PMOP * reg_curpm;   /**< Current regex match state */

    GV * defgv;      /**< The $_ global */
    GV * last_in_gv; /**< GV used in last <FH> */
    SV * rs;         /**< The $/ global */
    GV * ofsgv;      /**< The $, global */
    SV * ors_sv;     /**< The $\ global */
    GV * defoutgv;   /**< The default output filehandle */
    HV * curstash;   /**< Current package stash */
    HV * defstash;   /**< Default package stash */
    SV * errors;     /**< Outstanding queued errors */

    SV * user_cv;  /**< The Perl sub/coderef this fiber is running */
    SV * self_ref; /**< The Acme::Parataxis Perl object wrapper */

    SV * transfer_data; /**< Arguments or return values passed during yield/transfer */

    int id;          /**< Numeric ID of this fiber */
    int finished;    /**< Flag: 1 if the fiber has completed its entry_point */
    int parent_id;   /**< ID of the fiber that 'called' this one (asymmetric) */
    int last_sender; /**< ID of the fiber that last switched control to this one */
} para_fiber_t;

/** @name Job Status Constants */
///@{
#define JOB_FREE 0 /**< Slot is available for new tasks */
#define JOB_NEW 1  /**< Task is submitted but not yet picked up by a worker */
#define JOB_BUSY 2 /**< Task is currently being processed by a worker thread */
#define JOB_DONE 3 /**< Task has completed and results are ready */
///@}

/** @name Task Type Constants */
///@{
#define TASK_SLEEP 0   /**< Sleep for N milliseconds */
#define TASK_GET_CPU 1 /**< Retrieve current core ID */
#define TASK_READ 2    /**< Wait for read-readiness on a file descriptor */
#define TASK_WRITE 3   /**< Wait for write-readiness on a file descriptor */
///@}

/**
 * @union value_t
 * @brief Generic container for task input/output data.
 */
typedef union {
    int64_t i; /**< Integer/Pointer storage */
    double d;  /**< Floating point storage */
    char * s;  /**< String storage */
} value_t;

/**
 * @struct job_t
 * @brief Represents a task in the background thread pool queue.
 */
typedef struct {
    int fiber_id;      /**< ID of the Fiber that submitted this task */
    int target_thread; /**< Index of the assigned worker thread */
    int type;          /**< Type of task to perform (TASK_*) */
    value_t input;     /**< Input data for the task */
    value_t output;    /**< Result data populated by the worker */
    int timeout_ms;    /**< Timeout duration for I/O tasks */
    int status;        /**< Current lifecycle state (JOB_*) */
} job_t;

// Global Registry and State

/** @brief Maximum number of concurrent fibers allowed */
#define MAX_FIBERS 1024

lib/Acme/Parataxis.c  view on Meta::CPAN

    }
    return &PL_sv_undef;
}

/**
 * @brief Restores subroutine call depths and cleans argument pads.
 *
 * This function iterates the context stack and restores CvDEPTH for
 * active subroutines in two passes to safely handle recursive calls.
 *
 * Pass 1: Restores CvDEPTH for all active frames.
 * Pass 2: Surgicaly cleans Slot 0 of the *next* pad depth for each CV.
 *
 * @param to The fiber being resumed.
 */
static void _activate_current_depths(pTHX_ para_fiber_t * to) {
    PERL_SI * si = to->si;
    if (!si || !si->si_cxstack)
        return;

    /* Pass 1: Restore CvDEPTH for all active frames */
    for (I32 i = 0; i <= si->si_cxix; i++) {
        PERL_CONTEXT * cx = &(si->si_cxstack[i]);
        if (CxTYPE(cx) == CXt_SUB || CxTYPE(cx) == CXt_FORMAT) {
            CV * cv = cx->blk_sub.cv;
            if (cv && SvTYPE((SV *)cv) == SVt_PVCV)
                CvDEPTH(cv) = cx->blk_sub.olddepth + 1;
        }
    }

    /* Pass 2: Clean the landing pads for the NEXT call in each CV */
    for (I32 i = 0; i <= si->si_cxix; i++) {
        PERL_CONTEXT * cx = &(si->si_cxstack[i]);
        if (CxTYPE(cx) == CXt_SUB || CxTYPE(cx) == CXt_FORMAT) {
            CV * cv = cx->blk_sub.cv;
            if (cv && SvTYPE((SV *)cv) == SVt_PVCV) {
                PADLIST * pl = CvPADLIST(cv);
                I32 next_depth = CvDEPTH(cv) + 1;
                if (pl && next_depth <= PadlistMAX(pl)) {
                    AV * next_pad = (AV *)PadlistARRAY(pl)[next_depth];
                    if (next_pad && SvTYPE(next_pad) == SVt_PVAV) {
                        SV ** array = AvARRAY(next_pad);
                        if (array && AvMAX(next_pad) >= 0) {
                            SV * args = array[0];
                            if (args && SvTYPE(args) == SVt_PVAV) {
                                AvFILLp((AV *)args) = -1;
                                AvREAL_off((AV *)args);
                            }
                        }
                    }
                }
            }
        }
    }
}

/**
 * @brief Swaps the internal Perl Interpreter state pointers.
 *
 * This is the core of the fiber implementation. It manually saves all
 * global pointers that define the "state" of the Perl virtual machine for
 * the current context and restores them for the target context.
 *
 * @param from Context being paused.
 * @param to Context being resumed.
 */
void swap_perl_state(para_fiber_t * from, para_fiber_t * to) {
    dTHX;
    /* Save current state into 'from' context */
    from->si = PL_curstackinfo;

    // The Argument Stack (Main Perl stack)
    from->curstack = PL_curstack;
    from->stack_sp_offset = PL_stack_sp - PL_stack_base;

    // The Mark Stack (Tracks where lists begin on the argument stack)
    from->markstack = PL_markstack;
    from->markstack_ptr = PL_markstack_ptr;
    from->markstack_max = PL_markstack_max;

    // The Scope Stack (Tracks block entry/exit for cleanup)
    from->scopestack = PL_scopestack;
    from->scopestack_ix = PL_scopestack_ix;
    from->scopestack_max = PL_scopestack_max;

    // The Save Stack (Tracks 'local' variables and destructors)
    from->savestack = PL_savestack;
    from->savestack_ix = PL_savestack_ix;
    from->savestack_max = PL_savestack_max;

    // The Mortal Stack (Tracks temporary SVs that need decrementing)
    from->tmps_stack = PL_tmps_stack;
    from->tmps_ix = PL_tmps_ix;
    from->tmps_floor = PL_tmps_floor;
    from->tmps_max = PL_tmps_max;

    // Exception Environment (setjmp/longjmp buffers for eval/die)
    from->top_env = PL_top_env;

    // Op and Pad pointers (Where we are in the bytecode)
    from->curcop = PL_curcop;
    from->op = PL_op;
    from->comppad = PL_comppad;
    from->curpad = PL_curpad;
    from->curpm = PL_curpm;
    from->curpm_under = PL_curpm_under;
    from->reg_curpm = PL_reg_curpm;
    from->defgv = PL_defgv;
    from->last_in_gv = PL_last_in_gv;
    from->rs = PL_rs;
    from->ofsgv = PL_ofsgv;
    from->ors_sv = PL_ors_sv;
    from->defoutgv = PL_defoutgv;
    from->curstash = PL_curstash;
    from->defstash = PL_defstash;
    from->errors = PL_errors;

    /* Load target state from 'to' context */
    PL_curstackinfo = to->si;
    PL_curstack = to->curstack;

lib/Acme/Parataxis.c  view on Meta::CPAN

        PL_curpad = to->curpad;

    // Restore CvDEPTH and clean landing pads
    _activate_current_depths(aTHX_ to);
}

/**
 * @brief Allocates and initializes new Perl stacks for a fiber.
 *
 * Each fiber needs a complete set of independent stacks (Argument, Mark,
 * Scope, Save, Mortal) to function as a separate execution thread.
 *
 * @param c The fiber context to initialize.
 */
void init_perl_stacks(para_fiber_t * c) {
    dTHX;

    // Allocate Stack Info (SI)
    Newxz(c->si, 1, PERL_SI);
    c->si->si_cxmax = 64;

    // Use Newxz to ensure the context stack is zeroed.
    Newxz(c->si->si_cxstack, c->si->si_cxmax, PERL_CONTEXT);
    c->si->si_cxix = -1;
    c->si->si_type = PERLSI_MAIN;

    // Allocate Argument Stack (AV)
    c->curstack = newAV();
    AvREAL_off(c->curstack);  // Stacks do not 'own' their elements in the refcnt sense
    av_extend(c->curstack, 128);

    // Initialize stack with a dummy undef at index 0, matching Perl's main stack
    AvARRAY(c->curstack)[0] = &PL_sv_undef;
    AvFILLp(c->curstack) = 0;
    c->stack_sp_offset = 0;

    // Link the SI to the AV. Perl uses this linkage during stack unwinding.
    c->si->si_stack = c->curstack;

    // Allocate Control Stacks
    I32 sz = 2048; /* Recursion depth support */

    Newx(c->markstack, sz, I32);
    c->markstack_ptr = c->markstack;
    *c->markstack_ptr = 0;
    c->markstack_max = c->markstack + sz - 1;

    Newx(c->scopestack, sz, I32);
    c->scopestack_ix = 0;
    c->scopestack_max = sz;

    Newx(c->savestack, sz, ANY);
    c->savestack_ix = 0;
    c->savestack_max = sz;

    Newx(c->tmps_stack, sz, SV *);
    c->tmps_ix = -1;
    c->tmps_floor = -1;
    c->tmps_max = sz;

    // Inherit initial globals from current interpreter state
    c->curcop = PL_curcop;
    c->op = PL_op;
    c->top_env = PL_top_env;
    c->curpm = PL_curpm;
    c->curpm_under = PL_curpm_under;
    c->reg_curpm = NULL;
    c->defgv = PL_defgv;
    c->last_in_gv = PL_last_in_gv;
    c->rs = PL_rs;
    c->ofsgv = PL_ofsgv;
    c->ors_sv = PL_ors_sv;
    c->defoutgv = PL_defoutgv;
    c->curstash = PL_curstash;
    c->defstash = PL_defstash;
    c->errors = PL_errors;

    // Start with fresh pads to avoid interfering with caller.
    c->comppad = NULL;
    c->curpad = NULL;
}

/**
 * @brief Initializes the fiber system and converts the main thread.
 *
 * This function must be called once before any other fiber operations.
 * It captures the state of the main Perl interpreter thread.
 *
 * @return int 0 on success.
 */
DLLEXPORT int init_system() {
    dTHX;
    if (system_initialized)
        return 0;
    if (max_thread_pool_size == 0) {
        max_thread_pool_size = get_cpu_count();
        if (max_thread_pool_size > MAX_THREADS)
            max_thread_pool_size = MAX_THREADS;
    }
    main_context.si = PL_curstackinfo;
    main_context.transfer_data = &PL_sv_undef;
    main_context.id = -1;
    main_context.finished = 0;
    main_context.last_sender = -1;
    main_context.curpm = PL_curpm;
    main_context.curpm_under = PL_curpm_under;
    main_context.reg_curpm = PL_reg_curpm;
    main_context.defgv = PL_defgv;
    main_context.last_in_gv = PL_last_in_gv;
    main_context.rs = PL_rs;
    main_context.ofsgv = PL_ofsgv;
    main_context.ors_sv = PL_ors_sv;
    main_context.defoutgv = PL_defoutgv;
    main_context.curstash = PL_curstash;
    main_context.defstash = PL_defstash;
    main_context.errors = PL_errors;
    system_initialized = 1;
#ifdef _WIN32
    /* Convert the main thread into a fiber so it can be switched out */
    if (!main_fiber_handle) {
        main_fiber_handle = ConvertThreadToFiber(NULL);

lib/Acme/Parataxis.c  view on Meta::CPAN

    }
    if (c->self_ref && c->self_ref != &PL_sv_undef) {
        SvREFCNT_dec(c->self_ref);
        c->self_ref = NULL;
    }
    if (c->transfer_data && c->transfer_data != &PL_sv_undef) {
        SvREFCNT_dec(c->transfer_data);
        c->transfer_data = NULL;
    }

    /* Early exit if Perl is already shutting down */
    if (PL_dirty) {
#ifndef _WIN32
        if (c->stack_p)
            free(c->stack_p);
#endif
        free(c);
        return;
    }

#ifdef _WIN32
    if (c->context)
        DeleteFiber(c->context);
#else
    if (c->stack_p)
        free(c->stack_p);
#endif

    /* Safely free Perl-allocated stacks */
    if (c->si) {
        if (c->si->si_cxstack)
            Safefree(c->si->si_cxstack);
        Safefree(c->si);
    }
    if (c->curstack) {
        av_clear(c->curstack);
        SvREFCNT_dec((SV *)c->curstack);
        c->curstack = NULL;
    }
    if (c->markstack)
        Safefree(c->markstack);
    if (c->scopestack)
        Safefree(c->scopestack);
    if (c->savestack)
        Safefree(c->savestack);
    if (c->tmps_stack) {
        for (I32 i = 0; i <= c->tmps_ix; i++) {
            SV * sv = c->tmps_stack[i];
            if (sv && sv != &PL_sv_undef)
                SvREFCNT_dec(sv);
        }
        Safefree(c->tmps_stack);
    }
    free(c);
}

/**
 * @brief Global cleanup function for the fiber and thread pool system.
 *
 * Signals all worker threads to terminate and destroys all remaining
 * fibers. Should be called during global destruction or system shutdown.
 */
DLLEXPORT void cleanup() {
    dTHX;
    if (threads_initialized) {
        LOCK(queue_lock);
        threads_keep_running = 0;
        PARA_COND_BROADCAST(queue_cond);
        UNLOCK(queue_lock);

#ifdef _WIN32
        /* Wait for threads to finish and close handles */
        for (int i = 0; i < current_thread_count; i++) {
            if (thread_handles[i]) {
                WaitForSingleObject(thread_handles[i], 100);
                CloseHandle(thread_handles[i]);
                thread_handles[i] = NULL;
            }
        }
#else
        /* Give threads a moment to notice threads_keep_running = 0 */
        usleep(10000);
#endif
    }

    if (current_fiber_id != -1) {
        swap_perl_state(fibers[current_fiber_id], &main_context);
        current_fiber_id = -1;
    }
    for (int i = 0; i < MAX_FIBERS; i++)
        if (fibers[i])
            destroy_coro(i);
    if (main_context.transfer_data && main_context.transfer_data != &PL_sv_undef) {
        SvREFCNT_dec(main_context.transfer_data);
        main_context.transfer_data = &PL_sv_undef;
    }
}



( run in 0.978 second using v1.01-cache-2.11-cpan-df04353d9ac )