EV-Nats
view release on metacpan or search on metacpan
NAME
EV::Nats - High-performance asynchronous NATS client using EV
SYNOPSIS
use EV;
use EV::Nats;
my $nats = EV::Nats->new(
host => '127.0.0.1',
port => 4222,
reconnect => 1,
on_error => sub { warn "nats: $_[0]\n" },
on_connect => sub { warn "connected\n" },
);
# Subscribe (plain or queue group)
my $sid = $nats->subscribe('foo.>', sub {
my ($subject, $payload, $reply, $headers) = @_;
print "[$subject] $payload\n";
});
$nats->subscribe('work.>', sub { ... }, 'workers');
# Publish (fire-and-forget) and headered publish
$nats->publish('foo.bar', 'hello world');
$nats->hpublish('foo.bar', "NATS/1.0\r\nX-Trace: 42\r\n\r\n", 'body');
# Request / reply
$nats->request('service.echo', 'ping', sub {
my ($response, $err) = @_;
die $err if $err;
print "reply: $response\n";
}, 5000); # 5s timeout
$nats->unsubscribe($sid);
EV::run;
DESCRIPTION
EV::Nats is an async NATS client that implements the protocol directly
in XS on top of EV. There is no external C library dependency.
Protocol
Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG, PING/PONG),
including headered publish/receive, wildcard subjects ("*", ">"), queue
groups, and request/reply with an automatic shared inbox subscription.
Connectivity
TCP and Unix-domain sockets; TCP keepalive; connect timeout; auto
reconnect with exponential backoff and jitter; subscription and
auto-unsub state restored on reconnect; cluster failover from INFO
"connect_urls"; lame-duck-mode (leaf node graceful shutdown) callback;
graceful "drain".
Auth
Token, user/pass, NKey/JWT (Ed25519 via OpenSSL).
TLS
Optional, auto-detected at build time. STARTTLS-style upgrade after
INFO; full hostname verification (DNS or IP literal) by default; opt-out
"tls_skip_verify"; custom CA via "tls_ca_file".
Performance
Write coalescing via "ev_prepare" (one write() per loop iteration); O(1)
subscription lookup; per-publish allocation-free fast path; explicit
"batch" mode for tight loops; per-connection stats counters.
Higher-level APIs
EV::Nats::JetStream, EV::Nats::KV, EV::Nats::ObjectStore.
Note: DNS resolution via "getaddrinfo" is blocking. Use numeric IP
addresses for latency-sensitive applications.
METHODS
new(%options)
Create an EV::Nats instance. If "host" or "path" is supplied, connection
is initiated immediately and the "on_connect" callback fires once the
CONNECT/PONG handshake completes.
my $nats = EV::Nats->new(
host => '127.0.0.1',
port => 4222,
reconnect => 1,
on_error => sub { warn "nats: $_[0]\n" },
on_connect => sub { warn "ready\n" },
);
Connection options
host => Str
Server hostname (numeric IP recommended; see "CAVEATS"). When set,
connection starts immediately.
port => Int (default 4222)
Server port.
path => Str
Unix-domain socket path. Mutually exclusive with "host".
connect_timeout => Int (ms; 0 = none)
How long to wait for the TCP/TLS handshake before giving up.
keepalive => Int (seconds)
If set, enables "SO_KEEPALIVE" with this idle interval.
priority => Int (-2 .. +2)
EV watcher priority for the I/O watchers on this connection.
loop => EV::Loop (default "EV::default_loop")
The EV loop to attach watchers to.
name => Str
Client name advertised in CONNECT.
Auth options
user => Str / pass => Str
Username/password authentication. JSON-escaped in CONNECT.
token => Str
Token authentication.
nkey_seed => Str
NATS NKey seed (the "SU..." form). Requires the build to have
OpenSSL ("EV::Nats::HAS_NKEY").
jwt => Str
User JWT, paired with "nkey_seed" for decentralized auth. See also
"creds_file".
tls => Bool / tls_ca_file => Str / tls_skip_verify => Bool
See "tls" for details.
Protocol options
verbose => Bool (default 0)
Request "+OK" acknowledgments after each command.
pedantic => Bool (default 0)
Server-side strict subject checking.
echo => Bool (default 1)
Receive messages this client itself publishes.
no_responders => Bool (default 0)
Ask the server to send a 503 status reply when a request has no
responders, surfaced as the "no responders" error in "request".
ping_interval => Int (ms, default 120000; 0 = disabled)
Client-initiated PING interval for keep-alive.
max_pings_outstanding => Int (default 2)
Maximum unacked PINGs before the connection is declared stale.
Reconnect options
reconnect => Bool (default 0)
Enable automatic reconnection.
reconnect_delay => Int (ms, default 2000)
Initial delay between reconnect attempts; subsequent attempts use
exponential backoff with jitter, capped by "max_reconnect_delay".
max_reconnect_delay => Int (ms, default 30000)
Upper bound on the backoff delay.
max_reconnect_attempts => Int (default 60; 0 = unlimited)
Give up after this many consecutive failures.
Callback options
All callbacks fire on the EV loop, never inline.
on_connect => sub { }
Called after the CONNECT/PONG handshake completes.
on_disconnect => sub { }
Called when the connection drops, before any auto-reconnect attempt.
on_error => sub { my ($err) = @_ }
Receives a string. If unset, errors "croak".
on_lame_duck => sub { }
Called once when the server signals lame-duck-mode shutdown via INFO
"ldm:true".
on_slow_consumer => sub { my ($pending_bytes) = @_ }
See "slow_consumer".
connect($host, [$port])
Initiate a TCP connection. Port defaults to 4222. Croaks if already
connected or in the middle of connecting; otherwise returns immediately
and signals completion via "on_connect".
connect_unix($path)
Initiate a Unix-domain-socket connection. Same async semantics as
"connect".
disconnect
Cancel any pending reconnect, drop queued writes, close the socket, and
fire "on_disconnect". "intentional_disconnect" is set so no
auto-reconnect is scheduled. For a clean shutdown that flushes pending
writes first, see "drain".
is_connected
True if the CONNECT/PONG handshake has completed and no disconnect or
reconnect is in progress.
publish($subject, [$payload], [$reply_to])
Publish a message. Alias: "pub".
$nats->publish('foo', 'hello');
Send PING as a write fence; the subsequent PONG guarantees all prior
messages were processed by the server. If $cb is given, it is invoked
when the PONG arrives. The callback receives a single argument: "undef"
on success, or an error string (e.g. "disconnected") if the connection
dropped before the PONG arrived.
creds_file($path)
Read a NATS ".creds" file and apply the embedded JWT and NKey seed via
"jwt" and "nkey_seed". Apply this BEFORE "connect" so the credentials
are available during the CONNECT handshake. Dies if the file is
unreadable or missing either the "USER JWT" or "USER NKEY SEED" block.
new_inbox
Returns a fresh subject suitable for use as a private reply target
("_INBOX.<rand>.<n>"). Each call burns a slot from the same counter that
"request" uses, so manual subscribers must treat the returned subject as
opaque.
subscription_count
Returns the number of currently-registered subscriptions, including the
implicit "_INBOX.>" subscription used by "request".
server_info
Returns the raw JSON string of the most recent INFO frame received from
the server (or "undef" before the first INFO). Useful for inspecting
"server_id", "version", "cluster", "connect_urls", etc.
max_payload([$limit])
Server-advertised maximum payload size in bytes. Returns the current
value; with an argument, overrides it (publishes above this croak
locally before reaching the wire).
waiting_count
Number of writes queued locally during connect or reconnect (i.e.
"publish"/"request" calls made while the connection is not yet ready).
They flush when the handshake completes.
skip_waiting
Drop all queued writes without sending them. Useful before "disconnect"
if reconnect is enabled and you don't want stale publishes replayed.
reconnect($enable, [$delay_ms], [$max_attempts])
Configure reconnection. $delay_ms and $max_attempts are only written
when supplied; omitted args leave the existing value unchanged.
reconnect_enabled
Returns true if reconnect is enabled.
connect_timeout([$ms])
Get/set connect timeout.
ping_interval([$ms])
Get/set PING interval.
max_pings_outstanding([$num])
Get/set max outstanding PINGs.
priority([$num])
Get/set EV watcher priority.
keepalive([$seconds])
Get/set TCP keepalive.
batch($coderef)
Batch multiple publishes into a single write. Suppresses per-publish
write scheduling; all buffered data is flushed after the coderef
returns.
$nats->batch(sub {
$nats->publish("foo.$_", "msg-$_") for 1..1000;
});
slow_consumer($bytes_threshold, [$cb])
Enable slow consumer detection. When the write buffer exceeds
$bytes_threshold bytes, $cb is called with the current buffer size.
$nats->slow_consumer(1024*1024, sub {
my ($pending_bytes) = @_;
warn "slow consumer: ${pending_bytes}B pending\n";
});
on_lame_duck([$cb])
Get/set the lame-duck callback. Fires once when the server signals
shutdown (leaf node, rolling restart) via INFO "ldm:true". Use this to
migrate work to another server before the grace period elapses.
nkey_seed($seed)
Set the NKey seed (the "SU..." base32-encoded form) for Ed25519
authentication. Requires the build to have OpenSSL (see "HAS_NKEY" in
EV::Nats). The server nonce from INFO is automatically signed during
CONNECT. May also be passed to "new" as "nkey_seed => ...".
jwt($token)
Set the user JWT. Combine with "nkey_seed" for NATS decentralized auth.
May also be passed to "new". See "creds_file" for the common case of
loading both from a ".creds" file.
EV::Nats->nkey_generate_user_seed
Class method. Returns a fresh, valid NATS User NKey seed (the "SU..."
form). Useful for tests and provisioning scripts that don't have the
"nk" CLI available. Requires "HAS_NKEY"; croaks otherwise.
EV::Nats->nkey_public_from_seed($seed)
Class method. Derives the matching public key (the "U..." form) from a
User NKey seed. Croaks on an invalid seed. Pair with
"nkey_generate_user_seed" to provision the server with the public key
while the client keeps the seed.
tls($enable, [$ca_file], [$skip_verify])
Configure TLS. Requires OpenSSL at build time (see "HAS_TLS" in
EV::Nats).
$nats->tls(1); # system CA
$nats->tls(1, '/path/to/ca.pem'); # custom CA
$nats->tls(1, undef, 1); # skip verification
When verification is enabled (the default), the server certificate's SAN
must match either the resolved IP literal or the DNS hostname passed to
"connect". May also be passed to "new" as "tls => 1, tls_ca_file =>
$path".
stats
Zero all counters returned by "stats".
on_error([$cb])
on_connect([$cb])
on_disconnect([$cb])
Get/set the corresponding callback at runtime. With no argument, returns
the current value (or "undef"). With an argument, replaces it; pass
"undef" to clear.
BUILD-TIME FEATURES
EV::Nats::HAS_TLS
True if compiled with OpenSSL (TLS supported).
EV::Nats::HAS_NKEY
True if NKey/JWT signing is available (also requires OpenSSL).
BENCHMARKS
Measured on Linux with TCP loopback, Perl 5.40, nats-server 2.12,
100-byte payloads ("bench/benchmark.pl"):
100K msgs 200K msgs
PUB fire-and-forget 4.7M 5.0M msgs/sec
PUB + SUB (loopback) 1.8M 1.6M msgs/sec
PUB + SUB (8B payload) 2.2M 1.9M msgs/sec
REQ/REP (pipelined, 64) 334K msgs/sec
Connected-path publish appends directly to the write buffer with no
per-message allocation. Write coalescing via "ev_prepare" batches all
publishes per event-loop iteration into a single write() syscall.
Run "perl bench/benchmark.pl" for full results. Set "BENCH_MESSAGES",
"BENCH_PAYLOAD", "BENCH_HOST", "BENCH_PORT" to customize.
NATS PROTOCOL
This module implements the NATS client protocol directly in XS. The
protocol is text-based with CRLF-delimited control lines and binary
payloads.
Connection flow: server sends INFO, client sends CONNECT + PING, server
responds with PONG to confirm. All subscriptions (including queue groups
and auto-unsub state) are automatically restored on reconnect.
Request/reply uses a single wildcard inbox subscription
("_INBOX.<random>.*") for all requests, with unique suffixes per
request.
CAVEATS
* DNS resolution via "getaddrinfo" is blocking. Use numeric IP
addresses for latency-sensitive applications.
* TLS requires OpenSSL headers at build time (auto-detected).
* NKey auth requires OpenSSL with Ed25519 support (1.1.1+).
* The module handles all data as bytes. Encode UTF-8 strings before
passing them.
* Do not let the "EV::Nats" instance go out of scope (or be explicitly
"undef"-ed) from inside a callback while that callback is still
executing. The callback closure normally references $nats (via
"$nats->publish(...)" etc.), which keeps it alive; if you write a
callback that does not capture $nats and you "undef" the last outer
reference inside that callback, Perl will run "DESTROY" mid-callback
and free the underlying state. Any subsequent operation on $nats in
that callback is undefined behavior.
* Cluster URL discovery (the "connect_urls" field of INFO) is trusted
by default. On failover the client connects to whatever hostnames
the previous server advertised, and TLS hostname verification is
performed against those names. Use a private CA ("tls_ca_file") to
restrict which certificates are acceptable, or do not enable "tls"
on public-CA topologies where any holder of a valid cert could
redirect clients.
ENVIRONMENT
TEST_NATS_HOST, TEST_NATS_PORT
Set these to run the test suite against a NATS server (default:
127.0.0.1:4222).
SEE ALSO
EV::Nats::JetStream, EV::Nats::KV, EV::Nats::ObjectStore, EV, NATS
protocol
<https://docs.nats.io/reference/reference-protocols/nats-protocol>,
nats-server <https://github.com/nats-io/nats-server>.
AUTHOR
vividsnow
LICENSE
This library is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.
( run in 2.526 seconds using v1.01-cache-2.11-cpan-5a3173703d6 )