Hypersonic

 view release on metacpan or  search on metacpan

lib/Hypersonic/Future.pm  view on Meta::CPAN

    $builder->xs_function('xs_future_on_fail')
            ->xs_preamble
            ->line('if (items != 2) croak("Usage: $future->on_fail($code)");')
            ->line('int slot = SvIV(SvRV(ST(0)));')
            ->line('FutureContext *ctx = &future_registry[slot];')
            ->line('SV *code = ST(1);')
            ->line('if (ctx->state == FUTURE_STATE_FAILED) {')
            ->line('    dSP;')
            ->line('    ENTER; SAVETMPS;')
            ->line('    PUSHMARK(SP);')
            ->line('    if (ctx->fail_message) XPUSHs(sv_2mortal(newSVpv(ctx->fail_message, 0)));')
            ->line('    if (ctx->fail_category) XPUSHs(sv_2mortal(newSVpv(ctx->fail_category, 0)));')
            ->line('    PUTBACK;')
            ->line('    call_sv(code, G_DISCARD);')
            ->line('    FREETMPS; LEAVE;')
            ->line('} else if (ctx->state == FUTURE_STATE_PENDING && ctx->callback_count < MAX_CALLBACKS) {')
            ->line('    FutureCallback *cb = &ctx->callbacks[ctx->callback_count++];')
            ->line('    cb->type = FUTURE_CB_FAIL;')
            ->line('    cb->code = SvREFCNT_inc(code);')
            ->line('    cb->target_slot = -1;')
            ->line('}')
            ->line('ST(0) = ST(0);')
            ->xs_return('1')
            ->xs_end;

    $builder->xs_function('xs_future_on_cancel')
            ->xs_preamble
            ->line('if (items != 2) croak("Usage: $future->on_cancel($code)");')
            ->line('int slot = SvIV(SvRV(ST(0)));')
            ->line('FutureContext *ctx = &future_registry[slot];')
            ->line('SV *code = ST(1);')
            ->line('if (ctx->state == FUTURE_STATE_CANCELLED) {')
            ->line('    dSP;')
            ->line('    ENTER; SAVETMPS;')
            ->line('    PUSHMARK(SP);')
            ->line('    PUTBACK;')
            ->line('    call_sv(code, G_DISCARD);')
            ->line('    FREETMPS; LEAVE;')
            ->line('} else if (ctx->state == FUTURE_STATE_PENDING && ctx->callback_count < MAX_CALLBACKS) {')
            ->line('    FutureCallback *cb = &ctx->callbacks[ctx->callback_count++];')
            ->line('    cb->type = FUTURE_CB_CANCEL;')
            ->line('    cb->code = SvREFCNT_inc(code);')
            ->line('    cb->target_slot = -1;')
            ->line('}')
            ->line('ST(0) = ST(0);')
            ->xs_return('1')
            ->xs_end;

    # Sequencing: then
    $builder->xs_function('xs_future_then')
            ->xs_preamble
            ->line('int i;')
            ->line('if (items != 2) croak("Usage: $future->then($code)");')
            ->line('int slot = SvIV(SvRV(ST(0)));')
            ->line('FutureContext *ctx = &future_registry[slot];')
            ->line('SV *code = ST(1);')
            ->line('int new_slot = future_alloc_slot();')
            ->line('if (new_slot < 0) croak("Future registry full");')
            ->line('FutureContext *new_ctx = &future_registry[new_slot];')
            ->line('new_ctx->cancel_target = slot;')
            ->line('/* Keep parent alive while child exists (for chaining) */')
            ->line('ctx->refcount++;')
            ->line('if (ctx->state == FUTURE_STATE_DONE) {')
            ->line('    dSP;')
            ->line('    ENTER; SAVETMPS;')
            ->line('    PUSHMARK(SP);')
            ->line('    for (i = 0; i < ctx->result_count; i++) XPUSHs(ctx->result_values[i]);')
            ->line('    PUTBACK;')
            ->line('    int count = call_sv(code, G_ARRAY);')
            ->line('    SPAGAIN;')
            ->line('    if (count > 0) {')
            ->line('        new_ctx->result_values = (SV **)malloc(count * sizeof(SV *));')
            ->line('        for (i = count - 1; i >= 0; i--) new_ctx->result_values[i] = SvREFCNT_inc(POPs);')
            ->line('        new_ctx->result_count = count;')
            ->line('    }')
            ->line('    new_ctx->state = FUTURE_STATE_DONE;')
            ->line('    PUTBACK;')
            ->line('    FREETMPS; LEAVE;')
            ->line('} else if (ctx->state == FUTURE_STATE_FAILED) {')
            ->line('    new_ctx->state = FUTURE_STATE_FAILED;')
            ->line('    if (ctx->fail_message) new_ctx->fail_message = strdup(ctx->fail_message);')
            ->line('    if (ctx->fail_category) new_ctx->fail_category = strdup(ctx->fail_category);')
            ->line('} else if (ctx->state == FUTURE_STATE_CANCELLED) {')
            ->line('    new_ctx->state = FUTURE_STATE_CANCELLED;')
            ->line('} else {')
            ->line('    if (ctx->callback_count < MAX_CALLBACKS - 1) {')
            ->line('        FutureCallback *cb = &ctx->callbacks[ctx->callback_count++];')
            ->line('        cb->type = FUTURE_CB_DONE;')
            ->line('        cb->code = SvREFCNT_inc(code);')
            ->line('        cb->target_slot = new_slot;')
            ->line('        FutureCallback *fail_cb = &ctx->callbacks[ctx->callback_count++];')
            ->line('        fail_cb->type = FUTURE_CB_FAIL | FUTURE_CB_CANCEL;')
            ->line('        fail_cb->code = NULL;')
            ->line('        fail_cb->target_slot = new_slot;')
            ->line('        /* Keep child alive until parent resolves */')
            ->line('        new_ctx->refcount++;')
            ->line('    }')
            ->line('}')
            ->line('SV *new_slot_sv = newSViv(new_slot);')
            ->line('SV *new_ref = newRV_noinc(new_slot_sv);')
            ->line('sv_bless(new_ref, gv_stashpv("Hypersonic::Future", GV_ADD));')
            ->line('/* refcount already 1 from alloc_slot for Perl ref */')
            ->line('ST(0) = sv_2mortal(new_ref);')
            ->xs_return('1')
            ->xs_end;

    # Sequencing: catch
    $builder->xs_function('xs_future_catch')
            ->xs_preamble
            ->line('int i;')
            ->line('if (items != 2) croak("Usage: $future->catch($code)");')
            ->line('int slot = SvIV(SvRV(ST(0)));')
            ->line('FutureContext *ctx = &future_registry[slot];')
            ->line('SV *code = ST(1);')
            ->line('int new_slot = future_alloc_slot();')
            ->line('if (new_slot < 0) croak("Future registry full");')
            ->line('FutureContext *new_ctx = &future_registry[new_slot];')
            ->line('new_ctx->cancel_target = slot;')
            ->line('/* Keep parent alive while child exists (for chaining) */')
            ->line('ctx->refcount++;')
            ->line('if (ctx->state == FUTURE_STATE_FAILED) {')
            ->line('    dSP;')
            ->line('    ENTER; SAVETMPS;')
            ->line('    PUSHMARK(SP);')
            ->line('    if (ctx->fail_message) XPUSHs(sv_2mortal(newSVpv(ctx->fail_message, 0)));')
            ->line('    if (ctx->fail_category) XPUSHs(sv_2mortal(newSVpv(ctx->fail_category, 0)));')
            ->line('    PUTBACK;')
            ->line('    int count = call_sv(code, G_ARRAY);')
            ->line('    SPAGAIN;')
            ->line('    if (count > 0) {')
            ->line('        new_ctx->result_values = (SV **)malloc(count * sizeof(SV *));')
            ->line('        for (i = count - 1; i >= 0; i--) new_ctx->result_values[i] = SvREFCNT_inc(POPs);')
            ->line('        new_ctx->result_count = count;')
            ->line('    }')
            ->line('    new_ctx->state = FUTURE_STATE_DONE;')
            ->line('    PUTBACK;')
            ->line('    FREETMPS; LEAVE;')
            ->line('} else if (ctx->state == FUTURE_STATE_DONE) {')
            ->line('    new_ctx->state = FUTURE_STATE_DONE;')
            ->line('    if (ctx->result_count > 0) {')
            ->line('        new_ctx->result_values = (SV **)malloc(ctx->result_count * sizeof(SV *));')
            ->line('        for (i = 0; i < ctx->result_count; i++) new_ctx->result_values[i] = SvREFCNT_inc(ctx->result_values[i]);')
            ->line('        new_ctx->result_count = ctx->result_count;')
            ->line('    }')
            ->line('} else if (ctx->state == FUTURE_STATE_CANCELLED) {')
            ->line('    new_ctx->state = FUTURE_STATE_CANCELLED;')
            ->line('} else {')
            ->line('    if (ctx->callback_count < MAX_CALLBACKS - 1) {')
            ->line('        FutureCallback *cb = &ctx->callbacks[ctx->callback_count++];')
            ->line('        cb->type = FUTURE_CB_FAIL;')
            ->line('        cb->code = SvREFCNT_inc(code);')
            ->line('        cb->target_slot = new_slot;')
            ->line('        FutureCallback *done_cb = &ctx->callbacks[ctx->callback_count++];')
            ->line('        done_cb->type = FUTURE_CB_DONE | FUTURE_CB_CANCEL;')
            ->line('        done_cb->code = NULL;')
            ->line('        done_cb->target_slot = new_slot;')
            ->line('        /* Keep child alive until parent resolves */')
            ->line('        new_ctx->refcount++;')
            ->line('    }')
            ->line('}')
            ->line('SV *new_slot_sv = newSViv(new_slot);')
            ->line('SV *new_ref = newRV_noinc(new_slot_sv);')
            ->line('sv_bless(new_ref, gv_stashpv("Hypersonic::Future", GV_ADD));')
            ->line('/* refcount already 1 from alloc_slot for Perl ref */')
            ->line('ST(0) = sv_2mortal(new_ref);')
            ->xs_return('1')
            ->xs_end;

    # Sequencing: finally
    $builder->xs_function('xs_future_finally')
            ->xs_preamble
            ->line('int i;')
            ->line('if (items != 2) croak("Usage: $future->finally($code)");')
            ->line('int slot = SvIV(SvRV(ST(0)));')
            ->line('FutureContext *ctx = &future_registry[slot];')
            ->line('SV *code = ST(1);')
            ->line('int new_slot = future_alloc_slot();')
            ->line('if (new_slot < 0) croak("Future registry full");')
            ->line('FutureContext *new_ctx = &future_registry[new_slot];')
            ->line('new_ctx->cancel_target = slot;')
            ->line('/* Keep parent alive while child exists (for chaining) */')
            ->line('ctx->refcount++;')
            ->line('if (ctx->state != FUTURE_STATE_PENDING) {')
            ->line('    dSP;')
            ->line('    ENTER; SAVETMPS;')
            ->line('    PUSHMARK(SP);')
            ->line('    PUTBACK;')
            ->line('    call_sv(code, G_DISCARD);')
            ->line('    FREETMPS; LEAVE;')
            ->line('    new_ctx->state = ctx->state;')
            ->line('    if (ctx->state == FUTURE_STATE_DONE && ctx->result_count > 0) {')
            ->line('        new_ctx->result_values = (SV **)malloc(ctx->result_count * sizeof(SV *));')
            ->line('        for (i = 0; i < ctx->result_count; i++) new_ctx->result_values[i] = SvREFCNT_inc(ctx->result_values[i]);')
            ->line('        new_ctx->result_count = ctx->result_count;')
            ->line('    } else if (ctx->state == FUTURE_STATE_FAILED) {')
            ->line('        if (ctx->fail_message) new_ctx->fail_message = strdup(ctx->fail_message);')
            ->line('        if (ctx->fail_category) new_ctx->fail_category = strdup(ctx->fail_category);')
            ->line('    }')
            ->line('} else {')
            ->line('    if (ctx->callback_count < MAX_CALLBACKS) {')
            ->line('        FutureCallback *cb = &ctx->callbacks[ctx->callback_count++];')
            ->line('        cb->type = FUTURE_CB_READY;')
            ->line('        cb->code = SvREFCNT_inc(code);')
            ->line('        cb->target_slot = new_slot;')
            ->line('        /* Keep child alive until parent resolves */')
            ->line('        new_ctx->refcount++;')
            ->line('    }')
            ->line('}')
            ->line('SV *new_slot_sv = newSViv(new_slot);')
            ->line('SV *new_ref = newRV_noinc(new_slot_sv);')
            ->line('sv_bless(new_ref, gv_stashpv("Hypersonic::Future", GV_ADD));')
            ->line('/* refcount already 1 from alloc_slot for Perl ref */')
            ->line('ST(0) = sv_2mortal(new_ref);')
            ->xs_return('1')
            ->xs_end;

    # Convergent: needs_all - all must succeed
    $builder->xs_function('xs_future_needs_all')
            ->xs_preamble
            ->line('int i;')
            ->line('if (items < 2) croak("Usage: Hypersonic::Future->needs_all(@futures)");')
            ->line('int new_slot = future_alloc_slot();')
            ->line('if (new_slot < 0) croak("Future registry full");')
            ->line('FutureContext *ctx = &future_registry[new_slot];')
            ->line('int count = items - 1;')
            ->line('ctx->subfuture_slots = (int*)malloc(count * sizeof(int));')
            ->line('ctx->subfuture_count = count;')
            ->line('ctx->subfutures_pending = count;')
            ->line('ctx->convergent_mode = 1;')
            ->line('int all_done = 1;')
            ->line('for (i = 0; i < count; i++) {')
            ->line('    SV *sub_sv = ST(i + 1);')
            ->line('    if (!sv_isobject(sub_sv)) croak("needs_all: argument %d is not a Future", i+1);')
            ->line('    int sub_slot = SvIV(SvRV(sub_sv));')
            ->line('    ctx->subfuture_slots[i] = sub_slot;')
            ->line('    FutureContext *sub = &future_registry[sub_slot];')
            ->line('    if (sub->state == FUTURE_STATE_PENDING) {')
            ->line('        all_done = 0;')
            ->line('        /* Register callback on subfuture */')
            ->line('        if (sub->callback_count < MAX_CALLBACKS) {')
            ->line('            FutureCallback *cb = &sub->callbacks[sub->callback_count++];')
            ->line('            cb->type = FUTURE_CB_READY;')
            ->line('            cb->code = NULL;')
            ->line('            cb->target_slot = new_slot;')
            ->line('            ctx->refcount++;')
            ->line('        }')
            ->line('    } else if (sub->state == FUTURE_STATE_FAILED) {')
            ->line('        ctx->state = FUTURE_STATE_FAILED;')
            ->line('        if (sub->fail_message) ctx->fail_message = strdup(sub->fail_message);')
            ->line('        if (sub->fail_category) ctx->fail_category = strdup(sub->fail_category);')
            ->line('        break;')
            ->line('    } else if (sub->state == FUTURE_STATE_CANCELLED) {')
            ->line('        ctx->state = FUTURE_STATE_CANCELLED;')
            ->line('        break;')
            ->line('    } else {')
            ->line('        ctx->subfutures_pending--;')
            ->line('    }')
            ->line('}')
            ->line('if (ctx->state == FUTURE_STATE_PENDING && all_done) {')
            ->line('    /* All already done - collect results as flat list */')
            ->line('    ctx->result_values = (SV**)malloc(count * sizeof(SV*));')
            ->line('    ctx->result_count = 0;')
            ->line('    for (i = 0; i < count; i++) {')
            ->line('        FutureContext *sub = &future_registry[ctx->subfuture_slots[i]];')
            ->line('        if (sub->result_count > 0) {')



( run in 1.280 second using v1.01-cache-2.11-cpan-140bd7fdf52 )