Data-ReqRep-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN


void
recv(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
  PPCODE:
    int r = reqrep_try_recv(h, &str, &len, &utf8, &id);
    if (r == -1) croak("Data::ReqRep::Shared: out of memory");
    if (r == 1) {
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
        mXPUSHu((UV)id);
    }

void
recv_wait(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    double timeout = -1;
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
  PPCODE:
    if (items > 1) timeout = SvNV(ST(1));
    int r = reqrep_recv_wait(h, &str, &len, &utf8, &id, timeout);
    if (r == -1) croak("Data::ReqRep::Shared: out of memory");
    if (r == 1) {
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
        mXPUSHu((UV)id);
    }

void
recv_multi(self, count)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
  PPCODE:
    /* Hoist Perl SV construction out of process-shared futex mutex. */
    struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
    UV n = 0;
    int last_r = 0;
    int oom = 0;
    if (count > 0) {
        items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
        if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
    }
    reqrep_mutex_lock(h->hdr);

Shared.xs  view on Meta::CPAN

recv_wait_multi(self, count, ...)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    double timeout = -1;
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
  PPCODE:
    if (items > 2) timeout = SvNV(ST(2));
    /* Block until at least 1 */
    int r = reqrep_recv_wait(h, &str, &len, &utf8, &id, timeout);
    if (r == -1) croak("Data::ReqRep::Shared: out of memory");
    if (r != 1) XSRETURN(0);
    {
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
        mXPUSHu((UV)id);

Shared.xs  view on Meta::CPAN

void
drain(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    const char *str;
    uint32_t len;
    uint64_t id;
    bool utf8;
    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    /* Hoist SV construction out of the mutex (see recv_multi). */
    struct drain_item { char *buf; uint32_t len; uint64_t id; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
    UV drained_n = 0;
    int last_r = 0;
    int oom = 0;
    reqrep_mutex_lock(h->hdr);
    while (max_count-- > 0) {
        last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
        if (last_r <= 0) break;

Shared.xs  view on Meta::CPAN

    sv_setiv(SvRV(self), 0);
    reqrep_destroy(h);

void
recv(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    int64_t value;
    uint64_t id;
  PPCODE:
    if (reqrep_int_try_recv(h, &value, &id)) {
        mXPUSHi((IV)value);
        mXPUSHu((UV)id);
    }

void
recv_wait(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    double timeout = -1;
    int64_t value;
    uint64_t id;
  PPCODE:
    if (items > 1) timeout = SvNV(ST(1));
    if (reqrep_int_recv_wait(h, &value, &id, timeout)) {
        mXPUSHi((IV)value);
        mXPUSHu((UV)id);
    }

bool
reply(self, id, value)
    SV *self
    UV id



( run in 1.084 second using v1.01-cache-2.11-cpan-5511b514fd6 )