Promise-XS
view release on metacpan or search on metacpan
SV* caught = exs_call_method_scalar(
promise_sv,
"catch",
catch_args
);
SV* finally_args[] = { SvREFCNT_inc(stop_cr), NULL };
exs_call_method_void(
caught,
"finally",
finally_args
);
sv_2mortal(caught);
exs_call_method_void(
sv_2mortal( newSVpvs("Mojo::IOLoop") ),
"start",
NULL
);
}
//----------------------------------------------------------------------
MODULE = Promise::XS PACKAGE = Promise::XS
BOOT:
{
MY_CXT_INIT;
#ifdef USE_ITHREADS
MY_CXT.owner = aTHX;
#endif
MY_CXT.queue_head = NULL;
MY_CXT.queue_tail = NULL;
MY_CXT.in_flush = 0;
MY_CXT.backend_scheduled = 0;
MY_CXT.callback_depth = 0;
MY_CXT.pxs_base_stash = gv_stashpv(BASE_CLASS, FALSE);
MY_CXT.pxs_promise_stash = gv_stashpv(PROMISE_CLASS, FALSE);
MY_CXT.pxs_deferred_stash = gv_stashpv(DEFERRED_CLASS, FALSE);
MY_CXT.deferral_cr = NULL;
MY_CXT.deferral_arg = NULL;
MY_CXT.event_system = _DEFER_NONE;
MY_CXT.stop_cr = NULL;
MY_CXT.pxs_flush_cr = NULL;
}
# In some old thread-multi perls sv_dup_inc() wasnât defined.
#if defined(USE_ITHREADS) && defined(sv_dup_inc)
# ithreads would seem to be a very bad idea in Promise-based code,
# but anyway ..
void
CLONE(...)
PPCODE:
SV* pxs_flush_cr = NULL;
SV* deferral_cr = NULL;
event_system_t event_system;
SV* deferral_arg = NULL;
SV* stop_cr = NULL;
{
dMY_CXT;
CLONE_PARAMS params = {NULL, 0, MY_CXT.owner};
if ( MY_CXT.pxs_flush_cr ) {
pxs_flush_cr = sv_dup_inc( MY_CXT.pxs_flush_cr, ¶ms );
}
if ( MY_CXT.deferral_cr ) {
deferral_cr = sv_dup_inc( MY_CXT.deferral_cr, ¶ms );
}
if ( MY_CXT.deferral_arg ) {
deferral_arg = sv_dup_inc( MY_CXT.deferral_arg, ¶ms );
}
event_system = MY_CXT.event_system;
if ( MY_CXT.stop_cr ) {
stop_cr = sv_dup_inc( MY_CXT.stop_cr, ¶ms );
}
}
{
MY_CXT_CLONE;
MY_CXT.owner = aTHX;
// Clone SVs
MY_CXT.pxs_flush_cr = pxs_flush_cr;
MY_CXT.deferral_cr = deferral_cr;
MY_CXT.deferral_arg = deferral_arg;
MY_CXT.event_system = event_system;
MY_CXT.stop_cr = stop_cr;
// Clone HVs
MY_CXT.pxs_base_stash = gv_stashpv(BASE_CLASS, FALSE);
MY_CXT.pxs_promise_stash = gv_stashpv(PROMISE_CLASS, FALSE);
MY_CXT.pxs_deferred_stash = gv_stashpv(DEFERRED_CLASS, FALSE);
}
XSRETURN_UNDEF;
#endif /* USE_ITHREADS && defined(sv_dup_inc) */
SV *
resolved(...)
CODE:
RETVAL = _create_preresolved_promise(aTHX_ &(ST(0)), items, false);
OUTPUT:
RETVAL
SV *
Newxz(state, 1, pxs_all_state_t);
*guard = (pxs_all_guard_t) {
.output = output,
.state = state,
};
*state = (pxs_all_state_t) {
.output = output,
.total = count,
.remaining = count,
.refs = 1,
};
xspr_promise_incref(aTHX_ output); /* state holds one ref */
Newxz(state->results, count, SV*);
unsigned i;
for (i = 0; i < count; i++) {
SV* input_sv = ST(i + 1);
xspr_promise_t* input_promise = xspr_promise_from_sv(aTHX_ input_sv);
if (input_promise == NULL) {
/* Plain scalar - treat as already resolved */
AV* av = newAV();
av_push(av, newSVsv(input_sv));
state->results[i] = newRV_noinc((SV*)av);
state->remaining--;
if (state->remaining == 0 && !state->done) {
state->done = true;
pxs_all_state_finish_resolved(aTHX_ state);
}
} else {
/* Keep the callback guarded until xspr_promise_then()
successfully takes ownership. */
pxs_callback_guard_t* callback_guard;
Newxz(callback_guard, 1, pxs_callback_guard_t);
SAVEDESTRUCTOR_X(_pxs_callback_guard_cleanup, callback_guard);
xspr_callback_t* callback = xspr_callback_new_all(aTHX_ state, i);
callback_guard->callback = callback;
state->refs++; /* callback now owns this ref */
xspr_promise_then(aTHX_ input_promise, callback);
callback_guard->callback = NULL;
xspr_promise_decref(aTHX_ input_promise);
}
}
/* Release our initial ref; state is kept alive by callbacks */
pxs_all_state_decref(aTHX_ state);
guard->state = NULL; /* ref-counting owns state now */
/* _promise_to_sv takes ownership of the refs=1 from create_promise */
RETVAL = _promise_to_sv(aTHX_ output);
guard->output = NULL; /* RETVAL's DESTROY owns output now */
}
OUTPUT:
RETVAL
void
then(SV* self_sv, SV* on_resolve = NULL, SV* on_reject = NULL)
PPCODE:
_CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;
PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
xspr_promise_t* next;
if (on_resolve == NULL) on_resolve = &PL_sv_undef;
if (on_reject == NULL) on_reject = &PL_sv_undef;
next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));
xspr_callback_t* callback = xspr_callback_new_perl(aTHX_ on_resolve, on_reject, next);
xspr_promise_then(aTHX_ self->promise, callback);
XSRETURN(next ? 1 : 0);
void
catch(SV* self_sv, SV* on_reject)
PPCODE:
_CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;
PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
xspr_promise_t* next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));
xspr_callback_t* callback = xspr_callback_new_perl(aTHX_ &PL_sv_undef, on_reject, next);
xspr_promise_then(aTHX_ self->promise, callback);
XSRETURN(next ? 1 : 0);
void
finally(SV* self_sv, SV* on_finally)
PPCODE:
_CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;
PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
xspr_promise_t* next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));
xspr_callback_t* callback = xspr_callback_new_finally(aTHX_ on_finally, next);
xspr_promise_then(aTHX_ self->promise, callback);
XSRETURN(next ? 1 : 0);
void
DESTROY(SV* self_sv)
CODE:
PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
/* fprintf(stderr, "DESTROYing sv=%p, p=%p\n", self_sv, self->promise); */
_warn_on_destroy_if_needed(aTHX_ self->promise, self_sv);
xspr_promise_decref(aTHX_ self->promise);
Safefree(self);
# ----------------------------------------------------------------------
# Future::AsyncAwait interface:
# ----------------------------------------------------------------------
SV*
AWAIT_NEW_DONE(...)
CODE:
_DO_DEBUG_AWAITABLE();
UNUSED(items);
RETVAL = _create_preresolved_promise(aTHX_ &(ST(1)), items - 1, true);
OUTPUT:
RETVAL
SV*
AWAIT_NEW_FAIL(...)
CODE:
_DO_DEBUG_AWAITABLE();
UNUSED(items);
RETVAL = _create_prerejected_promise(aTHX_ &(ST(1)), items - 1, true);
OUTPUT:
RETVAL
SV*
AWAIT_CLONE(...)
CODE:
_DO_DEBUG_AWAITABLE();
UNUSED(items);
xspr_promise_t* promise_p = create_promise(aTHX);
RETVAL = _promise_to_sv(aTHX_ promise_p);
_IMMORTALIZE_PROMISE_SV(RETVAL, promise_p);
if (DEBUG_AWAITABLE) {
fprintf(stderr, "# SvREFCNT(RETVAL)=%d\n", SvREFCNT(RETVAL));
fprintf(stderr, "# SvREFCNT(SvRV(RETVAL))=%d\n", SvREFCNT(SvRV(RETVAL)));
sv_dump(RETVAL);
}
OUTPUT:
RETVAL
void
AWAIT_DONE(SV* self_sv, ...)
CODE:
_DO_DEBUG_AWAITABLE();
PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
_resolve_promise(aTHX_ self->promise, &ST(1), items - 1);
void
AWAIT_FAIL(SV* self_sv, ...)
CODE:
_DO_DEBUG_AWAITABLE();
PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
_reject_promise(aTHX_ NULL, self->promise, &ST(1), items - 1);
bool
AWAIT_IS_READY(SV *self_sv)
CODE:
_DO_DEBUG_AWAITABLE();
PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
RETVAL = (self->promise->state != XSPR_STATE_PENDING);
OUTPUT:
RETVAL
void
AWAIT_GET(SV *self_sv)
PPCODE:
_DO_DEBUG_AWAITABLE();
DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);
ASSUME(self->promise->state == XSPR_STATE_FINISHED);
SV** results = self->promise->finished.result->results;
int result_count = self->promise->finished.result->count;
if (RESULT_IS_RESOLVED(self->promise->finished.result)) {
int i;
if (!result_count) XSRETURN_EMPTY;
switch (GIMME_V) {
case G_ARRAY:
EXTEND(SP, result_count);
for (i=0; i<result_count; i++) {
PUSHs( sv_2mortal( newSVsv(results[i]) ) );
}
XSRETURN(result_count);
case G_SCALAR:
EXTEND(SP, 1);
PUSHs( sv_2mortal( newSVsv(results[0]) ) );
XSRETURN(1);
case G_VOID:
XSRETURN_EMPTY;
default:
ASSUME(0);
}
}
else {
SV* err;
if (result_count) {
err = sv_2mortal( newSVsv( results[0] ) );
}
else {
err = &PL_sv_undef;
}
croak_sv(err);
}
void
AWAIT_CHAIN_CANCEL(...)
CODE:
_DO_DEBUG_AWAITABLE();
UNUSED(items);
void
AWAIT_ON_CANCEL(...)
CODE:
_DO_DEBUG_AWAITABLE();
UNUSED(items);
UV
AWAIT_IS_CANCELLED(...)
CODE:
_DO_DEBUG_AWAITABLE();
UNUSED(items);
RETVAL = 0;
OUTPUT:
RETVAL
void
AWAIT_ON_READY(SV *self_sv, SV* coderef)
CODE:
_DO_DEBUG_AWAITABLE();
PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
self->promise->on_ready_immediate = coderef;
SvREFCNT_inc(coderef);
SvREFCNT_inc(SvRV(coderef));
void
AWAIT_WAIT(SV* self_sv)
PPCODE:
_DO_DEBUG_AWAITABLE();
dMY_CXT;
switch (MY_CXT.event_system) {
case _DEFER_ANYEVENT:
_anyevent_wait_promise(aTHX_ self_sv);
break;
case _DEFER_IOASYNC:
_ioasync_wait_promise(aTHX_ self_sv, MY_CXT.deferral_arg, MY_CXT.stop_cr);
break;
case _DEFER_MOJO:
_mojo_wait_promise(aTHX_ self_sv, MY_CXT.stop_cr);
break;
default:
croak(BASE_CLASS ": No event loop set up! Did you forget to call use_event()?");
}
PUSHMARK(SP);
int count = call_method("AWAIT_GET", GIMME_V);
XSRETURN(count);
( run in 1.144 second using v1.01-cache-2.11-cpan-5511b514fd6 )