Data-ReqRep-Shared
view release on metacpan or search on metacpan
new_memfd(class, name, req_cap, resp_slots, resp_size, ...)
const char *class
const char *name
UV req_cap
UV resp_slots
UV resp_size
PREINIT:
char errbuf[REQREP_ERR_BUFLEN];
uint64_t arena_cap;
CODE:
arena_cap = (items > 5) ? (uint64_t)SvUV(ST(5)) : 0;
ReqRepHandle *h = reqrep_create_memfd(name, (uint32_t)req_cap, (uint32_t)resp_slots,
(uint32_t)resp_size, arena_cap, errbuf);
if (!h) croak("Data::ReqRep::Shared->new_memfd: %s", errbuf);
MAKE_OBJ(class, h);
OUTPUT:
RETVAL
SV *
new_from_fd(class, fd)
const char *class
int fd
PREINIT:
char errbuf[REQREP_ERR_BUFLEN];
CODE:
ReqRepHandle *h = reqrep_open_fd(fd, REQREP_MODE_STR, errbuf);
if (!h) croak("Data::ReqRep::Shared->new_from_fd: %s", errbuf);
MAKE_OBJ(class, h);
OUTPUT:
RETVAL
IV
memfd(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
CODE:
RETVAL = h->backing_fd;
OUTPUT:
RETVAL
void
DESTROY(self)
SV *self
CODE:
if (!SvROK(self)) return;
ReqRepHandle *h = INT2PTR(ReqRepHandle*, SvIV(SvRV(self)));
if (!h) return;
sv_setiv(SvRV(self), 0);
reqrep_destroy(h);
void
recv(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
const char *str;
uint32_t len;
uint64_t id;
bool utf8;
PPCODE:
int r = reqrep_try_recv(h, &str, &len, &utf8, &id);
if (r == -1) croak("Data::ReqRep::Shared: out of memory");
if (r == 1) {
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
mXPUSHu((UV)id);
}
void
recv_wait(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
double timeout = -1;
const char *str;
uint32_t len;
uint64_t id;
bool utf8;
PPCODE:
if (items > 1) timeout = SvNV(ST(1));
int r = reqrep_recv_wait(h, &str, &len, &utf8, &id, timeout);
if (r == -1) croak("Data::ReqRep::Shared: out of memory");
if (r == 1) {
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
mXPUSHu((UV)id);
}
void
recv_multi(self, count)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
const char *str;
uint32_t len;
uint64_t id;
bool utf8;
PPCODE:
/* Hoist Perl SV construction out of process-shared futex mutex. */
struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
UV n = 0;
int last_r = 0;
int oom = 0;
if (count > 0) {
items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
}
reqrep_mutex_lock(h->hdr);
for (UV i = 0; i < count; i++) {
last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
if (last_r <= 0) break;
char *c = (char *)malloc(len ? len : 1);
if (!c) { oom = 1; break; }
if (len) memcpy(c, str, len);
items_buf[n].buf = c;
items_buf[n].len = len;
items_buf[n].id = id;
items_buf[n].utf8 = utf8;
n++;
}
reqrep_mutex_unlock(h->hdr);
reqrep_wake_producers(h->hdr);
EXTEND(SP, (SSize_t)(2 * n));
for (UV j = 0; j < n; j++) {
SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
if (items_buf[j].utf8) SvUTF8_on(sv);
PUSHs(sv_2mortal(sv));
PUSHs(sv_2mortal(newSVuv((UV)items_buf[j].id)));
free(items_buf[j].buf);
}
free(items_buf);
if (last_r == -1 || oom) croak("Data::ReqRep::Shared: out of memory");
void
recv_wait_multi(self, count, ...)
SV *self
UV count
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
double timeout = -1;
const char *str;
uint32_t len;
uint64_t id;
bool utf8;
PPCODE:
if (items > 2) timeout = SvNV(ST(2));
/* Block until at least 1 */
int r = reqrep_recv_wait(h, &str, &len, &utf8, &id, timeout);
if (r == -1) croak("Data::ReqRep::Shared: out of memory");
if (r != 1) XSRETURN(0);
{
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
mXPUSHu((UV)id);
}
/* Grab up to count-1 more non-blocking â hoist SV construction out of lock. */
struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
UV n = 0;
int last_r2 = 0;
int oom = 0;
if (count > 1) {
items_buf = (void *)malloc((size_t)(count - 1) * sizeof(*items_buf));
if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
}
reqrep_mutex_lock(h->hdr);
for (UV i = 1; i < count; i++) {
last_r2 = reqrep_recv_locked(h, &str, &len, &utf8, &id);
if (last_r2 <= 0) break;
char *c = (char *)malloc(len ? len : 1);
if (!c) { oom = 1; break; }
if (len) memcpy(c, str, len);
items_buf[n].buf = c;
items_buf[n].len = len;
items_buf[n].id = id;
items_buf[n].utf8 = utf8;
n++;
}
reqrep_mutex_unlock(h->hdr);
reqrep_wake_producers(h->hdr);
EXTEND(SP, (SSize_t)(2 * n));
for (UV j = 0; j < n; j++) {
SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
if (items_buf[j].utf8) SvUTF8_on(sv);
PUSHs(sv_2mortal(sv));
PUSHs(sv_2mortal(newSVuv((UV)items_buf[j].id)));
free(items_buf[j].buf);
}
free(items_buf);
if (last_r2 == -1 || oom) croak("Data::ReqRep::Shared: out of memory");
void
drain(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
const char *str;
uint32_t len;
uint64_t id;
bool utf8;
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
/* Hoist SV construction out of the mutex (see recv_multi). */
struct drain_item { char *buf; uint32_t len; uint64_t id; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
UV drained_n = 0;
int last_r = 0;
int oom = 0;
reqrep_mutex_lock(h->hdr);
while (max_count-- > 0) {
last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
if (last_r <= 0) break;
struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
char *c = (char *)malloc(len ? len : 1);
if (!it || !c) { free(it); free(c); oom = 1; break; }
if (len) memcpy(c, str, len);
it->buf = c; it->len = len; it->id = id; it->utf8 = utf8; it->next = NULL;
if (drained_tail) drained_tail->next = it; else drained_head = it;
drained_tail = it;
drained_n++;
}
reqrep_mutex_unlock(h->hdr);
reqrep_wake_producers(h->hdr);
EXTEND(SP, (SSize_t)(2 * drained_n));
while (drained_head) {
struct drain_item *it = drained_head; drained_head = it->next;
SV *sv = newSVpvn(it->buf, it->len);
if (it->utf8) SvUTF8_on(sv);
PUSHs(sv_2mortal(sv));
PUSHs(sv_2mortal(newSVuv((UV)it->id)));
free(it->buf);
free(it);
}
if (last_r == -1 || oom) croak("Data::ReqRep::Shared: out of memory");
bool
reply(self, id, value)
SV *self
UV id
SV *value
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
STRLEN len;
CODE:
const char *str = SvPV(value, len);
bool utf8 = SvUTF8(value) ? true : false;
int r = reqrep_reply(h, (uint64_t)id, str, (uint32_t)len, utf8);
if (r == -1) croak("Data::ReqRep::Shared: invalid slot index");
if (r == -3) croak("Data::ReqRep::Shared: response too long (max %u bytes)", h->resp_data_max);
RETVAL = (r == 1);
OUTPUT:
RETVAL
UV
size(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared", self);
CODE:
RETVAL = (UV)reqrep_size(h);
OUTPUT:
RETVAL
if (!h) croak("Data::ReqRep::Shared::Int->new: %s", errbuf);
MAKE_OBJ(class, h);
OUTPUT:
RETVAL
SV *
new_memfd(class, name, req_cap, resp_slots)
const char *class
const char *name
UV req_cap
UV resp_slots
PREINIT:
char errbuf[REQREP_ERR_BUFLEN];
CODE:
ReqRepHandle *h = reqrep_create_int_memfd(name, (uint32_t)req_cap, (uint32_t)resp_slots, errbuf);
if (!h) croak("Data::ReqRep::Shared::Int->new_memfd: %s", errbuf);
MAKE_OBJ(class, h);
OUTPUT:
RETVAL
SV *
new_from_fd(class, fd)
const char *class
int fd
PREINIT:
char errbuf[REQREP_ERR_BUFLEN];
CODE:
ReqRepHandle *h = reqrep_open_fd(fd, REQREP_MODE_INT, errbuf);
if (!h) croak("Data::ReqRep::Shared::Int->new_from_fd: %s", errbuf);
MAKE_OBJ(class, h);
OUTPUT:
RETVAL
IV
memfd(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
CODE:
RETVAL = h->backing_fd;
OUTPUT:
RETVAL
void
DESTROY(self)
SV *self
CODE:
if (!SvROK(self)) return;
ReqRepHandle *h = INT2PTR(ReqRepHandle*, SvIV(SvRV(self)));
if (!h) return;
sv_setiv(SvRV(self), 0);
reqrep_destroy(h);
void
recv(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
int64_t value;
uint64_t id;
PPCODE:
if (reqrep_int_try_recv(h, &value, &id)) {
mXPUSHi((IV)value);
mXPUSHu((UV)id);
}
void
recv_wait(self, ...)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
double timeout = -1;
int64_t value;
uint64_t id;
PPCODE:
if (items > 1) timeout = SvNV(ST(1));
if (reqrep_int_recv_wait(h, &value, &id, timeout)) {
mXPUSHi((IV)value);
mXPUSHu((UV)id);
}
bool
reply(self, id, value)
SV *self
UV id
IV value
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
CODE:
int r = reqrep_int_reply(h, (uint64_t)id, (int64_t)value);
if (r == -1) croak("Data::ReqRep::Shared::Int: invalid slot index");
RETVAL = (r == 1);
OUTPUT:
RETVAL
UV
size(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
CODE:
RETVAL = (UV)reqrep_int_size(h);
OUTPUT:
RETVAL
UV
capacity(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
CODE:
RETVAL = h->req_cap;
OUTPUT:
RETVAL
UV
resp_slots(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
CODE:
RETVAL = h->resp_slots;
OUTPUT:
RETVAL
SV *
path(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
CODE:
RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
OUTPUT:
RETVAL
( run in 1.662 second using v1.01-cache-2.11-cpan-71847e10f99 )