Data-PubSub-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

  CODE:
    RETVAL = h->backing_fd;
  OUTPUT:
    RETVAL

void
DESTROY(self)
    SV *self
  CODE:
    if (!SvROK(self)) return;
    PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self)));
    if (!h) return;
    sv_setiv(SvRV(self), 0);
    pubsub_destroy(h);

SV *
publish(self, value)
    SV *self
    SV *value
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
  CODE:
    STRLEN len;
    bool utf8 = SvUTF8(value) ? true : false;
    const char *str;
    if (utf8)
        str = SvPVutf8(value, len);
    else
        str = SvPV(value, len);
    int r = pubsub_str_publish(h, str, (uint32_t)len, utf8);
    if (r == -1) croak("publish: message too long (%u > %u)", (unsigned)len, h->msg_size);
    RETVAL = &PL_sv_yes;
  OUTPUT:
    RETVAL

UV
publish_multi(self, ...)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
  CODE:
    RETVAL = 0;
    uint32_t count = items - 1;
    if (count > 0) {
        /* Extract SV data BEFORE locking: SvPV(utf8) can run magic
         * (tied/overloaded stringification) that longjmps; doing it under
         * the process-shared mutex would abandon the lock and deadlock peers.
         * Newx+SAVEFREEPV so a die during extraction cannot leak args. */
        struct psm_arg { const char *str; STRLEN len; bool utf8; };
        struct psm_arg *args;
        Newx(args, count, struct psm_arg);
        SAVEFREEPV(args);
        for (uint32_t i = 0; i < count; i++) {
            SV *val = ST(i + 1);
            args[i].utf8 = SvUTF8(val) ? true : false;
            args[i].str = args[i].utf8 ? SvPVutf8(val, args[i].len)
                                       : SvPV(val, args[i].len);
        }
        pubsub_mutex_lock(h->hdr);
        for (uint32_t i = 0; i < count; i++) {
            int r = pubsub_str_publish_locked(h, args[i].str, (uint32_t)args[i].len, args[i].utf8);
            if (r == -1) {
                pubsub_mutex_unlock(h->hdr);
                croak("publish_multi: message too long (%u > %u)", (unsigned)args[i].len, h->msg_size);
            }
            RETVAL++;
        }
        pubsub_mutex_unlock(h->hdr);
        pubsub_wake_subscribers(h->hdr);
    }
  OUTPUT:
    RETVAL

void
publish_notify(self, value)
    SV *self
    SV *value
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
  CODE:
    STRLEN len;
    bool utf8 = SvUTF8(value) ? true : false;
    const char *str;
    if (utf8)
        str = SvPVutf8(value, len);
    else
        str = SvPV(value, len);
    int r = pubsub_str_publish(h, str, (uint32_t)len, utf8);
    if (r == -1) croak("publish_notify: message too long (%u > %u)", (unsigned)len, h->msg_size);
    pubsub_notify(h);

SV *
subscribe(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
  CODE:
    PubSubSub *sub = pubsub_subscribe(h, 0);
    if (!sub) croak("subscribe: out of memory");
    sub->userdata = (void *)newSVsv(self);
    MAKE_OBJ("Data::PubSub::Shared::Str::Sub", sub);
  OUTPUT:
    RETVAL

SV *
subscribe_all(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
  CODE:
    PubSubSub *sub = pubsub_subscribe(h, 1);
    if (!sub) croak("subscribe_all: out of memory");
    sub->userdata = (void *)newSVsv(self);
    MAKE_OBJ("Data::PubSub::Shared::Str::Sub", sub);
  OUTPUT:
    RETVAL

UV
capacity(self)
    SV *self
  PREINIT:



( run in 0.598 second using v1.01-cache-2.11-cpan-e1769b4cff6 )