EV-Nats

 view release on metacpan or  search on metacpan

src/EV__Nats.xs  view on Meta::CPAN

reconnect_enabled(self)
    EV::Nats self
  CODE:
    RETVAL = self->reconnect_enabled;
  OUTPUT:
    RETVAL

int
connect_timeout(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->connect_timeout_ms = SvIV(ST(1));
    RETVAL = self->connect_timeout_ms;
  OUTPUT:
    RETVAL

int
ping_interval(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->ping_interval_ms = SvIV(ST(1));
    RETVAL = self->ping_interval_ms;
  OUTPUT:
    RETVAL

int
max_pings_outstanding(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->max_pings_outstanding = SvIV(ST(1));
    RETVAL = self->max_pings_outstanding;
  OUTPUT:
    RETVAL

int
priority(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->priority = SvIV(ST(1));
    RETVAL = self->priority;
  OUTPUT:
    RETVAL

int
keepalive(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->keepalive = SvIV(ST(1));
    RETVAL = self->keepalive;
  OUTPUT:
    RETVAL

void
on_error(self, ...)
    EV::Nats self
  PPCODE:
    if (items > 1) {
        CLEAR_HANDLER(self->on_error);
        if (SvOK(ST(1)))
            self->on_error = newSVsv(ST(1));
    }
    if (GIMME_V != G_VOID && self->on_error)
        PUSHs(sv_2mortal(newSVsv(self->on_error)));

void
on_connect(self, ...)
    EV::Nats self
  PPCODE:
    if (items > 1) {
        CLEAR_HANDLER(self->on_connect);
        if (SvOK(ST(1)))
            self->on_connect = newSVsv(ST(1));
    }
    if (GIMME_V != G_VOID && self->on_connect)
        PUSHs(sv_2mortal(newSVsv(self->on_connect)));

void
on_disconnect(self, ...)
    EV::Nats self
  PPCODE:
    if (items > 1) {
        CLEAR_HANDLER(self->on_disconnect);
        if (SvOK(ST(1)))
            self->on_disconnect = newSVsv(ST(1));
    }
    if (GIMME_V != G_VOID && self->on_disconnect)
        PUSHs(sv_2mortal(newSVsv(self->on_disconnect)));

#ifdef HAVE_OPENSSL

void
tls(self, enable, ca_file = NULL, skip_verify = 0)
    EV::Nats self
    int enable
    const char *ca_file
    int skip_verify
  CODE:
    self->tls = enable;
    self->tls_skip_verify = skip_verify;
    nats_set_str(&self->tls_ca_file, (ca_file && *ca_file) ? ca_file : NULL);

#endif

void
stats(self)
    EV::Nats self
  PPCODE:
    EXTEND(SP, 8);
    PUSHs(sv_2mortal(newSVpvs("msgs_in")));
    PUSHs(sv_2mortal(newSVuv(self->msgs_in)));
    PUSHs(sv_2mortal(newSVpvs("msgs_out")));
    PUSHs(sv_2mortal(newSVuv(self->msgs_out)));
    PUSHs(sv_2mortal(newSVpvs("bytes_in")));
    PUSHs(sv_2mortal(newSVuv(self->bytes_in)));
    PUSHs(sv_2mortal(newSVpvs("bytes_out")));
    PUSHs(sv_2mortal(newSVuv(self->bytes_out)));

void
reset_stats(self)
    EV::Nats self
  CODE:
    self->msgs_in = 0;
    self->msgs_out = 0;
    self->bytes_in = 0;
    self->bytes_out = 0;

SV *
new_inbox(self)
    EV::Nats self
  CODE:
  {
    char inbox[80];
    int len = snprintf(inbox, sizeof(inbox), "%s%" UVuf, self->inbox_prefix, (UV)self->next_req_id++);
    RETVAL = newSVpvn(inbox, len);
  }
  OUTPUT:
    RETVAL

int
subscription_count(self)
    EV::Nats self
  CODE:
    RETVAL = HvKEYS(self->sub_map);
  OUTPUT:
    RETVAL

void
batch(self, code)
    EV::Nats self
    SV *code
  CODE:
    self->batch_mode = 1;
    {
        dSP;
        ENTER; SAVETMPS;
        PUSHMARK(SP);
        PUTBACK;
        call_sv(code, G_DISCARD);
        FREETMPS; LEAVE;
    }
    self->batch_mode = 0;
    if (self->wbuf_len > self->wbuf_off) {
        self->wbuf_dirty = 1;
        if (!self->prepare_active) {
            ev_prepare_start(self->loop, &self->prepare_watcher);
            self->prepare_active = 1;
        }
        /* Check slow consumer after batch */
        if (self->slow_consumer_bytes > 0 &&
            (self->wbuf_len - self->wbuf_off) > self->slow_consumer_bytes &&
            self->on_slow_consumer) {
            dSP;
            ENTER; SAVETMPS;
            PUSHMARK(SP);
            EXTEND(SP, 1);
            PUSHs(sv_2mortal(newSVuv(self->wbuf_len - self->wbuf_off)));
            PUTBACK;
            PINNED_CALL_SV(self->on_slow_consumer, G_DISCARD);
            FREETMPS; LEAVE;
        }
    }

void
slow_consumer(self, bytes_threshold, cb = NULL)
    EV::Nats self
    UV bytes_threshold
    SV *cb
  CODE:
    self->slow_consumer_bytes = (size_t)bytes_threshold;
    CLEAR_HANDLER(self->on_slow_consumer);
    if (cb && SvOK(cb))
        self->on_slow_consumer = newSVsv(cb);

void
on_lame_duck(self, ...)
    EV::Nats self
  PPCODE:
    if (items > 1) {
        CLEAR_HANDLER(self->on_ldm);
        if (SvOK(ST(1)))
            self->on_ldm = newSVsv(ST(1));
    }
    if (GIMME_V != G_VOID && self->on_ldm)
        PUSHs(sv_2mortal(newSVsv(self->on_ldm)));

  #ifdef HAVE_OPENSSL

void
nkey_seed(self, seed)
    EV::Nats self
    const char *seed
  CODE:
    nats_set_str(&self->nkey_seed, seed);

SV *
nkey_public_from_seed(class, seed)
    SV *class
    const char *seed
  CODE:
    (void)class;
    {
        char pub[64];
        int n = nats_nkey_public(seed, pub, sizeof(pub));
        if (n <= 0)
            croak("invalid NKey seed");
        RETVAL = newSVpvn(pub, n);
    }
  OUTPUT:
    RETVAL

SV *
nkey_generate_user_seed(class)
    SV *class
  CODE:
    (void)class;
    {
        unsigned char raw[36];
        /* S+User seed: b1 = PrefixByteSeed | (PrefixByteUser >> 5) = 0x95;
           b2 = (PrefixByteUser & 31) << 3 = 0x00. */
        raw[0] = 0x95;
        raw[1] = 0x00;
        nats_random_bytes(raw + 2, 32);
        uint16_t crc = nats_crc16(raw, 34);
        raw[34] = crc & 0xFF;
        raw[35] = (crc >> 8) & 0xFF;
        /* 36 bytes -> 58 base32 chars (288 bits, 57 full chars + 1 flush). */
        char enc[60];
        int n = nats_base32_encode(raw, sizeof(raw), enc, sizeof(enc));
        if (n < 0) croak("base32 buffer overflow");
        RETVAL = newSVpvn(enc, n);
    }
  OUTPUT:
    RETVAL

  #endif

void



( run in 2.226 seconds using v1.01-cache-2.11-cpan-71847e10f99 )