Future-Batch-XS

 view release on metacpan or  search on metacpan

lib/Future/Batch/XS.xs  view on Meta::CPAN

            SV **svp = hv_fetch(self_hv, "loop", 4, 0);
            loop_sv = (svp && SvOK(*svp)) ? *svp : NULL;
        }
        
        total = av_len(items_av) + 1;
        
        /* Allocate BatchState */
        Newxz(state, 1, BatchState);
        state->items = (AV*)SvREFCNT_inc((SV*)items_av);
        state->concurrent = concurrent;
        state->fail_fast = fail_fast ? TRUE : FALSE;
        state->total = total;
        state->in_flight = 0;
        state->finished = 0;
        state->aborted = FALSE;
        
        /* Create results array */
        state->results = newAV();
        av_extend(state->results, total - 1);
        for (i = 0; i < total; i++) {
            av_push(state->results, newSV(0));
        }
        
        /* Create queue */
        state->queue = newAV();
        av_extend(state->queue, total - 1);
        for (i = 0; i < total; i++) {
            av_push(state->queue, newSViv(i));
        }
        
        /* Create errors array */
        state->errors = newAV();
        
        /* Store callbacks */
        if (worker && SvOK(worker)) {
            state->worker = newSVsv(worker);
        }
        if (on_progress && SvOK(on_progress)) {
            state->on_progress = newSVsv(on_progress);
        }
        if (loop_sv && SvOK(loop_sv)) {
            state->loop = newSVsv(loop_sv);
        }
        
        /* Create result Future */
        ENTER; SAVETMPS; PUSHMARK(SP);
        mXPUSHs(newSVpv("Future", 0));
        PUTBACK;
        call_method("new", G_SCALAR);
        SPAGAIN;
        state->result_future = newSVsv(POPs);
        PUTBACK; FREETMPS; LEAVE;
        
        /* Create state object */
        state_sv = new_state_sv(aTHX_ state);
        
        /* Run the batch - fully in XS! */
        run_loop(aTHX_ state_sv);
        
        RETVAL = newSVsv(state->result_future);
        /* Don't dec state_sv - callbacks hold copies that keep it alive */
        /* It will be freed when all callbacks complete and are GC'd */
    }
OUTPUT:
    RETVAL

SV *
batch(...)
PREINIT:
    dSP;
    SV *obj;
    IV concurrent = 10;
    IV fail_fast = 0;
    SV *on_progress = &PL_sv_undef;
    SV *loop_sv = &PL_sv_undef;
    AV *items_av = NULL;
    SV *worker = NULL;
    IV i;
CODE:
    for (i = 0; i < items; i += 2) {
        if (i + 1 < items) {
            const char *key = SvPV_nolen(ST(i));
            SV *val = ST(i + 1);
            if (strEQ(key, "concurrent") && SvOK(val)) {
                concurrent = SvIV(val);
            } else if (strEQ(key, "fail_fast") && SvOK(val)) {
                fail_fast = SvTRUE(val) ? 1 : 0;
            } else if (strEQ(key, "on_progress") && SvOK(val)) {
                on_progress = val;
            } else if (strEQ(key, "loop") && SvOK(val)) {
                loop_sv = val;
            } else if (strEQ(key, "items") && SvROK(val)) {
                items_av = (AV*)SvRV(val);
            } else if (strEQ(key, "worker") && SvROK(val)) {
                worker = val;
            }
        }
    }
    
    ENTER; SAVETMPS; PUSHMARK(SP);
    mXPUSHs(newSVpv("Future::Batch::XS", 0));
    mXPUSHs(newSVpv("concurrent", 0));
    mXPUSHi(concurrent);
    mXPUSHs(newSVpv("fail_fast", 0));
    mXPUSHi(fail_fast);
    mXPUSHs(newSVpv("on_progress", 0));
    XPUSHs(on_progress);
    mXPUSHs(newSVpv("loop", 0));
    XPUSHs(loop_sv);
    PUTBACK;
    call_method("new", G_SCALAR);
    SPAGAIN;
    obj = newSVsv(POPs);
    PUTBACK; FREETMPS; LEAVE;
    
    ENTER; SAVETMPS; PUSHMARK(SP);
    XPUSHs(obj);
    mXPUSHs(newSVpv("items", 0));
    if (items_av) {
        XPUSHs(sv_2mortal(newRV_inc((SV*)items_av)));
    } else {



( run in 1.403 second using v1.01-cache-2.11-cpan-5a3173703d6 )