Acme-Parataxis

 view release on metacpan or  search on metacpan

lib/Acme/Parataxis.c  view on Meta::CPAN

/**
 * @file Parataxis.c
 * @brief Low-level Green Threads (Fibers) and Hybrid Thread Pool for Perl.
 *
 * @section Overview
 * This file implements a cooperative multitasking system (Fibers) integrated
 * with a preemptive native thread pool. It allows Perl to run thousands of
 * user-mode fibers that can offload blocking C-level tasks to background
 * OS threads without stalling the main interpreter.
 *
 * @section Architecture
 * - **Fibers**: The primitive unit of execution. Each fiber has its own OS context
 *   and a complete set of Perl interpreter stacks (Argument, Mark, Scope, Save, Mortal).
 * - **Coroutines**: The execution pattern (yield/call/transfer) used by fibers to
 *   pass control.
 * - **Thread Pool**: A fixed pool of worker threads that poll a job queue for
 *   blocking operations like sleep, I/O, or heavy computation.
 * - **Context Switching**: The `swap_perl_state` function manually saves and restores
 *   the global state of the Perl interpreter (`PL_*` variables) to allow disjoint
 *   execution flows.
 *
 * @section Caveats
 * Shared subroutines (CVs) with re-entrant yielding calls are handled by a
 * specialized pad-clearing mechanism in `_activate_current_depths` to satisfy
 * Perl's internal `AvFILLp` assertions in debug builds.
 */

#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0601
#endif
#else
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600
#endif
#ifdef __APPLE__
#ifndef _DARWIN_C_SOURCE
#define _DARWIN_C_SOURCE
#endif
#endif
#endif

#define PERL_NO_GET_CONTEXT
#define NO_XSLOCKS
#include "EXTERN.h"
#include "XSUB.h"
#include "perl.h"

#ifdef _WIN32
/** @brief Export macro for Windows DLLs */
#define DLLEXPORT __declspec(dllexport)
/** @brief Handle for the underlying OS fiber context */
typedef LPVOID coro_handle_t;
/** @brief Handle for a native OS thread */
typedef HANDLE para_thread_t;
/** @brief Mutex type for queue synchronization */
typedef CRITICAL_SECTION para_mutex_t;
#define LOCK(m) EnterCriticalSection(&m)
#define UNLOCK(m) LeaveCriticalSection(&m)
#define LOCK_INIT(m) InitializeCriticalSection(&m)
#else
#include <pthread.h>
#include <sched.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <ucontext.h>
#include <unistd.h>
#if defined(__APPLE__) || defined(__FreeBSD__)
#include <sys/sysctl.h>
#include <sys/types.h>
#endif
/** @brief Export macro for Unix systems */
#define DLLEXPORT __attribute__((visibility("default")))
/** @brief Handle for the underlying OS fiber context (ucontext_t) */
typedef ucontext_t coro_handle_t;
/** @brief Handle for a native OS thread (pthread_t) */
typedef pthread_t para_thread_t;
/** @brief Mutex type for queue synchronization (pthread_mutex_t) */
typedef pthread_mutex_t para_mutex_t;
#define LOCK(m) pthread_mutex_lock(&m)
#define UNLOCK(m) pthread_mutex_unlock(&m)
#define LOCK_INIT(m) pthread_mutex_init(&m, NULL)
#endif

#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>

// Forward declarations
DLLEXPORT SV * coro_yield(SV * ret_val);
DLLEXPORT SV * coro_transfer(int fiber_id, SV * args);
DLLEXPORT void destroy_coro(int fiber_id);

/**
 * @brief Get the Operating System's unique Thread ID.
 *
 * Useful for debugging to prove that background tasks are running on
 * different OS threads than the main Perl interpreter.
 *
 * @return int The TID (Windows) or LWP ID (Linux/BSD/macOS).
 */
int get_os_thread_id() {
#ifdef _WIN32
    return (int)GetCurrentThreadId();
#elif defined(__APPLE__)
    uint64_t tid;
    pthread_threadid_np(NULL, &tid);
    return (int)tid;
#elif defined(SYS_gettid)
    return (int)syscall(SYS_gettid);
#else
    return (int)(intptr_t)pthread_self();
#endif
}

/**
 * @brief Pin the current thread to a specific CPU core.
 *
 * Used by the Thread Pool to ensure worker threads are distributed
 * across available hardware cores for maximum parallelism.
 *
 * @param core_id The zero-based index of the CPU core.
 */
void pin_to_core(int core_id) {
#ifdef _WIN32
    DWORD_PTR mask = (1ULL << core_id);
    SetThreadAffinityMask(GetCurrentThread(), mask);
#elif defined(__linux__)
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(core_id, &cpuset);
    pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
#else
    (void)core_id; /* Not supported on macOS/BSD standard APIs */
#endif
}

/**
 * @brief Get the index of the CPU core currently executing this thread.
 *
 * @return int Core ID (0..N) or -1 if unsupported.
 */
int get_current_cpu() {
#ifdef _WIN32
    return GetCurrentProcessorNumber();
#elif defined(__linux__)
    return sched_getcpu();
#else
    return -1;
#endif
}

/**
 * @brief Detects the number of logical cores available on the system.
 *
 * @return int CPU count (minimum 1).
 */

lib/Acme/Parataxis.c  view on Meta::CPAN

    I32 tmps_max;     /**< Limit of the Mortal Stack */

    JMPENV * top_env;   /**< Pointer to the top exception environment (eval/die buffers) */
    COP * curcop;       /**< Current Op Pointer (location in the source/bytecode) */
    OP * op;            /**< Current Operation being executed */
    PAD * comppad;      /**< Current lexical Pad (variable storage) */
    SV ** curpad;       /**< Array pointer to the current lexical Pad */
    PMOP * curpm;       /**< Current pattern match state */
    PMOP * curpm_under; /**< Current pattern match state under */
    PMOP * reg_curpm;   /**< Current regex match state */

    GV * defgv;      /**< The $_ global */
    GV * last_in_gv; /**< GV used in last <FH> */
    SV * rs;         /**< The $/ global */
    GV * ofsgv;      /**< The $, global */
    SV * ors_sv;     /**< The $\ global */
    GV * defoutgv;   /**< The default output filehandle */
    HV * curstash;   /**< Current package stash */
    HV * defstash;   /**< Default package stash */
    SV * errors;     /**< Outstanding queued errors */

    SV * user_cv;  /**< The Perl sub/coderef this fiber is running */
    SV * self_ref; /**< The Acme::Parataxis Perl object wrapper */

    SV * transfer_data; /**< Arguments or return values passed during yield/transfer */

    int id;          /**< Numeric ID of this fiber */
    int finished;    /**< Flag: 1 if the fiber has completed its entry_point */
    int parent_id;   /**< ID of the fiber that 'called' this one (asymmetric) */
    int last_sender; /**< ID of the fiber that last switched control to this one */
} para_fiber_t;

/** @name Job Status Constants */
///@{
#define JOB_FREE 0 /**< Slot is available for new tasks */
#define JOB_NEW 1  /**< Task is submitted but not yet picked up by a worker */
#define JOB_BUSY 2 /**< Task is currently being processed by a worker thread */
#define JOB_DONE 3 /**< Task has completed and results are ready */
///@}

/** @name Task Type Constants */
///@{
#define TASK_SLEEP 0   /**< Sleep for N milliseconds */
#define TASK_GET_CPU 1 /**< Retrieve current core ID */
#define TASK_READ 2    /**< Wait for read-readiness on a file descriptor */
#define TASK_WRITE 3   /**< Wait for write-readiness on a file descriptor */
///@}

/**
 * @union value_t
 * @brief Generic container for task input/output data.
 */
typedef union {
    int64_t i; /**< Integer/Pointer storage */
    double d;  /**< Floating point storage */
    char * s;  /**< String storage */
} value_t;

/**
 * @struct job_t
 * @brief Represents a task in the background thread pool queue.
 */
typedef struct {
    int fiber_id;      /**< ID of the Fiber that submitted this task */
    int target_thread; /**< Index of the assigned worker thread */
    int type;          /**< Type of task to perform (TASK_*) */
    value_t input;     /**< Input data for the task */
    value_t output;    /**< Result data populated by the worker */
    int timeout_ms;    /**< Timeout duration for I/O tasks */
    int status;        /**< Current lifecycle state (JOB_*) */
} job_t;

// Global Registry and State

/** @brief Maximum number of concurrent fibers allowed */
#define MAX_FIBERS 1024
/** @brief Array of active fiber structures */
static para_fiber_t * fibers[MAX_FIBERS];
/** @brief The context representing the main Perl thread */
static para_fiber_t main_context;
/** @brief ID of the currently executing fiber (-1 for Main) */
static int current_fiber_id = -1;

/** @brief Size of the background job queue */
#define MAX_JOBS 1024
/** @brief Fixed-size array for background tasks */
static job_t job_slots[MAX_JOBS];
/** @brief Mutex protecting access to the job queue */
static para_mutex_t queue_lock;

#ifdef _WIN32
static CONDITION_VARIABLE queue_cond;
#else
static pthread_cond_t queue_cond;
#endif

static int threads_initialized = 0;
static int system_initialized = 0;

// Forward declarations for thread safety wrappers
#ifdef _WIN32
#define PARA_COND_WAIT(c, m) SleepConditionVariableCS(&c, &m, INFINITE)
#define PARA_COND_SIGNAL(c) WakeConditionVariable(&c)
#define PARA_COND_BROADCAST(c) WakeAllConditionVariable(&c)
#define PARA_COND_INIT(c) InitializeConditionVariable(&c)
#else
#define PARA_COND_WAIT(c, m) pthread_cond_wait(&c, &m)
#define PARA_COND_SIGNAL(c) pthread_cond_signal(&c)
#define PARA_COND_BROADCAST(c) pthread_cond_broadcast(&c)
#define PARA_COND_INIT(c) pthread_cond_init(&c, NULL)
#endif

/** @brief Threshold for automatic preemption (0 to disable) */
static long long preempt_threshold = 0;
/** @brief Count of operations since last preemption yield */
static long long preempt_count = 0;

/** @brief Maximum worker threads allowed in the pool */
#define MAX_THREADS 64
/** @brief Native OS handles for pool threads */
static para_thread_t thread_handles[MAX_THREADS];
/** @brief Maximum allowed threads in the pool */
static int max_thread_pool_size = 0;
/** @brief Number of currently running worker threads */
static int current_thread_count = 0;
/** @brief Flag to signal worker threads to terminate */
static volatile int threads_keep_running = 1;

#ifdef _WIN32
/** @brief Windows-only handle for the main thread converted to fiber */
static void * main_fiber_handle = NULL;
#endif

/** @brief Sets the maximum number of worker threads allowed in the pool. */
DLLEXPORT void set_max_threads(int max) {
    if (max > 0 && max <= MAX_THREADS)
        max_thread_pool_size = max;
}

/** @brief Forward declaration of worker_thread */
#ifdef _WIN32
DWORD WINAPI worker_thread(LPVOID arg);
#else
void * worker_thread(void * arg);
#endif

lib/Acme/Parataxis.c  view on Meta::CPAN

            }
            else if (job->type == TASK_READ || job->type == TASK_WRITE) {
                fd_set fds;
                FD_ZERO(&fds);
#ifdef _WIN32
                SOCKET s = (SOCKET)job->input.i;
                FD_SET(s, &fds);
#else
                int fd = (int)job->input.i;
                FD_SET(fd, &fds);
#endif
                struct timeval tv;
                int res;
                int elapsed_ms = 0;
                int timeout = job->timeout_ms > 0 ? job->timeout_ms : 5000;

                while (threads_keep_running) {
                    tv.tv_sec = 0;
                    tv.tv_usec = 10000;

                    fd_set work_fds = fds;
                    if (job->type == TASK_READ)
#ifdef _WIN32
                        res = select(0, &work_fds, NULL, NULL, &tv);
#else
                        res = select(fd + 1, &work_fds, NULL, NULL, &tv);
#endif
                    else
#ifdef _WIN32
                        res = select(0, NULL, &work_fds, NULL, &tv);
#else
                        res = select(fd + 1, NULL, &work_fds, NULL, &tv);
#endif

                    if (res != 0)
                        break;

                    elapsed_ms += 10;
                    if (elapsed_ms >= timeout)
                        break;
                }
                job->output.i = (res > 0) ? 1 : -1;
            }

            LOCK(queue_lock);
            job->status = JOB_DONE;
            UNLOCK(queue_lock);
        }
        else {
#ifdef _WIN32
            Sleep(1);
#else
            usleep(1000);
#endif
        }
    }
    return 0;
}

/**
 * @brief Initializes the background thread pool.
 *
 * Automatically detects the CPU count and spawns worker threads. This function
 * is called automatically by `init_system` and `submit_c_job`.
 */
DLLEXPORT void init_threads() {
    dTHX;
    if (threads_initialized)
        return;
    LOCK_INIT(queue_lock);
    PARA_COND_INIT(queue_cond);
    for (int i = 0; i < MAX_JOBS; i++)
        job_slots[i].status = JOB_FREE;

    if (max_thread_pool_size == 0) {
        max_thread_pool_size = get_cpu_count();
        if (max_thread_pool_size > MAX_THREADS)
            max_thread_pool_size = MAX_THREADS;
    }

    /* Start with a small "seed" pool of 2 threads */
    _spawn_workers(2);

    threads_initialized = 1;
}

/**
 * @brief Submits a C-level task to the background pool.
 *
 * @param type The task type constant (TASK_*).
 * @param arg Input integer or pointer data.
 * @param timeout_ms Timeout for I/O operations.
 * @return int The index of the submitted job, or -1 if the queue is full.
 */
DLLEXPORT int submit_c_job(int type, int64_t arg, int timeout_ms) {
    if (!threads_initialized)
        init_threads();
    int idx = -1;
    LOCK(queue_lock);

    /* Dynamic Scaling: If we have pending jobs and space in the pool, grow! */
    int pending_count = 0;
    for (int i = 0; i < MAX_JOBS; i++)
        if (job_slots[i].status == JOB_NEW)
            pending_count++;
    if (pending_count > 0 && current_thread_count < max_thread_pool_size)
        _spawn_workers(1); /* Grow by 1 on demand */

    for (int i = 0; i < MAX_JOBS; i++) {
        if (job_slots[i].status == JOB_FREE) {
            idx = i;
            break;
        }
    }
    if (idx != -1) {
        job_slots[idx].fiber_id = current_fiber_id;
        job_slots[idx].type = type;
        job_slots[idx].input.i = arg;
        job_slots[idx].timeout_ms = timeout_ms;
        job_slots[idx].status = JOB_NEW;
        PARA_COND_SIGNAL(queue_cond);
    }
    UNLOCK(queue_lock);
    return idx;
}

/**
 * @brief Polls the queue for any completed background jobs.
 *
 * @return int Index of a finished job, or -1 if none are ready.
 */
DLLEXPORT int check_for_completion() {
    if (!threads_initialized)
        init_threads();
    int job_idx = -1;
    LOCK(queue_lock);
    for (int i = 0; i < MAX_JOBS; i++) {
        if (job_slots[i].status == JOB_DONE) {
            job_idx = i;
            break;
        }
    }
    UNLOCK(queue_lock);
    return job_idx;
}

/**
 * @brief Retrieves the result of a completed job as a Perl SV.
 *
 * @param idx The job index in the queue.
 * @return SV* A mortalized Perl SV containing the result (IV).
 */
DLLEXPORT SV * get_job_result(int idx) {
    dTHX;
    if (idx < 0 || idx >= MAX_JOBS)
        return &PL_sv_undef;
    SV * res = &PL_sv_undef;
    LOCK(queue_lock);
    if (job_slots[idx].status == JOB_DONE || job_slots[idx].status == JOB_BUSY) {
        if (job_slots[idx].type == TASK_SLEEP || job_slots[idx].type == TASK_GET_CPU ||
            job_slots[idx].type == TASK_READ || job_slots[idx].type == TASK_WRITE) {
            res = newSViv(job_slots[idx].output.i);
            sv_2mortal(res);
        }
    }
    UNLOCK(queue_lock);
    return res;
}

/**
 * @brief Gets the ID of the Fiber that submitted a specific job.
 *
 * @param idx Job index.
 * @return int Fiber ID.
 */
DLLEXPORT int get_job_coro_id(int idx) {
    if (idx < 0 || idx >= MAX_JOBS)
        return -1;
    return job_slots[idx].fiber_id;
}

/**
 * @brief Frees a job slot in the queue after the result has been retrieved.
 *
 * @param idx Job index.
 */
DLLEXPORT void free_job_slot(int idx) {
    if (idx < 0 || idx >= MAX_JOBS)



( run in 0.905 second using v1.01-cache-2.11-cpan-f56aa216473 )