AnyEvent-XSPromises

 view release on metacpan or  search on metacpan

XSPromises.xs  view on Meta::CPAN

/* Add a callback invocation into the queue for the given origin promise.
 * Takes ownership of the callback structure */
void xspr_queue_add(pTHX_ xspr_callback_t* callback, xspr_promise_t* origin)
{
    dMY_CXT;

    xspr_callback_queue_t* entry;
    Newxz(entry, 1, xspr_callback_queue_t);
    entry->origin = origin;
    xspr_promise_incref(aTHX_ entry->origin);
    entry->callback = callback;

    if (MY_CXT.queue_head == NULL) {
        assert(MY_CXT.queue_tail == NULL);
        /* Empty queue, so now it's just us */
        MY_CXT.queue_head = entry;
        MY_CXT.queue_tail = entry;

    } else {
        assert(MY_CXT.queue_tail != NULL);
        /* Existing queue, add to the tail */
        MY_CXT.queue_tail->next = entry;
        MY_CXT.queue_tail = entry;
    }
}

void xspr_queue_maybe_schedule(pTHX)
{
    dMY_CXT;
    if (MY_CXT.queue_head == NULL || MY_CXT.backend_scheduled || MY_CXT.in_flush) {
        return;
    }

    MY_CXT.backend_scheduled = 1;
    /* We trust our backends to be sane, so little guarding against errors here */
    dSP;
    PUSHMARK(SP);
    call_sv(MY_CXT.backend_fn, G_DISCARD|G_NOARGS);
}

/* Invoke the user's perl code. We need to be really sure this doesn't return early via croak/next/etc. */
xspr_result_t* xspr_invoke_perl(pTHX_ SV* perl_fn, SV** input, int input_count)
{
    dSP;
    int count, i;
    SV* error;
    xspr_result_t* result;

    if (!SvROK(perl_fn)) {
        return xspr_result_from_error(aTHX_ "promise callbacks need to be a CODE reference");
    }

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    EXTEND(SP, input_count);
    for (i = 0; i < input_count; i++) {
        PUSHs(input[i]);
    }
    PUTBACK;

    /* Clear $_ so that callbacks don't end up talking to each other by accident */
    SAVE_DEFSV;
    DEFSV_set(sv_newmortal());

    count = call_sv(perl_fn, G_EVAL|G_ARRAY);

    SPAGAIN;
    error = ERRSV;
    if (SvTRUE(error)) {
        result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, 1);
        result->result[0] = newSVsv(error);
    } else {
        result = xspr_result_new(aTHX_ XSPR_RESULT_RESOLVED, count);
        for (i = 0; i < count; i++) {
            result->result[count-i-1] = SvREFCNT_inc(POPs);
        }
    }
    PUTBACK;

    FREETMPS;
    LEAVE;

    return result;
}

/* Increments the ref count for xspr_result_t */
void xspr_result_incref(pTHX_ xspr_result_t* result)
{
    result->refs++;
}

/* Decrements the ref count for the xspr_result_t, freeing the structure if needed */
void xspr_result_decref(pTHX_ xspr_result_t* result)
{
    if (--(result->refs) == 0) {
        int i;
        for (i = 0; i < result->count; i++) {
            SvREFCNT_dec(result->result[i]);
        }
        Safefree(result->result);
        Safefree(result);
    }
}

/* Transitions a promise from pending to finished, using the given result */
void xspr_promise_finish(pTHX_ xspr_promise_t* promise, xspr_result_t* result)
{
    assert(promise->state == XSPR_STATE_PENDING);
    xspr_callback_t** pending_callbacks = promise->pending.callbacks;
    int count = promise->pending.callbacks_count;

    promise->state = XSPR_STATE_FINISHED;
    promise->finished.result = result;
    xspr_result_incref(aTHX_ promise->finished.result);

    int i;
    for (i = 0; i < count; i++) {
        xspr_queue_add(aTHX_ pending_callbacks[i], promise);
    }
    Safefree(pending_callbacks);
}

/* Create a new xspr_result_t object with the given number of item slots */
xspr_result_t* xspr_result_new(pTHX_ xspr_result_state_t state, int count)
{
    xspr_result_t* result;
    Newxz(result, 1, xspr_result_t);
    Newxz(result->result, count, SV*);
    result->state = state;
    result->refs = 1;
    result->count = count;
    return result;
}

xspr_result_t* xspr_result_from_error(pTHX_ const char *error)
{
    xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, 1);
    result->result[0] = newSVpv(error, 0);

XSPromises.xs  view on Meta::CPAN

        dMY_CXT;

        xspr_result_t* new_result = xspr_invoke_perl(aTHX_ MY_CXT.conversion_helper, &input, 1);
        if (new_result->state == XSPR_RESULT_RESOLVED &&
            new_result->count == 1 &&
            new_result->result[0] != NULL &&
            SvROK(new_result->result[0]) &&
            sv_derived_from(new_result->result[0], "AnyEvent::XSPromises::PromisePtr")) {
            /* This is expected: our conversion function returned us one of our own promises */
            IV tmp = SvIV((SV*)SvRV(new_result->result[0]));
            AnyEvent__XSPromises__Promise* new_promise = INT2PTR(AnyEvent__XSPromises__Promise*, tmp);

            xspr_promise_t* promise = new_promise->promise;
            xspr_promise_incref(aTHX_ promise);

            xspr_result_decref(aTHX_ new_result);
            return promise;

        } else {
            xspr_promise_t* promise = xspr_promise_new(aTHX);
            xspr_promise_finish(aTHX_ promise, new_result);
            xspr_result_decref(aTHX_ new_result);
            return promise;
        }
    }

    /* We didn't get a promise. */
    return NULL;
}


MODULE = AnyEvent::XSPromises     PACKAGE = AnyEvent::XSPromises

PROTOTYPES: ENABLE

TYPEMAP: <<EOT
TYPEMAP
AnyEvent::XSPromises::Deferred* T_PTROBJ
AnyEvent::XSPromises::Promise* T_PTROBJ
EOT

BOOT:
{
    /* XXX: do we need a CLONE? */

    MY_CXT_INIT;
    MY_CXT.queue_head = NULL;
    MY_CXT.queue_tail = NULL;
    MY_CXT.in_flush = 0;
    MY_CXT.backend_scheduled = 0;
    MY_CXT.conversion_helper = NULL;
    MY_CXT.backend_fn = NULL;
}

AnyEvent::XSPromises::Deferred*
deferred()
    CODE:
        Newxz(RETVAL, 1, AnyEvent__XSPromises__Deferred);
        xspr_promise_t* promise = xspr_promise_new(aTHX);
        RETVAL->promise = promise;
    OUTPUT:
        RETVAL

void
___flush()
    CODE:
        xspr_queue_flush(aTHX);

void
___set_conversion_helper(helper)
        SV* helper
    CODE:
        dMY_CXT;
        if (MY_CXT.conversion_helper != NULL)
            croak("Refusing to set a conversion helper twice");
        MY_CXT.conversion_helper = newSVsv(helper);

void
___set_backend(backend)
        SV* backend
    CODE:
        dMY_CXT;
        if (MY_CXT.backend_fn != NULL)
            croak("Refusing to set a backend twice");
        MY_CXT.backend_fn = newSVsv(backend);


MODULE = AnyEvent::XSPromises     PACKAGE = AnyEvent::XSPromises::DeferredPtr

AnyEvent::XSPromises::Promise*
promise(self)
        AnyEvent::XSPromises::Deferred* self
    CODE:
        Newxz(RETVAL, 1, AnyEvent__XSPromises__Promise);
        RETVAL->promise = self->promise;
        xspr_promise_incref(aTHX_ RETVAL->promise);
    OUTPUT:
        RETVAL

void
resolve(self, ...)
        AnyEvent::XSPromises::Deferred* self
    CODE:
        if (self->promise->state != XSPR_STATE_PENDING) {
            croak("Cannot resolve deferred: not pending");
        }

        xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_RESOLVED, items-1);
        int i;
        for (i = 0; i < items-1; i++) {
            result->result[i] = newSVsv(ST(1+i));
        }
        xspr_promise_finish(aTHX_ self->promise, result);
        xspr_result_decref(aTHX_ result);
        xspr_queue_maybe_schedule(aTHX);

void
reject(self, ...)
        AnyEvent::XSPromises::Deferred* self
    CODE:
        if (self->promise->state != XSPR_STATE_PENDING) {
            croak("Cannot reject deferred: not pending");
        }

        xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, items-1);
        int i;
        for (i = 0; i < items-1; i++) {
            result->result[i] = newSVsv(ST(1+i));
        }
        xspr_promise_finish(aTHX_ self->promise, result);
        xspr_result_decref(aTHX_ result);
        xspr_queue_maybe_schedule(aTHX);

bool
is_in_progress(self)
        AnyEvent::XSPromises::Deferred* self
    CODE:
        RETVAL = (self->promise->state == XSPR_STATE_PENDING);
    OUTPUT:
        RETVAL

void
DESTROY(self)
        AnyEvent::XSPromises::Deferred* self
    CODE:
        xspr_promise_decref(aTHX_ self->promise);
        Safefree(self);


MODULE = AnyEvent::XSPromises     PACKAGE = AnyEvent::XSPromises::PromisePtr

void
then(self, ...)
        AnyEvent::XSPromises::Promise* self
    PPCODE:
        SV* on_resolve;
        SV* on_reject;
        xspr_promise_t* next = NULL;

        if (items > 3) {
            croak_xs_usage(cv, "self, on_resolve, on_reject");
        }

        on_resolve = (items > 1) ? ST(1) : &PL_sv_undef;
        on_reject  = (items > 2) ? ST(2) : &PL_sv_undef;

        /* Many promises are just thrown away after the final callback, no need to allocate a next promise for those */
        if (GIMME_V != G_VOID) {
            AnyEvent__XSPromises__Promise* next_promise;
            Newxz(next_promise, 1, AnyEvent__XSPromises__Promise);

            next = xspr_promise_new(aTHX);
            next_promise->promise = next;

            ST(0) = sv_newmortal();
            sv_setref_pv(ST(0), "AnyEvent::XSPromises::PromisePtr", (void*)next_promise);
        }

        xspr_callback_t* callback = xspr_callback_new_perl(aTHX_ on_resolve, on_reject, next);
        xspr_promise_then(aTHX_ self->promise, callback);
        xspr_queue_maybe_schedule(aTHX);

        XSRETURN(1);

void
catch(self, on_reject)
        AnyEvent::XSPromises::Promise* self
        SV* on_reject
    PPCODE:
        xspr_promise_t* next = NULL;

        /* Many promises are just thrown away after the final callback, no need to allocate a next promise for those */
        if (GIMME_V != G_VOID) {
            AnyEvent__XSPromises__Promise* next_promise;
            Newxz(next_promise, 1, AnyEvent__XSPromises__Promise);

            next = xspr_promise_new(aTHX);
            next_promise->promise = next;



( run in 0.784 second using v1.01-cache-2.11-cpan-13bb782fe5a )