Data-Queue-Shared
view release on metacpan or search on metacpan
double timeout = -1;
STRLEN len;
CODE:
if (items > 2) timeout = SvNV(ST(2));
const char *str = SvPV(value, len);
bool utf8 = SvUTF8(value) ? true : false;
int r = queue_str_push_wait(h, str, (uint32_t)len, utf8, timeout);
if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
RETVAL = (r == 1);
OUTPUT:
RETVAL
SV *
pop_wait(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
double timeout = -1;
const char *str;
uint32_t len;
bool utf8;
CODE:
if (items > 1) timeout = SvNV(ST(1));
int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
if (r == 1) {
RETVAL = newSVpvn(str, len);
if (utf8) SvUTF8_on(RETVAL);
} else if (r == -1) {
croak("Data::Queue::Shared::Str: out of memory");
} else {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
UV
push_multi(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
CODE:
uint32_t count = items - 1;
uint32_t pushed = 0;
/* Extract SV data BEFORE locking: SvPV can run magic (tied/overloaded
* stringification) that longjmps; doing it under the process-shared
* mutex would abandon the lock and deadlock peers. Newx+SAVEFREEPV so a
* die during extraction (or the too-long croak) cannot leak args. */
struct qsm_arg { const char *str; STRLEN len; bool utf8; };
struct qsm_arg *args = NULL;
if (count > 0) {
Newx(args, count, struct qsm_arg);
SAVEFREEPV(args);
for (uint32_t i = 0; i < count; i++) {
SV *sv = ST(i + 1);
args[i].str = SvPV(sv, args[i].len);
args[i].utf8 = SvUTF8(sv) ? true : false;
}
}
queue_mutex_lock(h->hdr);
for (uint32_t i = 0; i < count; i++) {
int r = queue_str_push_locked(h, args[i].str, (uint32_t)args[i].len, args[i].utf8);
if (r == -2) { queue_mutex_unlock(h->hdr); croak("Data::Queue::Shared::Str: string too long (max 2GB)"); }
if (r != 1) break;
pushed++;
}
queue_mutex_unlock(h->hdr);
if (pushed) queue_wake_consumers_n(h->hdr, pushed);
RETVAL = pushed;
OUTPUT:
RETVAL
void
pop_multi(self, count)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
const char *str;
uint32_t len;
bool utf8;
PPCODE:
/* Hoist Perl SV construction out of the process-shared mutex:
* newSVpvn can longjmp on OOM and deadlock peers on the futex. */
struct { char *buf; uint32_t len; bool utf8; } *items_buf = NULL;
UV n = 0;
int last_r = 0;
int oom = 0;
/* Cap count at capacity: the queue can't hold more than capacity items,
* so a single pop_multi can't return more than that. This also prevents
* size_t overflow in the items_buf allocation for adversarial inputs. */
if (count > h->capacity) count = h->capacity;
if (count > 0) {
items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
if (!items_buf) croak("Data::Queue::Shared::Str: out of memory");
}
queue_mutex_lock(h->hdr);
for (UV i = 0; i < count; i++) {
last_r = queue_str_pop_locked(h, &str, &len, &utf8);
if (last_r <= 0) break;
char *c = (char *)malloc(len ? len : 1);
if (!c) { oom = 1; break; }
if (len) memcpy(c, str, len);
items_buf[n].buf = c;
items_buf[n].len = len;
items_buf[n].utf8 = utf8;
n++;
}
queue_mutex_unlock(h->hdr);
if (n) queue_wake_producers_n(h->hdr, (uint32_t)n);
EXTEND(SP, (SSize_t)n);
for (UV j = 0; j < n; j++) {
SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
if (items_buf[j].utf8) SvUTF8_on(sv);
PUSHs(sv_2mortal(sv));
free(items_buf[j].buf);
}
free(items_buf);
if (last_r == -1 || oom) croak("Data::Queue::Shared::Str: out of memory");
UV
size(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
CODE:
RETVAL = (UV)queue_str_size(h);
OUTPUT:
RETVAL
UV
capacity(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
CODE:
RETVAL = h->capacity;
OUTPUT:
RETVAL
bool
is_empty(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
CODE:
RETVAL = (queue_str_size(h) == 0);
OUTPUT:
RETVAL
bool
is_full(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
CODE:
RETVAL = (queue_str_size(h) >= h->capacity);
OUTPUT:
RETVAL
} else if (r == -1) {
croak("Data::Queue::Shared::Str: out of memory");
} else {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
bool
push_front(self, value)
SV *self
SV *value
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
STRLEN len;
CODE:
const char *str = SvPV(value, len);
bool utf8 = SvUTF8(value) ? true : false;
int r = queue_str_push_front(h, str, (uint32_t)len, utf8);
if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
RETVAL = (r == 1);
OUTPUT:
RETVAL
bool
push_front_wait(self, value, ...)
SV *self
SV *value
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
double timeout = -1;
STRLEN len;
CODE:
if (items > 2) timeout = SvNV(ST(2));
const char *str = SvPV(value, len);
bool utf8 = SvUTF8(value) ? true : false;
int r = queue_str_push_front_wait(h, str, (uint32_t)len, utf8, timeout);
if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
RETVAL = (r == 1);
OUTPUT:
RETVAL
void
drain(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
const char *str;
uint32_t len;
bool utf8;
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
/* Hoist SV construction out of the mutex (see pop_multi). */
struct drain_item { char *buf; uint32_t len; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
UV drained_n = 0;
int last_r = 0;
int oom = 0;
queue_mutex_lock(h->hdr);
while (max_count-- > 0) {
last_r = queue_str_pop_locked(h, &str, &len, &utf8);
if (last_r <= 0) break;
struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
char *c = (char *)malloc(len ? len : 1);
if (!it || !c) { free(it); free(c); oom = 1; break; }
if (len) memcpy(c, str, len);
it->buf = c; it->len = len; it->utf8 = utf8; it->next = NULL;
if (drained_tail) drained_tail->next = it; else drained_head = it;
drained_tail = it;
drained_n++;
}
queue_mutex_unlock(h->hdr);
if (drained_n) queue_wake_producers_n(h->hdr, (uint32_t)drained_n);
EXTEND(SP, (SSize_t)drained_n);
while (drained_head) {
struct drain_item *it = drained_head; drained_head = it->next;
SV *sv = newSVpvn(it->buf, it->len);
if (it->utf8) SvUTF8_on(sv);
PUSHs(sv_2mortal(sv));
free(it->buf);
free(it);
}
if (last_r == -1 || oom) croak("Data::Queue::Shared::Str: out of memory");
void
pop_wait_multi(self, count, ...)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
double timeout = -1;
const char *str;
uint32_t len;
bool utf8;
PPCODE:
if (items > 2) timeout = SvNV(ST(2));
/* Block until at least 1 */
{
int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
if (r == -1) croak("Data::Queue::Shared::Str: out of memory");
if (r != 1) XSRETURN(0);
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
}
/* Grab up to count-1 more non-blocking */
for (UV i = 1; i < count; i++) {
int r = queue_str_try_pop(h, &str, &len, &utf8);
if (r <= 0) break;
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
}
UV
push_wait_multi(self, timeout, ...)
SV *self
double timeout
PREINIT:
EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
CODE:
( run in 0.564 second using v1.01-cache-2.11-cpan-e1769b4cff6 )