Data-Queue-Shared
view release on metacpan or search on metacpan
if (queue_int_try_pop(h, &value))
RETVAL = newSViv((IV)value);
else
RETVAL = &PL_sv_undef;
OUTPUT:
RETVAL
bool
push_wait(self, value, ...)
SV *self
IV value
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
double timeout = -1;
CODE:
if (items > 2) timeout = SvNV(ST(2));
RETVAL = queue_int_push_wait(h, (int64_t)value, timeout);
OUTPUT:
RETVAL
SV *
pop_wait(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
double timeout = -1;
int64_t value;
CODE:
if (items > 1) timeout = SvNV(ST(1));
if (queue_int_pop_wait(h, &value, timeout))
RETVAL = newSViv((IV)value);
else
RETVAL = &PL_sv_undef;
OUTPUT:
RETVAL
UV
push_multi(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
CODE:
uint32_t count = items - 1;
uint32_t pushed = 0;
for (uint32_t i = 0; i < count; i++) {
if (!queue_int_try_push(h, (int64_t)SvIV(ST(i + 1)))) break;
pushed++;
}
RETVAL = pushed;
OUTPUT:
RETVAL
void
pop_multi(self, count)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
int64_t value;
PPCODE:
for (UV i = 0; i < count; i++) {
if (!queue_int_try_pop(h, &value)) break;
mXPUSHi((IV)value);
}
UV
size(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
CODE:
RETVAL = (UV)queue_int_size(h);
OUTPUT:
RETVAL
UV
capacity(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
CODE:
RETVAL = h->capacity;
OUTPUT:
RETVAL
bool
is_empty(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
CODE:
RETVAL = (queue_int_size(h) == 0);
OUTPUT:
RETVAL
bool
is_full(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
CODE:
RETVAL = (queue_int_size(h) >= h->capacity);
OUTPUT:
RETVAL
void
clear(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
CODE:
queue_int_clear(h);
void
unlink(self_or_class, ...)
SV *self_or_class
CODE:
const char *path;
if (sv_isobject(self_or_class)) {
QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(self_or_class)));
if (items < 2) croak("Usage: Data::Queue::Shared::Int->unlink($path)");
path = SvPV_nolen(ST(1));
}
if (!path) croak("cannot unlink anonymous or memfd queue");
if (unlink(path) != 0)
croak("unlink(%s): %s", path, strerror(errno));
SV *
path(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
CODE:
RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
OUTPUT:
RETVAL
SV *
stats(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
CODE:
HV *hv = newHV();
QueueHeader *hdr = h->hdr;
hv_store(hv, "size", 4, newSVuv((UV)queue_int_size(h)), 0);
hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
hv_store(hv, "push_ok", 7, newSVuv((UV)__atomic_load_n(&hdr->stat_push_ok, __ATOMIC_RELAXED)), 0);
hv_store(hv, "pop_ok", 6, newSVuv((UV)__atomic_load_n(&hdr->stat_pop_ok, __ATOMIC_RELAXED)), 0);
hv_store(hv, "push_full", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_push_full, __ATOMIC_RELAXED)), 0);
hv_store(hv, "pop_empty", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_pop_empty, __ATOMIC_RELAXED)), 0);
hv_store(hv, "recoveries", 10, newSVuv((UV)__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
hv_store(hv, "push_waiters", 12, newSVuv((UV)__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED)), 0);
hv_store(hv, "pop_waiters", 11, newSVuv((UV)__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED)), 0);
RETVAL = newRV_noinc((SV *)hv);
OUTPUT:
RETVAL
SV *
peek(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
int64_t value;
CODE:
if (queue_int_peek(h, &value))
RETVAL = newSViv((IV)value);
else
RETVAL = &PL_sv_undef;
OUTPUT:
RETVAL
void
drain(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
int64_t value;
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
while (max_count-- > 0 && queue_int_try_pop(h, &value))
mXPUSHi((IV)value);
void
pop_wait_multi(self, count, ...)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
double timeout = -1;
int64_t value;
PPCODE:
if (items > 2) timeout = SvNV(ST(2));
/* Block until at least 1 */
if (!queue_int_pop_wait(h, &value, timeout)) XSRETURN(0);
mXPUSHi((IV)value);
/* Grab up to count-1 more non-blocking */
for (UV i = 1; i < count; i++) {
if (!queue_int_try_pop(h, &value)) break;
mXPUSHi((IV)value);
}
UV
push_wait_multi(self, timeout, ...)
SV *self
double timeout
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
CODE:
uint32_t nvalues = items - 2;
RETVAL = 0;
for (uint32_t i = 0; i < nvalues; i++) {
if (!queue_int_push_wait(h, (int64_t)SvIV(ST(i + 2)), timeout)) break;
RETVAL++;
}
OUTPUT:
RETVAL
void
sync(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
CODE:
if (queue_sync(h) != 0)
croak("msync: %s", strerror(errno));
IV
eventfd(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
CODE:
RETVAL = queue_eventfd_create(h);
if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
OUTPUT:
RETVAL
void
eventfd_set(self, fd)
SV *self
int fd
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
CODE:
queue_eventfd_set(h, fd);
IV
fileno(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
bool utf8;
CODE:
if (items > 1) timeout = SvNV(ST(1));
int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
if (r == 1) {
RETVAL = newSVpvn(str, len);
if (utf8) SvUTF8_on(RETVAL);
} else if (r == -1) {
croak("Data::Queue::Shared::Str: out of memory");
} else {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
UV
push_multi(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
CODE:
uint32_t count = items - 1;
uint32_t pushed = 0;
/* Extract SV data BEFORE locking: SvPV can run magic (tied/overloaded
* stringification) that longjmps; doing it under the process-shared
* mutex would abandon the lock and deadlock peers. Newx+SAVEFREEPV so a
* die during extraction (or the too-long croak) cannot leak args. */
struct qsm_arg { const char *str; STRLEN len; bool utf8; };
struct qsm_arg *args = NULL;
if (count > 0) {
Newx(args, count, struct qsm_arg);
SAVEFREEPV(args);
for (uint32_t i = 0; i < count; i++) {
SV *sv = ST(i + 1);
args[i].str = SvPV(sv, args[i].len);
args[i].utf8 = SvUTF8(sv) ? true : false;
}
}
queue_mutex_lock(h->hdr);
for (uint32_t i = 0; i < count; i++) {
int r = queue_str_push_locked(h, args[i].str, (uint32_t)args[i].len, args[i].utf8);
if (r == -2) { queue_mutex_unlock(h->hdr); croak("Data::Queue::Shared::Str: string too long (max 2GB)"); }
if (r != 1) break;
pushed++;
}
queue_mutex_unlock(h->hdr);
if (pushed) queue_wake_consumers_n(h->hdr, pushed);
RETVAL = pushed;
OUTPUT:
RETVAL
void
pop_multi(self, count)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
const char *str;
uint32_t len;
bool utf8;
PPCODE:
/* Hoist Perl SV construction out of the process-shared mutex:
* newSVpvn can longjmp on OOM and deadlock peers on the futex. */
struct { char *buf; uint32_t len; bool utf8; } *items_buf = NULL;
UV n = 0;
int last_r = 0;
int oom = 0;
/* Cap count at capacity: the queue can't hold more than capacity items,
* so a single pop_multi can't return more than that. This also prevents
* size_t overflow in the items_buf allocation for adversarial inputs. */
if (count > h->capacity) count = h->capacity;
if (count > 0) {
items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
if (!items_buf) croak("Data::Queue::Shared::Str: out of memory");
}
queue_mutex_lock(h->hdr);
for (UV i = 0; i < count; i++) {
last_r = queue_str_pop_locked(h, &str, &len, &utf8);
if (last_r <= 0) break;
char *c = (char *)malloc(len ? len : 1);
if (!c) { oom = 1; break; }
if (len) memcpy(c, str, len);
items_buf[n].buf = c;
items_buf[n].len = len;
items_buf[n].utf8 = utf8;
n++;
}
queue_mutex_unlock(h->hdr);
if (n) queue_wake_producers_n(h->hdr, (uint32_t)n);
EXTEND(SP, (SSize_t)n);
for (UV j = 0; j < n; j++) {
SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
if (items_buf[j].utf8) SvUTF8_on(sv);
PUSHs(sv_2mortal(sv));
free(items_buf[j].buf);
}
free(items_buf);
if (last_r == -1 || oom) croak("Data::Queue::Shared::Str: out of memory");
UV
size(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
CODE:
RETVAL = (UV)queue_str_size(h);
OUTPUT:
RETVAL
UV
capacity(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
CODE:
RETVAL = h->capacity;
OUTPUT:
RETVAL
bool
is_empty(self)
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
const char *str;
uint32_t len;
bool utf8;
CODE:
int r = queue_str_peek(h, &str, &len, &utf8);
if (r == 1) {
RETVAL = newSVpvn(str, len);
if (utf8) SvUTF8_on(RETVAL);
} else if (r == -1) {
croak("Data::Queue::Shared::Str: out of memory");
} else {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
bool
push_front(self, value)
SV *self
SV *value
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
STRLEN len;
CODE:
const char *str = SvPV(value, len);
bool utf8 = SvUTF8(value) ? true : false;
int r = queue_str_push_front(h, str, (uint32_t)len, utf8);
if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
RETVAL = (r == 1);
OUTPUT:
RETVAL
bool
push_front_wait(self, value, ...)
SV *self
SV *value
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
double timeout = -1;
STRLEN len;
CODE:
if (items > 2) timeout = SvNV(ST(2));
const char *str = SvPV(value, len);
bool utf8 = SvUTF8(value) ? true : false;
int r = queue_str_push_front_wait(h, str, (uint32_t)len, utf8, timeout);
if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
RETVAL = (r == 1);
OUTPUT:
RETVAL
void
drain(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
const char *str;
uint32_t len;
bool utf8;
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
/* Hoist SV construction out of the mutex (see pop_multi). */
struct drain_item { char *buf; uint32_t len; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
UV drained_n = 0;
int last_r = 0;
int oom = 0;
queue_mutex_lock(h->hdr);
while (max_count-- > 0) {
last_r = queue_str_pop_locked(h, &str, &len, &utf8);
if (last_r <= 0) break;
struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
char *c = (char *)malloc(len ? len : 1);
if (!it || !c) { free(it); free(c); oom = 1; break; }
if (len) memcpy(c, str, len);
it->buf = c; it->len = len; it->utf8 = utf8; it->next = NULL;
if (drained_tail) drained_tail->next = it; else drained_head = it;
drained_tail = it;
drained_n++;
}
queue_mutex_unlock(h->hdr);
if (drained_n) queue_wake_producers_n(h->hdr, (uint32_t)drained_n);
EXTEND(SP, (SSize_t)drained_n);
while (drained_head) {
struct drain_item *it = drained_head; drained_head = it->next;
SV *sv = newSVpvn(it->buf, it->len);
if (it->utf8) SvUTF8_on(sv);
PUSHs(sv_2mortal(sv));
free(it->buf);
free(it);
}
if (last_r == -1 || oom) croak("Data::Queue::Shared::Str: out of memory");
void
pop_wait_multi(self, count, ...)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
double timeout = -1;
const char *str;
uint32_t len;
bool utf8;
PPCODE:
if (items > 2) timeout = SvNV(ST(2));
/* Block until at least 1 */
{
int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
if (r == -1) croak("Data::Queue::Shared::Str: out of memory");
if (r != 1) XSRETURN(0);
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
}
/* Grab up to count-1 more non-blocking */
for (UV i = 1; i < count; i++) {
int r = queue_str_try_pop(h, &str, &len, &utf8);
if (r <= 0) break;
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
}
UV
push_wait_multi(self, timeout, ...)
SV *self
double timeout
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
CODE:
uint32_t nvalues = items - 2;
RETVAL = 0;
for (uint32_t i = 0; i < nvalues; i++) {
SV *sv = ST(i + 2);
STRLEN len;
const char *str = SvPV(sv, len);
bool utf8 = SvUTF8(sv) ? true : false;
int r = queue_str_push_wait(h, str, (uint32_t)len, utf8, timeout);
if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
if (r != 1) break;
RETVAL++;
}
OUTPUT:
RETVAL
SV *
pop_back(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
const char *str;
uint32_t len;
bool utf8;
CODE:
int r = queue_str_pop_back(h, &str, &len, &utf8);
if (r == 1) {
RETVAL = newSVpvn(str, len);
if (utf8) SvUTF8_on(RETVAL);
} else if (r == -1) {
croak("Data::Queue::Shared::Str: out of memory");
} else {
RETVAL = &PL_sv_undef;
}
OUTPUT:
( run in 0.728 second using v1.01-cache-2.11-cpan-5511b514fd6 )