Hypersonic
view release on metacpan or search on metacpan
lib/Hypersonic/Future.pm view on Meta::CPAN
$builder->xs_function('xs_future_on_fail')
->xs_preamble
->line('if (items != 2) croak("Usage: $future->on_fail($code)");')
->line('int slot = SvIV(SvRV(ST(0)));')
->line('FutureContext *ctx = &future_registry[slot];')
->line('SV *code = ST(1);')
->line('if (ctx->state == FUTURE_STATE_FAILED) {')
->line(' dSP;')
->line(' ENTER; SAVETMPS;')
->line(' PUSHMARK(SP);')
->line(' if (ctx->fail_message) XPUSHs(sv_2mortal(newSVpv(ctx->fail_message, 0)));')
->line(' if (ctx->fail_category) XPUSHs(sv_2mortal(newSVpv(ctx->fail_category, 0)));')
->line(' PUTBACK;')
->line(' call_sv(code, G_DISCARD);')
->line(' FREETMPS; LEAVE;')
->line('} else if (ctx->state == FUTURE_STATE_PENDING && ctx->callback_count < MAX_CALLBACKS) {')
->line(' FutureCallback *cb = &ctx->callbacks[ctx->callback_count++];')
->line(' cb->type = FUTURE_CB_FAIL;')
->line(' cb->code = SvREFCNT_inc(code);')
->line(' cb->target_slot = -1;')
->line('}')
->line('ST(0) = ST(0);')
->xs_return('1')
->xs_end;
$builder->xs_function('xs_future_on_cancel')
->xs_preamble
->line('if (items != 2) croak("Usage: $future->on_cancel($code)");')
->line('int slot = SvIV(SvRV(ST(0)));')
->line('FutureContext *ctx = &future_registry[slot];')
->line('SV *code = ST(1);')
->line('if (ctx->state == FUTURE_STATE_CANCELLED) {')
->line(' dSP;')
->line(' ENTER; SAVETMPS;')
->line(' PUSHMARK(SP);')
->line(' PUTBACK;')
->line(' call_sv(code, G_DISCARD);')
->line(' FREETMPS; LEAVE;')
->line('} else if (ctx->state == FUTURE_STATE_PENDING && ctx->callback_count < MAX_CALLBACKS) {')
->line(' FutureCallback *cb = &ctx->callbacks[ctx->callback_count++];')
->line(' cb->type = FUTURE_CB_CANCEL;')
->line(' cb->code = SvREFCNT_inc(code);')
->line(' cb->target_slot = -1;')
->line('}')
->line('ST(0) = ST(0);')
->xs_return('1')
->xs_end;
# Sequencing: then
$builder->xs_function('xs_future_then')
->xs_preamble
->line('int i;')
->line('if (items != 2) croak("Usage: $future->then($code)");')
->line('int slot = SvIV(SvRV(ST(0)));')
->line('FutureContext *ctx = &future_registry[slot];')
->line('SV *code = ST(1);')
->line('int new_slot = future_alloc_slot();')
->line('if (new_slot < 0) croak("Future registry full");')
->line('FutureContext *new_ctx = &future_registry[new_slot];')
->line('new_ctx->cancel_target = slot;')
->line('/* Keep parent alive while child exists (for chaining) */')
->line('ctx->refcount++;')
->line('if (ctx->state == FUTURE_STATE_DONE) {')
->line(' dSP;')
->line(' ENTER; SAVETMPS;')
->line(' PUSHMARK(SP);')
->line(' for (i = 0; i < ctx->result_count; i++) XPUSHs(ctx->result_values[i]);')
->line(' PUTBACK;')
->line(' int count = call_sv(code, G_ARRAY);')
->line(' SPAGAIN;')
->line(' if (count > 0) {')
->line(' new_ctx->result_values = (SV **)malloc(count * sizeof(SV *));')
->line(' for (i = count - 1; i >= 0; i--) new_ctx->result_values[i] = SvREFCNT_inc(POPs);')
->line(' new_ctx->result_count = count;')
->line(' }')
->line(' new_ctx->state = FUTURE_STATE_DONE;')
->line(' PUTBACK;')
->line(' FREETMPS; LEAVE;')
->line('} else if (ctx->state == FUTURE_STATE_FAILED) {')
->line(' new_ctx->state = FUTURE_STATE_FAILED;')
->line(' if (ctx->fail_message) new_ctx->fail_message = strdup(ctx->fail_message);')
->line(' if (ctx->fail_category) new_ctx->fail_category = strdup(ctx->fail_category);')
->line('} else if (ctx->state == FUTURE_STATE_CANCELLED) {')
->line(' new_ctx->state = FUTURE_STATE_CANCELLED;')
->line('} else {')
->line(' if (ctx->callback_count < MAX_CALLBACKS - 1) {')
->line(' FutureCallback *cb = &ctx->callbacks[ctx->callback_count++];')
->line(' cb->type = FUTURE_CB_DONE;')
->line(' cb->code = SvREFCNT_inc(code);')
->line(' cb->target_slot = new_slot;')
->line(' FutureCallback *fail_cb = &ctx->callbacks[ctx->callback_count++];')
->line(' fail_cb->type = FUTURE_CB_FAIL | FUTURE_CB_CANCEL;')
->line(' fail_cb->code = NULL;')
->line(' fail_cb->target_slot = new_slot;')
->line(' /* Keep child alive until parent resolves */')
->line(' new_ctx->refcount++;')
->line(' }')
->line('}')
->line('SV *new_slot_sv = newSViv(new_slot);')
->line('SV *new_ref = newRV_noinc(new_slot_sv);')
->line('sv_bless(new_ref, gv_stashpv("Hypersonic::Future", GV_ADD));')
->line('/* refcount already 1 from alloc_slot for Perl ref */')
->line('ST(0) = sv_2mortal(new_ref);')
->xs_return('1')
->xs_end;
# Sequencing: catch
$builder->xs_function('xs_future_catch')
->xs_preamble
->line('int i;')
->line('if (items != 2) croak("Usage: $future->catch($code)");')
->line('int slot = SvIV(SvRV(ST(0)));')
->line('FutureContext *ctx = &future_registry[slot];')
->line('SV *code = ST(1);')
->line('int new_slot = future_alloc_slot();')
->line('if (new_slot < 0) croak("Future registry full");')
->line('FutureContext *new_ctx = &future_registry[new_slot];')
->line('new_ctx->cancel_target = slot;')
->line('/* Keep parent alive while child exists (for chaining) */')
->line('ctx->refcount++;')
->line('if (ctx->state == FUTURE_STATE_FAILED) {')
->line(' dSP;')
->line(' ENTER; SAVETMPS;')
->line(' PUSHMARK(SP);')
->line(' if (ctx->fail_message) XPUSHs(sv_2mortal(newSVpv(ctx->fail_message, 0)));')
->line(' if (ctx->fail_category) XPUSHs(sv_2mortal(newSVpv(ctx->fail_category, 0)));')
->line(' PUTBACK;')
->line(' int count = call_sv(code, G_ARRAY);')
->line(' SPAGAIN;')
->line(' if (count > 0) {')
->line(' new_ctx->result_values = (SV **)malloc(count * sizeof(SV *));')
->line(' for (i = count - 1; i >= 0; i--) new_ctx->result_values[i] = SvREFCNT_inc(POPs);')
->line(' new_ctx->result_count = count;')
->line(' }')
->line(' new_ctx->state = FUTURE_STATE_DONE;')
->line(' PUTBACK;')
->line(' FREETMPS; LEAVE;')
->line('} else if (ctx->state == FUTURE_STATE_DONE) {')
->line(' new_ctx->state = FUTURE_STATE_DONE;')
->line(' if (ctx->result_count > 0) {')
->line(' new_ctx->result_values = (SV **)malloc(ctx->result_count * sizeof(SV *));')
->line(' for (i = 0; i < ctx->result_count; i++) new_ctx->result_values[i] = SvREFCNT_inc(ctx->result_values[i]);')
->line(' new_ctx->result_count = ctx->result_count;')
->line(' }')
->line('} else if (ctx->state == FUTURE_STATE_CANCELLED) {')
->line(' new_ctx->state = FUTURE_STATE_CANCELLED;')
->line('} else {')
->line(' if (ctx->callback_count < MAX_CALLBACKS - 1) {')
->line(' FutureCallback *cb = &ctx->callbacks[ctx->callback_count++];')
->line(' cb->type = FUTURE_CB_FAIL;')
->line(' cb->code = SvREFCNT_inc(code);')
->line(' cb->target_slot = new_slot;')
->line(' FutureCallback *done_cb = &ctx->callbacks[ctx->callback_count++];')
->line(' done_cb->type = FUTURE_CB_DONE | FUTURE_CB_CANCEL;')
->line(' done_cb->code = NULL;')
->line(' done_cb->target_slot = new_slot;')
->line(' /* Keep child alive until parent resolves */')
->line(' new_ctx->refcount++;')
->line(' }')
->line('}')
->line('SV *new_slot_sv = newSViv(new_slot);')
->line('SV *new_ref = newRV_noinc(new_slot_sv);')
->line('sv_bless(new_ref, gv_stashpv("Hypersonic::Future", GV_ADD));')
->line('/* refcount already 1 from alloc_slot for Perl ref */')
->line('ST(0) = sv_2mortal(new_ref);')
->xs_return('1')
->xs_end;
# Sequencing: finally
$builder->xs_function('xs_future_finally')
->xs_preamble
->line('int i;')
->line('if (items != 2) croak("Usage: $future->finally($code)");')
->line('int slot = SvIV(SvRV(ST(0)));')
->line('FutureContext *ctx = &future_registry[slot];')
->line('SV *code = ST(1);')
->line('int new_slot = future_alloc_slot();')
->line('if (new_slot < 0) croak("Future registry full");')
->line('FutureContext *new_ctx = &future_registry[new_slot];')
->line('new_ctx->cancel_target = slot;')
->line('/* Keep parent alive while child exists (for chaining) */')
->line('ctx->refcount++;')
->line('if (ctx->state != FUTURE_STATE_PENDING) {')
->line(' dSP;')
->line(' ENTER; SAVETMPS;')
->line(' PUSHMARK(SP);')
->line(' PUTBACK;')
->line(' call_sv(code, G_DISCARD);')
->line(' FREETMPS; LEAVE;')
->line(' new_ctx->state = ctx->state;')
->line(' if (ctx->state == FUTURE_STATE_DONE && ctx->result_count > 0) {')
->line(' new_ctx->result_values = (SV **)malloc(ctx->result_count * sizeof(SV *));')
->line(' for (i = 0; i < ctx->result_count; i++) new_ctx->result_values[i] = SvREFCNT_inc(ctx->result_values[i]);')
->line(' new_ctx->result_count = ctx->result_count;')
->line(' } else if (ctx->state == FUTURE_STATE_FAILED) {')
->line(' if (ctx->fail_message) new_ctx->fail_message = strdup(ctx->fail_message);')
->line(' if (ctx->fail_category) new_ctx->fail_category = strdup(ctx->fail_category);')
->line(' }')
->line('} else {')
->line(' if (ctx->callback_count < MAX_CALLBACKS) {')
->line(' FutureCallback *cb = &ctx->callbacks[ctx->callback_count++];')
->line(' cb->type = FUTURE_CB_READY;')
->line(' cb->code = SvREFCNT_inc(code);')
->line(' cb->target_slot = new_slot;')
->line(' /* Keep child alive until parent resolves */')
->line(' new_ctx->refcount++;')
->line(' }')
->line('}')
->line('SV *new_slot_sv = newSViv(new_slot);')
->line('SV *new_ref = newRV_noinc(new_slot_sv);')
->line('sv_bless(new_ref, gv_stashpv("Hypersonic::Future", GV_ADD));')
->line('/* refcount already 1 from alloc_slot for Perl ref */')
->line('ST(0) = sv_2mortal(new_ref);')
->xs_return('1')
->xs_end;
# Convergent: needs_all - all must succeed
$builder->xs_function('xs_future_needs_all')
->xs_preamble
->line('int i;')
->line('if (items < 2) croak("Usage: Hypersonic::Future->needs_all(@futures)");')
->line('int new_slot = future_alloc_slot();')
->line('if (new_slot < 0) croak("Future registry full");')
->line('FutureContext *ctx = &future_registry[new_slot];')
->line('int count = items - 1;')
->line('ctx->subfuture_slots = (int*)malloc(count * sizeof(int));')
->line('ctx->subfuture_count = count;')
->line('ctx->subfutures_pending = count;')
->line('ctx->convergent_mode = 1;')
->line('int all_done = 1;')
->line('for (i = 0; i < count; i++) {')
->line(' SV *sub_sv = ST(i + 1);')
->line(' if (!sv_isobject(sub_sv)) croak("needs_all: argument %d is not a Future", i+1);')
->line(' int sub_slot = SvIV(SvRV(sub_sv));')
->line(' ctx->subfuture_slots[i] = sub_slot;')
->line(' FutureContext *sub = &future_registry[sub_slot];')
->line(' if (sub->state == FUTURE_STATE_PENDING) {')
->line(' all_done = 0;')
->line(' /* Register callback on subfuture */')
->line(' if (sub->callback_count < MAX_CALLBACKS) {')
->line(' FutureCallback *cb = &sub->callbacks[sub->callback_count++];')
->line(' cb->type = FUTURE_CB_READY;')
->line(' cb->code = NULL;')
->line(' cb->target_slot = new_slot;')
->line(' ctx->refcount++;')
->line(' }')
->line(' } else if (sub->state == FUTURE_STATE_FAILED) {')
->line(' ctx->state = FUTURE_STATE_FAILED;')
->line(' if (sub->fail_message) ctx->fail_message = strdup(sub->fail_message);')
->line(' if (sub->fail_category) ctx->fail_category = strdup(sub->fail_category);')
->line(' break;')
->line(' } else if (sub->state == FUTURE_STATE_CANCELLED) {')
->line(' ctx->state = FUTURE_STATE_CANCELLED;')
->line(' break;')
->line(' } else {')
->line(' ctx->subfutures_pending--;')
->line(' }')
->line('}')
->line('if (ctx->state == FUTURE_STATE_PENDING && all_done) {')
->line(' /* All already done - collect results as flat list */')
->line(' ctx->result_values = (SV**)malloc(count * sizeof(SV*));')
->line(' ctx->result_count = 0;')
->line(' for (i = 0; i < count; i++) {')
->line(' FutureContext *sub = &future_registry[ctx->subfuture_slots[i]];')
->line(' if (sub->result_count > 0) {')
( run in 1.280 second using v1.01-cache-2.11-cpan-140bd7fdf52 )