EV-Nats
view release on metacpan or search on metacpan
DESCRIPTION
EV::Nats is an async NATS client that implements the protocol directly
in XS on top of EV. There is no external C library dependency.
Protocol
Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG, PING/PONG),
including headered publish/receive, wildcard subjects ("*", ">"), queue
groups, and request/reply with an automatic shared inbox subscription.
Connectivity
TCP and Unix-domain sockets; TCP keepalive; connect timeout; auto
reconnect with exponential backoff and jitter; subscription and
auto-unsub state restored on reconnect; cluster failover from INFO
"connect_urls"; lame-duck-mode (leaf node graceful shutdown) callback;
graceful "drain".
Auth
Token, user/pass, NKey/JWT (Ed25519 via OpenSSL).
TLS
Optional, auto-detected at build time. STARTTLS-style upgrade after
port => Int (default 4222)
Server port.
path => Str
Unix-domain socket path. Mutually exclusive with "host".
connect_timeout => Int (ms; 0 = none)
How long to wait for the TCP/TLS handshake before giving up.
keepalive => Int (seconds)
If set, enables "SO_KEEPALIVE" with this idle interval.
priority => Int (-2 .. +2)
EV watcher priority for the I/O watchers on this connection.
loop => EV::Loop (default "EV::default_loop")
The EV loop to attach watchers to.
name => Str
Client name advertised in CONNECT.
Server-side strict subject checking.
echo => Bool (default 1)
Receive messages this client itself publishes.
no_responders => Bool (default 0)
Ask the server to send a 503 status reply when a request has no
responders, surfaced as the "no responders" error in "request".
ping_interval => Int (ms, default 120000; 0 = disabled)
Client-initiated PING interval for keep-alive.
max_pings_outstanding => Int (default 2)
Maximum unacked PINGs before the connection is declared stale.
Reconnect options
reconnect => Bool (default 0)
Enable automatic reconnection.
reconnect_delay => Int (ms, default 2000)
Initial delay between reconnect attempts; subsequent attempts use
ping_interval([$ms])
Get/set PING interval.
max_pings_outstanding([$num])
Get/set max outstanding PINGs.
priority([$num])
Get/set EV watcher priority.
keepalive([$seconds])
Get/set TCP keepalive.
batch($coderef)
Batch multiple publishes into a single write. Suppresses per-publish
write scheduling; all buffered data is flushed after the coderef
returns.
$nats->batch(sub {
$nats->publish("foo.$_", "msg-$_") for 1..1000;
});
* TLS requires OpenSSL headers at build time (auto-detected).
* NKey auth requires OpenSSL with Ed25519 support (1.1.1+).
* The module handles all data as bytes. Encode UTF-8 strings before
passing them.
* Do not let the "EV::Nats" instance go out of scope (or be explicitly
"undef"-ed) from inside a callback while that callback is still
executing. The callback closure normally references $nats (via
"$nats->publish(...)" etc.), which keeps it alive; if you write a
callback that does not capture $nats and you "undef" the last outer
reference inside that callback, Perl will run "DESTROY" mid-callback
and free the underlying state. Any subsequent operation on $nats in
that callback is undefined behavior.
* Cluster URL discovery (the "connect_urls" field of INFO) is trusted
by default. On failover the client connects to whatever hostnames
the previous server advertised, and TLS hostname verification is
performed against those names. Use a private CA ("tls_ca_file") to
restrict which certificates are acceptable, or do not enable "tls"
lib/EV/Nats.pm view on Meta::CPAN
=head2 Protocol
Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG, PING/PONG),
including headered publish/receive, wildcard subjects (C<*>, C<E<gt>>),
queue groups, and request/reply with an automatic shared inbox
subscription.
=head2 Connectivity
TCP and Unix-domain sockets; TCP keepalive; connect timeout; auto
reconnect with exponential backoff and jitter; subscription and
auto-unsub state restored on reconnect; cluster failover from INFO
C<connect_urls>; lame-duck-mode (leaf node graceful shutdown) callback;
graceful C<drain>.
=head2 Auth
Token, user/pass, NKey/JWT (Ed25519 via OpenSSL).
=head2 TLS
lib/EV/Nats.pm view on Meta::CPAN
Server port.
=item path => Str
Unix-domain socket path. Mutually exclusive with C<host>.
=item connect_timeout => Int (ms; 0 = none)
How long to wait for the TCP/TLS handshake before giving up.
=item keepalive => Int (seconds)
If set, enables C<SO_KEEPALIVE> with this idle interval.
=item priority => Int (-2 .. +2)
L<EV> watcher priority for the I/O watchers on this connection.
=item loop => EV::Loop (default C<EV::default_loop>)
The L<EV> loop to attach watchers to.
lib/EV/Nats.pm view on Meta::CPAN
Receive messages this client itself publishes.
=item no_responders => Bool (default 0)
Ask the server to send a 503 status reply when a request has no
responders, surfaced as the C<"no responders"> error in C<request>.
=item ping_interval => Int (ms, default 120000; 0 = disabled)
Client-initiated PING interval for keep-alive.
=item max_pings_outstanding => Int (default 2)
Maximum unacked PINGs before the connection is declared stale.
=back
=head3 Reconnect options
=over
lib/EV/Nats.pm view on Meta::CPAN
Get/set PING interval.
=head2 max_pings_outstanding([$num])
Get/set max outstanding PINGs.
=head2 priority([$num])
Get/set EV watcher priority.
=head2 keepalive([$seconds])
Get/set TCP keepalive.
=head2 batch($coderef)
Batch multiple publishes into a single write. Suppresses per-publish
write scheduling; all buffered data is flushed after the coderef returns.
$nats->batch(sub {
$nats->publish("foo.$_", "msg-$_") for 1..1000;
});
lib/EV/Nats.pm view on Meta::CPAN
=item * TLS requires OpenSSL headers at build time (auto-detected).
=item * NKey auth requires OpenSSL with Ed25519 support (1.1.1+).
=item * The module handles all data as bytes. Encode UTF-8 strings before
passing them.
=item * Do not let the C<EV::Nats> instance go out of scope (or be
explicitly C<undef>-ed) from inside a callback while that callback is
still executing. The callback closure normally references C<$nats>
(via C<< $nats->publish(...) >> etc.), which keeps it alive; if you
write a callback that does not capture C<$nats> and you C<undef> the
last outer reference inside that callback, Perl will run C<DESTROY>
mid-callback and free the underlying state. Any subsequent operation
on C<$nats> in that callback is undefined behavior.
=item * Cluster URL discovery (the C<connect_urls> field of INFO) is
trusted by default. On failover the client connects to whatever
hostnames the previous server advertised, and TLS hostname verification
is performed against those names. Use a private CA (C<tls_ca_file>) to
restrict which certificates are acceptable, or do not enable C<tls> on
src/EV__Nats.xs view on Meta::CPAN
/* MSG/HMSG fields: absolute offsets into rbuf (safe across Renew) */
size_t msg_subject_off;
size_t msg_subject_len;
size_t msg_reply_off; /* msg_reply_len == 0 means no reply */
size_t msg_reply_len;
uint64_t msg_sid;
size_t msg_hdr_len;
size_t msg_total_len;
int priority;
int keepalive;
/* Stats */
UV msgs_in;
UV msgs_out;
UV bytes_in;
UV bytes_out;
/* Server pool for cluster failover */
ngx_queue_t server_pool;
int server_pool_count;
src/EV__Nats.xs view on Meta::CPAN
} else {
PUSHs(&PL_sv_undef);
}
if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
PUSHs(sv_2mortal(newSVpvn(payload, self->msg_hdr_len)));
}
PUTBACK;
/* pin sub->cb: a callback that unsubscribes its own sid clears
sub->cb (and frees the sub) mid-call; the pin keeps the CV alive */
PINNED_CALL_SV(sub->cb, G_DISCARD);
FREETMPS; LEAVE;
}
if (max_msgs > 0 && received >= max_msgs) {
sub = nats_find_sub(self, sid);
if (sub)
nats_remove_sub(self, sub);
}
}
src/EV__Nats.xs view on Meta::CPAN
for (rp = res; rp != NULL; rp = rp->ai_next) {
fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd < 0) continue;
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
int flag = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
if (self->keepalive > 0) {
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag));
#ifdef TCP_KEEPIDLE
setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &self->keepalive, sizeof(self->keepalive));
#endif
}
rv = connect(fd, rp->ai_addr, rp->ai_addrlen);
if (rv == 0 || errno == EINPROGRESS) break;
close(fd);
fd = -1;
}
src/EV__Nats.xs view on Meta::CPAN
else if (strcmp(key, "max_reconnect_delay") == 0)
self->max_reconnect_delay_ms = SvIV(val);
else if (strcmp(key, "connect_timeout") == 0)
self->connect_timeout_ms = SvIV(val);
else if (strcmp(key, "ping_interval") == 0)
self->ping_interval_ms = SvIV(val);
else if (strcmp(key, "max_pings_outstanding") == 0)
self->max_pings_outstanding = SvIV(val);
else if (strcmp(key, "priority") == 0)
self->priority = SvIV(val);
else if (strcmp(key, "keepalive") == 0)
self->keepalive = SvIV(val);
#ifdef HAVE_OPENSSL
else if (strcmp(key, "tls") == 0)
self->tls = SvTRUE(val) ? 1 : 0;
else if (strcmp(key, "tls_ca_file") == 0)
nats_set_str_sv(&self->tls_ca_file, val);
else if (strcmp(key, "tls_skip_verify") == 0)
self->tls_skip_verify = SvTRUE(val) ? 1 : 0;
else if (strcmp(key, "nkey_seed") == 0)
nats_set_str_sv(&self->nkey_seed, val);
#endif
src/EV__Nats.xs view on Meta::CPAN
priority(self, ...)
EV::Nats self
CODE:
if (items > 1)
self->priority = SvIV(ST(1));
RETVAL = self->priority;
OUTPUT:
RETVAL
int
keepalive(self, ...)
EV::Nats self
CODE:
if (items > 1)
self->keepalive = SvIV(ST(1));
RETVAL = self->keepalive;
OUTPUT:
RETVAL
void
on_error(self, ...)
EV::Nats self
PPCODE:
if (items > 1) {
CLEAR_HANDLER(self->on_error);
if (SvOK(ST(1)))
xt/09_max_payload.t view on Meta::CPAN
undef $t;
eval { $nats->publish('maxpay.ok', 'x' x ($max - 1)) };
ok !$@, 'publish at max_payload-1 succeeds';
# Publish over max_payload should croak
eval { $nats->publish('maxpay.over', 'x' x ($max + 1)) };
like $@, qr/max_payload/, 'publish over max_payload croaks';
# Verify connection still works after croak
$nats->subscribe('maxpay.after', sub {
pass 'connection still alive after max_payload croak';
$nats->disconnect;
EV::break;
});
my $p; $p = EV::timer 0.1, 0, sub {
undef $p;
$nats->publish('maxpay.after', 'alive');
};
};
};
EV::run;
( run in 0.942 second using v1.01-cache-2.11-cpan-524268b4103 )