Data-PubSub-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

    EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
  CODE:
    RETVAL = h->notify_fd;
  OUTPUT:
    RETVAL

SV *
eventfd_consume(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
  CODE:
    int64_t v = pubsub_eventfd_consume(h);
    RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
  OUTPUT:
    RETVAL

void
notify(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
  CODE:
    pubsub_notify(h);

MODULE = Data::PubSub::Shared  PACKAGE = Data::PubSub::Shared::Int::Sub

void
DESTROY(self)
    SV *self
  CODE:
    if (!SvROK(self)) return;
    PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(self)));
    if (!sub) return;
    sv_setiv(SvRV(self), 0);
    if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
    pubsub_sub_destroy(sub);

SV *
poll(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    int64_t value;
  CODE:
    int r = pubsub_int_poll(sub, &value);
    if (r == 1)
        RETVAL = newSViv((IV)value);
    else
        RETVAL = &PL_sv_undef;
  OUTPUT:
    RETVAL

void
poll_multi(self, count)
    SV *self
    UV count
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    int64_t value;
  PPCODE:
    for (UV i = 0; i < count; i++) {
        int r = pubsub_int_poll(sub, &value);
        if (r != 1) break;
        mXPUSHi((IV)value);
    }

SV *
poll_wait(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    double timeout = -1;
    int64_t value;
  CODE:
    if (items > 1) timeout = SvNV(ST(1));
    int r = pubsub_int_poll_wait(sub, &value, timeout);
    if (r == 1)
        RETVAL = newSViv((IV)value);
    else
        RETVAL = &PL_sv_undef;
  OUTPUT:
    RETVAL

void
drain(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    int64_t value;
    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    while (max_count-- > 0 && pubsub_int_poll(sub, &value))
        mXPUSHi((IV)value);

void
poll_wait_multi(self, count, ...)
    SV *self
    UV count
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    double timeout = -1;
    int64_t value;
  PPCODE:
    if (count == 0) XSRETURN(0);
    if (items > 2) timeout = SvNV(ST(2));
    if (!pubsub_int_poll_wait(sub, &value, timeout)) XSRETURN(0);
    mXPUSHi((IV)value);
    for (UV i = 1; i < count; i++) {
        if (!pubsub_int_poll(sub, &value)) break;
        mXPUSHi((IV)value);
    }

UV
lag(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
  CODE:
    RETVAL = (UV)pubsub_lag(sub);
  OUTPUT:
    RETVAL

UV
overflow_count(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
  CODE:
    RETVAL = (UV)sub->overflow_count;
  OUTPUT:
    RETVAL

UV
write_pos(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
  CODE:
    RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
  OUTPUT:
    RETVAL

bool
has_overflow(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
  CODE:
    uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
    RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
  OUTPUT:
    RETVAL

UV
cursor(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
  CODE:
    if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
    RETVAL = (UV)sub->cursor;
  OUTPUT:
    RETVAL

void
reset(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
  CODE:
    sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);

void
reset_oldest(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
  CODE:
    uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
    sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;

UV
poll_cb(self, cb)
    SV *self
    SV *cb
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    int64_t value;
  CODE:
    RETVAL = 0;
    while (pubsub_int_poll(sub, &value)) {
        dSP;
        ENTER; SAVETMPS;
        PUSHMARK(SP);
        mXPUSHi((IV)value);
        PUTBACK;
        call_sv(cb, G_DISCARD);
        FREETMPS; LEAVE;
        RETVAL++;
    }
  OUTPUT:
    RETVAL

void
drain_notify(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    int64_t value;
    uint32_t max_count;
  PPCODE:
    pubsub_sub_eventfd_consume(sub);
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    while (max_count-- > 0 && pubsub_int_poll(sub, &value))
        mXPUSHi((IV)value);

void
eventfd_set(self, fd)
    SV *self
    int fd
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
  CODE:
    sub->notify_fd = fd;

IV
fileno(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
  CODE:
    RETVAL = sub->notify_fd;
  OUTPUT:
    RETVAL

MODULE = Data::PubSub::Shared  PACKAGE = Data::PubSub::Shared::Str

SV *
new(class, path, capacity, ...)
    const char *class
    SV *path
    UV capacity
  PREINIT:
    char errbuf[PUBSUB_ERR_BUFLEN];
    uint32_t msg_size;
  CODE:
    msg_size = (items > 3) ? (uint32_t)SvUV(ST(3)) : 0;
    const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
    PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_STR, msg_size, errbuf);
    if (!h) croak("Data::PubSub::Shared::Str->new: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *
new_memfd(class, name, capacity, ...)
    const char *class
    const char *name
    UV capacity
  PREINIT:
    char errbuf[PUBSUB_ERR_BUFLEN];
    uint32_t msg_size;
  CODE:
    msg_size = (items > 3) ? (uint32_t)SvUV(ST(3)) : 0;
    PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_STR, msg_size, errbuf);
    if (!h) croak("Data::PubSub::Shared::Str->new_memfd: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *

Shared.xs  view on Meta::CPAN

SV *
eventfd_consume(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
  CODE:
    int64_t v = pubsub_eventfd_consume(h);
    RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
  OUTPUT:
    RETVAL

void
notify(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
  CODE:
    pubsub_notify(h);

MODULE = Data::PubSub::Shared  PACKAGE = Data::PubSub::Shared::Str::Sub

void
DESTROY(self)
    SV *self
  CODE:
    if (!SvROK(self)) return;
    PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(self)));
    if (!sub) return;
    sv_setiv(SvRV(self), 0);
    if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
    pubsub_sub_destroy(sub);

SV *
poll(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    const char *str;
    uint32_t len;
    bool utf8;
  CODE:
    int r = pubsub_str_poll(sub, &str, &len, &utf8);
    if (r == 1) {
        RETVAL = newSVpvn(str, len);
        if (utf8) SvUTF8_on(RETVAL);
    } else {
        RETVAL = &PL_sv_undef;
    }
  OUTPUT:
    RETVAL

void
poll_multi(self, count)
    SV *self
    UV count
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    const char *str;
    uint32_t len;
    bool utf8;
  PPCODE:
    for (UV i = 0; i < count; i++) {
        int r = pubsub_str_poll(sub, &str, &len, &utf8);
        if (r != 1) break;
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
    }

SV *
poll_wait(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    double timeout = -1;
    const char *str;
    uint32_t len;
    bool utf8;
  CODE:
    if (items > 1) timeout = SvNV(ST(1));
    int r = pubsub_str_poll_wait(sub, &str, &len, &utf8, timeout);
    if (r == 1) {
        RETVAL = newSVpvn(str, len);
        if (utf8) SvUTF8_on(RETVAL);
    } else {
        RETVAL = &PL_sv_undef;
    }
  OUTPUT:
    RETVAL

void
drain(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    const char *str;
    uint32_t len;
    bool utf8;
    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    while (max_count-- > 0 && pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
    }

void
poll_wait_multi(self, count, ...)
    SV *self
    UV count
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    double timeout = -1;
    const char *str;
    uint32_t len;
    bool utf8;
  PPCODE:
    if (count == 0) XSRETURN(0);
    if (items > 2) timeout = SvNV(ST(2));
    if (pubsub_str_poll_wait(sub, &str, &len, &utf8, timeout) != 1) XSRETURN(0);
    {
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
    }
    for (UV i = 1; i < count; i++) {
        if (pubsub_str_poll(sub, &str, &len, &utf8) != 1) break;
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
    }

UV
lag(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
  CODE:
    RETVAL = (UV)pubsub_lag(sub);
  OUTPUT:
    RETVAL

UV
overflow_count(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
  CODE:
    RETVAL = (UV)sub->overflow_count;
  OUTPUT:
    RETVAL

UV
write_pos(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
  CODE:
    RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
  OUTPUT:
    RETVAL

bool
has_overflow(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
  CODE:
    uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
    RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
  OUTPUT:
    RETVAL

UV
cursor(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
  CODE:
    if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
    RETVAL = (UV)sub->cursor;
  OUTPUT:
    RETVAL

void
reset(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
  CODE:
    sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);

void
reset_oldest(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
  CODE:
    uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
    sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;

UV
poll_cb(self, cb)
    SV *self
    SV *cb
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    const char *str;
    uint32_t len;
    bool utf8;
  CODE:
    RETVAL = 0;
    while (pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
        dSP;
        ENTER; SAVETMPS;
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        PUSHMARK(SP);
        mXPUSHs(sv);
        PUTBACK;
        call_sv(cb, G_DISCARD);
        FREETMPS; LEAVE;
        RETVAL++;
    }
  OUTPUT:
    RETVAL

void
drain_notify(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    const char *str;
    uint32_t len;
    bool utf8;
    uint32_t max_count;
  PPCODE:
    pubsub_sub_eventfd_consume(sub);
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    while (max_count-- > 0 && pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
        SV *sv = newSVpvn(str, len);
        if (utf8) SvUTF8_on(sv);
        mXPUSHs(sv);
    }

void
eventfd_set(self, fd)
    SV *self
    int fd
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
  CODE:
    sub->notify_fd = fd;

IV
fileno(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
  CODE:
    RETVAL = sub->notify_fd;
  OUTPUT:
    RETVAL

MODULE = Data::PubSub::Shared  PACKAGE = Data::PubSub::Shared::Int32

SV *
new(class, path, capacity)
    const char *class
    SV *path
    UV capacity
  PREINIT:
    char errbuf[PUBSUB_ERR_BUFLEN];
  CODE:
    const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
    PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT32, 0, errbuf);
    if (!h) croak("Data::PubSub::Shared::Int32->new: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *
new_memfd(class, name, capacity)
    const char *class
    const char *name
    UV capacity
  PREINIT:
    char errbuf[PUBSUB_ERR_BUFLEN];
  CODE:
    PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT32, 0, errbuf);
    if (!h) croak("Data::PubSub::Shared::Int32->new_memfd: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *
new_from_fd(class, fd)

Shared.xs  view on Meta::CPAN

    EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
  CODE:
    RETVAL = h->notify_fd;
  OUTPUT:
    RETVAL

SV *
eventfd_consume(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
  CODE:
    int64_t v = pubsub_eventfd_consume(h);
    RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
  OUTPUT:
    RETVAL

void
notify(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
  CODE:
    pubsub_notify(h);

MODULE = Data::PubSub::Shared  PACKAGE = Data::PubSub::Shared::Int32::Sub

void
DESTROY(self)
    SV *self
  CODE:
    if (!SvROK(self)) return;
    PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(self)));
    if (!sub) return;
    sv_setiv(SvRV(self), 0);
    if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
    pubsub_sub_destroy(sub);

SV *
poll(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    int32_t value;
  CODE:
    int r = pubsub_int32_poll(sub, &value);
    if (r == 1)
        RETVAL = newSViv((IV)value);
    else
        RETVAL = &PL_sv_undef;
  OUTPUT:
    RETVAL

void
poll_multi(self, count)
    SV *self
    UV count
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    int32_t value;
  PPCODE:
    for (UV i = 0; i < count; i++) {
        int r = pubsub_int32_poll(sub, &value);
        if (r != 1) break;
        mXPUSHi((IV)value);
    }

SV *
poll_wait(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    double timeout = -1;
    int32_t value;
  CODE:
    if (items > 1) timeout = SvNV(ST(1));
    int r = pubsub_int32_poll_wait(sub, &value, timeout);
    if (r == 1)
        RETVAL = newSViv((IV)value);
    else
        RETVAL = &PL_sv_undef;
  OUTPUT:
    RETVAL

void
drain(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    int32_t value;
    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    while (max_count-- > 0 && pubsub_int32_poll(sub, &value))
        mXPUSHi((IV)value);

void
poll_wait_multi(self, count, ...)
    SV *self
    UV count
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    double timeout = -1;
    int32_t value;
  PPCODE:
    if (count == 0) XSRETURN(0);
    if (items > 2) timeout = SvNV(ST(2));
    if (!pubsub_int32_poll_wait(sub, &value, timeout)) XSRETURN(0);
    mXPUSHi((IV)value);
    for (UV i = 1; i < count; i++) {
        if (!pubsub_int32_poll(sub, &value)) break;
        mXPUSHi((IV)value);
    }

UV
lag(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
  CODE:
    RETVAL = (UV)pubsub_lag(sub);
  OUTPUT:
    RETVAL

UV
overflow_count(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
  CODE:
    RETVAL = (UV)sub->overflow_count;
  OUTPUT:
    RETVAL

UV
write_pos(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
  CODE:
    RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
  OUTPUT:
    RETVAL

bool
has_overflow(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
  CODE:
    uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
    RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
  OUTPUT:
    RETVAL

UV
cursor(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
  CODE:
    if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
    RETVAL = (UV)sub->cursor;
  OUTPUT:
    RETVAL

void
reset(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
  CODE:
    sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);

void
reset_oldest(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
  CODE:
    uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
    sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;

UV
poll_cb(self, cb)
    SV *self
    SV *cb
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    int32_t value;
  CODE:
    RETVAL = 0;
    while (pubsub_int32_poll(sub, &value)) {
        dSP;
        ENTER; SAVETMPS;
        PUSHMARK(SP);
        mXPUSHi((IV)value);
        PUTBACK;
        call_sv(cb, G_DISCARD);
        FREETMPS; LEAVE;
        RETVAL++;
    }
  OUTPUT:
    RETVAL

void
drain_notify(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    int32_t value;
    uint32_t max_count;
  PPCODE:
    pubsub_sub_eventfd_consume(sub);
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    while (max_count-- > 0 && pubsub_int32_poll(sub, &value))
        mXPUSHi((IV)value);

void
eventfd_set(self, fd)
    SV *self
    int fd
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
  CODE:
    sub->notify_fd = fd;

IV
fileno(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
  CODE:
    RETVAL = sub->notify_fd;
  OUTPUT:
    RETVAL

MODULE = Data::PubSub::Shared  PACKAGE = Data::PubSub::Shared::Int16

SV *
new(class, path, capacity)
    const char *class
    SV *path
    UV capacity
  PREINIT:
    char errbuf[PUBSUB_ERR_BUFLEN];
  CODE:
    const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
    PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT16, 0, errbuf);
    if (!h) croak("Data::PubSub::Shared::Int16->new: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *
new_memfd(class, name, capacity)
    const char *class
    const char *name
    UV capacity
  PREINIT:
    char errbuf[PUBSUB_ERR_BUFLEN];
  CODE:
    PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT16, 0, errbuf);
    if (!h) croak("Data::PubSub::Shared::Int16->new_memfd: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *
new_from_fd(class, fd)
    const char *class
    int fd
  PREINIT:

Shared.xs  view on Meta::CPAN

    EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
  CODE:
    RETVAL = h->notify_fd;
  OUTPUT:
    RETVAL

SV *
eventfd_consume(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
  CODE:
    int64_t v = pubsub_eventfd_consume(h);
    RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
  OUTPUT:
    RETVAL

void
notify(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
  CODE:
    pubsub_notify(h);

MODULE = Data::PubSub::Shared  PACKAGE = Data::PubSub::Shared::Int16::Sub

void
DESTROY(self)
    SV *self
  CODE:
    if (!SvROK(self)) return;
    PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(self)));
    if (!sub) return;
    sv_setiv(SvRV(self), 0);
    if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
    pubsub_sub_destroy(sub);

SV *
poll(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    int16_t value;
  CODE:
    int r = pubsub_int16_poll(sub, &value);
    if (r == 1)
        RETVAL = newSViv((IV)value);
    else
        RETVAL = &PL_sv_undef;
  OUTPUT:
    RETVAL

void
poll_multi(self, count)
    SV *self
    UV count
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    int16_t value;
  PPCODE:
    for (UV i = 0; i < count; i++) {
        int r = pubsub_int16_poll(sub, &value);
        if (r != 1) break;
        mXPUSHi((IV)value);
    }

SV *
poll_wait(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    double timeout = -1;
    int16_t value;
  CODE:
    if (items > 1) timeout = SvNV(ST(1));
    int r = pubsub_int16_poll_wait(sub, &value, timeout);
    if (r == 1)
        RETVAL = newSViv((IV)value);
    else
        RETVAL = &PL_sv_undef;
  OUTPUT:
    RETVAL

void
drain(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    int16_t value;
    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    while (max_count-- > 0 && pubsub_int16_poll(sub, &value))
        mXPUSHi((IV)value);

void
poll_wait_multi(self, count, ...)
    SV *self
    UV count
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    double timeout = -1;
    int16_t value;
  PPCODE:
    if (count == 0) XSRETURN(0);
    if (items > 2) timeout = SvNV(ST(2));
    if (!pubsub_int16_poll_wait(sub, &value, timeout)) XSRETURN(0);
    mXPUSHi((IV)value);
    for (UV i = 1; i < count; i++) {
        if (!pubsub_int16_poll(sub, &value)) break;
        mXPUSHi((IV)value);
    }

UV
lag(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
  CODE:
    RETVAL = (UV)pubsub_lag(sub);
  OUTPUT:
    RETVAL

UV
overflow_count(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
  CODE:
    RETVAL = (UV)sub->overflow_count;
  OUTPUT:
    RETVAL

UV
write_pos(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
  CODE:
    RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
  OUTPUT:
    RETVAL

bool
has_overflow(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
  CODE:
    uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
    RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
  OUTPUT:
    RETVAL

UV
cursor(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
  CODE:
    if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
    RETVAL = (UV)sub->cursor;
  OUTPUT:
    RETVAL

void
reset(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
  CODE:
    sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);

void
reset_oldest(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
  CODE:
    uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
    sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;

UV
poll_cb(self, cb)
    SV *self
    SV *cb
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    int16_t value;
  CODE:
    RETVAL = 0;
    while (pubsub_int16_poll(sub, &value)) {
        dSP;
        ENTER; SAVETMPS;
        PUSHMARK(SP);
        mXPUSHi((IV)value);
        PUTBACK;
        call_sv(cb, G_DISCARD);
        FREETMPS; LEAVE;
        RETVAL++;
    }
  OUTPUT:
    RETVAL

void
drain_notify(self, ...)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    int16_t value;
    uint32_t max_count;
  PPCODE:
    pubsub_sub_eventfd_consume(sub);
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    while (max_count-- > 0 && pubsub_int16_poll(sub, &value))
        mXPUSHi((IV)value);

void
eventfd_set(self, fd)
    SV *self
    int fd
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
  CODE:
    sub->notify_fd = fd;

IV
fileno(self)
    SV *self
  PREINIT:
    EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
  CODE:
    RETVAL = sub->notify_fd;
  OUTPUT:
    RETVAL



( run in 1.104 second using v1.01-cache-2.11-cpan-71847e10f99 )