AnyEvent-XSPromises
view release on metacpan or search on metacpan
XSPromises.xs view on Meta::CPAN
void xspr_queue_add(pTHX_ xspr_callback_t* callback, xspr_promise_t* origin)
{
dMY_CXT;
xspr_callback_queue_t* entry;
Newxz(entry, 1, xspr_callback_queue_t);
entry->origin = origin;
xspr_promise_incref(aTHX_ entry->origin);
entry->callback = callback;
if (MY_CXT.queue_head == NULL) {
assert(MY_CXT.queue_tail == NULL);
/* Empty queue, so now it's just us */
MY_CXT.queue_head = entry;
MY_CXT.queue_tail = entry;
} else {
assert(MY_CXT.queue_tail != NULL);
/* Existing queue, add to the tail */
MY_CXT.queue_tail->next = entry;
MY_CXT.queue_tail = entry;
}
}
void xspr_queue_maybe_schedule(pTHX)
{
dMY_CXT;
if (MY_CXT.queue_head == NULL || MY_CXT.backend_scheduled || MY_CXT.in_flush) {
return;
}
MY_CXT.backend_scheduled = 1;
/* We trust our backends to be sane, so little guarding against errors here */
dSP;
PUSHMARK(SP);
call_sv(MY_CXT.backend_fn, G_DISCARD|G_NOARGS);
}
/* Invoke the user's perl code. We need to be really sure this doesn't return early via croak/next/etc. */
xspr_result_t* xspr_invoke_perl(pTHX_ SV* perl_fn, SV** input, int input_count)
{
dSP;
int count, i;
SV* error;
xspr_result_t* result;
if (!SvROK(perl_fn)) {
return xspr_result_from_error(aTHX_ "promise callbacks need to be a CODE reference");
}
ENTER;
SAVETMPS;
PUSHMARK(SP);
EXTEND(SP, input_count);
for (i = 0; i < input_count; i++) {
PUSHs(input[i]);
}
PUTBACK;
/* Clear $_ so that callbacks don't end up talking to each other by accident */
SAVE_DEFSV;
DEFSV_set(sv_newmortal());
count = call_sv(perl_fn, G_EVAL|G_ARRAY);
SPAGAIN;
error = ERRSV;
if (SvTRUE(error)) {
result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, 1);
result->result[0] = newSVsv(error);
} else {
result = xspr_result_new(aTHX_ XSPR_RESULT_RESOLVED, count);
for (i = 0; i < count; i++) {
result->result[count-i-1] = SvREFCNT_inc(POPs);
}
}
PUTBACK;
FREETMPS;
LEAVE;
return result;
}
/* Increments the ref count for xspr_result_t */
void xspr_result_incref(pTHX_ xspr_result_t* result)
{
result->refs++;
}
/* Decrements the ref count for the xspr_result_t, freeing the structure if needed */
void xspr_result_decref(pTHX_ xspr_result_t* result)
{
if (--(result->refs) == 0) {
int i;
for (i = 0; i < result->count; i++) {
SvREFCNT_dec(result->result[i]);
}
Safefree(result->result);
Safefree(result);
}
}
/* Transitions a promise from pending to finished, using the given result */
void xspr_promise_finish(pTHX_ xspr_promise_t* promise, xspr_result_t* result)
{
assert(promise->state == XSPR_STATE_PENDING);
xspr_callback_t** pending_callbacks = promise->pending.callbacks;
int count = promise->pending.callbacks_count;
promise->state = XSPR_STATE_FINISHED;
promise->finished.result = result;
xspr_result_incref(aTHX_ promise->finished.result);
int i;
for (i = 0; i < count; i++) {
xspr_queue_add(aTHX_ pending_callbacks[i], promise);
}
Safefree(pending_callbacks);
}
( run in 2.536 seconds using v1.01-cache-2.11-cpan-0d23b851a93 )