Acme-Parataxis
view release on metacpan or search on metacpan
lib/Acme/Parataxis.c view on Meta::CPAN
PMOP * curpm_under; /**< Current pattern match state under */
PMOP * reg_curpm; /**< Current regex match state */
GV * defgv; /**< The $_ global */
GV * last_in_gv; /**< GV used in last <FH> */
SV * rs; /**< The $/ global */
GV * ofsgv; /**< The $, global */
SV * ors_sv; /**< The $\ global */
GV * defoutgv; /**< The default output filehandle */
HV * curstash; /**< Current package stash */
HV * defstash; /**< Default package stash */
SV * errors; /**< Outstanding queued errors */
SV * user_cv; /**< The Perl sub/coderef this fiber is running */
SV * self_ref; /**< The Acme::Parataxis Perl object wrapper */
SV * transfer_data; /**< Arguments or return values passed during yield/transfer */
int id; /**< Numeric ID of this fiber */
int finished; /**< Flag: 1 if the fiber has completed its entry_point */
int parent_id; /**< ID of the fiber that 'called' this one (asymmetric) */
int last_sender; /**< ID of the fiber that last switched control to this one */
} para_fiber_t;
/** @name Job Status Constants */
///@{
#define JOB_FREE 0 /**< Slot is available for new tasks */
#define JOB_NEW 1 /**< Task is submitted but not yet picked up by a worker */
#define JOB_BUSY 2 /**< Task is currently being processed by a worker thread */
#define JOB_DONE 3 /**< Task has completed and results are ready */
///@}
/** @name Task Type Constants */
///@{
#define TASK_SLEEP 0 /**< Sleep for N milliseconds */
#define TASK_GET_CPU 1 /**< Retrieve current core ID */
#define TASK_READ 2 /**< Wait for read-readiness on a file descriptor */
#define TASK_WRITE 3 /**< Wait for write-readiness on a file descriptor */
///@}
/**
* @union value_t
* @brief Generic container for task input/output data.
*/
typedef union {
int64_t i; /**< Integer/Pointer storage */
double d; /**< Floating point storage */
char * s; /**< String storage */
} value_t;
/**
* @struct job_t
* @brief Represents a task in the background thread pool queue.
*/
typedef struct {
int fiber_id; /**< ID of the Fiber that submitted this task */
int target_thread; /**< Index of the assigned worker thread */
int type; /**< Type of task to perform (TASK_*) */
value_t input; /**< Input data for the task */
value_t output; /**< Result data populated by the worker */
int timeout_ms; /**< Timeout duration for I/O tasks */
int status; /**< Current lifecycle state (JOB_*) */
} job_t;
// Global Registry and State
/** @brief Maximum number of concurrent fibers allowed */
#define MAX_FIBERS 1024
/** @brief Array of active fiber structures */
static para_fiber_t * fibers[MAX_FIBERS];
/** @brief The context representing the main Perl thread */
static para_fiber_t main_context;
/** @brief ID of the currently executing fiber (-1 for Main) */
static int current_fiber_id = -1;
/** @brief Size of the background job queue */
#define MAX_JOBS 1024
/** @brief Fixed-size array for background tasks */
static job_t job_slots[MAX_JOBS];
/** @brief Mutex protecting access to the job queue */
static para_mutex_t queue_lock;
#ifdef _WIN32
static CONDITION_VARIABLE queue_cond;
#else
static pthread_cond_t queue_cond;
#endif
static int threads_initialized = 0;
static int system_initialized = 0;
// Forward declarations for thread safety wrappers
#ifdef _WIN32
#define PARA_COND_WAIT(c, m) SleepConditionVariableCS(&c, &m, INFINITE)
#define PARA_COND_SIGNAL(c) WakeConditionVariable(&c)
#define PARA_COND_BROADCAST(c) WakeAllConditionVariable(&c)
#define PARA_COND_INIT(c) InitializeConditionVariable(&c)
#else
#define PARA_COND_WAIT(c, m) pthread_cond_wait(&c, &m)
#define PARA_COND_SIGNAL(c) pthread_cond_signal(&c)
#define PARA_COND_BROADCAST(c) pthread_cond_broadcast(&c)
#define PARA_COND_INIT(c) pthread_cond_init(&c, NULL)
#endif
/** @brief Threshold for automatic preemption (0 to disable) */
static long long preempt_threshold = 0;
/** @brief Count of operations since last preemption yield */
static long long preempt_count = 0;
/** @brief Maximum worker threads allowed in the pool */
#define MAX_THREADS 64
/** @brief Native OS handles for pool threads */
static para_thread_t thread_handles[MAX_THREADS];
/** @brief Maximum allowed threads in the pool */
static int max_thread_pool_size = 0;
/** @brief Number of currently running worker threads */
static int current_thread_count = 0;
/** @brief Flag to signal worker threads to terminate */
static volatile int threads_keep_running = 1;
#ifdef _WIN32
lib/Acme/Parataxis.c view on Meta::CPAN
res = select(fd + 1, NULL, &work_fds, NULL, &tv);
#endif
if (res != 0)
break;
elapsed_ms += 10;
if (elapsed_ms >= timeout)
break;
}
job->output.i = (res > 0) ? 1 : -1;
}
LOCK(queue_lock);
job->status = JOB_DONE;
UNLOCK(queue_lock);
}
else {
#ifdef _WIN32
Sleep(1);
#else
usleep(1000);
#endif
}
}
return 0;
}
/**
* @brief Initializes the background thread pool.
*
* Automatically detects the CPU count and spawns worker threads. This function
* is called automatically by `init_system` and `submit_c_job`.
*/
DLLEXPORT void init_threads() {
dTHX;
if (threads_initialized)
return;
LOCK_INIT(queue_lock);
PARA_COND_INIT(queue_cond);
for (int i = 0; i < MAX_JOBS; i++)
job_slots[i].status = JOB_FREE;
if (max_thread_pool_size == 0) {
max_thread_pool_size = get_cpu_count();
if (max_thread_pool_size > MAX_THREADS)
max_thread_pool_size = MAX_THREADS;
}
/* Start with a small "seed" pool of 2 threads */
_spawn_workers(2);
threads_initialized = 1;
}
/**
* @brief Submits a C-level task to the background pool.
*
* @param type The task type constant (TASK_*).
* @param arg Input integer or pointer data.
* @param timeout_ms Timeout for I/O operations.
* @return int The index of the submitted job, or -1 if the queue is full.
*/
DLLEXPORT int submit_c_job(int type, int64_t arg, int timeout_ms) {
if (!threads_initialized)
init_threads();
int idx = -1;
LOCK(queue_lock);
/* Dynamic Scaling: If we have pending jobs and space in the pool, grow! */
int pending_count = 0;
for (int i = 0; i < MAX_JOBS; i++)
if (job_slots[i].status == JOB_NEW)
pending_count++;
if (pending_count > 0 && current_thread_count < max_thread_pool_size)
_spawn_workers(1); /* Grow by 1 on demand */
for (int i = 0; i < MAX_JOBS; i++) {
if (job_slots[i].status == JOB_FREE) {
idx = i;
break;
}
}
if (idx != -1) {
job_slots[idx].fiber_id = current_fiber_id;
job_slots[idx].type = type;
job_slots[idx].input.i = arg;
job_slots[idx].timeout_ms = timeout_ms;
job_slots[idx].status = JOB_NEW;
PARA_COND_SIGNAL(queue_cond);
}
UNLOCK(queue_lock);
return idx;
}
/**
* @brief Polls the queue for any completed background jobs.
*
* @return int Index of a finished job, or -1 if none are ready.
*/
DLLEXPORT int check_for_completion() {
if (!threads_initialized)
init_threads();
int job_idx = -1;
LOCK(queue_lock);
for (int i = 0; i < MAX_JOBS; i++) {
if (job_slots[i].status == JOB_DONE) {
job_idx = i;
break;
}
}
UNLOCK(queue_lock);
return job_idx;
}
/**
* @brief Retrieves the result of a completed job as a Perl SV.
*
* @param idx The job index in the queue.
* @return SV* A mortalized Perl SV containing the result (IV).
*/
( run in 0.897 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )