Promise-XS

 view release on metacpan or  search on metacpan

XS.xs  view on Meta::CPAN


    SV* caught = exs_call_method_scalar(
        promise_sv,
        "catch",
        catch_args
    );

    SV* finally_args[] = { SvREFCNT_inc(stop_cr), NULL };

    exs_call_method_void(
        caught,
        "finally",
        finally_args
    );

    sv_2mortal(caught);

    exs_call_method_void(
        sv_2mortal( newSVpvs("Mojo::IOLoop") ),
        "start",
        NULL
    );
}

//----------------------------------------------------------------------

MODULE = Promise::XS     PACKAGE = Promise::XS

BOOT:
{
    MY_CXT_INIT;
#ifdef USE_ITHREADS
    MY_CXT.owner = aTHX;
#endif
    MY_CXT.queue_head = NULL;
    MY_CXT.queue_tail = NULL;
    MY_CXT.in_flush = 0;
    MY_CXT.backend_scheduled = 0;
    MY_CXT.callback_depth = 0;

    MY_CXT.pxs_base_stash = gv_stashpv(BASE_CLASS, FALSE);
    MY_CXT.pxs_promise_stash = gv_stashpv(PROMISE_CLASS, FALSE);
    MY_CXT.pxs_deferred_stash = gv_stashpv(DEFERRED_CLASS, FALSE);

    MY_CXT.deferral_cr = NULL;
    MY_CXT.deferral_arg = NULL;
    MY_CXT.event_system = _DEFER_NONE;
    MY_CXT.stop_cr = NULL;
    MY_CXT.pxs_flush_cr = NULL;
}

# In some old thread-multi perls sv_dup_inc() wasn’t defined.

#if defined(USE_ITHREADS) && defined(sv_dup_inc)

# ithreads would seem to be a very bad idea in Promise-based code,
# but anyway ..

void
CLONE(...)
    PPCODE:

        SV* pxs_flush_cr = NULL;
        SV* deferral_cr = NULL;
        event_system_t event_system;
        SV* deferral_arg = NULL;
        SV* stop_cr = NULL;

        {
            dMY_CXT;

            CLONE_PARAMS params = {NULL, 0, MY_CXT.owner};

            if ( MY_CXT.pxs_flush_cr ) {
                pxs_flush_cr = sv_dup_inc( MY_CXT.pxs_flush_cr, &params );
            }

            if ( MY_CXT.deferral_cr ) {
                deferral_cr = sv_dup_inc( MY_CXT.deferral_cr, &params );
            }

            if ( MY_CXT.deferral_arg ) {
                deferral_arg = sv_dup_inc( MY_CXT.deferral_arg, &params );
            }

            event_system = MY_CXT.event_system;

            if ( MY_CXT.stop_cr ) {
                stop_cr = sv_dup_inc( MY_CXT.stop_cr, &params );
            }
        }

        {
            MY_CXT_CLONE;
            MY_CXT.owner = aTHX;

            // Clone SVs
            MY_CXT.pxs_flush_cr = pxs_flush_cr;
            MY_CXT.deferral_cr = deferral_cr;
            MY_CXT.deferral_arg = deferral_arg;
            MY_CXT.event_system = event_system;
            MY_CXT.stop_cr      = stop_cr;

            // Clone HVs
            MY_CXT.pxs_base_stash = gv_stashpv(BASE_CLASS, FALSE);
            MY_CXT.pxs_promise_stash = gv_stashpv(PROMISE_CLASS, FALSE);
            MY_CXT.pxs_deferred_stash = gv_stashpv(DEFERRED_CLASS, FALSE);
        }

        XSRETURN_UNDEF;

#endif /* USE_ITHREADS && defined(sv_dup_inc) */

SV *
resolved(...)
    CODE:
        RETVAL = _create_preresolved_promise(aTHX_ &(ST(0)), items, false);
    OUTPUT:
        RETVAL

SV *

XS.xs  view on Meta::CPAN

            Newxz(state, 1, pxs_all_state_t);
            *guard = (pxs_all_guard_t) {
                .output = output,
                .state = state,
            };
            *state = (pxs_all_state_t) {
                .output = output,
                .total = count,
                .remaining = count,
                .refs = 1,
            };
            xspr_promise_incref(aTHX_ output);  /* state holds one ref */
            Newxz(state->results, count, SV*);

            unsigned i;
            for (i = 0; i < count; i++) {
                SV* input_sv = ST(i + 1);
                xspr_promise_t* input_promise = xspr_promise_from_sv(aTHX_ input_sv);

                if (input_promise == NULL) {
                    /* Plain scalar - treat as already resolved */
                    AV* av = newAV();
                    av_push(av, newSVsv(input_sv));
                    state->results[i] = newRV_noinc((SV*)av);

                    state->remaining--;

                    if (state->remaining == 0 && !state->done) {
                        state->done = true;
                        pxs_all_state_finish_resolved(aTHX_ state);
                    }
                } else {
                    /* Keep the callback guarded until xspr_promise_then()
                       successfully takes ownership. */
                    pxs_callback_guard_t* callback_guard;
                    Newxz(callback_guard, 1, pxs_callback_guard_t);
                    SAVEDESTRUCTOR_X(_pxs_callback_guard_cleanup, callback_guard);

                    xspr_callback_t* callback = xspr_callback_new_all(aTHX_ state, i);
                    callback_guard->callback = callback;
                    state->refs++;  /* callback now owns this ref */
                    xspr_promise_then(aTHX_ input_promise, callback);
                    callback_guard->callback = NULL;
                    xspr_promise_decref(aTHX_ input_promise);
                }
            }

            /* Release our initial ref; state is kept alive by callbacks */
            pxs_all_state_decref(aTHX_ state);
            guard->state = NULL;  /* ref-counting owns state now */

            /* _promise_to_sv takes ownership of the refs=1 from create_promise */
            RETVAL = _promise_to_sv(aTHX_ output);
            guard->output = NULL;  /* RETVAL's DESTROY owns output now */
        }
    OUTPUT:
        RETVAL

void
then(SV* self_sv, SV* on_resolve = NULL, SV* on_reject = NULL)
    PPCODE:
        _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;

        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);

        xspr_promise_t* next;

        if (on_resolve == NULL) on_resolve = &PL_sv_undef;
        if (on_reject == NULL) on_reject = &PL_sv_undef;

        next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));

        xspr_callback_t* callback = xspr_callback_new_perl(aTHX_ on_resolve, on_reject, next);
        xspr_promise_then(aTHX_ self->promise, callback);

        XSRETURN(next ? 1 : 0);

void
catch(SV* self_sv, SV* on_reject)
    PPCODE:
        _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;

        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);

        xspr_promise_t* next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));

        xspr_callback_t* callback = xspr_callback_new_perl(aTHX_ &PL_sv_undef, on_reject, next);
        xspr_promise_then(aTHX_ self->promise, callback);

        XSRETURN(next ? 1 : 0);

void
finally(SV* self_sv, SV* on_finally)
    PPCODE:
        _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;

        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);

        xspr_promise_t* next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));

        xspr_callback_t* callback = xspr_callback_new_finally(aTHX_ on_finally, next);
        xspr_promise_then(aTHX_ self->promise, callback);

        XSRETURN(next ? 1 : 0);

void
DESTROY(SV* self_sv)
    CODE:
        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
        /* fprintf(stderr, "DESTROYing sv=%p, p=%p\n", self_sv, self->promise); */

        _warn_on_destroy_if_needed(aTHX_ self->promise, self_sv);

        xspr_promise_decref(aTHX_ self->promise);
        Safefree(self);

# ----------------------------------------------------------------------
# Future::AsyncAwait interface:
# ----------------------------------------------------------------------

SV*
AWAIT_NEW_DONE(...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        UNUSED(items);
        RETVAL = _create_preresolved_promise(aTHX_ &(ST(1)), items - 1, true);
    OUTPUT:
        RETVAL

SV*
AWAIT_NEW_FAIL(...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        UNUSED(items);
        RETVAL = _create_prerejected_promise(aTHX_ &(ST(1)), items - 1, true);
    OUTPUT:
        RETVAL

SV*
AWAIT_CLONE(...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        UNUSED(items);

        xspr_promise_t* promise_p = create_promise(aTHX);

        RETVAL = _promise_to_sv(aTHX_ promise_p);

        _IMMORTALIZE_PROMISE_SV(RETVAL, promise_p);

        if (DEBUG_AWAITABLE) {
            fprintf(stderr, "#   SvREFCNT(RETVAL)=%d\n", SvREFCNT(RETVAL));
            fprintf(stderr, "#   SvREFCNT(SvRV(RETVAL))=%d\n", SvREFCNT(SvRV(RETVAL)));
            sv_dump(RETVAL);
        }
    OUTPUT:
        RETVAL

void
AWAIT_DONE(SV* self_sv, ...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
        _resolve_promise(aTHX_ self->promise, &ST(1), items - 1);

void
AWAIT_FAIL(SV* self_sv, ...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
        _reject_promise(aTHX_ NULL, self->promise, &ST(1), items - 1);

bool
AWAIT_IS_READY(SV *self_sv)
    CODE:
        _DO_DEBUG_AWAITABLE();
        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);

        RETVAL = (self->promise->state != XSPR_STATE_PENDING);
    OUTPUT:
        RETVAL

void
AWAIT_GET(SV *self_sv)
    PPCODE:
        _DO_DEBUG_AWAITABLE();
        DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);

        ASSUME(self->promise->state == XSPR_STATE_FINISHED);

        SV** results = self->promise->finished.result->results;
        int result_count = self->promise->finished.result->count;

        if (RESULT_IS_RESOLVED(self->promise->finished.result)) {
            int i;

            if (!result_count) XSRETURN_EMPTY;

            switch (GIMME_V) {

                case G_ARRAY:
                    EXTEND(SP, result_count);

                    for (i=0; i<result_count; i++) {
                        PUSHs( sv_2mortal( newSVsv(results[i]) ) );
                    }

                    XSRETURN(result_count);

                case G_SCALAR:
                    EXTEND(SP, 1);
                    PUSHs( sv_2mortal( newSVsv(results[0]) ) );
                    XSRETURN(1);

                case G_VOID:
                    XSRETURN_EMPTY;

                default:
                    ASSUME(0);
            }
        }
        else {
            SV* err;
            if (result_count) {
                err = sv_2mortal( newSVsv( results[0] ) );
            }
            else {
                err = &PL_sv_undef;
            }

            croak_sv(err);
        }

void
AWAIT_CHAIN_CANCEL(...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        UNUSED(items);

void
AWAIT_ON_CANCEL(...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        UNUSED(items);

UV
AWAIT_IS_CANCELLED(...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        UNUSED(items);
        RETVAL = 0;
    OUTPUT:
        RETVAL

void
AWAIT_ON_READY(SV *self_sv, SV* coderef)
    CODE:
        _DO_DEBUG_AWAITABLE();
        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);

        self->promise->on_ready_immediate = coderef;
        SvREFCNT_inc(coderef);
        SvREFCNT_inc(SvRV(coderef));

void
AWAIT_WAIT(SV* self_sv)
    PPCODE:
        _DO_DEBUG_AWAITABLE();
        dMY_CXT;

        switch (MY_CXT.event_system) {
            case _DEFER_ANYEVENT:
                _anyevent_wait_promise(aTHX_ self_sv);
                break;

            case _DEFER_IOASYNC:
                _ioasync_wait_promise(aTHX_ self_sv, MY_CXT.deferral_arg, MY_CXT.stop_cr);
                break;

            case _DEFER_MOJO:
                _mojo_wait_promise(aTHX_ self_sv, MY_CXT.stop_cr);
                break;

            default:
                croak(BASE_CLASS ": No event loop set up! Did you forget to call use_event()?");
        }

        PUSHMARK(SP);

        int count = call_method("AWAIT_GET", GIMME_V);
        XSRETURN(count);



( run in 1.144 second using v1.01-cache-2.11-cpan-5511b514fd6 )