Acme-Parataxis
view release on metacpan or search on metacpan
lib/Acme/Parataxis.c view on Meta::CPAN
/**
* @file Parataxis.c
* @brief Low-level Green Threads (Fibers) and Hybrid Thread Pool for Perl.
*
* @section Overview
* This file implements a cooperative multitasking system (Fibers) integrated
* with a preemptive native thread pool. It allows Perl to run thousands of
* user-mode fibers that can offload blocking C-level tasks to background
* OS threads without stalling the main interpreter.
*
* @section Architecture
* - **Fibers**: The primitive unit of execution. Each fiber has its own OS context
* and a complete set of Perl interpreter stacks (Argument, Mark, Scope, Save, Mortal).
* - **Coroutines**: The execution pattern (yield/call/transfer) used by fibers to
* pass control.
* - **Thread Pool**: A fixed pool of worker threads that poll a job queue for
* blocking operations like sleep, I/O, or heavy computation.
* - **Context Switching**: The `swap_perl_state` function manually saves and restores
* the global state of the Perl interpreter (`PL_*` variables) to allow disjoint
* execution flows.
*
* @section Caveats
* Shared subroutines (CVs) with re-entrant yielding calls are handled by a
* specialized pad-clearing mechanism in `_activate_current_depths` to satisfy
* Perl's internal `AvFILLp` assertions in debug builds.
*/
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0601
#endif
#else
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600
#endif
#ifdef __APPLE__
#ifndef _DARWIN_C_SOURCE
#define _DARWIN_C_SOURCE
#endif
#endif
#endif
#define PERL_NO_GET_CONTEXT
#define NO_XSLOCKS
#include "EXTERN.h"
#include "XSUB.h"
#include "perl.h"
#ifdef _WIN32
/** @brief Export macro for Windows DLLs */
#define DLLEXPORT __declspec(dllexport)
/** @brief Handle for the underlying OS fiber context */
typedef LPVOID coro_handle_t;
/** @brief Handle for a native OS thread */
typedef HANDLE para_thread_t;
/** @brief Mutex type for queue synchronization */
typedef CRITICAL_SECTION para_mutex_t;
#define LOCK(m) EnterCriticalSection(&m)
#define UNLOCK(m) LeaveCriticalSection(&m)
#define LOCK_INIT(m) InitializeCriticalSection(&m)
#else
#include <pthread.h>
#include <sched.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <ucontext.h>
#include <unistd.h>
#if defined(__APPLE__) || defined(__FreeBSD__)
#include <sys/sysctl.h>
#include <sys/types.h>
#endif
/** @brief Export macro for Unix systems */
#define DLLEXPORT __attribute__((visibility("default")))
/** @brief Handle for the underlying OS fiber context (ucontext_t) */
typedef ucontext_t coro_handle_t;
/** @brief Handle for a native OS thread (pthread_t) */
typedef pthread_t para_thread_t;
/** @brief Mutex type for queue synchronization (pthread_mutex_t) */
typedef pthread_mutex_t para_mutex_t;
lib/Acme/Parataxis.c view on Meta::CPAN
AV * curstack; /**< The active Argument Stack (AV*) */
SSize_t stack_sp_offset; /**< Stack Pointer offset from stack base */
I32 * markstack; /**< Base of the Mark Stack (tracks list start points) */
I32 * markstack_ptr; /**< Current pointer into the Mark Stack */
I32 * markstack_max; /**< Limit of the Mark Stack */
I32 * scopestack; /**< Base of the Scope Stack (tracks block nesting) */
I32 scopestack_ix; /**< Current index in the Scope Stack */
I32 scopestack_max; /**< Limit of the Scope Stack */
ANY * savestack; /**< Base of the Save Stack (tracks local/my variables for cleanup) */
I32 savestack_ix; /**< Current index in the Save Stack */
I32 savestack_max; /**< Limit of the Save Stack */
SV ** tmps_stack; /**< Base of the Mortal Stack (tracks SVs needing refcnt decrement) */
I32 tmps_ix; /**< Current index in the Mortal Stack */
I32 tmps_floor; /**< Current floor of the Mortal Stack */
I32 tmps_max; /**< Limit of the Mortal Stack */
JMPENV * top_env; /**< Pointer to the top exception environment (eval/die buffers) */
COP * curcop; /**< Current Op Pointer (location in the source/bytecode) */
OP * op; /**< Current Operation being executed */
PAD * comppad; /**< Current lexical Pad (variable storage) */
SV ** curpad; /**< Array pointer to the current lexical Pad */
PMOP * curpm; /**< Current pattern match state */
PMOP * curpm_under; /**< Current pattern match state under */
PMOP * reg_curpm; /**< Current regex match state */
GV * defgv; /**< The $_ global */
GV * last_in_gv; /**< GV used in last <FH> */
SV * rs; /**< The $/ global */
GV * ofsgv; /**< The $, global */
SV * ors_sv; /**< The $\ global */
GV * defoutgv; /**< The default output filehandle */
HV * curstash; /**< Current package stash */
HV * defstash; /**< Default package stash */
SV * errors; /**< Outstanding queued errors */
SV * user_cv; /**< The Perl sub/coderef this fiber is running */
SV * self_ref; /**< The Acme::Parataxis Perl object wrapper */
SV * transfer_data; /**< Arguments or return values passed during yield/transfer */
int id; /**< Numeric ID of this fiber */
int finished; /**< Flag: 1 if the fiber has completed its entry_point */
int parent_id; /**< ID of the fiber that 'called' this one (asymmetric) */
int last_sender; /**< ID of the fiber that last switched control to this one */
} para_fiber_t;
/** @name Job Status Constants */
///@{
#define JOB_FREE 0 /**< Slot is available for new tasks */
#define JOB_NEW 1 /**< Task is submitted but not yet picked up by a worker */
#define JOB_BUSY 2 /**< Task is currently being processed by a worker thread */
#define JOB_DONE 3 /**< Task has completed and results are ready */
///@}
/** @name Task Type Constants */
///@{
#define TASK_SLEEP 0 /**< Sleep for N milliseconds */
#define TASK_GET_CPU 1 /**< Retrieve current core ID */
#define TASK_READ 2 /**< Wait for read-readiness on a file descriptor */
#define TASK_WRITE 3 /**< Wait for write-readiness on a file descriptor */
///@}
/**
* @union value_t
* @brief Generic container for task input/output data.
*/
typedef union {
int64_t i; /**< Integer/Pointer storage */
double d; /**< Floating point storage */
char * s; /**< String storage */
} value_t;
/**
* @struct job_t
* @brief Represents a task in the background thread pool queue.
*/
typedef struct {
int fiber_id; /**< ID of the Fiber that submitted this task */
int target_thread; /**< Index of the assigned worker thread */
int type; /**< Type of task to perform (TASK_*) */
value_t input; /**< Input data for the task */
value_t output; /**< Result data populated by the worker */
int timeout_ms; /**< Timeout duration for I/O tasks */
int status; /**< Current lifecycle state (JOB_*) */
} job_t;
// Global Registry and State
/** @brief Maximum number of concurrent fibers allowed */
#define MAX_FIBERS 1024
/** @brief Array of active fiber structures */
static para_fiber_t * fibers[MAX_FIBERS];
/** @brief The context representing the main Perl thread */
static para_fiber_t main_context;
/** @brief ID of the currently executing fiber (-1 for Main) */
static int current_fiber_id = -1;
/** @brief Size of the background job queue */
#define MAX_JOBS 1024
/** @brief Fixed-size array for background tasks */
static job_t job_slots[MAX_JOBS];
/** @brief Mutex protecting access to the job queue */
static para_mutex_t queue_lock;
#ifdef _WIN32
static CONDITION_VARIABLE queue_cond;
#else
static pthread_cond_t queue_cond;
#endif
static int threads_initialized = 0;
static int system_initialized = 0;
// Forward declarations for thread safety wrappers
#ifdef _WIN32
#define PARA_COND_WAIT(c, m) SleepConditionVariableCS(&c, &m, INFINITE)
#define PARA_COND_SIGNAL(c) WakeConditionVariable(&c)
lib/Acme/Parataxis.c view on Meta::CPAN
#ifdef _WIN32
DWORD WINAPI worker_thread(LPVOID arg) {
#else
void * worker_thread(void * arg) {
#endif
int thread_id = (int)(intptr_t)arg;
int cpu_count = get_cpu_count();
pin_to_core(thread_id % cpu_count);
while (threads_keep_running) {
int found_idx = -1;
LOCK(queue_lock);
while (threads_keep_running) {
for (int i = 0; i < MAX_JOBS; i++) {
if (job_slots[i].status == JOB_NEW) {
job_slots[i].status = JOB_BUSY;
found_idx = i;
break;
}
}
if (found_idx != -1 || !threads_keep_running)
break;
PARA_COND_WAIT(queue_cond, queue_lock);
}
UNLOCK(queue_lock);
if (found_idx != -1 && threads_keep_running) {
job_t * job = &job_slots[found_idx];
// ... processing ...
if (job->type == TASK_SLEEP) {
int ms = (int)job->input.i;
#ifdef _WIN32
Sleep(ms);
#else
usleep(ms * 1000);
#endif
job->output.i = ms;
}
else if (job->type == TASK_GET_CPU) {
int cpu = get_current_cpu();
job->output.i = cpu;
}
else if (job->type == TASK_READ || job->type == TASK_WRITE) {
fd_set fds;
FD_ZERO(&fds);
#ifdef _WIN32
SOCKET s = (SOCKET)job->input.i;
FD_SET(s, &fds);
#else
int fd = (int)job->input.i;
FD_SET(fd, &fds);
#endif
struct timeval tv;
int res;
int elapsed_ms = 0;
int timeout = job->timeout_ms > 0 ? job->timeout_ms : 5000;
while (threads_keep_running) {
tv.tv_sec = 0;
tv.tv_usec = 10000;
fd_set work_fds = fds;
if (job->type == TASK_READ)
#ifdef _WIN32
res = select(0, &work_fds, NULL, NULL, &tv);
#else
res = select(fd + 1, &work_fds, NULL, NULL, &tv);
#endif
else
#ifdef _WIN32
res = select(0, NULL, &work_fds, NULL, &tv);
#else
res = select(fd + 1, NULL, &work_fds, NULL, &tv);
#endif
if (res != 0)
break;
elapsed_ms += 10;
if (elapsed_ms >= timeout)
break;
}
job->output.i = (res > 0) ? 1 : -1;
}
LOCK(queue_lock);
job->status = JOB_DONE;
UNLOCK(queue_lock);
}
else {
#ifdef _WIN32
Sleep(1);
#else
usleep(1000);
#endif
}
}
return 0;
}
/**
* @brief Initializes the background thread pool.
*
* Automatically detects the CPU count and spawns worker threads. This function
* is called automatically by `init_system` and `submit_c_job`.
*/
DLLEXPORT void init_threads() {
dTHX;
if (threads_initialized)
return;
LOCK_INIT(queue_lock);
PARA_COND_INIT(queue_cond);
for (int i = 0; i < MAX_JOBS; i++)
job_slots[i].status = JOB_FREE;
if (max_thread_pool_size == 0) {
max_thread_pool_size = get_cpu_count();
if (max_thread_pool_size > MAX_THREADS)
max_thread_pool_size = MAX_THREADS;
}
( run in 3.576 seconds using v1.01-cache-2.11-cpan-437f7b0c052 )