Data-Queue-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

    if (queue_int_try_pop(h, &value))
        RETVAL = newSViv((IV)value);
    else
        RETVAL = &PL_sv_undef;
  OUTPUT:
    RETVAL

bool
push_wait(self, value, ...)
    SV *self
    IV value
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    double timeout = -1;
  CODE:
    if (items > 2) timeout = SvNV(ST(2));
    RETVAL = queue_int_push_wait(h, (int64_t)value, timeout);
  OUTPUT:
    RETVAL

SV *
pop_wait(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    double timeout = -1;
    int64_t value;
  CODE:
    if (items > 1) timeout = SvNV(ST(1));
    if (queue_int_pop_wait(h, &value, timeout))
        RETVAL = newSViv((IV)value);
    else
        RETVAL = &PL_sv_undef;
  OUTPUT:
    RETVAL

UV
push_multi(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
  CODE:
    uint32_t count = items - 1;
    uint32_t pushed = 0;
    for (uint32_t i = 0; i < count; i++) {
        if (!queue_int_try_push(h, (int64_t)SvIV(ST(i + 1)))) break;
        pushed++;
    }

    RETVAL = pushed;
  OUTPUT:
    RETVAL

void
pop_multi(self, count)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    int64_t value;
  PPCODE:
    for (UV i = 0; i < count; i++) {
        if (!queue_int_try_pop(h, &value)) break;
        mXPUSHi((IV)value);
    }

UV
size(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
  CODE:
    RETVAL = (UV)queue_int_size(h);
  OUTPUT:
    RETVAL

UV
capacity(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
  CODE:
    RETVAL = h->capacity;
  OUTPUT:
    RETVAL

bool
is_empty(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
  CODE:
    RETVAL = (queue_int_size(h) == 0);
  OUTPUT:
    RETVAL

bool
is_full(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
  CODE:
    RETVAL = (queue_int_size(h) >= h->capacity);
  OUTPUT:
    RETVAL

void
clear(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
  CODE:
    queue_int_clear(h);

void
unlink(self_or_class, ...)
    SV *self_or_class
  CODE:
    const char *path;
    if (sv_isobject(self_or_class)) {
        QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(self_or_class)));

Shared.xs  view on Meta::CPAN

        if (items < 2) croak("Usage: Data::Queue::Shared::Int->unlink($path)");
        path = SvPV_nolen(ST(1));
    }
    if (!path) croak("cannot unlink anonymous or memfd queue");
    if (unlink(path) != 0)
        croak("unlink(%s): %s", path, strerror(errno));

SV *
path(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
  CODE:
    RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
  OUTPUT:
    RETVAL

SV *
stats(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
  CODE:
    HV *hv = newHV();
    QueueHeader *hdr = h->hdr;
    hv_store(hv, "size", 4, newSVuv((UV)queue_int_size(h)), 0);
    hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
    hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
    hv_store(hv, "push_ok", 7, newSVuv((UV)__atomic_load_n(&hdr->stat_push_ok, __ATOMIC_RELAXED)), 0);
    hv_store(hv, "pop_ok", 6, newSVuv((UV)__atomic_load_n(&hdr->stat_pop_ok, __ATOMIC_RELAXED)), 0);
    hv_store(hv, "push_full", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_push_full, __ATOMIC_RELAXED)), 0);
    hv_store(hv, "pop_empty", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_pop_empty, __ATOMIC_RELAXED)), 0);
    hv_store(hv, "recoveries", 10, newSVuv((UV)__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
    hv_store(hv, "push_waiters", 12, newSVuv((UV)__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED)), 0);
    hv_store(hv, "pop_waiters", 11, newSVuv((UV)__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED)), 0);
    RETVAL = newRV_noinc((SV *)hv);
  OUTPUT:
    RETVAL

SV *
peek(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    int64_t value;
  CODE:
    if (queue_int_peek(h, &value))
        RETVAL = newSViv((IV)value);
    else
        RETVAL = &PL_sv_undef;
  OUTPUT:
    RETVAL

void
drain(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    int64_t value;
    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    while (max_count-- > 0 && queue_int_try_pop(h, &value))
        mXPUSHi((IV)value);

void
pop_wait_multi(self, count, ...)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    double timeout = -1;
    int64_t value;
  PPCODE:
    if (items > 2) timeout = SvNV(ST(2));
    /* Block until at least 1 */
    if (!queue_int_pop_wait(h, &value, timeout)) XSRETURN(0);
    mXPUSHi((IV)value);
    /* Grab up to count-1 more non-blocking */
    for (UV i = 1; i < count; i++) {
        if (!queue_int_try_pop(h, &value)) break;
        mXPUSHi((IV)value);
    }

UV
push_wait_multi(self, timeout, ...)
    SV *self
    double timeout
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
  CODE:
    uint32_t nvalues = items - 2;
    RETVAL = 0;
    for (uint32_t i = 0; i < nvalues; i++) {
        if (!queue_int_push_wait(h, (int64_t)SvIV(ST(i + 2)), timeout)) break;
        RETVAL++;
    }
  OUTPUT:
    RETVAL

void
sync(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
  CODE:
    if (queue_sync(h) != 0)
        croak("msync: %s", strerror(errno));

IV
eventfd(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
  CODE:
    RETVAL = queue_eventfd_create(h);
    if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
  OUTPUT:
    RETVAL

void
eventfd_set(self, fd)
    SV *self
    int fd
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
  CODE:
    queue_eventfd_set(h, fd);

IV
fileno(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);

Shared.xs  view on Meta::CPAN

    bool utf8;
  CODE:
    if (items > 1) timeout = SvNV(ST(1));
    int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
    if (r == 1) {
        RETVAL = newSVpvn(str, len);
        if (utf8) SvUTF8_on(RETVAL);
    } else if (r == -1) {
        croak("Data::Queue::Shared::Str: out of memory");
    } else {
        RETVAL = &PL_sv_undef;
    }
  OUTPUT:
    RETVAL

UV
push_multi(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
  CODE:
    uint32_t count = items - 1;
    uint32_t pushed = 0;
    /* Extract SV data BEFORE locking: SvPV can run magic (tied/overloaded
     * stringification) that longjmps; doing it under the process-shared
     * mutex would abandon the lock and deadlock peers. Newx+SAVEFREEPV so a
     * die during extraction (or the too-long croak) cannot leak args. */
    struct qsm_arg { const char *str; STRLEN len; bool utf8; };
    struct qsm_arg *args = NULL;
    if (count > 0) {
        Newx(args, count, struct qsm_arg);
        SAVEFREEPV(args);
        for (uint32_t i = 0; i < count; i++) {
            SV *sv = ST(i + 1);
            args[i].str = SvPV(sv, args[i].len);
            args[i].utf8 = SvUTF8(sv) ? true : false;
        }
    }
    queue_mutex_lock(h->hdr);
    for (uint32_t i = 0; i < count; i++) {
        int r = queue_str_push_locked(h, args[i].str, (uint32_t)args[i].len, args[i].utf8);
        if (r == -2) { queue_mutex_unlock(h->hdr); croak("Data::Queue::Shared::Str: string too long (max 2GB)"); }
        if (r != 1) break;
        pushed++;
    }
    queue_mutex_unlock(h->hdr);
    if (pushed) queue_wake_consumers_n(h->hdr, pushed);
    RETVAL = pushed;
  OUTPUT:
    RETVAL

void
pop_multi(self, count)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    const char *str;
    uint32_t len;
    bool utf8;
  PPCODE:
    /* Hoist Perl SV construction out of the process-shared mutex:
     * newSVpvn can longjmp on OOM and deadlock peers on the futex. */
    struct { char *buf; uint32_t len; bool utf8; } *items_buf = NULL;
    UV n = 0;
    int last_r = 0;
    int oom = 0;
    /* Cap count at capacity: the queue can't hold more than capacity items,
     * so a single pop_multi can't return more than that. This also prevents
     * size_t overflow in the items_buf allocation for adversarial inputs. */
    if (count > h->capacity) count = h->capacity;
    if (count > 0) {
        items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
        if (!items_buf) croak("Data::Queue::Shared::Str: out of memory");
    }
    queue_mutex_lock(h->hdr);
    for (UV i = 0; i < count; i++) {
        last_r = queue_str_pop_locked(h, &str, &len, &utf8);
        if (last_r <= 0) break;
        char *c = (char *)malloc(len ? len : 1);
        if (!c) { oom = 1; break; }
        if (len) memcpy(c, str, len);
        items_buf[n].buf = c;
        items_buf[n].len = len;
        items_buf[n].utf8 = utf8;
        n++;
    }
    queue_mutex_unlock(h->hdr);
    if (n) queue_wake_producers_n(h->hdr, (uint32_t)n);
    EXTEND(SP, (SSize_t)n);
    for (UV j = 0; j < n; j++) {
        SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
        if (items_buf[j].utf8) SvUTF8_on(sv);
        PUSHs(sv_2mortal(sv));
        free(items_buf[j].buf);
    }
    free(items_buf);
    if (last_r == -1 || oom) croak("Data::Queue::Shared::Str: out of memory");

UV
size(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
  CODE:
    RETVAL = (UV)queue_str_size(h);
  OUTPUT:
    RETVAL

UV
capacity(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
  CODE:
    RETVAL = h->capacity;
  OUTPUT:
    RETVAL

bool
is_empty(self)

Shared.xs  view on Meta::CPAN

    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    const char *str;
    uint32_t len;
    bool utf8;
  CODE:
    int r = queue_str_peek(h, &str, &len, &utf8);
    if (r == 1) {
        RETVAL = newSVpvn(str, len);
        if (utf8) SvUTF8_on(RETVAL);
    } else if (r == -1) {
        croak("Data::Queue::Shared::Str: out of memory");
    } else {
        RETVAL = &PL_sv_undef;
    }
  OUTPUT:
    RETVAL

bool
push_front(self, value)
    SV *self
    SV *value
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    STRLEN len;
  CODE:
    const char *str = SvPV(value, len);
    bool utf8 = SvUTF8(value) ? true : false;
    int r = queue_str_push_front(h, str, (uint32_t)len, utf8);
    if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
    RETVAL = (r == 1);
  OUTPUT:
    RETVAL

bool
push_front_wait(self, value, ...)
    SV *self
    SV *value
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    double timeout = -1;
    STRLEN len;
  CODE:
    if (items > 2) timeout = SvNV(ST(2));
    const char *str = SvPV(value, len);
    bool utf8 = SvUTF8(value) ? true : false;
    int r = queue_str_push_front_wait(h, str, (uint32_t)len, utf8, timeout);
    if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
    RETVAL = (r == 1);
  OUTPUT:
    RETVAL

void
drain(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    const char *str;
    uint32_t len;
    bool utf8;
    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    /* Hoist SV construction out of the mutex (see pop_multi). */
    struct drain_item { char *buf; uint32_t len; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
    UV drained_n = 0;
    int last_r = 0;
    int oom = 0;
    queue_mutex_lock(h->hdr);
    while (max_count-- > 0) {
        last_r = queue_str_pop_locked(h, &str, &len, &utf8);
        if (last_r <= 0) break;
        struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
        char *c = (char *)malloc(len ? len : 1);
        if (!it || !c) { free(it); free(c); oom = 1; break; }
        if (len) memcpy(c, str, len);
        it->buf = c; it->len = len; it->utf8 = utf8; it->next = NULL;
        if (drained_tail) drained_tail->next = it; else drained_head = it;
        drained_tail = it;
        drained_n++;
    }
    queue_mutex_unlock(h->hdr);
    if (drained_n) queue_wake_producers_n(h->hdr, (uint32_t)drained_n);
    EXTEND(SP, (SSize_t)drained_n);
    while (drained_head) {
        struct drain_item *it = drained_head; drained_head = it->next;
        SV *sv = newSVpvn(it->buf, it->len);
        if (it->utf8) SvUTF8_on(sv);
        PUSHs(sv_2mortal(sv));
        free(it->buf);
        free(it);
    }
    if (last_r == -1 || oom) croak("Data::Queue::Shared::Str: out of memory");

void
pop_wait_multi(self, count, ...)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    double timeout = -1;
    const char *str;
    uint32_t len;
    bool utf8;
  PPCODE:
    if (items > 2) timeout = SvNV(ST(2));
    /* Block until at least 1 */
    {
        int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
        if (r == -1) croak("Data::Queue::Shared::Str: out of memory");
        if (r != 1) XSRETURN(0);
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
    }
    /* Grab up to count-1 more non-blocking */
    for (UV i = 1; i < count; i++) {
        int r = queue_str_try_pop(h, &str, &len, &utf8);
        if (r <= 0) break;
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
    }

UV
push_wait_multi(self, timeout, ...)
    SV *self
    double timeout
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
  CODE:
    uint32_t nvalues = items - 2;
    RETVAL = 0;
    for (uint32_t i = 0; i < nvalues; i++) {
        SV *sv = ST(i + 2);
        STRLEN len;
        const char *str = SvPV(sv, len);
        bool utf8 = SvUTF8(sv) ? true : false;
        int r = queue_str_push_wait(h, str, (uint32_t)len, utf8, timeout);
        if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
        if (r != 1) break;
        RETVAL++;
    }
  OUTPUT:
    RETVAL

SV *
pop_back(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    const char *str;
    uint32_t len;
    bool utf8;
  CODE:
    int r = queue_str_pop_back(h, &str, &len, &utf8);
    if (r == 1) {
        RETVAL = newSVpvn(str, len);
        if (utf8) SvUTF8_on(RETVAL);
    } else if (r == -1) {
        croak("Data::Queue::Shared::Str: out of memory");
    } else {
        RETVAL = &PL_sv_undef;
    }
  OUTPUT:



( run in 0.728 second using v1.01-cache-2.11-cpan-5511b514fd6 )