Future-XS

 view release on metacpan or  search on metacpan

src/future.c  view on Meta::CPAN


  AV *result = (is_done) ? self->result :
               (is_fail) ? self->failure :
               NULL;

  if(is_done && !(flags & CB_DONE))
    return;
  if(is_fail && !(flags & CB_FAIL))
    return;
  if(is_cancelled && !(flags & CB_CANCEL))
    return;

  if(flags & CB_IS_FUTURE) {
    dSP;

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    XPUSHs(CB_NONSEQ_CODE(cb)); // really a Future RV
    if(result)
      XPUSHs_from_AV(result);

    PUTBACK;
    if(is_done)
      call_method("done", G_VOID);
    else if(is_fail)
      call_method("fail", G_VOID);
    else
      call_method("cancel", G_VOID);

    FREETMPS;
    LEAVE;
  }
  else if(flags & CB_SEQ_ANY) {
    SV *fseq  = cb->seq.f;

    if(!SvOK(fseq)) {
      warn("%" SVf " lost a sequence Future",
          SVfARG(future_mortal_selfstr(selfsv)));
      return;
    }

    SV *f2 = invoke_seq_callback(self, selfsv, cb);
    if(f2 == fseq)
      /* immediate fail */
      return;

    future_on_cancel(fseq, f2);

    if(future_is_ready(f2)) {
      if(!future_is_cancelled(f2))
        future_on_ready(f2, fseq);
      else if(flags & CB_CANCEL)
        future_cancel(fseq);
    }
    else {
      struct FutureXS *f2self = get_future(f2);
      struct FutureXSCallback cb2 = {
        .flags = CB_DONE|CB_FAIL|CB_IS_FUTURE,
        .code  = sv_rvweaken(newSVsv(fseq)),
      };
      push_callback(f2self, &cb2);
    }

    assert(SvREFCNT(f2) == 1);
    SvREFCNT_dec(f2);
  }
  else {
    SV *code = CB_NONSEQ_CODE(cb);

    dSP;

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    if(flags & CB_SELF)
      XPUSHs(selfsv);
    if((flags & CB_RESULT) && result)
      XPUSHs_from_AV(result);

    PUTBACK;
    assert(SvOK(code));
    call_sv(code, G_VOID);

    FREETMPS;
    LEAVE;
  }
}

#define revoke_on_cancel(rev)  S_revoke_on_cancel(aTHX_ rev)
static void S_revoke_on_cancel(pTHX_ struct FutureXSRevocation *rev)
{
  if(rev->toclear_sv_at && SvROK(rev->toclear_sv_at)) {
    assert(SvTYPE(rev->toclear_sv_at) <= SVt_PVMG);
    assert(SvROK(rev->toclear_sv_at));
    sv_set_undef(SvRV(rev->toclear_sv_at));
    SvREFCNT_dec(rev->toclear_sv_at);
    rev->toclear_sv_at = NULL;
  }

  if(!SvOK(rev->precedent_f))
    return;

  struct FutureXS *self = get_future(rev->precedent_f);

  self->empty_revocation_slots++;

  AV *on_cancel = self->on_cancel;
  if(self->empty_revocation_slots >= 8 && on_cancel &&
      self->empty_revocation_slots >= AvFILL(on_cancel)/2) {

    // Squash up the array to contain only defined values
    SV **wrsv = AvARRAY(on_cancel),
       **rdsv = AvARRAY(on_cancel),
       **end  = AvARRAY(on_cancel) + AvFILL(on_cancel);

    while(rdsv <= end) {
      if(SvOK(*rdsv))
        // Keep this one

src/future.c  view on Meta::CPAN

  if(self->precedent_f) {
    SvREFCNT_dec(self->precedent_f);
    self->precedent_f = NULL;
  }

  clear_on_cancel(self);
  if(self->revoke_when_ready) {
    AV *revocations = self->revoke_when_ready;
    for(size_t i = 0; i < av_count(revocations); i++) {
      struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(revocations)[i];
      revoke_on_cancel(rev);

      SvREFCNT_dec(rev->precedent_f);
      Safefree(rev);
    }
    AvFILLp(revocations) = -1;
    SvREFCNT_dec(revocations);

    self->revoke_when_ready = NULL;
  }

  if(!self->callbacks)
    return;

  AV *callbacks = self->callbacks;

  struct FutureXSCallback **cbs = (struct FutureXSCallback **)AvARRAY(callbacks);
  size_t i, n = av_count(callbacks);
  for(i = 0; i < n; i++) {
    struct FutureXSCallback *cb = cbs[i];
    invoke_callback(self, selfsv, cb);
  }

  destroy_callbacks(self);
}

#define make_sequence(f1, cb)  S_make_sequence(aTHX_ f1, cb)
static SV *S_make_sequence(pTHX_ SV *f1, struct FutureXSCallback *cb)
{
  struct FutureXS *self = get_future(f1);

  int flags = cb->flags;

  if(self->ready) {
    // TODO: CB_SEQ_IM*

    SV *f2 = invoke_seq_callback(self, f1, cb);
    clear_callback(cb);
    return f2;
  }

  SV *fseq = future_new_proto(f1);
  if(cb->flags & CB_SEQ_CANCEL)
    future_on_cancel(fseq, f1);

  cb->flags |= CB_DONE|CB_FAIL;
  if(cb->seq.thencode)
    cb->seq.thencode = wrap_cb(f1, "sequence", sv_2mortal(cb->seq.thencode));
  if(cb->seq.elsecode)
    cb->seq.elsecode = wrap_cb(f1, "sequence", sv_2mortal(cb->seq.elsecode));
  cb->seq.f = sv_rvweaken(newSVsv(fseq));

  push_callback(self, cb);

  return fseq;
}

// TODO: move to a hax/ file
#define CvNAME_FILE_LINE(cv)  S_CvNAME_FILE_LINE(aTHX_ cv)
static SV *S_CvNAME_FILE_LINE(pTHX_ CV *cv)
{
  if(!CvANON(cv)) {
    SV *ret = newSVpvf("HvNAME::GvNAME");
    return ret;
  }

  OP *cop = CvSTART(cv);
  while(cop && OP_CLASS(cop) != OA_COP)
    cop = cop->op_next;

  if(!cop)
    return newSVpvs("__ANON__");

  return newSVpvf("__ANON__(%s line %d)", CopFILE((COP *)cop), CopLINE((COP *)cop));
}

static const char *statestr(struct FutureXS *self)
{
  if(!self->ready)
    return "pending";
  if(self->cancelled)
    return "cancelled";
  if(self->failure)
    return "failed";

  return "done";
}

void Future_donev(pTHX_ SV *f, SV **svp, size_t n)
{
  struct FutureXS *self = get_future(f);

  if(self->cancelled)
    return;

  if(self->ready)
    croak("%" SVf " is already %s and cannot be ->done",
        SVfARG(f), statestr(self));
  // TODO: test subs

  self->result = newAV_svn_dup(svp, n);
  mark_ready(self, f, "done");
}

void Future_failv(pTHX_ SV *f, SV **svp, size_t n)
{
  struct FutureXS *self = get_future(f);

  if(self->cancelled)
    return;

src/future.c  view on Meta::CPAN


      SSize_t count = call_method("details", G_LIST);

      SPAGAIN;

      SV **retp = SP - count + 1;

      for(SSize_t i = 0; i < count; i++)
        av_push(failure, SvREFCNT_inc(retp[i]));
      SP -= count;

      PUTBACK;
      FREETMPS;
      LEAVE;
    }
  }
  else {
    self->failure = newAV_svn_dup(svp, n);
  }

  mark_ready(self, f, "failed");
}

#define future_failp(f, s)  Future_failp(aTHX_ f, s)
void Future_failp(pTHX_ SV *f, const char *s)
{
  struct FutureXS *self = get_future(f);

  if(self->cancelled)
    return;

  if(self->ready)
    croak("%" SVf " is already %s and cannot be ->fail'ed",
        SVfARG(f), statestr(self));

  self->failure = newAV();
  av_push(self->failure, newSVpv(s, strlen(s)));
  mark_ready(self, f, "failed");
}

void Future_on_cancel(pTHX_ SV *f, SV *code)
{
  struct FutureXS *self = get_future(f);

  if(self->ready)
    return;

  bool is_future = sv_is_future(code);
  // TODO: is_future or callable(code) or croak

  if(!self->on_cancel)
    self->on_cancel = newAV();

  SV *rv = newSVsv((SV *)code);
  av_push(self->on_cancel, rv);

  if(is_future) {
    struct FutureXSRevocation *rev;
    Newx(rev, 1, struct FutureXSRevocation);

    rev->precedent_f = sv_rvweaken(newSVsv(f));
    rev->toclear_sv_at = sv_rvweaken(newRV_inc(rv));

    struct FutureXS *codeself = get_future(code);
    if(!codeself->revoke_when_ready)
      codeself->revoke_when_ready = newAV();

    av_push(codeself->revoke_when_ready, (SV *)rev);
  }
}

void Future_on_ready(pTHX_ SV *f, SV *code)
{
  struct FutureXS *self = get_future(f);

  bool is_future = sv_is_future(code);
  // TODO: is_future or callable(code) or croak

  int flags = CB_ALWAYS|CB_SELF;
  if(is_future)
    flags |= CB_IS_FUTURE;

  struct FutureXSCallback cb = {
    .flags = flags,
    .code  = code,
  };

  if(self->ready)
    invoke_callback(self, f, &cb);
  else {
    cb.code = wrap_cb(f, "on_ready", cb.code);
    push_callback(self, &cb);
  }
}

void Future_on_done(pTHX_ SV *f, SV *code)
{
  struct FutureXS *self = get_future(f);

  bool is_future = sv_is_future(code);
  // TODO: is_future or callable(code) or croak

  int flags = CB_DONE|CB_RESULT;
  if(is_future)
    flags |= CB_IS_FUTURE;

  struct FutureXSCallback cb = {
    .flags = flags,
    .code  = code,
  };

  if(self->ready)
    invoke_callback(self, f, &cb);
  else {
    cb.code = wrap_cb(f, "on_done", cb.code);
    push_callback(self, &cb);
  }
}

void Future_on_fail(pTHX_ SV *f, SV *code)
{
  struct FutureXS *self = get_future(f);

src/future.c  view on Meta::CPAN

    return;

  for(Size_t i = 0; i < av_count(self->subs); i++) {
    SV *sub = AvARRAY(self->subs)[i];
    U8 flags = self->subflags[i];

    if(!(flags & SUBFLAG_NO_CANCEL) && !future_is_ready(sub))
      future_cancel(sub);
  }
}

XS_INTERNAL(sub_on_ready_waitall)
{
  dXSARGS;

  SV *f = XSANY_sv;
  if(!SvOK(f))
    return;

  /* Make sure self doesn't disappear during this function */
  SvREFCNT_inc(SvRV(f));
  SAVEFREESV(SvRV(f));

  struct FutureXS *self = get_future(f);

  self->pending_subs--;

  if(self->pending_subs)
    XSRETURN(0);

  /* TODO: This is really just newAVav() */
  self->result = newAV_svn_dup(AvARRAY(self->subs), av_count(self->subs));
  mark_ready(self, f, "wait_all");
}

SV *Future_new_waitallv(pTHX_ const char *cls, SV **subs, size_t n)
{
  SV *f = future_new_subsv(cls, subs, n);
  struct FutureXS *self = get_future(f);

  /* Reïnit subs + n */
  subs = AvARRAY(self->subs);
  n    = av_count(self->subs);

  self->pending_subs = 0;
  for(Size_t i = 0; i < n; i++) {
    /* TODO: This should probably use some API function to make it transparent */
    if(!future_is_ready(subs[i]))
      self->pending_subs++;
  }

  if(!self->pending_subs) {
    self->result = newAV_svn_dup(subs, n);
    mark_ready(self, f, "wait_all");

    return f;
  }

  CV *sub_on_ready = newXS(NULL, sub_on_ready_waitall, __FILE__);
  cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
  sv_rvweaken(CvXSUBANY_sv(sub_on_ready));

  GV *gv = gv_fetchpvs("Future::XS::(wait_all callback)", GV_ADDMULTI, SVt_PVCV);
  CvGV_set(sub_on_ready, gv);
  CvANON_off(sub_on_ready);

  for(Size_t i = 0; i < n; i++) {
    if(!future_is_ready(subs[i]))
      future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
  }

  SvREFCNT_dec(sub_on_ready);

  return f;
}

XS_INTERNAL(sub_on_ready_waitany)
{
  dXSARGS;
  SV *thissub = ST(0);

  SV *f = XSANY_sv;
  if(!SvOK(f))
    return;

  /* Make sure self doesn't disappear during this function */
  SvREFCNT_inc(SvRV(f));
  SAVEFREESV(SvRV(f));

  struct FutureXS *self = get_future(f);

  if(self->result || self->failure)
    return;

  self->pending_subs--;

  bool this_cancelled = future_is_cancelled(thissub);

  if(self->pending_subs && this_cancelled)
    return;

  if(this_cancelled) {
    future_failp(f, "All component futures were cancelled");
    return;
  }
  else
    copy_result(self, thissub);

  cancel_pending_subs(self);

  mark_ready(self, f, "wait_any");
}

SV *Future_new_waitanyv(pTHX_ const char *cls, SV **subs, size_t n)
{
  SV *f = future_new_subsv(cls, subs, n);
  struct FutureXS *self = get_future(f);

  /* Reïnit subs + n */
  subs = AvARRAY(self->subs);
  n    = av_count(self->subs);

  if(!n) {
    future_failp(f, "Cannot ->wait_any with no subfutures");
    return f;
  }

  SV *immediate_ready = NULL;
  for(Size_t i = 0; i < n; i++) {
    /* TODO: This should probably use some API function to make it transparent */
    if(future_is_ready(subs[i]) && !future_is_cancelled(subs[i])) {
      immediate_ready = subs[i];
      break;
    }
  }

  if(immediate_ready) {
    copy_result(self, immediate_ready);

    cancel_pending_subs(self);

    mark_ready(self, f, "wait_any");

    return f;
  }

  self->pending_subs = 0;

  CV *sub_on_ready = newXS(NULL, sub_on_ready_waitany, __FILE__);
  cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
  sv_rvweaken(CvXSUBANY_sv(sub_on_ready));

  GV *gv = gv_fetchpvs("Future::XS::(wait_any callback)", GV_ADDMULTI, SVt_PVCV);
  CvGV_set(sub_on_ready, gv);
  CvANON_off(sub_on_ready);

  for(Size_t i = 0; i < n; i++) {
    if(future_is_cancelled(subs[i]))
      continue;

    future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
    self->pending_subs++;
  }

  SvREFCNT_dec(sub_on_ready);

  return f;
}

#define compose_needsall_result(self)  S_compose_needsall_result(aTHX_ self)
static void S_compose_needsall_result(pTHX_ struct FutureXS *self)
{
  AV *result = self->result = newAV();
  for(Size_t i = 0; i < av_count(self->subs); i++) {
    SV *sub = AvARRAY(self->subs)[i];
    struct FutureXS *subself = get_future(sub);
    assert(subself->result);
    av_push_svn(result, AvARRAY(subself->result), av_count(subself->result));
  }
}

XS_INTERNAL(sub_on_ready_needsall)
{
  dXSARGS;
  SV *thissub = ST(0);

  SV *f = XSANY_sv;
  if(!SvOK(f))
    return;

  /* Make sure self doesn't disappear during this function */
  SvREFCNT_inc(SvRV(f));
  SAVEFREESV(SvRV(f));

  struct FutureXS *self = get_future(f);

  if(self->result || self->failure)
    return;

  if(future_is_cancelled(thissub)) {
    future_failp(f, "A component future was cancelled");
    cancel_pending_subs(self);
    return;
  }
  else if(future_is_failed(thissub)) {
    copy_result(self, thissub);
    cancel_pending_subs(self);
    mark_ready(self, f, "needs_all");
  }
  else {
    self->pending_subs--;
    if(self->pending_subs)
      return;
    compose_needsall_result(self);
    mark_ready(self, f, "needs_all");
  }
}

SV *Future_new_needsallv(pTHX_ const char *cls, SV **subs, size_t n)
{
  SV *f = future_new_subsv(cls, subs, n);
  struct FutureXS *self = get_future(f);

  /* Reïnit subs + n */
  subs = AvARRAY(self->subs);
  n    = av_count(self->subs);

  if(!n) {
    future_donev(f, NULL, 0);
    return f;
  }

  SV *immediate_fail = NULL;
  for(Size_t i = 0; i < n; i++) {
    if(future_is_cancelled(subs[i])) {
      future_failp(f, "A component future was cancelled");
      cancel_pending_subs(self);
      return f;
    }
    if(future_is_failed(subs[i])) {
      immediate_fail = subs[i];
      break;
    }
  }

  if(immediate_fail) {
    copy_result(self, immediate_fail);
    cancel_pending_subs(self);
    mark_ready(self, f, "needs_all");
    return f;
  }

  self->pending_subs = 0;

  CV *sub_on_ready = newXS(NULL, sub_on_ready_needsall, __FILE__);
  cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
  sv_rvweaken(CvXSUBANY_sv(sub_on_ready));

  GV *gv = gv_fetchpvs("Future::XS::(needs_all callback)", GV_ADDMULTI, SVt_PVCV);
  CvGV_set(sub_on_ready, gv);
  CvANON_off(sub_on_ready);

  for(Size_t i = 0; i < n; i++) {
    if(future_is_ready(subs[i]))
      continue;

    future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
    self->pending_subs++;
  }

  if(!self->pending_subs) {
    compose_needsall_result(self);
    mark_ready(self, f, "needs_all");
  }

  SvREFCNT_dec(sub_on_ready);

  return f;
}

XS_INTERNAL(sub_on_ready_needsany)
{
  dXSARGS;
  SV *thissub = ST(0);

  SV *f = XSANY_sv;
  if(!SvOK(f))
    return;

  /* Make sure self doesn't disappear during this function */
  SvREFCNT_inc(SvRV(f));
  SAVEFREESV(SvRV(f));

  struct FutureXS *self = get_future(f);

  if(self->result || self->failure)
    return;

  self->pending_subs--;

  bool this_cancelled = future_is_cancelled(thissub);

  if(self->pending_subs && this_cancelled)
    return;

  if(this_cancelled) {
    future_failp(f, "All component futures were cancelled");
  }
  else if(future_is_failed(thissub)) {
    if(self->pending_subs)
      return;

    copy_result(self, thissub);
    mark_ready(self, f, "needs_any");
  }
  else {
    copy_result(self, thissub);
    cancel_pending_subs(self);
    mark_ready(self, f, "needs_any");
  }
}

SV *Future_new_needsanyv(pTHX_ const char *cls, SV **subs, size_t n)
{
  SV *f = future_new_subsv(cls, subs, n);
  struct FutureXS *self = get_future(f);

  /* Reïnit subs + n */
  subs = AvARRAY(self->subs);
  n    = av_count(self->subs);

  if(!n) {
    future_failp(f, "Cannot ->needs_any with no subfutures");
    return f;
  }

  SV *immediate_done = NULL;
  for(Size_t i = 0; i < n; i++) {
    if(future_is_done(subs[i])) {
      immediate_done = subs[i];
      break;
    }
  }

  if(immediate_done) {
    copy_result(self, immediate_done);
    cancel_pending_subs(self);
    mark_ready(self, f, "needs_any");
    return f;
  }

  self->pending_subs = 0;

  CV *sub_on_ready = newXS(NULL, sub_on_ready_needsany, __FILE__);
  cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
  sv_rvweaken(CvXSUBANY_sv(sub_on_ready));

  GV *gv = gv_fetchpvs("Future::XS::(needs_any callback)", GV_ADDMULTI, SVt_PVCV);
  CvGV_set(sub_on_ready, gv);
  CvANON_off(sub_on_ready);

  for(Size_t i = 0; i < n; i++) {
    if(future_is_ready(subs[i]))
      continue;

    future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
    self->pending_subs++;
  }

  if(!self->pending_subs) {
    copy_result(self, subs[n-1]);
    mark_ready(self, f, "needs_any");
  }

  SvREFCNT_dec(sub_on_ready);

  return f;
}

Size_t Future_mPUSH_subs(pTHX_ SV *f, enum FutureSubFilter filter)
{
  dSP;

  struct FutureXS *self = get_future(f);

  Size_t ret = 0;
  for(Size_t i = 0; self->subs && i < av_count(self->subs); i++) {
    SV *sub = AvARRAY(self->subs)[i];

    bool want;
    switch(filter) {
      case FUTURE_SUBS_PENDING:
        want = !future_is_ready(sub);
        break;

      case FUTURE_SUBS_READY:
        want = future_is_ready(sub);
        break;

      case FUTURE_SUBS_DONE:
        want = future_is_done(sub);
        break;

      case FUTURE_SUBS_FAILED:
        want = future_is_failed(sub);
        break;

      case FUTURE_SUBS_CANCELLED:
        want = future_is_cancelled(sub);
        break;
    }

    if(want) {
      XPUSHs(sv_mortalcopy(sub));
      ret++;
    }



( run in 1.436 second using v1.01-cache-2.11-cpan-39bf76dae61 )