Data-Queue-Shared
view release on metacpan or search on metacpan
OUTPUT:
RETVAL
void
pop_multi(self, count)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
int64_t value;
PPCODE:
for (UV i = 0; i < count; i++) {
if (!queue_int_try_pop(h, &value)) break;
mXPUSHi((IV)value);
}
UV
size(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
OUTPUT:
RETVAL
void
drain(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
int64_t value;
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
while (max_count-- > 0 && queue_int_try_pop(h, &value))
mXPUSHi((IV)value);
void
pop_wait_multi(self, count, ...)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
double timeout = -1;
int64_t value;
PPCODE:
if (items > 2) timeout = SvNV(ST(2));
/* Block until at least 1 */
if (!queue_int_pop_wait(h, &value, timeout)) XSRETURN(0);
mXPUSHi((IV)value);
/* Grab up to count-1 more non-blocking */
for (UV i = 1; i < count; i++) {
if (!queue_int_try_pop(h, &value)) break;
mXPUSHi((IV)value);
}
void
pop_multi(self, count)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
const char *str;
uint32_t len;
bool utf8;
PPCODE:
/* Hoist Perl SV construction out of the process-shared mutex:
* newSVpvn can longjmp on OOM and deadlock peers on the futex. */
struct { char *buf; uint32_t len; bool utf8; } *items_buf = NULL;
UV n = 0;
int last_r = 0;
int oom = 0;
/* Cap count at capacity: the queue can't hold more than capacity items,
* so a single pop_multi can't return more than that. This also prevents
* size_t overflow in the items_buf allocation for adversarial inputs. */
if (count > h->capacity) count = h->capacity;
void
drain(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
const char *str;
uint32_t len;
bool utf8;
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
/* Hoist SV construction out of the mutex (see pop_multi). */
struct drain_item { char *buf; uint32_t len; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
UV drained_n = 0;
int last_r = 0;
int oom = 0;
queue_mutex_lock(h->hdr);
while (max_count-- > 0) {
last_r = queue_str_pop_locked(h, &str, &len, &utf8);
if (last_r <= 0) break;
void
pop_wait_multi(self, count, ...)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
double timeout = -1;
const char *str;
uint32_t len;
bool utf8;
PPCODE:
if (items > 2) timeout = SvNV(ST(2));
/* Block until at least 1 */
{
int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
if (r == -1) croak("Data::Queue::Shared::Str: out of memory");
if (r != 1) XSRETURN(0);
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
}
( run in 0.604 second using v1.01-cache-2.11-cpan-5511b514fd6 )