Data-Queue-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

  OUTPUT:
    RETVAL

void
pop_multi(self, count)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    int64_t value;
  PPCODE:
    for (UV i = 0; i < count; i++) {
        if (!queue_int_try_pop(h, &value)) break;
        mXPUSHi((IV)value);
    }

UV
size(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);

Shared.xs  view on Meta::CPAN

  OUTPUT:
    RETVAL

void
drain(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    int64_t value;
    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    while (max_count-- > 0 && queue_int_try_pop(h, &value))
        mXPUSHi((IV)value);

void
pop_wait_multi(self, count, ...)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    double timeout = -1;
    int64_t value;
  PPCODE:
    if (items > 2) timeout = SvNV(ST(2));
    /* Block until at least 1 */
    if (!queue_int_pop_wait(h, &value, timeout)) XSRETURN(0);
    mXPUSHi((IV)value);
    /* Grab up to count-1 more non-blocking */
    for (UV i = 1; i < count; i++) {
        if (!queue_int_try_pop(h, &value)) break;
        mXPUSHi((IV)value);
    }

Shared.xs  view on Meta::CPAN


void
pop_multi(self, count)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    const char *str;
    uint32_t len;
    bool utf8;
  PPCODE:
    /* Hoist Perl SV construction out of the process-shared mutex:
     * newSVpvn can longjmp on OOM and deadlock peers on the futex. */
    struct { char *buf; uint32_t len; bool utf8; } *items_buf = NULL;
    UV n = 0;
    int last_r = 0;
    int oom = 0;
    /* Cap count at capacity: the queue can't hold more than capacity items,
     * so a single pop_multi can't return more than that. This also prevents
     * size_t overflow in the items_buf allocation for adversarial inputs. */
    if (count > h->capacity) count = h->capacity;

Shared.xs  view on Meta::CPAN


void
drain(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    const char *str;
    uint32_t len;
    bool utf8;
    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    /* Hoist SV construction out of the mutex (see pop_multi). */
    struct drain_item { char *buf; uint32_t len; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
    UV drained_n = 0;
    int last_r = 0;
    int oom = 0;
    queue_mutex_lock(h->hdr);
    while (max_count-- > 0) {
        last_r = queue_str_pop_locked(h, &str, &len, &utf8);
        if (last_r <= 0) break;

Shared.xs  view on Meta::CPAN

void
pop_wait_multi(self, count, ...)
    SV *self
    UV count
  PREINIT:
    EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    double timeout = -1;
    const char *str;
    uint32_t len;
    bool utf8;
  PPCODE:
    if (items > 2) timeout = SvNV(ST(2));
    /* Block until at least 1 */
    {
        int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
        if (r == -1) croak("Data::Queue::Shared::Str: out of memory");
        if (r != 1) XSRETURN(0);
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
    }



( run in 0.671 second using v1.01-cache-2.11-cpan-5511b514fd6 )