EV-Nats

 view release on metacpan or  search on metacpan

src/EV__Nats.xs  view on Meta::CPAN

    ngx_queue_t wait_queue;
    int waiting_count;

    /* Write coalescing */
    ev_prepare prepare_watcher;
    int prepare_active;
    int wbuf_dirty;

    char *host;
    int port;
    char *path; /* Unix socket path */

    int reconnect_enabled;
    int reconnect_delay_ms;
    int max_reconnect_delay_ms;
    int max_reconnect_attempts;
    int reconnect_attempts;
    ev_timer reconnect_timer;
    int reconnect_timer_active;
    int intentional_disconnect;

    int connect_timeout_ms;
    ev_timer connect_timer;
    int connect_timer_active;

    int ping_interval_ms;
    ev_timer ping_timer;
    int ping_timer_active;
    int pings_outstanding;
    int max_pings_outstanding;

    int max_payload;
    SV *server_info_json;

    char *user;
    char *pass;
    char *token;
    char *name;
    int verbose;
    int pedantic;
    int echo;
    int no_responders;

    char inbox_prefix[32];
    uint64_t next_req_id;
    ngx_queue_t req_queue;
    uint64_t inbox_sub_sid;

    int parse_state;
    int msg_type;
    /* MSG/HMSG fields: absolute offsets into rbuf (safe across Renew) */
    size_t msg_subject_off;
    size_t msg_subject_len;
    size_t msg_reply_off;       /* msg_reply_len == 0 means no reply */
    size_t msg_reply_len;
    uint64_t msg_sid;
    size_t msg_hdr_len;
    size_t msg_total_len;

    int priority;
    int keepalive;

    /* Stats */
    UV msgs_in;
    UV msgs_out;
    UV bytes_in;
    UV bytes_out;

    /* Server pool for cluster failover */
    ngx_queue_t server_pool;
    int server_pool_count;

    /* Drain state */
    int draining;
    SV *drain_cb;

    /* Slow consumer detection */
    size_t slow_consumer_bytes;  /* wbuf threshold, 0 = disabled */
    SV *on_slow_consumer;

    /* Batch mode */
    int batch_mode;

    /* PONG callback queue (for flush) */
    ngx_queue_t pong_cbs;

    /* NKey auth */
    char *nkey_seed;     /* Ed25519 seed (base32-encoded) */
    char *jwt;
    char *server_nonce;  /* nonce from INFO for signing */

    /* Lame duck mode (leaf node graceful shutdown) */
    int ldm;
    SV *on_ldm;

    /* TLS */
#ifdef HAVE_OPENSSL
    SSL_CTX *ssl_ctx;
    SSL     *ssl;
    int tls;
    int tls_skip_verify;
    char *tls_ca_file;
    int ssl_handshaking;
#endif
};

typedef struct nats_req_s {
    uint64_t req_id;
    SV *cb;
    ev_timer timer;
    int timer_active;
    ngx_queue_t queue;
} nats_req_t;

/* ================================================================
 * Forward declarations
 * ================================================================ */

static void nats_connect_tcp(nats_t *self);
static void nats_connect_unix(nats_t *self);
static void nats_do_connect(nats_t *self);

src/EV__Nats.xs  view on Meta::CPAN

    int hdr_idx, len_idx;
    if (ntokens == 2) {
        self->msg_reply_off = 0;
        self->msg_reply_len = 0;
        hdr_idx = 0; len_idx = 1;
    } else if (ntokens == 3) {
        self->msg_reply_off = tokens[0] - self->rbuf;
        self->msg_reply_len = token_lens[0];
        hdr_idx = 1; len_idx = 2;
    } else {
        return -1;
    }
    if (nats_parse_decimal(tokens[hdr_idx], token_lens[hdr_idx], &self->msg_hdr_len) != 0
     || nats_parse_decimal(tokens[len_idx], token_lens[len_idx], &self->msg_total_len) != 0) {
        return -1;
    }

    self->msg_type = MSG_TYPE_HMSG;
    return 0;
}

static void nats_process_msg(nats_t *self, char *payload, size_t len)
{
    nats_sub_t *sub = nats_find_sub(self, self->msg_sid);
    if (!sub) return;

    self->msgs_in++;
    self->bytes_in += len;
    sub->received++;

    int max_msgs = sub->max_msgs;
    int received = sub->received;
    uint64_t sid = sub->sid;

    if (sub->cb) {
        dSP;
        ENTER; SAVETMPS;
        PUSHMARK(SP);
        EXTEND(SP, 4);

        PUSHs(sv_2mortal(newSVpvn(self->rbuf + self->msg_subject_off, self->msg_subject_len)));

        if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
            PUSHs(sv_2mortal(newSVpvn(payload + self->msg_hdr_len, len - self->msg_hdr_len)));
        } else {
            PUSHs(sv_2mortal(newSVpvn(payload, len)));
        }

        if (self->msg_reply_len > 0) {
            PUSHs(sv_2mortal(newSVpvn(self->rbuf + self->msg_reply_off, self->msg_reply_len)));
        } else {
            PUSHs(&PL_sv_undef);
        }

        if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
            PUSHs(sv_2mortal(newSVpvn(payload, self->msg_hdr_len)));
        }

        PUTBACK;
        /* pin sub->cb: a callback that unsubscribes its own sid clears
           sub->cb (and frees the sub) mid-call; the pin keeps the CV alive */
        PINNED_CALL_SV(sub->cb, G_DISCARD);
        FREETMPS; LEAVE;
    }

    if (max_msgs > 0 && received >= max_msgs) {
        sub = nats_find_sub(self, sid);
        if (sub)
            nats_remove_sub(self, sub);
    }
}

static void nats_check_inbox_response(nats_t *self, const char *subject, size_t subject_len,
                                       const char *payload, size_t payload_len,
                                       const char *headers, size_t headers_len)
{
    size_t pfx_len = strlen(self->inbox_prefix);
    if (subject_len <= pfx_len || memcmp(subject, self->inbox_prefix, pfx_len) != 0)
        return;

    const char *id_str = subject + pfx_len;
    size_t id_len = subject_len - pfx_len;
    uint64_t req_id = 0;
    size_t i;
    for (i = 0; i < id_len; i++) {
        if (id_str[i] < '0' || id_str[i] > '9') return;
        req_id = req_id * 10 + (id_str[i] - '0');
    }

    ngx_queue_t *q;
    ngx_queue_foreach(q, &self->req_queue) {
        nats_req_t *req = ngx_queue_data(q, nats_req_t, queue);
        if (req->req_id == req_id) {
            ngx_queue_remove(q);
            if (req->timer_active) {
                ev_timer_stop(self->loop, &req->timer);
                req->timer_active = 0;
            }
            if (req->cb) {
                dSP;
                int is_no_responders = (headers && headers_len >= 12 &&
                                        memcmp(headers, "NATS/1.0 503", 12) == 0);
                ENTER; SAVETMPS;
                PUSHMARK(SP);
                EXTEND(SP, 3);
                if (is_no_responders) {
                    PUSHs(&PL_sv_undef);
                    PUSHs(sv_2mortal(newSVpvn("no responders", 13)));
                } else {
                    PUSHs(sv_2mortal(newSVpvn(payload, payload_len)));
                    PUSHs(&PL_sv_undef);
                    if (headers && headers_len > 0)
                        PUSHs(sv_2mortal(newSVpvn(headers, headers_len)));
                }
                PUTBACK;
                call_sv(req->cb, G_DISCARD);
                FREETMPS; LEAVE;
                SvREFCNT_dec(req->cb);
            }
            Safefree(req);
            return;

src/EV__Nats.xs  view on Meta::CPAN

/* Shared post-connect(2) socket setup for the TCP and Unix connect paths.
 * rv is the connect() return: nonzero means the connection is still in
 * progress (EINPROGRESS) and we must also wait for EV_WRITE. */
static void nats_after_connect(nats_t *self, int fd, int rv)
{
    self->fd = fd;
    self->connecting = 1;
    self->connected = 0;

    ev_io_init(&self->rio, nats_on_read, fd, EV_READ);
    ev_io_init(&self->wio, nats_on_write, fd, EV_WRITE);

    if (self->priority) {
        ev_set_priority(&self->rio, self->priority);
        ev_set_priority(&self->wio, self->priority);
    }

    ev_io_start(self->loop, &self->rio);
    self->reading = 1;

    if (rv != 0) {
        ev_io_start(self->loop, &self->wio);
        self->writing = 1;
    }

    if (self->connect_timeout_ms > 0) {
        ev_timer_set(&self->connect_timer, self->connect_timeout_ms / 1000.0, 0.0);
        ev_timer_start(self->loop, &self->connect_timer);
        self->connect_timer_active = 1;
    }
}

static void nats_connect_tcp(nats_t *self)
{
    struct addrinfo hints, *res, *rp;
    char port_str[8];
    int fd = -1;

    memset(&hints, 0, sizeof(hints));
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;

    snprintf(port_str, sizeof(port_str), "%d", self->port);

    int rv = getaddrinfo(self->host, port_str, &hints, &res);
    if (rv != 0) {
        nats_emit_error(self, gai_strerror(rv));
        nats_schedule_reconnect(self);
        return;
    }

    for (rp = res; rp != NULL; rp = rp->ai_next) {
        fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
        if (fd < 0) continue;

        fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);

        int flag = 1;
        setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));

        if (self->keepalive > 0) {
            setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag));
            #ifdef TCP_KEEPIDLE
            setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &self->keepalive, sizeof(self->keepalive));
            #endif
        }

        rv = connect(fd, rp->ai_addr, rp->ai_addrlen);
        if (rv == 0 || errno == EINPROGRESS) break;

        close(fd);
        fd = -1;
    }

    freeaddrinfo(res);

    if (fd < 0) {
        nats_emit_error(self, "connection failed");
        nats_schedule_reconnect(self);
        return;
    }

    nats_after_connect(self, fd, rv);
}

static void nats_connect_unix(nats_t *self)
{
    struct sockaddr_un addr;
    int fd;

    if (!self->path || strlen(self->path) >= sizeof(addr.sun_path)) {
        nats_emit_error(self, "invalid unix socket path");
        nats_schedule_reconnect(self);
        return;
    }

    fd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (fd < 0) {
        nats_emit_error(self, strerror(errno));
        nats_schedule_reconnect(self);
        return;
    }

    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);

    memset(&addr, 0, sizeof(addr));
    addr.sun_family = AF_UNIX;
    strncpy(addr.sun_path, self->path, sizeof(addr.sun_path) - 1);

    int rv = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
    if (rv != 0 && errno != EINPROGRESS) {
        close(fd);
        nats_emit_error(self, strerror(errno));
        nats_schedule_reconnect(self);
        return;
    }

    nats_after_connect(self, fd, rv);
}

/* Helper: connect via path or host */
static void nats_do_connect(nats_t *self)
{
    if (self->path)

src/EV__Nats.xs  view on Meta::CPAN


    ngx_queue_init(&self->subs);
    ngx_queue_init(&self->wait_queue);
    ngx_queue_init(&self->req_queue);

    self->next_sid = 1;

    ev_timer_init(&self->connect_timer, nats_on_connect_timeout, 0., 0.);
    ev_timer_init(&self->reconnect_timer, nats_on_reconnect_timer, 0., 0.);
    ev_timer_init(&self->ping_timer, nats_on_ping_timer, 0., 0.);
    ev_prepare_init(&self->prepare_watcher, nats_on_prepare);

    ngx_queue_init(&self->server_pool);
    ngx_queue_init(&self->pong_cbs);
    self->sub_map = newHV();

    nats_setup_inbox(self);

    if (items > 1 && (items - 1) % 2 == 0) {
        for (i = 1; i < items; i += 2) {
            const char *key = SvPV_nolen(ST(i));
            SV *val = ST(i + 1);

            if      (strcmp(key, "host")  == 0) nats_set_str_sv(&self->host,  val);
            else if (strcmp(key, "port")  == 0) self->port = SvIV(val);
            else if (strcmp(key, "path")  == 0) nats_set_str_sv(&self->path,  val);
            else if (strcmp(key, "user")  == 0) nats_set_str_sv(&self->user,  val);
            else if (strcmp(key, "pass")  == 0) nats_set_str_sv(&self->pass,  val);
            else if (strcmp(key, "token") == 0) nats_set_str_sv(&self->token, val);
            else if (strcmp(key, "name")  == 0) nats_set_str_sv(&self->name,  val);
            else if (strcmp(key, "on_error") == 0)
                self->on_error = newSVsv(val);
            else if (strcmp(key, "on_connect") == 0)
                self->on_connect = newSVsv(val);
            else if (strcmp(key, "on_disconnect") == 0)
                self->on_disconnect = newSVsv(val);
            else if (strcmp(key, "verbose") == 0)
                self->verbose = SvTRUE(val) ? 1 : 0;
            else if (strcmp(key, "pedantic") == 0)
                self->pedantic = SvTRUE(val) ? 1 : 0;
            else if (strcmp(key, "echo") == 0)
                self->echo = SvTRUE(val) ? 1 : 0;
            else if (strcmp(key, "no_responders") == 0)
                self->no_responders = SvTRUE(val) ? 1 : 0;
            else if (strcmp(key, "reconnect") == 0)
                self->reconnect_enabled = SvTRUE(val) ? 1 : 0;
            else if (strcmp(key, "reconnect_delay") == 0)
                self->reconnect_delay_ms = SvIV(val);
            else if (strcmp(key, "max_reconnect_attempts") == 0)
                self->max_reconnect_attempts = SvIV(val);
            else if (strcmp(key, "max_reconnect_delay") == 0)
                self->max_reconnect_delay_ms = SvIV(val);
            else if (strcmp(key, "connect_timeout") == 0)
                self->connect_timeout_ms = SvIV(val);
            else if (strcmp(key, "ping_interval") == 0)
                self->ping_interval_ms = SvIV(val);
            else if (strcmp(key, "max_pings_outstanding") == 0)
                self->max_pings_outstanding = SvIV(val);
            else if (strcmp(key, "priority") == 0)
                self->priority = SvIV(val);
            else if (strcmp(key, "keepalive") == 0)
                self->keepalive = SvIV(val);
  #ifdef HAVE_OPENSSL
            else if (strcmp(key, "tls") == 0)
                self->tls = SvTRUE(val) ? 1 : 0;
            else if (strcmp(key, "tls_ca_file") == 0)
                nats_set_str_sv(&self->tls_ca_file, val);
            else if (strcmp(key, "tls_skip_verify") == 0)
                self->tls_skip_verify = SvTRUE(val) ? 1 : 0;
            else if (strcmp(key, "nkey_seed") == 0)
                nats_set_str_sv(&self->nkey_seed, val);
  #endif
            else if (strcmp(key, "jwt") == 0)
                nats_set_str_sv(&self->jwt, val);
            else if (strcmp(key, "slow_consumer_bytes") == 0)
                self->slow_consumer_bytes = (size_t)SvUV(val);
            else if (strcmp(key, "on_slow_consumer") == 0)
                self->on_slow_consumer = newSVsv(val);
            else if (strcmp(key, "on_lame_duck") == 0)
                self->on_ldm = newSVsv(val);
            else if (strcmp(key, "loop") == 0)
                self->loop = (struct ev_loop *)SvIVx(SvRV(val));
            else
                warn("EV::Nats::new: unknown option '%s'", key);
        }
    }

    if (self->host || self->path)
        nats_do_connect(self);

    RETVAL = self;
  OUTPUT:
    RETVAL

void
connect(self, host, port = 4222)
    EV::Nats self
    char *host
    int port
  CODE:
    if (self->connected || self->connecting)
        croak("already connected");

    if (self->reconnect_timer_active) {
        ev_timer_stop(self->loop, &self->reconnect_timer);
        self->reconnect_timer_active = 0;
    }

    nats_set_str(&self->path, NULL);
    nats_set_str(&self->host, host);
    self->port = port;
    self->intentional_disconnect = 0;

    nats_connect_tcp(self);

void
connect_unix(self, path)
    EV::Nats self
    char *path
  CODE:
    if (self->connected || self->connecting)
        croak("already connected");

src/EV__Nats.xs  view on Meta::CPAN

    nats_skip_waiting(self);

void
reconnect(self, enable, ...)
    EV::Nats self
    int enable
  CODE:
    self->reconnect_enabled = enable;
    if (items > 2) self->reconnect_delay_ms = SvIV(ST(2));
    if (items > 3) self->max_reconnect_attempts = SvIV(ST(3));

int
reconnect_enabled(self)
    EV::Nats self
  CODE:
    RETVAL = self->reconnect_enabled;
  OUTPUT:
    RETVAL

int
connect_timeout(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->connect_timeout_ms = SvIV(ST(1));
    RETVAL = self->connect_timeout_ms;
  OUTPUT:
    RETVAL

int
ping_interval(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->ping_interval_ms = SvIV(ST(1));
    RETVAL = self->ping_interval_ms;
  OUTPUT:
    RETVAL

int
max_pings_outstanding(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->max_pings_outstanding = SvIV(ST(1));
    RETVAL = self->max_pings_outstanding;
  OUTPUT:
    RETVAL

int
priority(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->priority = SvIV(ST(1));
    RETVAL = self->priority;
  OUTPUT:
    RETVAL

int
keepalive(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->keepalive = SvIV(ST(1));
    RETVAL = self->keepalive;
  OUTPUT:
    RETVAL

void
on_error(self, ...)
    EV::Nats self
  PPCODE:
    if (items > 1) {
        CLEAR_HANDLER(self->on_error);
        if (SvOK(ST(1)))
            self->on_error = newSVsv(ST(1));
    }
    if (GIMME_V != G_VOID && self->on_error)
        PUSHs(sv_2mortal(newSVsv(self->on_error)));

void
on_connect(self, ...)
    EV::Nats self
  PPCODE:
    if (items > 1) {
        CLEAR_HANDLER(self->on_connect);
        if (SvOK(ST(1)))
            self->on_connect = newSVsv(ST(1));
    }
    if (GIMME_V != G_VOID && self->on_connect)
        PUSHs(sv_2mortal(newSVsv(self->on_connect)));

void
on_disconnect(self, ...)
    EV::Nats self
  PPCODE:
    if (items > 1) {
        CLEAR_HANDLER(self->on_disconnect);
        if (SvOK(ST(1)))
            self->on_disconnect = newSVsv(ST(1));
    }
    if (GIMME_V != G_VOID && self->on_disconnect)
        PUSHs(sv_2mortal(newSVsv(self->on_disconnect)));

#ifdef HAVE_OPENSSL

void
tls(self, enable, ca_file = NULL, skip_verify = 0)
    EV::Nats self
    int enable
    const char *ca_file
    int skip_verify
  CODE:
    self->tls = enable;
    self->tls_skip_verify = skip_verify;
    nats_set_str(&self->tls_ca_file, (ca_file && *ca_file) ? ca_file : NULL);

#endif

void
stats(self)
    EV::Nats self
  PPCODE:
    EXTEND(SP, 8);
    PUSHs(sv_2mortal(newSVpvs("msgs_in")));



( run in 2.358 seconds using v1.01-cache-2.11-cpan-97f6503c9c8 )