Data-PubSub-Shared
view release on metacpan or search on metacpan
EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
CODE:
RETVAL = h->notify_fd;
OUTPUT:
RETVAL
SV *
eventfd_consume(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
CODE:
int64_t v = pubsub_eventfd_consume(h);
RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
OUTPUT:
RETVAL
void
notify(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
CODE:
pubsub_notify(h);
MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int::Sub
void
DESTROY(self)
SV *self
CODE:
if (!SvROK(self)) return;
PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(self)));
if (!sub) return;
sv_setiv(SvRV(self), 0);
if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
pubsub_sub_destroy(sub);
SV *
poll(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
int64_t value;
CODE:
int r = pubsub_int_poll(sub, &value);
if (r == 1)
RETVAL = newSViv((IV)value);
else
RETVAL = &PL_sv_undef;
OUTPUT:
RETVAL
void
poll_multi(self, count)
SV *self
UV count
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
int64_t value;
PPCODE:
for (UV i = 0; i < count; i++) {
int r = pubsub_int_poll(sub, &value);
if (r != 1) break;
mXPUSHi((IV)value);
}
SV *
poll_wait(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
double timeout = -1;
int64_t value;
CODE:
if (items > 1) timeout = SvNV(ST(1));
int r = pubsub_int_poll_wait(sub, &value, timeout);
if (r == 1)
RETVAL = newSViv((IV)value);
else
RETVAL = &PL_sv_undef;
OUTPUT:
RETVAL
void
drain(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
int64_t value;
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
while (max_count-- > 0 && pubsub_int_poll(sub, &value))
mXPUSHi((IV)value);
void
poll_wait_multi(self, count, ...)
SV *self
UV count
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
double timeout = -1;
int64_t value;
PPCODE:
if (count == 0) XSRETURN(0);
if (items > 2) timeout = SvNV(ST(2));
if (!pubsub_int_poll_wait(sub, &value, timeout)) XSRETURN(0);
mXPUSHi((IV)value);
for (UV i = 1; i < count; i++) {
if (!pubsub_int_poll(sub, &value)) break;
mXPUSHi((IV)value);
}
UV
lag(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
CODE:
RETVAL = (UV)pubsub_lag(sub);
OUTPUT:
RETVAL
UV
overflow_count(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
CODE:
RETVAL = (UV)sub->overflow_count;
OUTPUT:
RETVAL
UV
write_pos(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
CODE:
RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
OUTPUT:
RETVAL
bool
has_overflow(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
CODE:
uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
OUTPUT:
RETVAL
UV
cursor(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
CODE:
if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
RETVAL = (UV)sub->cursor;
OUTPUT:
RETVAL
void
reset(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
CODE:
sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
void
reset_oldest(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
CODE:
uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
UV
poll_cb(self, cb)
SV *self
SV *cb
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
int64_t value;
CODE:
RETVAL = 0;
while (pubsub_int_poll(sub, &value)) {
dSP;
ENTER; SAVETMPS;
PUSHMARK(SP);
mXPUSHi((IV)value);
PUTBACK;
call_sv(cb, G_DISCARD);
FREETMPS; LEAVE;
RETVAL++;
}
OUTPUT:
RETVAL
void
drain_notify(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
int64_t value;
uint32_t max_count;
PPCODE:
pubsub_sub_eventfd_consume(sub);
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
while (max_count-- > 0 && pubsub_int_poll(sub, &value))
mXPUSHi((IV)value);
void
eventfd_set(self, fd)
SV *self
int fd
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
CODE:
sub->notify_fd = fd;
IV
fileno(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
CODE:
RETVAL = sub->notify_fd;
OUTPUT:
RETVAL
MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Str
SV *
new(class, path, capacity, ...)
const char *class
SV *path
UV capacity
PREINIT:
char errbuf[PUBSUB_ERR_BUFLEN];
uint32_t msg_size;
CODE:
msg_size = (items > 3) ? (uint32_t)SvUV(ST(3)) : 0;
const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_STR, msg_size, errbuf);
if (!h) croak("Data::PubSub::Shared::Str->new: %s", errbuf);
MAKE_OBJ(class, h);
OUTPUT:
RETVAL
SV *
new_memfd(class, name, capacity, ...)
const char *class
const char *name
UV capacity
PREINIT:
char errbuf[PUBSUB_ERR_BUFLEN];
uint32_t msg_size;
CODE:
msg_size = (items > 3) ? (uint32_t)SvUV(ST(3)) : 0;
PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_STR, msg_size, errbuf);
if (!h) croak("Data::PubSub::Shared::Str->new_memfd: %s", errbuf);
MAKE_OBJ(class, h);
OUTPUT:
RETVAL
SV *
SV *
eventfd_consume(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
CODE:
int64_t v = pubsub_eventfd_consume(h);
RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
OUTPUT:
RETVAL
void
notify(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
CODE:
pubsub_notify(h);
MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Str::Sub
void
DESTROY(self)
SV *self
CODE:
if (!SvROK(self)) return;
PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(self)));
if (!sub) return;
sv_setiv(SvRV(self), 0);
if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
pubsub_sub_destroy(sub);
SV *
poll(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
const char *str;
uint32_t len;
bool utf8;
CODE:
int r = pubsub_str_poll(sub, &str, &len, &utf8);
if (r == 1) {
RETVAL = newSVpvn(str, len);
if (utf8) SvUTF8_on(RETVAL);
} else {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
void
poll_multi(self, count)
SV *self
UV count
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
const char *str;
uint32_t len;
bool utf8;
PPCODE:
for (UV i = 0; i < count; i++) {
int r = pubsub_str_poll(sub, &str, &len, &utf8);
if (r != 1) break;
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
}
SV *
poll_wait(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
double timeout = -1;
const char *str;
uint32_t len;
bool utf8;
CODE:
if (items > 1) timeout = SvNV(ST(1));
int r = pubsub_str_poll_wait(sub, &str, &len, &utf8, timeout);
if (r == 1) {
RETVAL = newSVpvn(str, len);
if (utf8) SvUTF8_on(RETVAL);
} else {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
void
drain(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
const char *str;
uint32_t len;
bool utf8;
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
while (max_count-- > 0 && pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
}
void
poll_wait_multi(self, count, ...)
SV *self
UV count
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
double timeout = -1;
const char *str;
uint32_t len;
bool utf8;
PPCODE:
if (count == 0) XSRETURN(0);
if (items > 2) timeout = SvNV(ST(2));
if (pubsub_str_poll_wait(sub, &str, &len, &utf8, timeout) != 1) XSRETURN(0);
{
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
}
for (UV i = 1; i < count; i++) {
if (pubsub_str_poll(sub, &str, &len, &utf8) != 1) break;
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
}
UV
lag(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
CODE:
RETVAL = (UV)pubsub_lag(sub);
OUTPUT:
RETVAL
UV
overflow_count(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
CODE:
RETVAL = (UV)sub->overflow_count;
OUTPUT:
RETVAL
UV
write_pos(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
CODE:
RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
OUTPUT:
RETVAL
bool
has_overflow(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
CODE:
uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
OUTPUT:
RETVAL
UV
cursor(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
CODE:
if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
RETVAL = (UV)sub->cursor;
OUTPUT:
RETVAL
void
reset(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
CODE:
sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
void
reset_oldest(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
CODE:
uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
UV
poll_cb(self, cb)
SV *self
SV *cb
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
const char *str;
uint32_t len;
bool utf8;
CODE:
RETVAL = 0;
while (pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
dSP;
ENTER; SAVETMPS;
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
PUSHMARK(SP);
mXPUSHs(sv);
PUTBACK;
call_sv(cb, G_DISCARD);
FREETMPS; LEAVE;
RETVAL++;
}
OUTPUT:
RETVAL
void
drain_notify(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
const char *str;
uint32_t len;
bool utf8;
uint32_t max_count;
PPCODE:
pubsub_sub_eventfd_consume(sub);
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
while (max_count-- > 0 && pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
SV *sv = newSVpvn(str, len);
if (utf8) SvUTF8_on(sv);
mXPUSHs(sv);
}
void
eventfd_set(self, fd)
SV *self
int fd
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
CODE:
sub->notify_fd = fd;
IV
fileno(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
CODE:
RETVAL = sub->notify_fd;
OUTPUT:
RETVAL
MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int32
SV *
new(class, path, capacity)
const char *class
SV *path
UV capacity
PREINIT:
char errbuf[PUBSUB_ERR_BUFLEN];
CODE:
const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT32, 0, errbuf);
if (!h) croak("Data::PubSub::Shared::Int32->new: %s", errbuf);
MAKE_OBJ(class, h);
OUTPUT:
RETVAL
SV *
new_memfd(class, name, capacity)
const char *class
const char *name
UV capacity
PREINIT:
char errbuf[PUBSUB_ERR_BUFLEN];
CODE:
PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT32, 0, errbuf);
if (!h) croak("Data::PubSub::Shared::Int32->new_memfd: %s", errbuf);
MAKE_OBJ(class, h);
OUTPUT:
RETVAL
SV *
new_from_fd(class, fd)
EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
CODE:
RETVAL = h->notify_fd;
OUTPUT:
RETVAL
SV *
eventfd_consume(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
CODE:
int64_t v = pubsub_eventfd_consume(h);
RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
OUTPUT:
RETVAL
void
notify(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
CODE:
pubsub_notify(h);
MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int32::Sub
void
DESTROY(self)
SV *self
CODE:
if (!SvROK(self)) return;
PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(self)));
if (!sub) return;
sv_setiv(SvRV(self), 0);
if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
pubsub_sub_destroy(sub);
SV *
poll(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
int32_t value;
CODE:
int r = pubsub_int32_poll(sub, &value);
if (r == 1)
RETVAL = newSViv((IV)value);
else
RETVAL = &PL_sv_undef;
OUTPUT:
RETVAL
void
poll_multi(self, count)
SV *self
UV count
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
int32_t value;
PPCODE:
for (UV i = 0; i < count; i++) {
int r = pubsub_int32_poll(sub, &value);
if (r != 1) break;
mXPUSHi((IV)value);
}
SV *
poll_wait(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
double timeout = -1;
int32_t value;
CODE:
if (items > 1) timeout = SvNV(ST(1));
int r = pubsub_int32_poll_wait(sub, &value, timeout);
if (r == 1)
RETVAL = newSViv((IV)value);
else
RETVAL = &PL_sv_undef;
OUTPUT:
RETVAL
void
drain(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
int32_t value;
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
while (max_count-- > 0 && pubsub_int32_poll(sub, &value))
mXPUSHi((IV)value);
void
poll_wait_multi(self, count, ...)
SV *self
UV count
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
double timeout = -1;
int32_t value;
PPCODE:
if (count == 0) XSRETURN(0);
if (items > 2) timeout = SvNV(ST(2));
if (!pubsub_int32_poll_wait(sub, &value, timeout)) XSRETURN(0);
mXPUSHi((IV)value);
for (UV i = 1; i < count; i++) {
if (!pubsub_int32_poll(sub, &value)) break;
mXPUSHi((IV)value);
}
UV
lag(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
CODE:
RETVAL = (UV)pubsub_lag(sub);
OUTPUT:
RETVAL
UV
overflow_count(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
CODE:
RETVAL = (UV)sub->overflow_count;
OUTPUT:
RETVAL
UV
write_pos(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
CODE:
RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
OUTPUT:
RETVAL
bool
has_overflow(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
CODE:
uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
OUTPUT:
RETVAL
UV
cursor(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
CODE:
if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
RETVAL = (UV)sub->cursor;
OUTPUT:
RETVAL
void
reset(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
CODE:
sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
void
reset_oldest(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
CODE:
uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
UV
poll_cb(self, cb)
SV *self
SV *cb
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
int32_t value;
CODE:
RETVAL = 0;
while (pubsub_int32_poll(sub, &value)) {
dSP;
ENTER; SAVETMPS;
PUSHMARK(SP);
mXPUSHi((IV)value);
PUTBACK;
call_sv(cb, G_DISCARD);
FREETMPS; LEAVE;
RETVAL++;
}
OUTPUT:
RETVAL
void
drain_notify(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
int32_t value;
uint32_t max_count;
PPCODE:
pubsub_sub_eventfd_consume(sub);
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
while (max_count-- > 0 && pubsub_int32_poll(sub, &value))
mXPUSHi((IV)value);
void
eventfd_set(self, fd)
SV *self
int fd
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
CODE:
sub->notify_fd = fd;
IV
fileno(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
CODE:
RETVAL = sub->notify_fd;
OUTPUT:
RETVAL
MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int16
SV *
new(class, path, capacity)
const char *class
SV *path
UV capacity
PREINIT:
char errbuf[PUBSUB_ERR_BUFLEN];
CODE:
const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT16, 0, errbuf);
if (!h) croak("Data::PubSub::Shared::Int16->new: %s", errbuf);
MAKE_OBJ(class, h);
OUTPUT:
RETVAL
SV *
new_memfd(class, name, capacity)
const char *class
const char *name
UV capacity
PREINIT:
char errbuf[PUBSUB_ERR_BUFLEN];
CODE:
PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT16, 0, errbuf);
if (!h) croak("Data::PubSub::Shared::Int16->new_memfd: %s", errbuf);
MAKE_OBJ(class, h);
OUTPUT:
RETVAL
SV *
new_from_fd(class, fd)
const char *class
int fd
PREINIT:
EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
CODE:
RETVAL = h->notify_fd;
OUTPUT:
RETVAL
SV *
eventfd_consume(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
CODE:
int64_t v = pubsub_eventfd_consume(h);
RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
OUTPUT:
RETVAL
void
notify(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
CODE:
pubsub_notify(h);
MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int16::Sub
void
DESTROY(self)
SV *self
CODE:
if (!SvROK(self)) return;
PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(self)));
if (!sub) return;
sv_setiv(SvRV(self), 0);
if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
pubsub_sub_destroy(sub);
SV *
poll(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
int16_t value;
CODE:
int r = pubsub_int16_poll(sub, &value);
if (r == 1)
RETVAL = newSViv((IV)value);
else
RETVAL = &PL_sv_undef;
OUTPUT:
RETVAL
void
poll_multi(self, count)
SV *self
UV count
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
int16_t value;
PPCODE:
for (UV i = 0; i < count; i++) {
int r = pubsub_int16_poll(sub, &value);
if (r != 1) break;
mXPUSHi((IV)value);
}
SV *
poll_wait(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
double timeout = -1;
int16_t value;
CODE:
if (items > 1) timeout = SvNV(ST(1));
int r = pubsub_int16_poll_wait(sub, &value, timeout);
if (r == 1)
RETVAL = newSViv((IV)value);
else
RETVAL = &PL_sv_undef;
OUTPUT:
RETVAL
void
drain(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
int16_t value;
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
while (max_count-- > 0 && pubsub_int16_poll(sub, &value))
mXPUSHi((IV)value);
void
poll_wait_multi(self, count, ...)
SV *self
UV count
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
double timeout = -1;
int16_t value;
PPCODE:
if (count == 0) XSRETURN(0);
if (items > 2) timeout = SvNV(ST(2));
if (!pubsub_int16_poll_wait(sub, &value, timeout)) XSRETURN(0);
mXPUSHi((IV)value);
for (UV i = 1; i < count; i++) {
if (!pubsub_int16_poll(sub, &value)) break;
mXPUSHi((IV)value);
}
UV
lag(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
CODE:
RETVAL = (UV)pubsub_lag(sub);
OUTPUT:
RETVAL
UV
overflow_count(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
CODE:
RETVAL = (UV)sub->overflow_count;
OUTPUT:
RETVAL
UV
write_pos(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
CODE:
RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
OUTPUT:
RETVAL
bool
has_overflow(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
CODE:
uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
OUTPUT:
RETVAL
UV
cursor(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
CODE:
if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
RETVAL = (UV)sub->cursor;
OUTPUT:
RETVAL
void
reset(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
CODE:
sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
void
reset_oldest(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
CODE:
uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
UV
poll_cb(self, cb)
SV *self
SV *cb
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
int16_t value;
CODE:
RETVAL = 0;
while (pubsub_int16_poll(sub, &value)) {
dSP;
ENTER; SAVETMPS;
PUSHMARK(SP);
mXPUSHi((IV)value);
PUTBACK;
call_sv(cb, G_DISCARD);
FREETMPS; LEAVE;
RETVAL++;
}
OUTPUT:
RETVAL
void
drain_notify(self, ...)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
int16_t value;
uint32_t max_count;
PPCODE:
pubsub_sub_eventfd_consume(sub);
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
while (max_count-- > 0 && pubsub_int16_poll(sub, &value))
mXPUSHi((IV)value);
void
eventfd_set(self, fd)
SV *self
int fd
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
CODE:
sub->notify_fd = fd;
IV
fileno(self)
SV *self
PREINIT:
EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
CODE:
RETVAL = sub->notify_fd;
OUTPUT:
RETVAL
( run in 1.104 second using v1.01-cache-2.11-cpan-71847e10f99 )