EV-Nats

 view release on metacpan or  search on metacpan

lib/EV/Nats.pm  view on Meta::CPAN

        ? $self->subscribe($subject, $cb, $queue_group)
        : $self->subscribe($subject, $cb);
    $self->unsubscribe($sid, $max_msgs) if $max_msgs && $max_msgs > 0;
    $sid;
}

1;

=head1 NAME

EV::Nats - High-performance asynchronous NATS client using EV

=head1 SYNOPSIS

    use EV;
    use EV::Nats;

    my $nats = EV::Nats->new(
        host       => '127.0.0.1',
        port       => 4222,
        reconnect  => 1,
        on_error   => sub { warn "nats: $_[0]\n" },
        on_connect => sub { warn "connected\n" },
    );

    # Subscribe (plain or queue group)
    my $sid = $nats->subscribe('foo.>', sub {
        my ($subject, $payload, $reply, $headers) = @_;
        print "[$subject] $payload\n";
    });
    $nats->subscribe('work.>', sub { ... }, 'workers');

    # Publish (fire-and-forget) and headered publish
    $nats->publish('foo.bar', 'hello world');
    $nats->hpublish('foo.bar', "NATS/1.0\r\nX-Trace: 42\r\n\r\n", 'body');

    # Request / reply
    $nats->request('service.echo', 'ping', sub {
        my ($response, $err) = @_;
        die $err if $err;
        print "reply: $response\n";
    }, 5000);                       # 5s timeout

    $nats->unsubscribe($sid);
    EV::run;

=head1 DESCRIPTION

EV::Nats is an async NATS client that implements the protocol directly
in XS on top of L<EV>. There is no external C library dependency.

=head2 Protocol

Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG, PING/PONG),
including headered publish/receive, wildcard subjects (C<*>, C<E<gt>>),
queue groups, and request/reply with an automatic shared inbox
subscription.

=head2 Connectivity

TCP and Unix-domain sockets; TCP keepalive; connect timeout; auto
reconnect with exponential backoff and jitter; subscription and
auto-unsub state restored on reconnect; cluster failover from INFO
C<connect_urls>; lame-duck-mode (leaf node graceful shutdown) callback;
graceful C<drain>.

=head2 Auth

Token, user/pass, NKey/JWT (Ed25519 via OpenSSL).

=head2 TLS

Optional, auto-detected at build time. STARTTLS-style upgrade after
INFO; full hostname verification (DNS or IP literal) by default;
opt-out C<tls_skip_verify>; custom CA via C<tls_ca_file>.

=head2 Performance

Write coalescing via C<ev_prepare> (one C<write()> per loop
iteration); O(1) subscription lookup; per-publish allocation-free
fast path; explicit C<batch> mode for tight loops; per-connection
stats counters.

=head2 Higher-level APIs

L<EV::Nats::JetStream>, L<EV::Nats::KV>, L<EV::Nats::ObjectStore>.

B<Note:> DNS resolution via C<getaddrinfo> is blocking. Use numeric IP
addresses for latency-sensitive applications.

=head1 METHODS

=head2 new(%options)

Create an EV::Nats instance. If C<host> or C<path> is supplied,
connection is initiated immediately and the C<on_connect> callback
fires once the CONNECT/PONG handshake completes.

    my $nats = EV::Nats->new(
        host       => '127.0.0.1',
        port       => 4222,
        reconnect  => 1,
        on_error   => sub { warn "nats: $_[0]\n" },
        on_connect => sub { warn "ready\n" },
    );

=head3 Connection options

=over

=item host => Str

Server hostname (numeric IP recommended; see L</CAVEATS>). When set,
connection starts immediately.

=item port => Int (default 4222)

Server port.

=item path => Str

Unix-domain socket path. Mutually exclusive with C<host>.

=item connect_timeout => Int (ms; 0 = none)

How long to wait for the TCP/TLS handshake before giving up.

=item keepalive => Int (seconds)

If set, enables C<SO_KEEPALIVE> with this idle interval.

=item priority => Int (-2 .. +2)

L<EV> watcher priority for the I/O watchers on this connection.

=item loop => EV::Loop (default C<EV::default_loop>)

The L<EV> loop to attach watchers to.

=item name => Str

Client name advertised in CONNECT.

=back

=head3 Auth options

=over

=item user => Str / pass => Str

Username/password authentication. JSON-escaped in CONNECT.

=item token => Str

Token authentication.

=item nkey_seed => Str

NATS NKey seed (the C<SU...> form). Requires the build to have
OpenSSL (C<EV::Nats::HAS_NKEY>).

=item jwt => Str

User JWT, paired with C<nkey_seed> for decentralized auth. See also
L</creds_file>.

=item tls => Bool / tls_ca_file => Str / tls_skip_verify => Bool

See L</tls> for details.

=back

=head3 Protocol options

=over

=item verbose => Bool (default 0)

Request C<+OK> acknowledgments after each command.

=item pedantic => Bool (default 0)

Server-side strict subject checking.

=item echo => Bool (default 1)

Receive messages this client itself publishes.

=item no_responders => Bool (default 0)

Ask the server to send a 503 status reply when a request has no
responders, surfaced as the C<"no responders"> error in C<request>.

=item ping_interval => Int (ms, default 120000; 0 = disabled)

Client-initiated PING interval for keep-alive.

=item max_pings_outstanding => Int (default 2)

Maximum unacked PINGs before the connection is declared stale.

=back

=head3 Reconnect options

=over

=item reconnect => Bool (default 0)

Enable automatic reconnection.

=item reconnect_delay => Int (ms, default 2000)

Initial delay between reconnect attempts; subsequent attempts use
exponential backoff with jitter, capped by C<max_reconnect_delay>.

=item max_reconnect_delay => Int (ms, default 30000)

Upper bound on the backoff delay.

=item max_reconnect_attempts => Int (default 60; 0 = unlimited)

Give up after this many consecutive failures.

=back

=head3 Callback options

All callbacks fire on the L<EV> loop, never inline.

=over

=item on_connect => sub { }

Called after the CONNECT/PONG handshake completes.

=item on_disconnect => sub { }

Called when the connection drops, before any auto-reconnect attempt.

=item on_error => sub { my ($err) = @_ }

Receives a string. If unset, errors C<croak>.

=item on_lame_duck => sub { }

Called once when the server signals lame-duck-mode shutdown via
INFO C<ldm:true>.

=item on_slow_consumer => sub { my ($pending_bytes) = @_ }

See L</slow_consumer>.

=back

=head2 connect($host, [$port])

lib/EV/Nats.pm  view on Meta::CPAN

Returns a fresh subject suitable for use as a private reply target
(C<_INBOX.E<lt>randE<gt>.E<lt>nE<gt>>). Each call burns a slot from
the same counter that L</request> uses, so manual subscribers must
treat the returned subject as opaque.

=head2 subscription_count

Returns the number of currently-registered subscriptions, including
the implicit C<_INBOX.E<gt>> subscription used by L</request>.

=head2 server_info

Returns the raw JSON string of the most recent INFO frame received
from the server (or C<undef> before the first INFO). Useful for
inspecting C<server_id>, C<version>, C<cluster>, C<connect_urls>,
etc.

=head2 max_payload([$limit])

Server-advertised maximum payload size in bytes. Returns the current
value; with an argument, overrides it (publishes above this croak
locally before reaching the wire).

=head2 waiting_count

Number of writes queued locally during connect or reconnect (i.e.
C<publish>/C<request> calls made while the connection is not yet
ready). They flush when the handshake completes.

=head2 skip_waiting

Drop all queued writes without sending them. Useful before
C<disconnect> if reconnect is enabled and you don't want stale
publishes replayed.

=head2 reconnect($enable, [$delay_ms], [$max_attempts])

Configure reconnection. C<$delay_ms> and C<$max_attempts> are only
written when supplied; omitted args leave the existing value unchanged.

=head2 reconnect_enabled

Returns true if reconnect is enabled.

=head2 connect_timeout([$ms])

Get/set connect timeout.

=head2 ping_interval([$ms])

Get/set PING interval.

=head2 max_pings_outstanding([$num])

Get/set max outstanding PINGs.

=head2 priority([$num])

Get/set EV watcher priority.

=head2 keepalive([$seconds])

Get/set TCP keepalive.

=head2 batch($coderef)

Batch multiple publishes into a single write. Suppresses per-publish
write scheduling; all buffered data is flushed after the coderef returns.

    $nats->batch(sub {
        $nats->publish("foo.$_", "msg-$_") for 1..1000;
    });

=head2 slow_consumer($bytes_threshold, [$cb])

Enable slow consumer detection. When the write buffer exceeds
C<$bytes_threshold> bytes, C<$cb> is called with the current buffer size.

    $nats->slow_consumer(1024*1024, sub {
        my ($pending_bytes) = @_;
        warn "slow consumer: ${pending_bytes}B pending\n";
    });

=head2 on_lame_duck([$cb])

Get/set the lame-duck callback. Fires once when the server signals
shutdown (leaf node, rolling restart) via INFO C<ldm:true>. Use this
to migrate work to another server before the grace period elapses.

=head2 nkey_seed($seed)

Set the NKey seed (the C<SU...> base32-encoded form) for Ed25519
authentication. Requires the build to have OpenSSL (see
L<EV::Nats/HAS_NKEY>). The server nonce from INFO is automatically
signed during CONNECT. May also be passed to L</new> as
C<nkey_seed =E<gt> ...>.

=head2 jwt($token)

Set the user JWT. Combine with L</nkey_seed> for NATS decentralized
auth. May also be passed to L</new>. See L</creds_file> for the
common case of loading both from a C<.creds> file.

=head2 EV::Nats->nkey_generate_user_seed

Class method. Returns a fresh, valid NATS User NKey seed (the
C<SU...> form). Useful for tests and provisioning scripts that
don't have the C<nk> CLI available. Requires C<HAS_NKEY>; croaks
otherwise.

=head2 EV::Nats->nkey_public_from_seed($seed)

Class method. Derives the matching public key (the C<U...> form)
from a User NKey seed. Croaks on an invalid seed. Pair with
L</nkey_generate_user_seed> to provision the server with the public
key while the client keeps the seed.

=head2 tls($enable, [$ca_file], [$skip_verify])

Configure TLS. Requires OpenSSL at build time (see
L<EV::Nats/HAS_TLS>).

    $nats->tls(1);                           # system CA

lib/EV/Nats.pm  view on Meta::CPAN


True if compiled with OpenSSL (TLS supported).

=item EV::Nats::HAS_NKEY

True if NKey/JWT signing is available (also requires OpenSSL).

=back

=head1 BENCHMARKS

Measured on Linux with TCP loopback, Perl 5.40, nats-server 2.12,
100-byte payloads (C<bench/benchmark.pl>):

                                100K msgs    200K msgs
    PUB fire-and-forget         4.7M         5.0M msgs/sec
    PUB + SUB (loopback)        1.8M         1.6M msgs/sec
    PUB + SUB (8B payload)      2.2M         1.9M msgs/sec
    REQ/REP (pipelined, 64)     334K               msgs/sec

Connected-path publish appends directly to the write buffer with no
per-message allocation. Write coalescing via C<ev_prepare> batches
all publishes per event-loop iteration into a single C<write()> syscall.

Run C<perl bench/benchmark.pl> for full results. Set C<BENCH_MESSAGES>,
C<BENCH_PAYLOAD>, C<BENCH_HOST>, C<BENCH_PORT> to customize.

=head1 NATS PROTOCOL

This module implements the NATS client protocol directly in XS.
The protocol is text-based with CRLF-delimited control lines and
binary payloads.

Connection flow: server sends INFO, client sends CONNECT + PING,
server responds with PONG to confirm. All subscriptions (including
queue groups and auto-unsub state) are automatically restored on
reconnect.

Request/reply uses a single wildcard inbox subscription
(C<_INBOX.E<lt>randomE<gt>.*>) for all requests, with unique
suffixes per request.

=head1 CAVEATS

=over

=item * DNS resolution via C<getaddrinfo> is blocking. Use numeric IP
addresses for latency-sensitive applications.


=item * TLS requires OpenSSL headers at build time (auto-detected).

=item * NKey auth requires OpenSSL with Ed25519 support (1.1.1+).

=item * The module handles all data as bytes. Encode UTF-8 strings before
passing them.

=item * Do not let the C<EV::Nats> instance go out of scope (or be
explicitly C<undef>-ed) from inside a callback while that callback is
still executing. The callback closure normally references C<$nats>
(via C<< $nats->publish(...) >> etc.), which keeps it alive; if you
write a callback that does not capture C<$nats> and you C<undef> the
last outer reference inside that callback, Perl will run C<DESTROY>
mid-callback and free the underlying state. Any subsequent operation
on C<$nats> in that callback is undefined behavior.

=item * Cluster URL discovery (the C<connect_urls> field of INFO) is
trusted by default. On failover the client connects to whatever
hostnames the previous server advertised, and TLS hostname verification
is performed against those names. Use a private CA (C<tls_ca_file>) to
restrict which certificates are acceptable, or do not enable C<tls> on
public-CA topologies where any holder of a valid cert could redirect
clients.

=back

=head1 ENVIRONMENT

=over

=item TEST_NATS_HOST, TEST_NATS_PORT

Set these to run the test suite against a NATS server
(default: 127.0.0.1:4222).

=back

=head1 SEE ALSO

L<EV::Nats::JetStream>, L<EV::Nats::KV>, L<EV::Nats::ObjectStore>,
L<EV>,
L<NATS protocol|https://docs.nats.io/reference/reference-protocols/nats-protocol>,
L<nats-server|https://github.com/nats-io/nats-server>.

=head1 AUTHOR

vividsnow

=head1 LICENSE

This library is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.

=cut



( run in 0.889 second using v1.01-cache-2.11-cpan-5a3173703d6 )