Data-ReqRep-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

new_memfd(class, name, req_cap, resp_slots, resp_size, ...)
    const char *class
    const char *name
    UV req_cap
    UV resp_slots
    UV resp_size
  PREINIT:
    char errbuf[REQREP_ERR_BUFLEN];
    uint64_t arena_cap;
  CODE:
    arena_cap = (items > 5) ? (uint64_t)SvUV(ST(5)) : 0;
    ReqRepHandle *h = reqrep_create_memfd(name, (uint32_t)req_cap, (uint32_t)resp_slots,
                                           (uint32_t)resp_size, arena_cap, errbuf);
    if (!h) croak("Data::ReqRep::Shared->new_memfd: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *
new_from_fd(class, fd)
    const char *class
    int fd
  PREINIT:
    char errbuf[REQREP_ERR_BUFLEN];
  CODE:
    ReqRepHandle *h = reqrep_open_fd(fd, REQREP_MODE_STR, errbuf);
    if (!h) croak("Data::ReqRep::Shared->new_from_fd: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

IV
memfd(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
  CODE:
    RETVAL = h->backing_fd;
  OUTPUT:
    RETVAL

void
DESTROY(self)
    SV *self
  CODE:
    if (!SvROK(self)) return;
    ReqRepHandle *h = INT2PTR(ReqRepHandle*, SvIV(SvRV(self)));
    if (!h) return;
    sv_setiv(SvRV(self), 0);
    reqrep_destroy(h);

void
recv(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
  PPCODE:
    int r = reqrep_try_recv(h, &str, &len, &utf8, &id);
    if (r == -1) croak("Data::ReqRep::Shared: out of memory");
    if (r == 1) {
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
        mXPUSHu((UV)id);
    }

void
recv_wait(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    double timeout = -1;
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
  PPCODE:
    if (items > 1) timeout = SvNV(ST(1));
    int r = reqrep_recv_wait(h, &str, &len, &utf8, &id, timeout);
    if (r == -1) croak("Data::ReqRep::Shared: out of memory");
    if (r == 1) {
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
        mXPUSHu((UV)id);
    }

void
recv_multi(self, count)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
  PPCODE:
    /* Hoist Perl SV construction out of process-shared futex mutex. */
    struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
    UV n = 0;
    int last_r = 0;
    int oom = 0;
    if (count > 0) {
        items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
        if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
    }
    reqrep_mutex_lock(h->hdr);
    for (UV i = 0; i < count; i++) {
        last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
        if (last_r <= 0) break;
        char *c = (char *)malloc(len ? len : 1);
        if (!c) { oom = 1; break; }
        if (len) memcpy(c, str, len);
        items_buf[n].buf = c;
        items_buf[n].len = len;
        items_buf[n].id = id;
        items_buf[n].utf8 = utf8;
        n++;
    }
    reqrep_mutex_unlock(h->hdr);
    reqrep_wake_producers(h->hdr);
    EXTEND(SP, (SSize_t)(2 * n));
    for (UV j = 0; j < n; j++) {
        SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
        if (items_buf[j].utf8) SvUTF8_on(sv);
        PUSHs(sv_2mortal(sv));
        PUSHs(sv_2mortal(newSVuv((UV)items_buf[j].id)));
        free(items_buf[j].buf);
    }
    free(items_buf);
    if (last_r == -1 || oom) croak("Data::ReqRep::Shared: out of memory");

void
recv_wait_multi(self, count, ...)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    double timeout = -1;
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
  PPCODE:
    if (items > 2) timeout = SvNV(ST(2));
    /* Block until at least 1 */
    int r = reqrep_recv_wait(h, &str, &len, &utf8, &id, timeout);
    if (r == -1) croak("Data::ReqRep::Shared: out of memory");
    if (r != 1) XSRETURN(0);
    {
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
        mXPUSHu((UV)id);
    }
    /* Grab up to count-1 more non-blocking — hoist SV construction out of lock. */
    struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
    UV n = 0;
    int last_r2 = 0;
    int oom = 0;
    if (count > 1) {
        items_buf = (void *)malloc((size_t)(count - 1) * sizeof(*items_buf));
        if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
    }
    reqrep_mutex_lock(h->hdr);
    for (UV i = 1; i < count; i++) {
        last_r2 = reqrep_recv_locked(h, &str, &len, &utf8, &id);
        if (last_r2 <= 0) break;
        char *c = (char *)malloc(len ? len : 1);
        if (!c) { oom = 1; break; }
        if (len) memcpy(c, str, len);
        items_buf[n].buf = c;
        items_buf[n].len = len;
        items_buf[n].id = id;
        items_buf[n].utf8 = utf8;
        n++;
    }
    reqrep_mutex_unlock(h->hdr);
    reqrep_wake_producers(h->hdr);
    EXTEND(SP, (SSize_t)(2 * n));
    for (UV j = 0; j < n; j++) {
        SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
        if (items_buf[j].utf8) SvUTF8_on(sv);
        PUSHs(sv_2mortal(sv));
        PUSHs(sv_2mortal(newSVuv((UV)items_buf[j].id)));
        free(items_buf[j].buf);
    }
    free(items_buf);
    if (last_r2 == -1 || oom) croak("Data::ReqRep::Shared: out of memory");

void
drain(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    /* Hoist SV construction out of the mutex (see recv_multi). */
    struct drain_item { char *buf; uint32_t len; uint64_t id; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
    UV drained_n = 0;
    int last_r = 0;
    int oom = 0;
    reqrep_mutex_lock(h->hdr);
    while (max_count-- > 0) {
        last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
        if (last_r <= 0) break;
        struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
        char *c = (char *)malloc(len ? len : 1);
        if (!it || !c) { free(it); free(c); oom = 1; break; }
        if (len) memcpy(c, str, len);
        it->buf = c; it->len = len; it->id = id; it->utf8 = utf8; it->next = NULL;
        if (drained_tail) drained_tail->next = it; else drained_head = it;
        drained_tail = it;
        drained_n++;
    }
    reqrep_mutex_unlock(h->hdr);
    reqrep_wake_producers(h->hdr);
    EXTEND(SP, (SSize_t)(2 * drained_n));
    while (drained_head) {
        struct drain_item *it = drained_head; drained_head = it->next;
        SV *sv = newSVpvn(it->buf, it->len);
        if (it->utf8) SvUTF8_on(sv);
        PUSHs(sv_2mortal(sv));
        PUSHs(sv_2mortal(newSVuv((UV)it->id)));
        free(it->buf);
        free(it);
    }
    if (last_r == -1 || oom) croak("Data::ReqRep::Shared: out of memory");

bool
reply(self, id, value)
    SV *self
    UV id
    SV *value
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    STRLEN len;
  CODE:
    const char *str = SvPV(value, len);
    bool utf8 = SvUTF8(value) ? true : false;
    int r = reqrep_reply(h, (uint64_t)id, str, (uint32_t)len, utf8);
    if (r == -1) croak("Data::ReqRep::Shared: invalid slot index");
    if (r == -3) croak("Data::ReqRep::Shared: response too long (max %u bytes)", h->resp_data_max);
    RETVAL = (r == 1);
  OUTPUT:
    RETVAL

UV
size(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
  CODE:
    RETVAL = (UV)reqrep_size(h);
  OUTPUT:
    RETVAL

Shared.xs  view on Meta::CPAN

    if (!h) croak("Data::ReqRep::Shared::Int->new: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *
new_memfd(class, name, req_cap, resp_slots)
    const char *class
    const char *name
    UV req_cap
    UV resp_slots
  PREINIT:
    char errbuf[REQREP_ERR_BUFLEN];
  CODE:
    ReqRepHandle *h = reqrep_create_int_memfd(name, (uint32_t)req_cap, (uint32_t)resp_slots, errbuf);
    if (!h) croak("Data::ReqRep::Shared::Int->new_memfd: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *
new_from_fd(class, fd)
    const char *class
    int fd
  PREINIT:
    char errbuf[REQREP_ERR_BUFLEN];
  CODE:
    ReqRepHandle *h = reqrep_open_fd(fd, REQREP_MODE_INT, errbuf);
    if (!h) croak("Data::ReqRep::Shared::Int->new_from_fd: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

IV
memfd(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
  CODE:
    RETVAL = h->backing_fd;
  OUTPUT:
    RETVAL

void
DESTROY(self)
    SV *self
  CODE:
    if (!SvROK(self)) return;
    ReqRepHandle *h = INT2PTR(ReqRepHandle*, SvIV(SvRV(self)));
    if (!h) return;
    sv_setiv(SvRV(self), 0);
    reqrep_destroy(h);

void
recv(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    int64_t value;
    uint64_t id;
  PPCODE:
    if (reqrep_int_try_recv(h, &value, &id)) {
        mXPUSHi((IV)value);
        mXPUSHu((UV)id);
    }

void
recv_wait(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    double timeout = -1;
    int64_t value;
    uint64_t id;
  PPCODE:
    if (items > 1) timeout = SvNV(ST(1));
    if (reqrep_int_recv_wait(h, &value, &id, timeout)) {
        mXPUSHi((IV)value);
        mXPUSHu((UV)id);
    }

bool
reply(self, id, value)
    SV *self
    UV id
    IV value
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
  CODE:
    int r = reqrep_int_reply(h, (uint64_t)id, (int64_t)value);
    if (r == -1) croak("Data::ReqRep::Shared::Int: invalid slot index");
    RETVAL = (r == 1);
  OUTPUT:
    RETVAL

UV
size(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
  CODE:
    RETVAL = (UV)reqrep_int_size(h);
  OUTPUT:
    RETVAL

UV
capacity(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
  CODE:
    RETVAL = h->req_cap;
  OUTPUT:
    RETVAL

UV
resp_slots(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
  CODE:
    RETVAL = h->resp_slots;
  OUTPUT:
    RETVAL

SV *
path(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
  CODE:
    RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
  OUTPUT:
    RETVAL



( run in 1.662 second using v1.01-cache-2.11-cpan-71847e10f99 )