Data-ReqRep-Shared
view release on metacpan or search on metacpan
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
const char *str;
uint32_t len;
uint64_t id;
bool utf8;
PPCODE:
int r = reqrep_try_recv(h, &str, &len, &utf8, &id);
if (r == -1) croak("Data::ReqRep::Shared: out of memory");
if (r == 1) {
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
mXPUSHu((UV)id);
}
void
recv_wait(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
double timeout = -1;
const char *str;
uint32_t len;
uint64_t id;
bool utf8;
PPCODE:
if (items > 1) timeout = SvNV(ST(1));
int r = reqrep_recv_wait(h, &str, &len, &utf8, &id, timeout);
if (r == -1) croak("Data::ReqRep::Shared: out of memory");
if (r == 1) {
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
mXPUSHu((UV)id);
}
void
recv_multi(self, count)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
const char *str;
uint32_t len;
uint64_t id;
bool utf8;
PPCODE:
/* Hoist Perl SV construction out of process-shared futex mutex. */
struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
UV n = 0;
int last_r = 0;
int oom = 0;
if (count > 0) {
items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
}
reqrep_mutex_lock(h->hdr);
for (UV i = 0; i < count; i++) {
last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
if (last_r <= 0) break;
char *c = (char *)malloc(len ? len : 1);
if (!c) { oom = 1; break; }
if (len) memcpy(c, str, len);
items_buf[n].buf = c;
items_buf[n].len = len;
items_buf[n].id = id;
items_buf[n].utf8 = utf8;
n++;
}
reqrep_mutex_unlock(h->hdr);
reqrep_wake_producers(h->hdr);
EXTEND(SP, (SSize_t)(2 * n));
for (UV j = 0; j < n; j++) {
SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
if (items_buf[j].utf8) SvUTF8_on(sv);
PUSHs(sv_2mortal(sv));
PUSHs(sv_2mortal(newSVuv((UV)items_buf[j].id)));
free(items_buf[j].buf);
}
free(items_buf);
if (last_r == -1 || oom) croak("Data::ReqRep::Shared: out of memory");
void
recv_wait_multi(self, count, ...)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
double timeout = -1;
const char *str;
uint32_t len;
uint64_t id;
bool utf8;
PPCODE:
if (items > 2) timeout = SvNV(ST(2));
/* Block until at least 1 */
int r = reqrep_recv_wait(h, &str, &len, &utf8, &id, timeout);
if (r == -1) croak("Data::ReqRep::Shared: out of memory");
if (r != 1) XSRETURN(0);
{
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
mXPUSHu((UV)id);
}
/* Grab up to count-1 more non-blocking â hoist SV construction out of lock. */
struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
UV n = 0;
int last_r2 = 0;
int oom = 0;
if (count > 1) {
items_buf = (void *)malloc((size_t)(count - 1) * sizeof(*items_buf));
if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
}
reqrep_mutex_lock(h->hdr);
for (UV i = 1; i < count; i++) {
last_r2 = reqrep_recv_locked(h, &str, &len, &utf8, &id);
if (last_r2 <= 0) break;
char *c = (char *)malloc(len ? len : 1);
if (!c) { oom = 1; break; }
if (len) memcpy(c, str, len);
items_buf[n].buf = c;
items_buf[n].len = len;
items_buf[n].id = id;
items_buf[n].utf8 = utf8;
n++;
}
reqrep_mutex_unlock(h->hdr);
reqrep_wake_producers(h->hdr);
EXTEND(SP, (SSize_t)(2 * n));
for (UV j = 0; j < n; j++) {
SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
if (items_buf[j].utf8) SvUTF8_on(sv);
PUSHs(sv_2mortal(sv));
PUSHs(sv_2mortal(newSVuv((UV)items_buf[j].id)));
free(items_buf[j].buf);
}
free(items_buf);
if (last_r2 == -1 || oom) croak("Data::ReqRep::Shared: out of memory");
void
drain(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
const char *str;
uint32_t len;
uint64_t id;
bool utf8;
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
/* Hoist SV construction out of the mutex (see recv_multi). */
struct drain_item { char *buf; uint32_t len; uint64_t id; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
UV drained_n = 0;
int last_r = 0;
int oom = 0;
reqrep_mutex_lock(h->hdr);
while (max_count-- > 0) {
last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
if (last_r <= 0) break;
struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
char *c = (char *)malloc(len ? len : 1);
if (!it || !c) { free(it); free(c); oom = 1; break; }
if (len) memcpy(c, str, len);
it->buf = c; it->len = len; it->id = id; it->utf8 = utf8; it->next = NULL;
if (drained_tail) drained_tail->next = it; else drained_head = it;
drained_tail = it;
drained_n++;
}
reqrep_mutex_unlock(h->hdr);
reqrep_wake_producers(h->hdr);
EXTEND(SP, (SSize_t)(2 * drained_n));
while (drained_head) {
struct drain_item *it = drained_head; drained_head = it->next;
SV *sv = newSVpvn(it->buf, it->len);
if (it->utf8) SvUTF8_on(sv);
PUSHs(sv_2mortal(sv));
PUSHs(sv_2mortal(newSVuv((UV)it->id)));
free(it->buf);
free(it);
}
if (last_r == -1 || oom) croak("Data::ReqRep::Shared: out of memory");
bool
reply(self, id, value)
SV *self
UV id
SV *value
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
STRLEN len;
CODE:
const char *str = SvPV(value, len);
bool utf8 = SvUTF8(value) ? true : false;
int r = reqrep_reply(h, (uint64_t)id, str, (uint32_t)len, utf8);
if (r == -1) croak("Data::ReqRep::Shared: invalid slot index");
if (r == -3) croak("Data::ReqRep::Shared: response too long (max %u bytes)", h->resp_data_max);
RETVAL = (r == 1);
OUTPUT:
RETVAL
UV
size(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
CODE:
RETVAL = (UV)reqrep_size(h);
OUTPUT:
RETVAL
UV
capacity(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
CODE:
RETVAL = h->req_cap;
OUTPUT:
( run in 1.704 second using v1.01-cache-2.11-cpan-39bf76dae61 )