EV-Nats
view release on metacpan or search on metacpan
lib/EV/Nats.pm view on Meta::CPAN
? $self->subscribe($subject, $cb, $queue_group)
: $self->subscribe($subject, $cb);
$self->unsubscribe($sid, $max_msgs) if $max_msgs && $max_msgs > 0;
$sid;
}
1;
=head1 NAME
EV::Nats - High-performance asynchronous NATS client using EV
=head1 SYNOPSIS
use EV;
use EV::Nats;
my $nats = EV::Nats->new(
host => '127.0.0.1',
port => 4222,
reconnect => 1,
on_error => sub { warn "nats: $_[0]\n" },
on_connect => sub { warn "connected\n" },
);
# Subscribe (plain or queue group)
my $sid = $nats->subscribe('foo.>', sub {
my ($subject, $payload, $reply, $headers) = @_;
print "[$subject] $payload\n";
});
$nats->subscribe('work.>', sub { ... }, 'workers');
# Publish (fire-and-forget) and headered publish
$nats->publish('foo.bar', 'hello world');
$nats->hpublish('foo.bar', "NATS/1.0\r\nX-Trace: 42\r\n\r\n", 'body');
# Request / reply
$nats->request('service.echo', 'ping', sub {
my ($response, $err) = @_;
die $err if $err;
print "reply: $response\n";
}, 5000); # 5s timeout
$nats->unsubscribe($sid);
EV::run;
=head1 DESCRIPTION
EV::Nats is an async NATS client that implements the protocol directly
in XS on top of L<EV>. There is no external C library dependency.
=head2 Protocol
Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG, PING/PONG),
including headered publish/receive, wildcard subjects (C<*>, C<E<gt>>),
queue groups, and request/reply with an automatic shared inbox
subscription.
=head2 Connectivity
TCP and Unix-domain sockets; TCP keepalive; connect timeout; auto
reconnect with exponential backoff and jitter; subscription and
auto-unsub state restored on reconnect; cluster failover from INFO
C<connect_urls>; lame-duck-mode (leaf node graceful shutdown) callback;
graceful C<drain>.
=head2 Auth
Token, user/pass, NKey/JWT (Ed25519 via OpenSSL).
=head2 TLS
Optional, auto-detected at build time. STARTTLS-style upgrade after
INFO; full hostname verification (DNS or IP literal) by default;
opt-out C<tls_skip_verify>; custom CA via C<tls_ca_file>.
=head2 Performance
Write coalescing via C<ev_prepare> (one C<write()> per loop
iteration); O(1) subscription lookup; per-publish allocation-free
fast path; explicit C<batch> mode for tight loops; per-connection
stats counters.
=head2 Higher-level APIs
L<EV::Nats::JetStream>, L<EV::Nats::KV>, L<EV::Nats::ObjectStore>.
B<Note:> DNS resolution via C<getaddrinfo> is blocking. Use numeric IP
addresses for latency-sensitive applications.
=head1 METHODS
=head2 new(%options)
Create an EV::Nats instance. If C<host> or C<path> is supplied,
connection is initiated immediately and the C<on_connect> callback
fires once the CONNECT/PONG handshake completes.
my $nats = EV::Nats->new(
host => '127.0.0.1',
port => 4222,
reconnect => 1,
on_error => sub { warn "nats: $_[0]\n" },
on_connect => sub { warn "ready\n" },
);
=head3 Connection options
=over
=item host => Str
Server hostname (numeric IP recommended; see L</CAVEATS>). When set,
connection starts immediately.
=item port => Int (default 4222)
Server port.
=item path => Str
Unix-domain socket path. Mutually exclusive with C<host>.
=item connect_timeout => Int (ms; 0 = none)
How long to wait for the TCP/TLS handshake before giving up.
=item keepalive => Int (seconds)
If set, enables C<SO_KEEPALIVE> with this idle interval.
=item priority => Int (-2 .. +2)
L<EV> watcher priority for the I/O watchers on this connection.
=item loop => EV::Loop (default C<EV::default_loop>)
The L<EV> loop to attach watchers to.
=item name => Str
Client name advertised in CONNECT.
=back
=head3 Auth options
=over
=item user => Str / pass => Str
Username/password authentication. JSON-escaped in CONNECT.
=item token => Str
Token authentication.
=item nkey_seed => Str
NATS NKey seed (the C<SU...> form). Requires the build to have
OpenSSL (C<EV::Nats::HAS_NKEY>).
=item jwt => Str
User JWT, paired with C<nkey_seed> for decentralized auth. See also
L</creds_file>.
=item tls => Bool / tls_ca_file => Str / tls_skip_verify => Bool
See L</tls> for details.
=back
=head3 Protocol options
=over
=item verbose => Bool (default 0)
Request C<+OK> acknowledgments after each command.
=item pedantic => Bool (default 0)
Server-side strict subject checking.
=item echo => Bool (default 1)
Receive messages this client itself publishes.
=item no_responders => Bool (default 0)
Ask the server to send a 503 status reply when a request has no
responders, surfaced as the C<"no responders"> error in C<request>.
=item ping_interval => Int (ms, default 120000; 0 = disabled)
Client-initiated PING interval for keep-alive.
=item max_pings_outstanding => Int (default 2)
Maximum unacked PINGs before the connection is declared stale.
=back
=head3 Reconnect options
=over
=item reconnect => Bool (default 0)
Enable automatic reconnection.
=item reconnect_delay => Int (ms, default 2000)
Initial delay between reconnect attempts; subsequent attempts use
exponential backoff with jitter, capped by C<max_reconnect_delay>.
=item max_reconnect_delay => Int (ms, default 30000)
Upper bound on the backoff delay.
=item max_reconnect_attempts => Int (default 60; 0 = unlimited)
Give up after this many consecutive failures.
=back
=head3 Callback options
All callbacks fire on the L<EV> loop, never inline.
=over
=item on_connect => sub { }
Called after the CONNECT/PONG handshake completes.
=item on_disconnect => sub { }
Called when the connection drops, before any auto-reconnect attempt.
=item on_error => sub { my ($err) = @_ }
Receives a string. If unset, errors C<croak>.
=item on_lame_duck => sub { }
Called once when the server signals lame-duck-mode shutdown via
INFO C<ldm:true>.
=item on_slow_consumer => sub { my ($pending_bytes) = @_ }
See L</slow_consumer>.
=back
=head2 connect($host, [$port])
lib/EV/Nats.pm view on Meta::CPAN
Returns a fresh subject suitable for use as a private reply target
(C<_INBOX.E<lt>randE<gt>.E<lt>nE<gt>>). Each call burns a slot from
the same counter that L</request> uses, so manual subscribers must
treat the returned subject as opaque.
=head2 subscription_count
Returns the number of currently-registered subscriptions, including
the implicit C<_INBOX.E<gt>> subscription used by L</request>.
=head2 server_info
Returns the raw JSON string of the most recent INFO frame received
from the server (or C<undef> before the first INFO). Useful for
inspecting C<server_id>, C<version>, C<cluster>, C<connect_urls>,
etc.
=head2 max_payload([$limit])
Server-advertised maximum payload size in bytes. Returns the current
value; with an argument, overrides it (publishes above this croak
locally before reaching the wire).
=head2 waiting_count
Number of writes queued locally during connect or reconnect (i.e.
C<publish>/C<request> calls made while the connection is not yet
ready). They flush when the handshake completes.
=head2 skip_waiting
Drop all queued writes without sending them. Useful before
C<disconnect> if reconnect is enabled and you don't want stale
publishes replayed.
=head2 reconnect($enable, [$delay_ms], [$max_attempts])
Configure reconnection. C<$delay_ms> and C<$max_attempts> are only
written when supplied; omitted args leave the existing value unchanged.
=head2 reconnect_enabled
Returns true if reconnect is enabled.
=head2 connect_timeout([$ms])
Get/set connect timeout.
=head2 ping_interval([$ms])
Get/set PING interval.
=head2 max_pings_outstanding([$num])
Get/set max outstanding PINGs.
=head2 priority([$num])
Get/set EV watcher priority.
=head2 keepalive([$seconds])
Get/set TCP keepalive.
=head2 batch($coderef)
Batch multiple publishes into a single write. Suppresses per-publish
write scheduling; all buffered data is flushed after the coderef returns.
$nats->batch(sub {
$nats->publish("foo.$_", "msg-$_") for 1..1000;
});
=head2 slow_consumer($bytes_threshold, [$cb])
Enable slow consumer detection. When the write buffer exceeds
C<$bytes_threshold> bytes, C<$cb> is called with the current buffer size.
$nats->slow_consumer(1024*1024, sub {
my ($pending_bytes) = @_;
warn "slow consumer: ${pending_bytes}B pending\n";
});
=head2 on_lame_duck([$cb])
Get/set the lame-duck callback. Fires once when the server signals
shutdown (leaf node, rolling restart) via INFO C<ldm:true>. Use this
to migrate work to another server before the grace period elapses.
=head2 nkey_seed($seed)
Set the NKey seed (the C<SU...> base32-encoded form) for Ed25519
authentication. Requires the build to have OpenSSL (see
L<EV::Nats/HAS_NKEY>). The server nonce from INFO is automatically
signed during CONNECT. May also be passed to L</new> as
C<nkey_seed =E<gt> ...>.
=head2 jwt($token)
Set the user JWT. Combine with L</nkey_seed> for NATS decentralized
auth. May also be passed to L</new>. See L</creds_file> for the
common case of loading both from a C<.creds> file.
=head2 EV::Nats->nkey_generate_user_seed
Class method. Returns a fresh, valid NATS User NKey seed (the
C<SU...> form). Useful for tests and provisioning scripts that
don't have the C<nk> CLI available. Requires C<HAS_NKEY>; croaks
otherwise.
=head2 EV::Nats->nkey_public_from_seed($seed)
Class method. Derives the matching public key (the C<U...> form)
from a User NKey seed. Croaks on an invalid seed. Pair with
L</nkey_generate_user_seed> to provision the server with the public
key while the client keeps the seed.
=head2 tls($enable, [$ca_file], [$skip_verify])
Configure TLS. Requires OpenSSL at build time (see
L<EV::Nats/HAS_TLS>).
$nats->tls(1); # system CA
lib/EV/Nats.pm view on Meta::CPAN
True if compiled with OpenSSL (TLS supported).
=item EV::Nats::HAS_NKEY
True if NKey/JWT signing is available (also requires OpenSSL).
=back
=head1 BENCHMARKS
Measured on Linux with TCP loopback, Perl 5.40, nats-server 2.12,
100-byte payloads (C<bench/benchmark.pl>):
100K msgs 200K msgs
PUB fire-and-forget 4.7M 5.0M msgs/sec
PUB + SUB (loopback) 1.8M 1.6M msgs/sec
PUB + SUB (8B payload) 2.2M 1.9M msgs/sec
REQ/REP (pipelined, 64) 334K msgs/sec
Connected-path publish appends directly to the write buffer with no
per-message allocation. Write coalescing via C<ev_prepare> batches
all publishes per event-loop iteration into a single C<write()> syscall.
Run C<perl bench/benchmark.pl> for full results. Set C<BENCH_MESSAGES>,
C<BENCH_PAYLOAD>, C<BENCH_HOST>, C<BENCH_PORT> to customize.
=head1 NATS PROTOCOL
This module implements the NATS client protocol directly in XS.
The protocol is text-based with CRLF-delimited control lines and
binary payloads.
Connection flow: server sends INFO, client sends CONNECT + PING,
server responds with PONG to confirm. All subscriptions (including
queue groups and auto-unsub state) are automatically restored on
reconnect.
Request/reply uses a single wildcard inbox subscription
(C<_INBOX.E<lt>randomE<gt>.*>) for all requests, with unique
suffixes per request.
=head1 CAVEATS
=over
=item * DNS resolution via C<getaddrinfo> is blocking. Use numeric IP
addresses for latency-sensitive applications.
=item * TLS requires OpenSSL headers at build time (auto-detected).
=item * NKey auth requires OpenSSL with Ed25519 support (1.1.1+).
=item * The module handles all data as bytes. Encode UTF-8 strings before
passing them.
=item * Do not let the C<EV::Nats> instance go out of scope (or be
explicitly C<undef>-ed) from inside a callback while that callback is
still executing. The callback closure normally references C<$nats>
(via C<< $nats->publish(...) >> etc.), which keeps it alive; if you
write a callback that does not capture C<$nats> and you C<undef> the
last outer reference inside that callback, Perl will run C<DESTROY>
mid-callback and free the underlying state. Any subsequent operation
on C<$nats> in that callback is undefined behavior.
=item * Cluster URL discovery (the C<connect_urls> field of INFO) is
trusted by default. On failover the client connects to whatever
hostnames the previous server advertised, and TLS hostname verification
is performed against those names. Use a private CA (C<tls_ca_file>) to
restrict which certificates are acceptable, or do not enable C<tls> on
public-CA topologies where any holder of a valid cert could redirect
clients.
=back
=head1 ENVIRONMENT
=over
=item TEST_NATS_HOST, TEST_NATS_PORT
Set these to run the test suite against a NATS server
(default: 127.0.0.1:4222).
=back
=head1 SEE ALSO
L<EV::Nats::JetStream>, L<EV::Nats::KV>, L<EV::Nats::ObjectStore>,
L<EV>,
L<NATS protocol|https://docs.nats.io/reference/reference-protocols/nats-protocol>,
L<nats-server|https://github.com/nats-io/nats-server>.
=head1 AUTHOR
vividsnow
=head1 LICENSE
This library is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.
=cut
( run in 0.889 second using v1.01-cache-2.11-cpan-5a3173703d6 )