Acme-Parataxis
view release on metacpan or search on metacpan
lib/Acme/Parataxis.c view on Meta::CPAN
*/
typedef struct {
coro_handle_t context; /**< OS-specific context handle */
#ifndef _WIN32
void * stack_p; /**< Pointer to dynamically allocated fiber stack (Unix only) */
size_t stack_sz; /**< Size of the allocated stack (Unix only) */
#endif
/*
* Perl Interpreter State Pointers.
* These must be saved and restored during every context switch.
*/
PERL_SI * si; /**< Current Stack Info (tracks recursion and eval frames) */
AV * curstack; /**< The active Argument Stack (AV*) */
SSize_t stack_sp_offset; /**< Stack Pointer offset from stack base */
I32 * markstack; /**< Base of the Mark Stack (tracks list start points) */
I32 * markstack_ptr; /**< Current pointer into the Mark Stack */
I32 * markstack_max; /**< Limit of the Mark Stack */
I32 * scopestack; /**< Base of the Scope Stack (tracks block nesting) */
I32 scopestack_ix; /**< Current index in the Scope Stack */
I32 scopestack_max; /**< Limit of the Scope Stack */
ANY * savestack; /**< Base of the Save Stack (tracks local/my variables for cleanup) */
I32 savestack_ix; /**< Current index in the Save Stack */
I32 savestack_max; /**< Limit of the Save Stack */
SV ** tmps_stack; /**< Base of the Mortal Stack (tracks SVs needing refcnt decrement) */
I32 tmps_ix; /**< Current index in the Mortal Stack */
I32 tmps_floor; /**< Current floor of the Mortal Stack */
I32 tmps_max; /**< Limit of the Mortal Stack */
JMPENV * top_env; /**< Pointer to the top exception environment (eval/die buffers) */
COP * curcop; /**< Current Op Pointer (location in the source/bytecode) */
OP * op; /**< Current Operation being executed */
PAD * comppad; /**< Current lexical Pad (variable storage) */
SV ** curpad; /**< Array pointer to the current lexical Pad */
PMOP * curpm; /**< Current pattern match state */
PMOP * curpm_under; /**< Current pattern match state under */
PMOP * reg_curpm; /**< Current regex match state */
GV * defgv; /**< The $_ global */
GV * last_in_gv; /**< GV used in last <FH> */
SV * rs; /**< The $/ global */
GV * ofsgv; /**< The $, global */
SV * ors_sv; /**< The $\ global */
GV * defoutgv; /**< The default output filehandle */
HV * curstash; /**< Current package stash */
HV * defstash; /**< Default package stash */
SV * errors; /**< Outstanding queued errors */
SV * user_cv; /**< The Perl sub/coderef this fiber is running */
SV * self_ref; /**< The Acme::Parataxis Perl object wrapper */
SV * transfer_data; /**< Arguments or return values passed during yield/transfer */
int id; /**< Numeric ID of this fiber */
int finished; /**< Flag: 1 if the fiber has completed its entry_point */
int parent_id; /**< ID of the fiber that 'called' this one (asymmetric) */
int last_sender; /**< ID of the fiber that last switched control to this one */
} para_fiber_t;
/** @name Job Status Constants */
///@{
#define JOB_FREE 0 /**< Slot is available for new tasks */
#define JOB_NEW 1 /**< Task is submitted but not yet picked up by a worker */
#define JOB_BUSY 2 /**< Task is currently being processed by a worker thread */
#define JOB_DONE 3 /**< Task has completed and results are ready */
///@}
/** @name Task Type Constants */
///@{
#define TASK_SLEEP 0 /**< Sleep for N milliseconds */
#define TASK_GET_CPU 1 /**< Retrieve current core ID */
#define TASK_READ 2 /**< Wait for read-readiness on a file descriptor */
#define TASK_WRITE 3 /**< Wait for write-readiness on a file descriptor */
///@}
/**
* @union value_t
* @brief Generic container for task input/output data.
*/
typedef union {
int64_t i; /**< Integer/Pointer storage */
double d; /**< Floating point storage */
char * s; /**< String storage */
} value_t;
/**
* @struct job_t
* @brief Represents a task in the background thread pool queue.
*/
typedef struct {
int fiber_id; /**< ID of the Fiber that submitted this task */
int target_thread; /**< Index of the assigned worker thread */
int type; /**< Type of task to perform (TASK_*) */
value_t input; /**< Input data for the task */
value_t output; /**< Result data populated by the worker */
int timeout_ms; /**< Timeout duration for I/O tasks */
int status; /**< Current lifecycle state (JOB_*) */
} job_t;
// Global Registry and State
/** @brief Maximum number of concurrent fibers allowed */
#define MAX_FIBERS 1024
/** @brief Array of active fiber structures */
static para_fiber_t * fibers[MAX_FIBERS];
/** @brief The context representing the main Perl thread */
static para_fiber_t main_context;
/** @brief ID of the currently executing fiber (-1 for Main) */
static int current_fiber_id = -1;
/** @brief Size of the background job queue */
#define MAX_JOBS 1024
/** @brief Fixed-size array for background tasks */
static job_t job_slots[MAX_JOBS];
/** @brief Mutex protecting access to the job queue */
static para_mutex_t queue_lock;
lib/Acme/Parataxis.c view on Meta::CPAN
main_context.defoutgv = PL_defoutgv;
main_context.curstash = PL_curstash;
main_context.defstash = PL_defstash;
main_context.errors = PL_errors;
system_initialized = 1;
#ifdef _WIN32
/* Convert the main thread into a fiber so it can be switched out */
if (!main_fiber_handle) {
main_fiber_handle = ConvertThreadToFiber(NULL);
if (!main_fiber_handle) {
if (GetLastError() == ERROR_ALREADY_FIBER)
main_fiber_handle = GetCurrentFiber();
}
}
#endif
init_threads();
return 0;
}
/**
* @brief Performs the low-level OS context switch.
*
* Saves the Perl state and then uses OS primitives (SwitchToFiber or
* swapcontext) to change execution flow.
*
* @param target_id ID of the target fiber (-1 for Main).
*/
void perform_switch(int target_id) {
dTHX;
if (target_id == current_fiber_id)
return;
para_fiber_t * from = (current_fiber_id == -1) ? &main_context : fibers[current_fiber_id];
para_fiber_t * to = (target_id == -1) ? &main_context : fibers[target_id];
to->last_sender = current_fiber_id;
current_fiber_id = target_id;
swap_perl_state(from, to);
#ifdef _WIN32
if (target_id == -1)
SwitchToFiber(main_fiber_handle);
else
SwitchToFiber(to->context);
#else
swapcontext(&from->context, &to->context);
#endif
}
/**
* @brief Yields execution back to the caller or the main thread.
*
* Suspends the current fiber and returns a value to the context that
* last resumed or called this fiber.
*
* @param ret_val The Perl SV to "return" to the caller.
* @return SV* The value passed in when this fiber is eventually resumed.
*/
DLLEXPORT SV * coro_yield(SV * ret_val) {
dTHX;
if (current_fiber_id == -1)
return &PL_sv_undef;
para_fiber_t * self = fibers[current_fiber_id];
int parent = self->parent_id;
if (parent != -1 && (!fibers[parent] || fibers[parent]->finished))
parent = self->last_sender;
else if (parent == -1)
parent = self->last_sender;
if (parent >= 0 && (!fibers[parent] || fibers[parent]->finished))
parent = -1;
para_fiber_t * caller = (parent == -1) ? &main_context : fibers[parent];
/* Pass return value to caller */
if (caller->transfer_data != ret_val) {
if (caller->transfer_data && caller->transfer_data != &PL_sv_undef)
SvREFCNT_dec(caller->transfer_data);
caller->transfer_data = ret_val;
if (ret_val && ret_val != &PL_sv_undef)
SvREFCNT_inc(ret_val);
}
perform_switch(parent);
/* Retrieve value passed back during resume */
SV * res = self->transfer_data;
self->transfer_data = &PL_sv_undef;
if (res && res != &PL_sv_undef)
sv_2mortal(res);
return res;
}
/**
* @brief Entry point function for all new fibers.
*
* Sets up the Perl environment (ENTER/SAVETMPS), unpacks arguments,
* calls the user coderef, handles results/errors, and manages the
* fiber's completion lifecycle.
*
* @param c Pointer to the fiber context being started.
*/
static void entry_point(para_fiber_t * c) {
dTHX;
ENTER;
SAVETMPS;
dSP;
PUSHMARK(SP);
/* Unpack arguments passed during coro_call */
if (c->transfer_data && SvROK(c->transfer_data) && SvTYPE(SvRV(c->transfer_data)) == SVt_PVAV) {
AV * args = (AV *)SvRV(c->transfer_data);
I32 len = av_top_index(args) + 1;
for (I32 i = 0; i < len; i++) {
SV ** svp = av_fetch(args, i, 0);
if (svp)
XPUSHs(*svp);
}
}
PUTBACK;
/* Execute the Perl sub */
int count = call_sv(c->user_cv, G_SCALAR | G_EVAL);
SPAGAIN;
SV * ret_val = &PL_sv_undef;
if (count == 1)
ret_val = POPs;
PUTBACK;
c->finished = true;
/* Cleanup transfer data and store result */
if (c->transfer_data && c->transfer_data != &PL_sv_undef) {
SvREFCNT_dec(c->transfer_data);
c->transfer_data = &PL_sv_undef;
}
if (ret_val && ret_val != &PL_sv_undef) {
SvREFCNT_inc(ret_val);
c->transfer_data = ret_val;
}
/* Update the Perl-level Acme::Parataxis object */
if (c->self_ref && SvROK(c->self_ref)) {
lib/Acme/Parataxis.c view on Meta::CPAN
XPUSHs(ERRSV);
PUTBACK;
call_method("set_error", G_DISCARD);
}
else {
XPUSHs(ret_val);
PUTBACK;
call_method("set_result", G_DISCARD);
}
FREETMPS;
LEAVE;
}
FREETMPS;
LEAVE;
/* Final yield back to caller */
coro_yield(c->transfer_data ? c->transfer_data : &PL_sv_undef);
/* Loop indefinitely if resumed after finish */
while (1)
coro_yield(&PL_sv_undef);
}
#ifdef _WIN32
/** @brief Windows fiber callback wrapper. */
static void WINAPI fiber_entry(void * param) { entry_point((para_fiber_t *)param); }
#else
/** @brief POSIX makecontext callback wrapper. */
static void posix_entry(int fiber_id) { entry_point(fibers[fiber_id]); }
#endif
/**
* @brief Allocates and prepares a new Fiber context.
*
* @param user_code Coderef to execute in the fiber.
* @param self_ref Acme::Parataxis object to notify on completion.
* @return int Unique ID of the new fiber, or negative on error.
*/
DLLEXPORT int create_fiber(SV * user_code, SV * self_ref) {
dTHX;
int idx = -1;
for (int i = 0; i < MAX_FIBERS; i++) {
if (fibers[i] == NULL) {
idx = i;
break;
}
}
if (idx == -1)
return -2;
para_fiber_t * c = (para_fiber_t *)malloc(sizeof(para_fiber_t));
if (!c)
return -3;
memset(c, 0, sizeof(para_fiber_t));
c->user_cv = user_code;
if (user_code && user_code != &PL_sv_undef)
SvREFCNT_inc(user_code);
c->self_ref = self_ref;
if (self_ref && self_ref != &PL_sv_undef)
SvREFCNT_inc(self_ref);
c->id = idx;
c->parent_id = -1;
c->last_sender = -1;
c->transfer_data = &PL_sv_undef;
fibers[idx] = c;
/* Initialize Perl stacks */
init_perl_stacks(c);
#ifdef _WIN32
c->context = CreateFiber(0, fiber_entry, c);
#else
c->stack_sz = 512 * 1024; // 512KB is plenty for Perl fibers
if (posix_memalign(&c->stack_p, 16, c->stack_sz) != 0) {
destroy_coro(idx);
return -3;
}
getcontext(&c->context);
c->context.uc_stack.ss_sp = c->stack_p;
c->context.uc_stack.ss_size = c->stack_sz;
c->context.uc_link = &main_context.context;
makecontext(&c->context, (void (*)())posix_entry, 1, c->id);
#endif
return idx;
}
/**
* @brief Resumes a fiber (asymmetric call).
*
* Suspends the caller and switches execution to the specified fiber.
* Sets the caller as the 'parent' for future yields.
*
* @param fiber_id Fiber ID to call.
* @param args Perl SV (usually arrayref) to pass as arguments to the fiber.
* @return SV* Result yielded by the fiber.
*/
DLLEXPORT SV * coro_call(int fiber_id, SV * args) {
dTHX;
if (fiber_id < 0 || fiber_id >= MAX_FIBERS || !fibers[fiber_id] || fibers[fiber_id]->finished)
return &PL_sv_undef;
if (fibers[fiber_id]->transfer_data != args) {
if (fibers[fiber_id]->transfer_data && fibers[fiber_id]->transfer_data != &PL_sv_undef)
SvREFCNT_dec(fibers[fiber_id]->transfer_data);
fibers[fiber_id]->transfer_data = args;
if (args && args != &PL_sv_undef)
SvREFCNT_inc(args);
}
fibers[fiber_id]->parent_id = current_fiber_id;
perform_switch(fiber_id);
if (fibers[fiber_id] && fibers[fiber_id]->finished) {
if (fibers[fiber_id]->transfer_data && fibers[fiber_id]->transfer_data != &PL_sv_undef) {
SvREFCNT_dec(fibers[fiber_id]->transfer_data);
fibers[fiber_id]->transfer_data = &PL_sv_undef;
}
}
para_fiber_t * me = (current_fiber_id == -1) ? &main_context : fibers[current_fiber_id];
SV * res = me->transfer_data;
me->transfer_data = &PL_sv_undef;
if (res && res != &PL_sv_undef)
sv_2mortal(res);
return res;
}
/**
* @brief Transfers control directly to another fiber (symmetric).
*
* Suspends the current fiber and switches directly to the target. No
* parent/child relationship is established.
*
* @param target_id Fiber ID to transfer to.
* @param args Arguments to pass to the target.
* @return SV* Data eventually transferred back to this fiber.
*/
DLLEXPORT SV * coro_transfer(int target_id, SV * args) {
dTHX;
if (target_id < -1 || (target_id >= 0 && (target_id >= MAX_FIBERS || !fibers[target_id])))
return &PL_sv_undef;
if (target_id >= 0 && fibers[target_id]->finished)
return &PL_sv_undef;
para_fiber_t * target = (target_id == -1) ? &main_context : fibers[target_id];
if (target->transfer_data != args) {
if (target->transfer_data && target->transfer_data != &PL_sv_undef)
SvREFCNT_dec(target->transfer_data);
target->transfer_data = args;
if (args && args != &PL_sv_undef)
SvREFCNT_inc(args);
}
perform_switch(target_id);
if (target_id >= 0 && fibers[target_id] && fibers[target_id]->finished) {
if (fibers[target_id]->transfer_data && fibers[target_id]->transfer_data != &PL_sv_undef) {
SvREFCNT_dec(fibers[target_id]->transfer_data);
fibers[target_id]->transfer_data = &PL_sv_undef;
}
}
para_fiber_t * me = (current_fiber_id == -1) ? &main_context : fibers[current_fiber_id];
SV * res = me->transfer_data;
me->transfer_data = &PL_sv_undef;
if (res && res != &PL_sv_undef)
sv_2mortal(res);
return res;
}
/** @brief Returns 1 if the fiber has finished execution. */
DLLEXPORT int is_finished(int fiber_id) {
if (fiber_id < 0)
return 0;
return (fibers[fiber_id] && fibers[fiber_id]->finished) ? 1 : 0;
}
/** @brief Internal helper to reset subroutine depth for cleanup. */
static void recursive_depth_reset(pTHX_ CV * cv) {
if (!cv || SvTYPE((SV *)cv) != SVt_PVCV)
return;
if (CvDEPTH(cv) > 0)
CvDEPTH(cv) = 0;
}
/**
* @brief Clears active pads in the fiber stack.
*
* Internal helper used during fiber destruction to ensure all active lexical
* scopes are unwound and their variables freed.
*
* @param si The Stack Info structure of the fiber.
*/
static void _clear_pads_in_stack(pTHX_ PERL_SI * si) {
if (!si || !si->si_cxstack)
return;
( run in 0.685 second using v1.01-cache-2.11-cpan-2398b32b56e )