Data-ReqRep-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
  PPCODE:
    int r = reqrep_try_recv(h, &str, &len, &utf8, &id);
    if (r == -1) croak("Data::ReqRep::Shared: out of memory");
    if (r == 1) {
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
        mXPUSHu((UV)id);
    }

void
recv_wait(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    double timeout = -1;
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
  PPCODE:
    if (items > 1) timeout = SvNV(ST(1));
    int r = reqrep_recv_wait(h, &str, &len, &utf8, &id, timeout);
    if (r == -1) croak("Data::ReqRep::Shared: out of memory");
    if (r == 1) {
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
        mXPUSHu((UV)id);
    }

void
recv_multi(self, count)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
  PPCODE:
    /* Hoist Perl SV construction out of process-shared futex mutex. */
    struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
    UV n = 0;
    int last_r = 0;
    int oom = 0;
    if (count > 0) {
        items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
        if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
    }
    reqrep_mutex_lock(h->hdr);
    for (UV i = 0; i < count; i++) {
        last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
        if (last_r <= 0) break;
        char *c = (char *)malloc(len ? len : 1);
        if (!c) { oom = 1; break; }
        if (len) memcpy(c, str, len);
        items_buf[n].buf = c;
        items_buf[n].len = len;
        items_buf[n].id = id;
        items_buf[n].utf8 = utf8;
        n++;
    }
    reqrep_mutex_unlock(h->hdr);
    reqrep_wake_producers(h->hdr);
    EXTEND(SP, (SSize_t)(2 * n));
    for (UV j = 0; j < n; j++) {
        SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
        if (items_buf[j].utf8) SvUTF8_on(sv);
        PUSHs(sv_2mortal(sv));
        PUSHs(sv_2mortal(newSVuv((UV)items_buf[j].id)));
        free(items_buf[j].buf);
    }
    free(items_buf);
    if (last_r == -1 || oom) croak("Data::ReqRep::Shared: out of memory");

void
recv_wait_multi(self, count, ...)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    double timeout = -1;
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
  PPCODE:
    if (items > 2) timeout = SvNV(ST(2));
    /* Block until at least 1 */
    int r = reqrep_recv_wait(h, &str, &len, &utf8, &id, timeout);
    if (r == -1) croak("Data::ReqRep::Shared: out of memory");
    if (r != 1) XSRETURN(0);
    {
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
        mXPUSHu((UV)id);
    }
    /* Grab up to count-1 more non-blocking — hoist SV construction out of lock. */
    struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
    UV n = 0;
    int last_r2 = 0;
    int oom = 0;
    if (count > 1) {
        items_buf = (void *)malloc((size_t)(count - 1) * sizeof(*items_buf));
        if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
    }
    reqrep_mutex_lock(h->hdr);
    for (UV i = 1; i < count; i++) {
        last_r2 = reqrep_recv_locked(h, &str, &len, &utf8, &id);
        if (last_r2 <= 0) break;
        char *c = (char *)malloc(len ? len : 1);
        if (!c) { oom = 1; break; }
        if (len) memcpy(c, str, len);
        items_buf[n].buf = c;
        items_buf[n].len = len;
        items_buf[n].id = id;
        items_buf[n].utf8 = utf8;
        n++;
    }
    reqrep_mutex_unlock(h->hdr);
    reqrep_wake_producers(h->hdr);
    EXTEND(SP, (SSize_t)(2 * n));
    for (UV j = 0; j < n; j++) {
        SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
        if (items_buf[j].utf8) SvUTF8_on(sv);
        PUSHs(sv_2mortal(sv));
        PUSHs(sv_2mortal(newSVuv((UV)items_buf[j].id)));
        free(items_buf[j].buf);
    }
    free(items_buf);
    if (last_r2 == -1 || oom) croak("Data::ReqRep::Shared: out of memory");

void
drain(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    /* Hoist SV construction out of the mutex (see recv_multi). */
    struct drain_item { char *buf; uint32_t len; uint64_t id; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
    UV drained_n = 0;
    int last_r = 0;
    int oom = 0;
    reqrep_mutex_lock(h->hdr);
    while (max_count-- > 0) {
        last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
        if (last_r <= 0) break;
        struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
        char *c = (char *)malloc(len ? len : 1);
        if (!it || !c) { free(it); free(c); oom = 1; break; }
        if (len) memcpy(c, str, len);
        it->buf = c; it->len = len; it->id = id; it->utf8 = utf8; it->next = NULL;
        if (drained_tail) drained_tail->next = it; else drained_head = it;
        drained_tail = it;
        drained_n++;
    }
    reqrep_mutex_unlock(h->hdr);
    reqrep_wake_producers(h->hdr);
    EXTEND(SP, (SSize_t)(2 * drained_n));
    while (drained_head) {
        struct drain_item *it = drained_head; drained_head = it->next;
        SV *sv = newSVpvn(it->buf, it->len);
        if (it->utf8) SvUTF8_on(sv);
        PUSHs(sv_2mortal(sv));
        PUSHs(sv_2mortal(newSVuv((UV)it->id)));
        free(it->buf);
        free(it);
    }
    if (last_r == -1 || oom) croak("Data::ReqRep::Shared: out of memory");

bool
reply(self, id, value)
    SV *self
    UV id
    SV *value
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    STRLEN len;
  CODE:
    const char *str = SvPV(value, len);
    bool utf8 = SvUTF8(value) ? true : false;
    int r = reqrep_reply(h, (uint64_t)id, str, (uint32_t)len, utf8);
    if (r == -1) croak("Data::ReqRep::Shared: invalid slot index");
    if (r == -3) croak("Data::ReqRep::Shared: response too long (max %u bytes)", h->resp_data_max);
    RETVAL = (r == 1);
  OUTPUT:
    RETVAL

UV
size(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
  CODE:
    RETVAL = (UV)reqrep_size(h);
  OUTPUT:
    RETVAL

UV
capacity(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
  CODE:
    RETVAL = h->req_cap;
  OUTPUT:



( run in 1.704 second using v1.01-cache-2.11-cpan-39bf76dae61 )