Future-XS
view release on metacpan or search on metacpan
src/future.c view on Meta::CPAN
AV *result = (is_done) ? self->result :
(is_fail) ? self->failure :
NULL;
if(is_done && !(flags & CB_DONE))
return;
if(is_fail && !(flags & CB_FAIL))
return;
if(is_cancelled && !(flags & CB_CANCEL))
return;
if(flags & CB_IS_FUTURE) {
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(CB_NONSEQ_CODE(cb)); // really a Future RV
if(result)
XPUSHs_from_AV(result);
PUTBACK;
if(is_done)
call_method("done", G_VOID);
else if(is_fail)
call_method("fail", G_VOID);
else
call_method("cancel", G_VOID);
FREETMPS;
LEAVE;
}
else if(flags & CB_SEQ_ANY) {
SV *fseq = cb->seq.f;
if(!SvOK(fseq)) {
warn("%" SVf " lost a sequence Future",
SVfARG(future_mortal_selfstr(selfsv)));
return;
}
SV *f2 = invoke_seq_callback(self, selfsv, cb);
if(f2 == fseq)
/* immediate fail */
return;
future_on_cancel(fseq, f2);
if(future_is_ready(f2)) {
if(!future_is_cancelled(f2))
future_on_ready(f2, fseq);
else if(flags & CB_CANCEL)
future_cancel(fseq);
}
else {
struct FutureXS *f2self = get_future(f2);
struct FutureXSCallback cb2 = {
.flags = CB_DONE|CB_FAIL|CB_IS_FUTURE,
.code = sv_rvweaken(newSVsv(fseq)),
};
push_callback(f2self, &cb2);
}
assert(SvREFCNT(f2) == 1);
SvREFCNT_dec(f2);
}
else {
SV *code = CB_NONSEQ_CODE(cb);
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
if(flags & CB_SELF)
XPUSHs(selfsv);
if((flags & CB_RESULT) && result)
XPUSHs_from_AV(result);
PUTBACK;
assert(SvOK(code));
call_sv(code, G_VOID);
FREETMPS;
LEAVE;
}
}
#define revoke_on_cancel(rev) S_revoke_on_cancel(aTHX_ rev)
static void S_revoke_on_cancel(pTHX_ struct FutureXSRevocation *rev)
{
if(rev->toclear_sv_at && SvROK(rev->toclear_sv_at)) {
assert(SvTYPE(rev->toclear_sv_at) <= SVt_PVMG);
assert(SvROK(rev->toclear_sv_at));
sv_set_undef(SvRV(rev->toclear_sv_at));
SvREFCNT_dec(rev->toclear_sv_at);
rev->toclear_sv_at = NULL;
}
if(!SvOK(rev->precedent_f))
return;
struct FutureXS *self = get_future(rev->precedent_f);
self->empty_revocation_slots++;
AV *on_cancel = self->on_cancel;
if(self->empty_revocation_slots >= 8 && on_cancel &&
self->empty_revocation_slots >= AvFILL(on_cancel)/2) {
// Squash up the array to contain only defined values
SV **wrsv = AvARRAY(on_cancel),
**rdsv = AvARRAY(on_cancel),
**end = AvARRAY(on_cancel) + AvFILL(on_cancel);
while(rdsv <= end) {
if(SvOK(*rdsv))
// Keep this one
src/future.c view on Meta::CPAN
if(self->precedent_f) {
SvREFCNT_dec(self->precedent_f);
self->precedent_f = NULL;
}
clear_on_cancel(self);
if(self->revoke_when_ready) {
AV *revocations = self->revoke_when_ready;
for(size_t i = 0; i < av_count(revocations); i++) {
struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(revocations)[i];
revoke_on_cancel(rev);
SvREFCNT_dec(rev->precedent_f);
Safefree(rev);
}
AvFILLp(revocations) = -1;
SvREFCNT_dec(revocations);
self->revoke_when_ready = NULL;
}
if(!self->callbacks)
return;
AV *callbacks = self->callbacks;
struct FutureXSCallback **cbs = (struct FutureXSCallback **)AvARRAY(callbacks);
size_t i, n = av_count(callbacks);
for(i = 0; i < n; i++) {
struct FutureXSCallback *cb = cbs[i];
invoke_callback(self, selfsv, cb);
}
destroy_callbacks(self);
}
#define make_sequence(f1, cb) S_make_sequence(aTHX_ f1, cb)
static SV *S_make_sequence(pTHX_ SV *f1, struct FutureXSCallback *cb)
{
struct FutureXS *self = get_future(f1);
int flags = cb->flags;
if(self->ready) {
// TODO: CB_SEQ_IM*
SV *f2 = invoke_seq_callback(self, f1, cb);
clear_callback(cb);
return f2;
}
SV *fseq = future_new_proto(f1);
if(cb->flags & CB_SEQ_CANCEL)
future_on_cancel(fseq, f1);
cb->flags |= CB_DONE|CB_FAIL;
if(cb->seq.thencode)
cb->seq.thencode = wrap_cb(f1, "sequence", sv_2mortal(cb->seq.thencode));
if(cb->seq.elsecode)
cb->seq.elsecode = wrap_cb(f1, "sequence", sv_2mortal(cb->seq.elsecode));
cb->seq.f = sv_rvweaken(newSVsv(fseq));
push_callback(self, cb);
return fseq;
}
// TODO: move to a hax/ file
#define CvNAME_FILE_LINE(cv) S_CvNAME_FILE_LINE(aTHX_ cv)
static SV *S_CvNAME_FILE_LINE(pTHX_ CV *cv)
{
if(!CvANON(cv)) {
SV *ret = newSVpvf("HvNAME::GvNAME");
return ret;
}
OP *cop = CvSTART(cv);
while(cop && OP_CLASS(cop) != OA_COP)
cop = cop->op_next;
if(!cop)
return newSVpvs("__ANON__");
return newSVpvf("__ANON__(%s line %d)", CopFILE((COP *)cop), CopLINE((COP *)cop));
}
static const char *statestr(struct FutureXS *self)
{
if(!self->ready)
return "pending";
if(self->cancelled)
return "cancelled";
if(self->failure)
return "failed";
return "done";
}
void Future_donev(pTHX_ SV *f, SV **svp, size_t n)
{
struct FutureXS *self = get_future(f);
if(self->cancelled)
return;
if(self->ready)
croak("%" SVf " is already %s and cannot be ->done",
SVfARG(f), statestr(self));
// TODO: test subs
self->result = newAV_svn_dup(svp, n);
mark_ready(self, f, "done");
}
void Future_failv(pTHX_ SV *f, SV **svp, size_t n)
{
struct FutureXS *self = get_future(f);
if(self->cancelled)
return;
src/future.c view on Meta::CPAN
SSize_t count = call_method("details", G_LIST);
SPAGAIN;
SV **retp = SP - count + 1;
for(SSize_t i = 0; i < count; i++)
av_push(failure, SvREFCNT_inc(retp[i]));
SP -= count;
PUTBACK;
FREETMPS;
LEAVE;
}
}
else {
self->failure = newAV_svn_dup(svp, n);
}
mark_ready(self, f, "failed");
}
#define future_failp(f, s) Future_failp(aTHX_ f, s)
void Future_failp(pTHX_ SV *f, const char *s)
{
struct FutureXS *self = get_future(f);
if(self->cancelled)
return;
if(self->ready)
croak("%" SVf " is already %s and cannot be ->fail'ed",
SVfARG(f), statestr(self));
self->failure = newAV();
av_push(self->failure, newSVpv(s, strlen(s)));
mark_ready(self, f, "failed");
}
void Future_on_cancel(pTHX_ SV *f, SV *code)
{
struct FutureXS *self = get_future(f);
if(self->ready)
return;
bool is_future = sv_is_future(code);
// TODO: is_future or callable(code) or croak
if(!self->on_cancel)
self->on_cancel = newAV();
SV *rv = newSVsv((SV *)code);
av_push(self->on_cancel, rv);
if(is_future) {
struct FutureXSRevocation *rev;
Newx(rev, 1, struct FutureXSRevocation);
rev->precedent_f = sv_rvweaken(newSVsv(f));
rev->toclear_sv_at = sv_rvweaken(newRV_inc(rv));
struct FutureXS *codeself = get_future(code);
if(!codeself->revoke_when_ready)
codeself->revoke_when_ready = newAV();
av_push(codeself->revoke_when_ready, (SV *)rev);
}
}
void Future_on_ready(pTHX_ SV *f, SV *code)
{
struct FutureXS *self = get_future(f);
bool is_future = sv_is_future(code);
// TODO: is_future or callable(code) or croak
int flags = CB_ALWAYS|CB_SELF;
if(is_future)
flags |= CB_IS_FUTURE;
struct FutureXSCallback cb = {
.flags = flags,
.code = code,
};
if(self->ready)
invoke_callback(self, f, &cb);
else {
cb.code = wrap_cb(f, "on_ready", cb.code);
push_callback(self, &cb);
}
}
void Future_on_done(pTHX_ SV *f, SV *code)
{
struct FutureXS *self = get_future(f);
bool is_future = sv_is_future(code);
// TODO: is_future or callable(code) or croak
int flags = CB_DONE|CB_RESULT;
if(is_future)
flags |= CB_IS_FUTURE;
struct FutureXSCallback cb = {
.flags = flags,
.code = code,
};
if(self->ready)
invoke_callback(self, f, &cb);
else {
cb.code = wrap_cb(f, "on_done", cb.code);
push_callback(self, &cb);
}
}
void Future_on_fail(pTHX_ SV *f, SV *code)
{
struct FutureXS *self = get_future(f);
src/future.c view on Meta::CPAN
return;
for(Size_t i = 0; i < av_count(self->subs); i++) {
SV *sub = AvARRAY(self->subs)[i];
U8 flags = self->subflags[i];
if(!(flags & SUBFLAG_NO_CANCEL) && !future_is_ready(sub))
future_cancel(sub);
}
}
XS_INTERNAL(sub_on_ready_waitall)
{
dXSARGS;
SV *f = XSANY_sv;
if(!SvOK(f))
return;
/* Make sure self doesn't disappear during this function */
SvREFCNT_inc(SvRV(f));
SAVEFREESV(SvRV(f));
struct FutureXS *self = get_future(f);
self->pending_subs--;
if(self->pending_subs)
XSRETURN(0);
/* TODO: This is really just newAVav() */
self->result = newAV_svn_dup(AvARRAY(self->subs), av_count(self->subs));
mark_ready(self, f, "wait_all");
}
SV *Future_new_waitallv(pTHX_ const char *cls, SV **subs, size_t n)
{
SV *f = future_new_subsv(cls, subs, n);
struct FutureXS *self = get_future(f);
/* Reïnit subs + n */
subs = AvARRAY(self->subs);
n = av_count(self->subs);
self->pending_subs = 0;
for(Size_t i = 0; i < n; i++) {
/* TODO: This should probably use some API function to make it transparent */
if(!future_is_ready(subs[i]))
self->pending_subs++;
}
if(!self->pending_subs) {
self->result = newAV_svn_dup(subs, n);
mark_ready(self, f, "wait_all");
return f;
}
CV *sub_on_ready = newXS(NULL, sub_on_ready_waitall, __FILE__);
cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
GV *gv = gv_fetchpvs("Future::XS::(wait_all callback)", GV_ADDMULTI, SVt_PVCV);
CvGV_set(sub_on_ready, gv);
CvANON_off(sub_on_ready);
for(Size_t i = 0; i < n; i++) {
if(!future_is_ready(subs[i]))
future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
}
SvREFCNT_dec(sub_on_ready);
return f;
}
XS_INTERNAL(sub_on_ready_waitany)
{
dXSARGS;
SV *thissub = ST(0);
SV *f = XSANY_sv;
if(!SvOK(f))
return;
/* Make sure self doesn't disappear during this function */
SvREFCNT_inc(SvRV(f));
SAVEFREESV(SvRV(f));
struct FutureXS *self = get_future(f);
if(self->result || self->failure)
return;
self->pending_subs--;
bool this_cancelled = future_is_cancelled(thissub);
if(self->pending_subs && this_cancelled)
return;
if(this_cancelled) {
future_failp(f, "All component futures were cancelled");
return;
}
else
copy_result(self, thissub);
cancel_pending_subs(self);
mark_ready(self, f, "wait_any");
}
SV *Future_new_waitanyv(pTHX_ const char *cls, SV **subs, size_t n)
{
SV *f = future_new_subsv(cls, subs, n);
struct FutureXS *self = get_future(f);
/* Reïnit subs + n */
subs = AvARRAY(self->subs);
n = av_count(self->subs);
if(!n) {
future_failp(f, "Cannot ->wait_any with no subfutures");
return f;
}
SV *immediate_ready = NULL;
for(Size_t i = 0; i < n; i++) {
/* TODO: This should probably use some API function to make it transparent */
if(future_is_ready(subs[i]) && !future_is_cancelled(subs[i])) {
immediate_ready = subs[i];
break;
}
}
if(immediate_ready) {
copy_result(self, immediate_ready);
cancel_pending_subs(self);
mark_ready(self, f, "wait_any");
return f;
}
self->pending_subs = 0;
CV *sub_on_ready = newXS(NULL, sub_on_ready_waitany, __FILE__);
cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
GV *gv = gv_fetchpvs("Future::XS::(wait_any callback)", GV_ADDMULTI, SVt_PVCV);
CvGV_set(sub_on_ready, gv);
CvANON_off(sub_on_ready);
for(Size_t i = 0; i < n; i++) {
if(future_is_cancelled(subs[i]))
continue;
future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
self->pending_subs++;
}
SvREFCNT_dec(sub_on_ready);
return f;
}
#define compose_needsall_result(self) S_compose_needsall_result(aTHX_ self)
static void S_compose_needsall_result(pTHX_ struct FutureXS *self)
{
AV *result = self->result = newAV();
for(Size_t i = 0; i < av_count(self->subs); i++) {
SV *sub = AvARRAY(self->subs)[i];
struct FutureXS *subself = get_future(sub);
assert(subself->result);
av_push_svn(result, AvARRAY(subself->result), av_count(subself->result));
}
}
XS_INTERNAL(sub_on_ready_needsall)
{
dXSARGS;
SV *thissub = ST(0);
SV *f = XSANY_sv;
if(!SvOK(f))
return;
/* Make sure self doesn't disappear during this function */
SvREFCNT_inc(SvRV(f));
SAVEFREESV(SvRV(f));
struct FutureXS *self = get_future(f);
if(self->result || self->failure)
return;
if(future_is_cancelled(thissub)) {
future_failp(f, "A component future was cancelled");
cancel_pending_subs(self);
return;
}
else if(future_is_failed(thissub)) {
copy_result(self, thissub);
cancel_pending_subs(self);
mark_ready(self, f, "needs_all");
}
else {
self->pending_subs--;
if(self->pending_subs)
return;
compose_needsall_result(self);
mark_ready(self, f, "needs_all");
}
}
SV *Future_new_needsallv(pTHX_ const char *cls, SV **subs, size_t n)
{
SV *f = future_new_subsv(cls, subs, n);
struct FutureXS *self = get_future(f);
/* Reïnit subs + n */
subs = AvARRAY(self->subs);
n = av_count(self->subs);
if(!n) {
future_donev(f, NULL, 0);
return f;
}
SV *immediate_fail = NULL;
for(Size_t i = 0; i < n; i++) {
if(future_is_cancelled(subs[i])) {
future_failp(f, "A component future was cancelled");
cancel_pending_subs(self);
return f;
}
if(future_is_failed(subs[i])) {
immediate_fail = subs[i];
break;
}
}
if(immediate_fail) {
copy_result(self, immediate_fail);
cancel_pending_subs(self);
mark_ready(self, f, "needs_all");
return f;
}
self->pending_subs = 0;
CV *sub_on_ready = newXS(NULL, sub_on_ready_needsall, __FILE__);
cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
GV *gv = gv_fetchpvs("Future::XS::(needs_all callback)", GV_ADDMULTI, SVt_PVCV);
CvGV_set(sub_on_ready, gv);
CvANON_off(sub_on_ready);
for(Size_t i = 0; i < n; i++) {
if(future_is_ready(subs[i]))
continue;
future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
self->pending_subs++;
}
if(!self->pending_subs) {
compose_needsall_result(self);
mark_ready(self, f, "needs_all");
}
SvREFCNT_dec(sub_on_ready);
return f;
}
XS_INTERNAL(sub_on_ready_needsany)
{
dXSARGS;
SV *thissub = ST(0);
SV *f = XSANY_sv;
if(!SvOK(f))
return;
/* Make sure self doesn't disappear during this function */
SvREFCNT_inc(SvRV(f));
SAVEFREESV(SvRV(f));
struct FutureXS *self = get_future(f);
if(self->result || self->failure)
return;
self->pending_subs--;
bool this_cancelled = future_is_cancelled(thissub);
if(self->pending_subs && this_cancelled)
return;
if(this_cancelled) {
future_failp(f, "All component futures were cancelled");
}
else if(future_is_failed(thissub)) {
if(self->pending_subs)
return;
copy_result(self, thissub);
mark_ready(self, f, "needs_any");
}
else {
copy_result(self, thissub);
cancel_pending_subs(self);
mark_ready(self, f, "needs_any");
}
}
SV *Future_new_needsanyv(pTHX_ const char *cls, SV **subs, size_t n)
{
SV *f = future_new_subsv(cls, subs, n);
struct FutureXS *self = get_future(f);
/* Reïnit subs + n */
subs = AvARRAY(self->subs);
n = av_count(self->subs);
if(!n) {
future_failp(f, "Cannot ->needs_any with no subfutures");
return f;
}
SV *immediate_done = NULL;
for(Size_t i = 0; i < n; i++) {
if(future_is_done(subs[i])) {
immediate_done = subs[i];
break;
}
}
if(immediate_done) {
copy_result(self, immediate_done);
cancel_pending_subs(self);
mark_ready(self, f, "needs_any");
return f;
}
self->pending_subs = 0;
CV *sub_on_ready = newXS(NULL, sub_on_ready_needsany, __FILE__);
cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
GV *gv = gv_fetchpvs("Future::XS::(needs_any callback)", GV_ADDMULTI, SVt_PVCV);
CvGV_set(sub_on_ready, gv);
CvANON_off(sub_on_ready);
for(Size_t i = 0; i < n; i++) {
if(future_is_ready(subs[i]))
continue;
future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
self->pending_subs++;
}
if(!self->pending_subs) {
copy_result(self, subs[n-1]);
mark_ready(self, f, "needs_any");
}
SvREFCNT_dec(sub_on_ready);
return f;
}
Size_t Future_mPUSH_subs(pTHX_ SV *f, enum FutureSubFilter filter)
{
dSP;
struct FutureXS *self = get_future(f);
Size_t ret = 0;
for(Size_t i = 0; self->subs && i < av_count(self->subs); i++) {
SV *sub = AvARRAY(self->subs)[i];
bool want;
switch(filter) {
case FUTURE_SUBS_PENDING:
want = !future_is_ready(sub);
break;
case FUTURE_SUBS_READY:
want = future_is_ready(sub);
break;
case FUTURE_SUBS_DONE:
want = future_is_done(sub);
break;
case FUTURE_SUBS_FAILED:
want = future_is_failed(sub);
break;
case FUTURE_SUBS_CANCELLED:
want = future_is_cancelled(sub);
break;
}
if(want) {
XPUSHs(sv_mortalcopy(sub));
ret++;
}
( run in 1.436 second using v1.01-cache-2.11-cpan-39bf76dae61 )