Future-Batch-XS
view release on metacpan or search on metacpan
lib/Future/Batch/XS.xs view on Meta::CPAN
SV **svp = hv_fetch(self_hv, "loop", 4, 0);
loop_sv = (svp && SvOK(*svp)) ? *svp : NULL;
}
total = av_len(items_av) + 1;
/* Allocate BatchState */
Newxz(state, 1, BatchState);
state->items = (AV*)SvREFCNT_inc((SV*)items_av);
state->concurrent = concurrent;
state->fail_fast = fail_fast ? TRUE : FALSE;
state->total = total;
state->in_flight = 0;
state->finished = 0;
state->aborted = FALSE;
/* Create results array */
state->results = newAV();
av_extend(state->results, total - 1);
for (i = 0; i < total; i++) {
av_push(state->results, newSV(0));
}
/* Create queue */
state->queue = newAV();
av_extend(state->queue, total - 1);
for (i = 0; i < total; i++) {
av_push(state->queue, newSViv(i));
}
/* Create errors array */
state->errors = newAV();
/* Store callbacks */
if (worker && SvOK(worker)) {
state->worker = newSVsv(worker);
}
if (on_progress && SvOK(on_progress)) {
state->on_progress = newSVsv(on_progress);
}
if (loop_sv && SvOK(loop_sv)) {
state->loop = newSVsv(loop_sv);
}
/* Create result Future */
ENTER; SAVETMPS; PUSHMARK(SP);
mXPUSHs(newSVpv("Future", 0));
PUTBACK;
call_method("new", G_SCALAR);
SPAGAIN;
state->result_future = newSVsv(POPs);
PUTBACK; FREETMPS; LEAVE;
/* Create state object */
state_sv = new_state_sv(aTHX_ state);
/* Run the batch - fully in XS! */
run_loop(aTHX_ state_sv);
RETVAL = newSVsv(state->result_future);
/* Don't dec state_sv - callbacks hold copies that keep it alive */
/* It will be freed when all callbacks complete and are GC'd */
}
OUTPUT:
RETVAL
SV *
batch(...)
PREINIT:
dSP;
SV *obj;
IV concurrent = 10;
IV fail_fast = 0;
SV *on_progress = &PL_sv_undef;
SV *loop_sv = &PL_sv_undef;
AV *items_av = NULL;
SV *worker = NULL;
IV i;
CODE:
for (i = 0; i < items; i += 2) {
if (i + 1 < items) {
const char *key = SvPV_nolen(ST(i));
SV *val = ST(i + 1);
if (strEQ(key, "concurrent") && SvOK(val)) {
concurrent = SvIV(val);
} else if (strEQ(key, "fail_fast") && SvOK(val)) {
fail_fast = SvTRUE(val) ? 1 : 0;
} else if (strEQ(key, "on_progress") && SvOK(val)) {
on_progress = val;
} else if (strEQ(key, "loop") && SvOK(val)) {
loop_sv = val;
} else if (strEQ(key, "items") && SvROK(val)) {
items_av = (AV*)SvRV(val);
} else if (strEQ(key, "worker") && SvROK(val)) {
worker = val;
}
}
}
ENTER; SAVETMPS; PUSHMARK(SP);
mXPUSHs(newSVpv("Future::Batch::XS", 0));
mXPUSHs(newSVpv("concurrent", 0));
mXPUSHi(concurrent);
mXPUSHs(newSVpv("fail_fast", 0));
mXPUSHi(fail_fast);
mXPUSHs(newSVpv("on_progress", 0));
XPUSHs(on_progress);
mXPUSHs(newSVpv("loop", 0));
XPUSHs(loop_sv);
PUTBACK;
call_method("new", G_SCALAR);
SPAGAIN;
obj = newSVsv(POPs);
PUTBACK; FREETMPS; LEAVE;
ENTER; SAVETMPS; PUSHMARK(SP);
XPUSHs(obj);
mXPUSHs(newSVpv("items", 0));
if (items_av) {
XPUSHs(sv_2mortal(newRV_inc((SV*)items_av)));
} else {
( run in 1.403 second using v1.01-cache-2.11-cpan-5a3173703d6 )