Acme-Parataxis
view release on metacpan or search on metacpan
lib/Acme/Parataxis.c view on Meta::CPAN
*/
int get_current_cpu() {
#ifdef _WIN32
return GetCurrentProcessorNumber();
#elif defined(__linux__)
return sched_getcpu();
#else
return -1;
#endif
}
/**
* @brief Detects the number of logical cores available on the system.
*
* @return int CPU count (minimum 1).
*/
int get_cpu_count() {
#ifdef _WIN32
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
int count = sysinfo.dwNumberOfProcessors;
return (count > 0) ? count : 1;
#elif defined(__APPLE__) || defined(__FreeBSD__)
int nm[2];
size_t len = 4;
uint32_t count;
nm[0] = CTL_HW;
nm[1] = HW_NCPU;
sysctl(nm, 2, &count, &len, NULL, 0);
return (count > 0) ? (int)count : 1;
#else
long count = sysconf(_SC_NPROCESSORS_ONLN);
return (count > 0) ? (int)count : 1;
#endif
}
/**
* @struct para_fiber_t
* @brief The complete execution context of a Perl Fiber.
*
* This structure encapsulates both the OS-level register state (via context)
* and the entire internal state of the Perl interpreter required to pause
* and resume execution of Perl code.
*/
typedef struct {
coro_handle_t context; /**< OS-specific context handle */
#ifndef _WIN32
void * stack_p; /**< Pointer to dynamically allocated fiber stack (Unix only) */
size_t stack_sz; /**< Size of the allocated stack (Unix only) */
#endif
/*
* Perl Interpreter State Pointers.
* These must be saved and restored during every context switch.
*/
PERL_SI * si; /**< Current Stack Info (tracks recursion and eval frames) */
AV * curstack; /**< The active Argument Stack (AV*) */
SSize_t stack_sp_offset; /**< Stack Pointer offset from stack base */
I32 * markstack; /**< Base of the Mark Stack (tracks list start points) */
I32 * markstack_ptr; /**< Current pointer into the Mark Stack */
I32 * markstack_max; /**< Limit of the Mark Stack */
I32 * scopestack; /**< Base of the Scope Stack (tracks block nesting) */
I32 scopestack_ix; /**< Current index in the Scope Stack */
I32 scopestack_max; /**< Limit of the Scope Stack */
ANY * savestack; /**< Base of the Save Stack (tracks local/my variables for cleanup) */
I32 savestack_ix; /**< Current index in the Save Stack */
I32 savestack_max; /**< Limit of the Save Stack */
SV ** tmps_stack; /**< Base of the Mortal Stack (tracks SVs needing refcnt decrement) */
I32 tmps_ix; /**< Current index in the Mortal Stack */
I32 tmps_floor; /**< Current floor of the Mortal Stack */
I32 tmps_max; /**< Limit of the Mortal Stack */
JMPENV * top_env; /**< Pointer to the top exception environment (eval/die buffers) */
COP * curcop; /**< Current Op Pointer (location in the source/bytecode) */
OP * op; /**< Current Operation being executed */
PAD * comppad; /**< Current lexical Pad (variable storage) */
SV ** curpad; /**< Array pointer to the current lexical Pad */
PMOP * curpm; /**< Current pattern match state */
PMOP * curpm_under; /**< Current pattern match state under */
PMOP * reg_curpm; /**< Current regex match state */
GV * defgv; /**< The $_ global */
GV * last_in_gv; /**< GV used in last <FH> */
SV * rs; /**< The $/ global */
GV * ofsgv; /**< The $, global */
SV * ors_sv; /**< The $\ global */
GV * defoutgv; /**< The default output filehandle */
HV * curstash; /**< Current package stash */
HV * defstash; /**< Default package stash */
SV * errors; /**< Outstanding queued errors */
SV * user_cv; /**< The Perl sub/coderef this fiber is running */
SV * self_ref; /**< The Acme::Parataxis Perl object wrapper */
SV * transfer_data; /**< Arguments or return values passed during yield/transfer */
int id; /**< Numeric ID of this fiber */
int finished; /**< Flag: 1 if the fiber has completed its entry_point */
int parent_id; /**< ID of the fiber that 'called' this one (asymmetric) */
int last_sender; /**< ID of the fiber that last switched control to this one */
} para_fiber_t;
/** @name Job Status Constants */
///@{
#define JOB_FREE 0 /**< Slot is available for new tasks */
#define JOB_NEW 1 /**< Task is submitted but not yet picked up by a worker */
#define JOB_BUSY 2 /**< Task is currently being processed by a worker thread */
#define JOB_DONE 3 /**< Task has completed and results are ready */
///@}
/** @name Task Type Constants */
///@{
#define TASK_SLEEP 0 /**< Sleep for N milliseconds */
#define TASK_GET_CPU 1 /**< Retrieve current core ID */
#define TASK_READ 2 /**< Wait for read-readiness on a file descriptor */
#define TASK_WRITE 3 /**< Wait for write-readiness on a file descriptor */
lib/Acme/Parataxis.c view on Meta::CPAN
if (job_slots[i].status == JOB_DONE) {
job_idx = i;
break;
}
}
UNLOCK(queue_lock);
return job_idx;
}
/**
* @brief Retrieves the result of a completed job as a Perl SV.
*
* @param idx The job index in the queue.
* @return SV* A mortalized Perl SV containing the result (IV).
*/
DLLEXPORT SV * get_job_result(int idx) {
dTHX;
if (idx < 0 || idx >= MAX_JOBS)
return &PL_sv_undef;
SV * res = &PL_sv_undef;
LOCK(queue_lock);
if (job_slots[idx].status == JOB_DONE || job_slots[idx].status == JOB_BUSY) {
if (job_slots[idx].type == TASK_SLEEP || job_slots[idx].type == TASK_GET_CPU ||
job_slots[idx].type == TASK_READ || job_slots[idx].type == TASK_WRITE) {
res = newSViv(job_slots[idx].output.i);
sv_2mortal(res);
}
}
UNLOCK(queue_lock);
return res;
}
/**
* @brief Gets the ID of the Fiber that submitted a specific job.
*
* @param idx Job index.
* @return int Fiber ID.
*/
DLLEXPORT int get_job_coro_id(int idx) {
if (idx < 0 || idx >= MAX_JOBS)
return -1;
return job_slots[idx].fiber_id;
}
/**
* @brief Frees a job slot in the queue after the result has been retrieved.
*
* @param idx Job index.
*/
DLLEXPORT void free_job_slot(int idx) {
if (idx < 0 || idx >= MAX_JOBS)
return;
LOCK(queue_lock);
job_slots[idx].status = JOB_FREE;
UNLOCK(queue_lock);
}
/**
* @brief Resets the call depth of a Perl CV to zero.
*
* Used to ensure that a newly created fiber starts its coderef with a
* clean execution state.
*
* @param cv_ref SV reference to the coderef.
*/
DLLEXPORT void force_depth_zero(SV * cv_ref) {
dTHX;
CV * cv = NULL;
if (SvROK(cv_ref))
cv = (CV *)SvRV(cv_ref);
else if (SvTYPE(cv_ref) == SVt_PVCV)
cv = (CV *)cv_ref;
if (cv && SvTYPE((SV *)cv) == SVt_PVCV)
((XPVCV *)MUTABLE_PTR(SvANY(cv)))->xcv_depth = 0;
}
/** @brief Returns the ID of the currently executing fiber. */
DLLEXPORT int get_current_parataxis_id() { return current_fiber_id; }
/** @brief Returns the OS-level thread ID of the main interpreter thread. */
DLLEXPORT int get_os_thread_id_export() { return get_os_thread_id(); }
/** @brief Returns the number of worker threads currently running in the pool. */
DLLEXPORT int get_thread_pool_size() { return current_thread_count; }
/** @brief Returns the maximum number of worker threads allowed in the pool. */
DLLEXPORT int get_max_thread_pool_size() { return max_thread_pool_size; }
/** @brief Sets the threshold for automatic yield-based preemption. */
DLLEXPORT void set_preempt_threshold(int64_t threshold) { preempt_threshold = threshold; }
/** @brief Returns the current count towards the preemption threshold. */
DLLEXPORT int64_t get_preempt_count() { return preempt_count; }
/**
* @brief Checks if automatic preemption should occur.
*
* Increments the internal counter and triggers a `coro_yield` if the
* threshold is reached.
*
* @return SV* Result of the yield, or undef if no yield occurred.
*/
DLLEXPORT SV * maybe_yield() {
dTHX;
preempt_count++;
if (preempt_threshold > 0 && preempt_count >= preempt_threshold) {
preempt_count = 0;
return coro_yield(&PL_sv_undef);
}
return &PL_sv_undef;
}
/**
* @brief Restores subroutine call depths and cleans argument pads.
*
* This function iterates the context stack and restores CvDEPTH for
* active subroutines in two passes to safely handle recursive calls.
*
* Pass 1: Restores CvDEPTH for all active frames.
* Pass 2: Surgicaly cleans Slot 0 of the *next* pad depth for each CV.
*
* @param to The fiber being resumed.
*/
static void _activate_current_depths(pTHX_ para_fiber_t * to) {
PERL_SI * si = to->si;
lib/Acme/Parataxis.c view on Meta::CPAN
swap_perl_state(from, to);
#ifdef _WIN32
if (target_id == -1)
SwitchToFiber(main_fiber_handle);
else
SwitchToFiber(to->context);
#else
swapcontext(&from->context, &to->context);
#endif
}
/**
* @brief Yields execution back to the caller or the main thread.
*
* Suspends the current fiber and returns a value to the context that
* last resumed or called this fiber.
*
* @param ret_val The Perl SV to "return" to the caller.
* @return SV* The value passed in when this fiber is eventually resumed.
*/
DLLEXPORT SV * coro_yield(SV * ret_val) {
dTHX;
if (current_fiber_id == -1)
return &PL_sv_undef;
para_fiber_t * self = fibers[current_fiber_id];
int parent = self->parent_id;
if (parent != -1 && (!fibers[parent] || fibers[parent]->finished))
parent = self->last_sender;
else if (parent == -1)
parent = self->last_sender;
if (parent >= 0 && (!fibers[parent] || fibers[parent]->finished))
parent = -1;
para_fiber_t * caller = (parent == -1) ? &main_context : fibers[parent];
/* Pass return value to caller */
if (caller->transfer_data != ret_val) {
if (caller->transfer_data && caller->transfer_data != &PL_sv_undef)
SvREFCNT_dec(caller->transfer_data);
caller->transfer_data = ret_val;
if (ret_val && ret_val != &PL_sv_undef)
SvREFCNT_inc(ret_val);
}
perform_switch(parent);
/* Retrieve value passed back during resume */
SV * res = self->transfer_data;
self->transfer_data = &PL_sv_undef;
if (res && res != &PL_sv_undef)
sv_2mortal(res);
return res;
}
/**
* @brief Entry point function for all new fibers.
*
* Sets up the Perl environment (ENTER/SAVETMPS), unpacks arguments,
* calls the user coderef, handles results/errors, and manages the
* fiber's completion lifecycle.
*
* @param c Pointer to the fiber context being started.
*/
static void entry_point(para_fiber_t * c) {
dTHX;
ENTER;
SAVETMPS;
dSP;
PUSHMARK(SP);
/* Unpack arguments passed during coro_call */
if (c->transfer_data && SvROK(c->transfer_data) && SvTYPE(SvRV(c->transfer_data)) == SVt_PVAV) {
AV * args = (AV *)SvRV(c->transfer_data);
I32 len = av_top_index(args) + 1;
for (I32 i = 0; i < len; i++) {
SV ** svp = av_fetch(args, i, 0);
if (svp)
XPUSHs(*svp);
}
}
PUTBACK;
/* Execute the Perl sub */
int count = call_sv(c->user_cv, G_SCALAR | G_EVAL);
SPAGAIN;
SV * ret_val = &PL_sv_undef;
if (count == 1)
ret_val = POPs;
PUTBACK;
c->finished = true;
/* Cleanup transfer data and store result */
if (c->transfer_data && c->transfer_data != &PL_sv_undef) {
SvREFCNT_dec(c->transfer_data);
c->transfer_data = &PL_sv_undef;
}
if (ret_val && ret_val != &PL_sv_undef) {
SvREFCNT_inc(ret_val);
c->transfer_data = ret_val;
}
/* Update the Perl-level Acme::Parataxis object */
if (c->self_ref && SvROK(c->self_ref)) {
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(c->self_ref);
if (SvTRUE(ERRSV)) {
XPUSHs(ERRSV);
PUTBACK;
call_method("set_error", G_DISCARD);
}
else {
XPUSHs(ret_val);
PUTBACK;
call_method("set_result", G_DISCARD);
}
FREETMPS;
LEAVE;
( run in 0.622 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )