Acme-Parataxis
view release on metacpan or search on metacpan
lib/Acme/Parataxis.c view on Meta::CPAN
/**
* @brief Get the index of the CPU core currently executing this thread.
*
* @return int Core ID (0..N) or -1 if unsupported.
*/
int get_current_cpu() {
#ifdef _WIN32
return GetCurrentProcessorNumber();
#elif defined(__linux__)
return sched_getcpu();
#else
return -1;
#endif
}
/**
* @brief Detects the number of logical cores available on the system.
*
* @return int CPU count (minimum 1).
*/
int get_cpu_count() {
#ifdef _WIN32
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
int count = sysinfo.dwNumberOfProcessors;
return (count > 0) ? count : 1;
#elif defined(__APPLE__) || defined(__FreeBSD__)
int nm[2];
size_t len = 4;
uint32_t count;
nm[0] = CTL_HW;
nm[1] = HW_NCPU;
sysctl(nm, 2, &count, &len, NULL, 0);
return (count > 0) ? (int)count : 1;
#else
long count = sysconf(_SC_NPROCESSORS_ONLN);
return (count > 0) ? (int)count : 1;
#endif
}
/**
* @struct para_fiber_t
* @brief The complete execution context of a Perl Fiber.
*
* This structure encapsulates both the OS-level register state (via context)
* and the entire internal state of the Perl interpreter required to pause
* and resume execution of Perl code.
*/
typedef struct {
coro_handle_t context; /**< OS-specific context handle */
#ifndef _WIN32
void * stack_p; /**< Pointer to dynamically allocated fiber stack (Unix only) */
size_t stack_sz; /**< Size of the allocated stack (Unix only) */
#endif
/*
* Perl Interpreter State Pointers.
* These must be saved and restored during every context switch.
*/
PERL_SI * si; /**< Current Stack Info (tracks recursion and eval frames) */
AV * curstack; /**< The active Argument Stack (AV*) */
SSize_t stack_sp_offset; /**< Stack Pointer offset from stack base */
I32 * markstack; /**< Base of the Mark Stack (tracks list start points) */
I32 * markstack_ptr; /**< Current pointer into the Mark Stack */
I32 * markstack_max; /**< Limit of the Mark Stack */
I32 * scopestack; /**< Base of the Scope Stack (tracks block nesting) */
I32 scopestack_ix; /**< Current index in the Scope Stack */
I32 scopestack_max; /**< Limit of the Scope Stack */
ANY * savestack; /**< Base of the Save Stack (tracks local/my variables for cleanup) */
I32 savestack_ix; /**< Current index in the Save Stack */
I32 savestack_max; /**< Limit of the Save Stack */
SV ** tmps_stack; /**< Base of the Mortal Stack (tracks SVs needing refcnt decrement) */
I32 tmps_ix; /**< Current index in the Mortal Stack */
I32 tmps_floor; /**< Current floor of the Mortal Stack */
I32 tmps_max; /**< Limit of the Mortal Stack */
JMPENV * top_env; /**< Pointer to the top exception environment (eval/die buffers) */
COP * curcop; /**< Current Op Pointer (location in the source/bytecode) */
OP * op; /**< Current Operation being executed */
PAD * comppad; /**< Current lexical Pad (variable storage) */
SV ** curpad; /**< Array pointer to the current lexical Pad */
PMOP * curpm; /**< Current pattern match state */
PMOP * curpm_under; /**< Current pattern match state under */
PMOP * reg_curpm; /**< Current regex match state */
GV * defgv; /**< The $_ global */
GV * last_in_gv; /**< GV used in last <FH> */
SV * rs; /**< The $/ global */
GV * ofsgv; /**< The $, global */
SV * ors_sv; /**< The $\ global */
GV * defoutgv; /**< The default output filehandle */
HV * curstash; /**< Current package stash */
HV * defstash; /**< Default package stash */
SV * errors; /**< Outstanding queued errors */
SV * user_cv; /**< The Perl sub/coderef this fiber is running */
SV * self_ref; /**< The Acme::Parataxis Perl object wrapper */
SV * transfer_data; /**< Arguments or return values passed during yield/transfer */
int id; /**< Numeric ID of this fiber */
int finished; /**< Flag: 1 if the fiber has completed its entry_point */
int parent_id; /**< ID of the fiber that 'called' this one (asymmetric) */
int last_sender; /**< ID of the fiber that last switched control to this one */
} para_fiber_t;
/** @name Job Status Constants */
///@{
#define JOB_FREE 0 /**< Slot is available for new tasks */
#define JOB_NEW 1 /**< Task is submitted but not yet picked up by a worker */
#define JOB_BUSY 2 /**< Task is currently being processed by a worker thread */
#define JOB_DONE 3 /**< Task has completed and results are ready */
///@}
/** @name Task Type Constants */
///@{
lib/Acme/Parataxis.c view on Meta::CPAN
UNLOCK(queue_lock);
}
/**
* @brief Resets the call depth of a Perl CV to zero.
*
* Used to ensure that a newly created fiber starts its coderef with a
* clean execution state.
*
* @param cv_ref SV reference to the coderef.
*/
DLLEXPORT void force_depth_zero(SV * cv_ref) {
dTHX;
CV * cv = NULL;
if (SvROK(cv_ref))
cv = (CV *)SvRV(cv_ref);
else if (SvTYPE(cv_ref) == SVt_PVCV)
cv = (CV *)cv_ref;
if (cv && SvTYPE((SV *)cv) == SVt_PVCV)
((XPVCV *)MUTABLE_PTR(SvANY(cv)))->xcv_depth = 0;
}
/** @brief Returns the ID of the currently executing fiber. */
DLLEXPORT int get_current_parataxis_id() { return current_fiber_id; }
/** @brief Returns the OS-level thread ID of the main interpreter thread. */
DLLEXPORT int get_os_thread_id_export() { return get_os_thread_id(); }
/** @brief Returns the number of worker threads currently running in the pool. */
DLLEXPORT int get_thread_pool_size() { return current_thread_count; }
/** @brief Returns the maximum number of worker threads allowed in the pool. */
DLLEXPORT int get_max_thread_pool_size() { return max_thread_pool_size; }
/** @brief Sets the threshold for automatic yield-based preemption. */
DLLEXPORT void set_preempt_threshold(int64_t threshold) { preempt_threshold = threshold; }
/** @brief Returns the current count towards the preemption threshold. */
DLLEXPORT int64_t get_preempt_count() { return preempt_count; }
/**
* @brief Checks if automatic preemption should occur.
*
* Increments the internal counter and triggers a `coro_yield` if the
* threshold is reached.
*
* @return SV* Result of the yield, or undef if no yield occurred.
*/
DLLEXPORT SV * maybe_yield() {
dTHX;
preempt_count++;
if (preempt_threshold > 0 && preempt_count >= preempt_threshold) {
preempt_count = 0;
return coro_yield(&PL_sv_undef);
}
return &PL_sv_undef;
}
/**
* @brief Restores subroutine call depths and cleans argument pads.
*
* This function iterates the context stack and restores CvDEPTH for
* active subroutines in two passes to safely handle recursive calls.
*
* Pass 1: Restores CvDEPTH for all active frames.
* Pass 2: Surgicaly cleans Slot 0 of the *next* pad depth for each CV.
*
* @param to The fiber being resumed.
*/
static void _activate_current_depths(pTHX_ para_fiber_t * to) {
PERL_SI * si = to->si;
if (!si || !si->si_cxstack)
return;
/* Pass 1: Restore CvDEPTH for all active frames */
for (I32 i = 0; i <= si->si_cxix; i++) {
PERL_CONTEXT * cx = &(si->si_cxstack[i]);
if (CxTYPE(cx) == CXt_SUB || CxTYPE(cx) == CXt_FORMAT) {
CV * cv = cx->blk_sub.cv;
if (cv && SvTYPE((SV *)cv) == SVt_PVCV)
CvDEPTH(cv) = cx->blk_sub.olddepth + 1;
}
}
/* Pass 2: Clean the landing pads for the NEXT call in each CV */
for (I32 i = 0; i <= si->si_cxix; i++) {
PERL_CONTEXT * cx = &(si->si_cxstack[i]);
if (CxTYPE(cx) == CXt_SUB || CxTYPE(cx) == CXt_FORMAT) {
CV * cv = cx->blk_sub.cv;
if (cv && SvTYPE((SV *)cv) == SVt_PVCV) {
PADLIST * pl = CvPADLIST(cv);
I32 next_depth = CvDEPTH(cv) + 1;
if (pl && next_depth <= PadlistMAX(pl)) {
AV * next_pad = (AV *)PadlistARRAY(pl)[next_depth];
if (next_pad && SvTYPE(next_pad) == SVt_PVAV) {
SV ** array = AvARRAY(next_pad);
if (array && AvMAX(next_pad) >= 0) {
SV * args = array[0];
if (args && SvTYPE(args) == SVt_PVAV) {
AvFILLp((AV *)args) = -1;
AvREAL_off((AV *)args);
}
}
}
}
}
}
}
}
/**
* @brief Swaps the internal Perl Interpreter state pointers.
*
* This is the core of the fiber implementation. It manually saves all
* global pointers that define the "state" of the Perl virtual machine for
* the current context and restores them for the target context.
*
* @param from Context being paused.
* @param to Context being resumed.
*/
void swap_perl_state(para_fiber_t * from, para_fiber_t * to) {
dTHX;
/* Save current state into 'from' context */
from->si = PL_curstackinfo;
// The Argument Stack (Main Perl stack)
from->curstack = PL_curstack;
from->stack_sp_offset = PL_stack_sp - PL_stack_base;
// The Mark Stack (Tracks where lists begin on the argument stack)
from->markstack = PL_markstack;
from->markstack_ptr = PL_markstack_ptr;
from->markstack_max = PL_markstack_max;
// The Scope Stack (Tracks block entry/exit for cleanup)
( run in 0.796 second using v1.01-cache-2.11-cpan-df04353d9ac )