EV-Nats
view release on metacpan or search on metacpan
src/EV__Nats.xs view on Meta::CPAN
ngx_queue_t wait_queue;
int waiting_count;
/* Write coalescing */
ev_prepare prepare_watcher;
int prepare_active;
int wbuf_dirty;
char *host;
int port;
char *path; /* Unix socket path */
int reconnect_enabled;
int reconnect_delay_ms;
int max_reconnect_delay_ms;
int max_reconnect_attempts;
int reconnect_attempts;
ev_timer reconnect_timer;
int reconnect_timer_active;
int intentional_disconnect;
int connect_timeout_ms;
ev_timer connect_timer;
int connect_timer_active;
int ping_interval_ms;
ev_timer ping_timer;
int ping_timer_active;
int pings_outstanding;
int max_pings_outstanding;
int max_payload;
SV *server_info_json;
char *user;
char *pass;
char *token;
char *name;
int verbose;
int pedantic;
int echo;
int no_responders;
char inbox_prefix[32];
uint64_t next_req_id;
ngx_queue_t req_queue;
uint64_t inbox_sub_sid;
int parse_state;
int msg_type;
/* MSG/HMSG fields: absolute offsets into rbuf (safe across Renew) */
size_t msg_subject_off;
size_t msg_subject_len;
size_t msg_reply_off; /* msg_reply_len == 0 means no reply */
size_t msg_reply_len;
uint64_t msg_sid;
size_t msg_hdr_len;
size_t msg_total_len;
int priority;
int keepalive;
/* Stats */
UV msgs_in;
UV msgs_out;
UV bytes_in;
UV bytes_out;
/* Server pool for cluster failover */
ngx_queue_t server_pool;
int server_pool_count;
/* Drain state */
int draining;
SV *drain_cb;
/* Slow consumer detection */
size_t slow_consumer_bytes; /* wbuf threshold, 0 = disabled */
SV *on_slow_consumer;
/* Batch mode */
int batch_mode;
/* PONG callback queue (for flush) */
ngx_queue_t pong_cbs;
/* NKey auth */
char *nkey_seed; /* Ed25519 seed (base32-encoded) */
char *jwt;
char *server_nonce; /* nonce from INFO for signing */
/* Lame duck mode (leaf node graceful shutdown) */
int ldm;
SV *on_ldm;
/* TLS */
#ifdef HAVE_OPENSSL
SSL_CTX *ssl_ctx;
SSL *ssl;
int tls;
int tls_skip_verify;
char *tls_ca_file;
int ssl_handshaking;
#endif
};
typedef struct nats_req_s {
uint64_t req_id;
SV *cb;
ev_timer timer;
int timer_active;
ngx_queue_t queue;
} nats_req_t;
/* ================================================================
* Forward declarations
* ================================================================ */
static void nats_connect_tcp(nats_t *self);
static void nats_connect_unix(nats_t *self);
static void nats_do_connect(nats_t *self);
src/EV__Nats.xs view on Meta::CPAN
int hdr_idx, len_idx;
if (ntokens == 2) {
self->msg_reply_off = 0;
self->msg_reply_len = 0;
hdr_idx = 0; len_idx = 1;
} else if (ntokens == 3) {
self->msg_reply_off = tokens[0] - self->rbuf;
self->msg_reply_len = token_lens[0];
hdr_idx = 1; len_idx = 2;
} else {
return -1;
}
if (nats_parse_decimal(tokens[hdr_idx], token_lens[hdr_idx], &self->msg_hdr_len) != 0
|| nats_parse_decimal(tokens[len_idx], token_lens[len_idx], &self->msg_total_len) != 0) {
return -1;
}
self->msg_type = MSG_TYPE_HMSG;
return 0;
}
static void nats_process_msg(nats_t *self, char *payload, size_t len)
{
nats_sub_t *sub = nats_find_sub(self, self->msg_sid);
if (!sub) return;
self->msgs_in++;
self->bytes_in += len;
sub->received++;
int max_msgs = sub->max_msgs;
int received = sub->received;
uint64_t sid = sub->sid;
if (sub->cb) {
dSP;
ENTER; SAVETMPS;
PUSHMARK(SP);
EXTEND(SP, 4);
PUSHs(sv_2mortal(newSVpvn(self->rbuf + self->msg_subject_off, self->msg_subject_len)));
if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
PUSHs(sv_2mortal(newSVpvn(payload + self->msg_hdr_len, len - self->msg_hdr_len)));
} else {
PUSHs(sv_2mortal(newSVpvn(payload, len)));
}
if (self->msg_reply_len > 0) {
PUSHs(sv_2mortal(newSVpvn(self->rbuf + self->msg_reply_off, self->msg_reply_len)));
} else {
PUSHs(&PL_sv_undef);
}
if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
PUSHs(sv_2mortal(newSVpvn(payload, self->msg_hdr_len)));
}
PUTBACK;
/* pin sub->cb: a callback that unsubscribes its own sid clears
sub->cb (and frees the sub) mid-call; the pin keeps the CV alive */
PINNED_CALL_SV(sub->cb, G_DISCARD);
FREETMPS; LEAVE;
}
if (max_msgs > 0 && received >= max_msgs) {
sub = nats_find_sub(self, sid);
if (sub)
nats_remove_sub(self, sub);
}
}
static void nats_check_inbox_response(nats_t *self, const char *subject, size_t subject_len,
const char *payload, size_t payload_len,
const char *headers, size_t headers_len)
{
size_t pfx_len = strlen(self->inbox_prefix);
if (subject_len <= pfx_len || memcmp(subject, self->inbox_prefix, pfx_len) != 0)
return;
const char *id_str = subject + pfx_len;
size_t id_len = subject_len - pfx_len;
uint64_t req_id = 0;
size_t i;
for (i = 0; i < id_len; i++) {
if (id_str[i] < '0' || id_str[i] > '9') return;
req_id = req_id * 10 + (id_str[i] - '0');
}
ngx_queue_t *q;
ngx_queue_foreach(q, &self->req_queue) {
nats_req_t *req = ngx_queue_data(q, nats_req_t, queue);
if (req->req_id == req_id) {
ngx_queue_remove(q);
if (req->timer_active) {
ev_timer_stop(self->loop, &req->timer);
req->timer_active = 0;
}
if (req->cb) {
dSP;
int is_no_responders = (headers && headers_len >= 12 &&
memcmp(headers, "NATS/1.0 503", 12) == 0);
ENTER; SAVETMPS;
PUSHMARK(SP);
EXTEND(SP, 3);
if (is_no_responders) {
PUSHs(&PL_sv_undef);
PUSHs(sv_2mortal(newSVpvn("no responders", 13)));
} else {
PUSHs(sv_2mortal(newSVpvn(payload, payload_len)));
PUSHs(&PL_sv_undef);
if (headers && headers_len > 0)
PUSHs(sv_2mortal(newSVpvn(headers, headers_len)));
}
PUTBACK;
call_sv(req->cb, G_DISCARD);
FREETMPS; LEAVE;
SvREFCNT_dec(req->cb);
}
Safefree(req);
return;
src/EV__Nats.xs view on Meta::CPAN
/* Shared post-connect(2) socket setup for the TCP and Unix connect paths.
* rv is the connect() return: nonzero means the connection is still in
* progress (EINPROGRESS) and we must also wait for EV_WRITE. */
static void nats_after_connect(nats_t *self, int fd, int rv)
{
self->fd = fd;
self->connecting = 1;
self->connected = 0;
ev_io_init(&self->rio, nats_on_read, fd, EV_READ);
ev_io_init(&self->wio, nats_on_write, fd, EV_WRITE);
if (self->priority) {
ev_set_priority(&self->rio, self->priority);
ev_set_priority(&self->wio, self->priority);
}
ev_io_start(self->loop, &self->rio);
self->reading = 1;
if (rv != 0) {
ev_io_start(self->loop, &self->wio);
self->writing = 1;
}
if (self->connect_timeout_ms > 0) {
ev_timer_set(&self->connect_timer, self->connect_timeout_ms / 1000.0, 0.0);
ev_timer_start(self->loop, &self->connect_timer);
self->connect_timer_active = 1;
}
}
static void nats_connect_tcp(nats_t *self)
{
struct addrinfo hints, *res, *rp;
char port_str[8];
int fd = -1;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
snprintf(port_str, sizeof(port_str), "%d", self->port);
int rv = getaddrinfo(self->host, port_str, &hints, &res);
if (rv != 0) {
nats_emit_error(self, gai_strerror(rv));
nats_schedule_reconnect(self);
return;
}
for (rp = res; rp != NULL; rp = rp->ai_next) {
fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd < 0) continue;
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
int flag = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
if (self->keepalive > 0) {
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag));
#ifdef TCP_KEEPIDLE
setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &self->keepalive, sizeof(self->keepalive));
#endif
}
rv = connect(fd, rp->ai_addr, rp->ai_addrlen);
if (rv == 0 || errno == EINPROGRESS) break;
close(fd);
fd = -1;
}
freeaddrinfo(res);
if (fd < 0) {
nats_emit_error(self, "connection failed");
nats_schedule_reconnect(self);
return;
}
nats_after_connect(self, fd, rv);
}
static void nats_connect_unix(nats_t *self)
{
struct sockaddr_un addr;
int fd;
if (!self->path || strlen(self->path) >= sizeof(addr.sun_path)) {
nats_emit_error(self, "invalid unix socket path");
nats_schedule_reconnect(self);
return;
}
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd < 0) {
nats_emit_error(self, strerror(errno));
nats_schedule_reconnect(self);
return;
}
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, self->path, sizeof(addr.sun_path) - 1);
int rv = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
if (rv != 0 && errno != EINPROGRESS) {
close(fd);
nats_emit_error(self, strerror(errno));
nats_schedule_reconnect(self);
return;
}
nats_after_connect(self, fd, rv);
}
/* Helper: connect via path or host */
static void nats_do_connect(nats_t *self)
{
if (self->path)
src/EV__Nats.xs view on Meta::CPAN
ngx_queue_init(&self->subs);
ngx_queue_init(&self->wait_queue);
ngx_queue_init(&self->req_queue);
self->next_sid = 1;
ev_timer_init(&self->connect_timer, nats_on_connect_timeout, 0., 0.);
ev_timer_init(&self->reconnect_timer, nats_on_reconnect_timer, 0., 0.);
ev_timer_init(&self->ping_timer, nats_on_ping_timer, 0., 0.);
ev_prepare_init(&self->prepare_watcher, nats_on_prepare);
ngx_queue_init(&self->server_pool);
ngx_queue_init(&self->pong_cbs);
self->sub_map = newHV();
nats_setup_inbox(self);
if (items > 1 && (items - 1) % 2 == 0) {
for (i = 1; i < items; i += 2) {
const char *key = SvPV_nolen(ST(i));
SV *val = ST(i + 1);
if (strcmp(key, "host") == 0) nats_set_str_sv(&self->host, val);
else if (strcmp(key, "port") == 0) self->port = SvIV(val);
else if (strcmp(key, "path") == 0) nats_set_str_sv(&self->path, val);
else if (strcmp(key, "user") == 0) nats_set_str_sv(&self->user, val);
else if (strcmp(key, "pass") == 0) nats_set_str_sv(&self->pass, val);
else if (strcmp(key, "token") == 0) nats_set_str_sv(&self->token, val);
else if (strcmp(key, "name") == 0) nats_set_str_sv(&self->name, val);
else if (strcmp(key, "on_error") == 0)
self->on_error = newSVsv(val);
else if (strcmp(key, "on_connect") == 0)
self->on_connect = newSVsv(val);
else if (strcmp(key, "on_disconnect") == 0)
self->on_disconnect = newSVsv(val);
else if (strcmp(key, "verbose") == 0)
self->verbose = SvTRUE(val) ? 1 : 0;
else if (strcmp(key, "pedantic") == 0)
self->pedantic = SvTRUE(val) ? 1 : 0;
else if (strcmp(key, "echo") == 0)
self->echo = SvTRUE(val) ? 1 : 0;
else if (strcmp(key, "no_responders") == 0)
self->no_responders = SvTRUE(val) ? 1 : 0;
else if (strcmp(key, "reconnect") == 0)
self->reconnect_enabled = SvTRUE(val) ? 1 : 0;
else if (strcmp(key, "reconnect_delay") == 0)
self->reconnect_delay_ms = SvIV(val);
else if (strcmp(key, "max_reconnect_attempts") == 0)
self->max_reconnect_attempts = SvIV(val);
else if (strcmp(key, "max_reconnect_delay") == 0)
self->max_reconnect_delay_ms = SvIV(val);
else if (strcmp(key, "connect_timeout") == 0)
self->connect_timeout_ms = SvIV(val);
else if (strcmp(key, "ping_interval") == 0)
self->ping_interval_ms = SvIV(val);
else if (strcmp(key, "max_pings_outstanding") == 0)
self->max_pings_outstanding = SvIV(val);
else if (strcmp(key, "priority") == 0)
self->priority = SvIV(val);
else if (strcmp(key, "keepalive") == 0)
self->keepalive = SvIV(val);
#ifdef HAVE_OPENSSL
else if (strcmp(key, "tls") == 0)
self->tls = SvTRUE(val) ? 1 : 0;
else if (strcmp(key, "tls_ca_file") == 0)
nats_set_str_sv(&self->tls_ca_file, val);
else if (strcmp(key, "tls_skip_verify") == 0)
self->tls_skip_verify = SvTRUE(val) ? 1 : 0;
else if (strcmp(key, "nkey_seed") == 0)
nats_set_str_sv(&self->nkey_seed, val);
#endif
else if (strcmp(key, "jwt") == 0)
nats_set_str_sv(&self->jwt, val);
else if (strcmp(key, "slow_consumer_bytes") == 0)
self->slow_consumer_bytes = (size_t)SvUV(val);
else if (strcmp(key, "on_slow_consumer") == 0)
self->on_slow_consumer = newSVsv(val);
else if (strcmp(key, "on_lame_duck") == 0)
self->on_ldm = newSVsv(val);
else if (strcmp(key, "loop") == 0)
self->loop = (struct ev_loop *)SvIVx(SvRV(val));
else
warn("EV::Nats::new: unknown option '%s'", key);
}
}
if (self->host || self->path)
nats_do_connect(self);
RETVAL = self;
OUTPUT:
RETVAL
void
connect(self, host, port = 4222)
EV::Nats self
char *host
int port
CODE:
if (self->connected || self->connecting)
croak("already connected");
if (self->reconnect_timer_active) {
ev_timer_stop(self->loop, &self->reconnect_timer);
self->reconnect_timer_active = 0;
}
nats_set_str(&self->path, NULL);
nats_set_str(&self->host, host);
self->port = port;
self->intentional_disconnect = 0;
nats_connect_tcp(self);
void
connect_unix(self, path)
EV::Nats self
char *path
CODE:
if (self->connected || self->connecting)
croak("already connected");
src/EV__Nats.xs view on Meta::CPAN
nats_skip_waiting(self);
void
reconnect(self, enable, ...)
EV::Nats self
int enable
CODE:
self->reconnect_enabled = enable;
if (items > 2) self->reconnect_delay_ms = SvIV(ST(2));
if (items > 3) self->max_reconnect_attempts = SvIV(ST(3));
int
reconnect_enabled(self)
EV::Nats self
CODE:
RETVAL = self->reconnect_enabled;
OUTPUT:
RETVAL
int
connect_timeout(self, ...)
EV::Nats self
CODE:
if (items > 1)
self->connect_timeout_ms = SvIV(ST(1));
RETVAL = self->connect_timeout_ms;
OUTPUT:
RETVAL
int
ping_interval(self, ...)
EV::Nats self
CODE:
if (items > 1)
self->ping_interval_ms = SvIV(ST(1));
RETVAL = self->ping_interval_ms;
OUTPUT:
RETVAL
int
max_pings_outstanding(self, ...)
EV::Nats self
CODE:
if (items > 1)
self->max_pings_outstanding = SvIV(ST(1));
RETVAL = self->max_pings_outstanding;
OUTPUT:
RETVAL
int
priority(self, ...)
EV::Nats self
CODE:
if (items > 1)
self->priority = SvIV(ST(1));
RETVAL = self->priority;
OUTPUT:
RETVAL
int
keepalive(self, ...)
EV::Nats self
CODE:
if (items > 1)
self->keepalive = SvIV(ST(1));
RETVAL = self->keepalive;
OUTPUT:
RETVAL
void
on_error(self, ...)
EV::Nats self
PPCODE:
if (items > 1) {
CLEAR_HANDLER(self->on_error);
if (SvOK(ST(1)))
self->on_error = newSVsv(ST(1));
}
if (GIMME_V != G_VOID && self->on_error)
PUSHs(sv_2mortal(newSVsv(self->on_error)));
void
on_connect(self, ...)
EV::Nats self
PPCODE:
if (items > 1) {
CLEAR_HANDLER(self->on_connect);
if (SvOK(ST(1)))
self->on_connect = newSVsv(ST(1));
}
if (GIMME_V != G_VOID && self->on_connect)
PUSHs(sv_2mortal(newSVsv(self->on_connect)));
void
on_disconnect(self, ...)
EV::Nats self
PPCODE:
if (items > 1) {
CLEAR_HANDLER(self->on_disconnect);
if (SvOK(ST(1)))
self->on_disconnect = newSVsv(ST(1));
}
if (GIMME_V != G_VOID && self->on_disconnect)
PUSHs(sv_2mortal(newSVsv(self->on_disconnect)));
#ifdef HAVE_OPENSSL
void
tls(self, enable, ca_file = NULL, skip_verify = 0)
EV::Nats self
int enable
const char *ca_file
int skip_verify
CODE:
self->tls = enable;
self->tls_skip_verify = skip_verify;
nats_set_str(&self->tls_ca_file, (ca_file && *ca_file) ? ca_file : NULL);
#endif
void
stats(self)
EV::Nats self
PPCODE:
EXTEND(SP, 8);
PUSHs(sv_2mortal(newSVpvs("msgs_in")));
( run in 2.358 seconds using v1.01-cache-2.11-cpan-97f6503c9c8 )