EV-Nats

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

DESCRIPTION
    EV::Nats is an async NATS client that implements the protocol directly
    in XS on top of EV. There is no external C library dependency.

  Protocol
    Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG, PING/PONG),
    including headered publish/receive, wildcard subjects ("*", ">"), queue
    groups, and request/reply with an automatic shared inbox subscription.

  Connectivity
    TCP and Unix-domain sockets; TCP keepalive; connect timeout; auto
    reconnect with exponential backoff and jitter; subscription and
    auto-unsub state restored on reconnect; cluster failover from INFO
    "connect_urls"; lame-duck-mode (leaf node graceful shutdown) callback;
    graceful "drain".

  Auth
    Token, user/pass, NKey/JWT (Ed25519 via OpenSSL).

  TLS
    Optional, auto-detected at build time. STARTTLS-style upgrade after

README  view on Meta::CPAN


    port => Int (default 4222)
        Server port.

    path => Str
        Unix-domain socket path. Mutually exclusive with "host".

    connect_timeout => Int (ms; 0 = none)
        How long to wait for the TCP/TLS handshake before giving up.

    keepalive => Int (seconds)
        If set, enables "SO_KEEPALIVE" with this idle interval.

    priority => Int (-2 .. +2)
        EV watcher priority for the I/O watchers on this connection.

    loop => EV::Loop (default "EV::default_loop")
        The EV loop to attach watchers to.

    name => Str
        Client name advertised in CONNECT.

README  view on Meta::CPAN

        Server-side strict subject checking.

    echo => Bool (default 1)
        Receive messages this client itself publishes.

    no_responders => Bool (default 0)
        Ask the server to send a 503 status reply when a request has no
        responders, surfaced as the "no responders" error in "request".

    ping_interval => Int (ms, default 120000; 0 = disabled)
        Client-initiated PING interval for keep-alive.

    max_pings_outstanding => Int (default 2)
        Maximum unacked PINGs before the connection is declared stale.

   Reconnect options
    reconnect => Bool (default 0)
        Enable automatic reconnection.

    reconnect_delay => Int (ms, default 2000)
        Initial delay between reconnect attempts; subsequent attempts use

README  view on Meta::CPAN


  ping_interval([$ms])
    Get/set PING interval.

  max_pings_outstanding([$num])
    Get/set max outstanding PINGs.

  priority([$num])
    Get/set EV watcher priority.

  keepalive([$seconds])
    Get/set TCP keepalive.

  batch($coderef)
    Batch multiple publishes into a single write. Suppresses per-publish
    write scheduling; all buffered data is flushed after the coderef
    returns.

        $nats->batch(sub {
            $nats->publish("foo.$_", "msg-$_") for 1..1000;
        });

README  view on Meta::CPAN

    *   TLS requires OpenSSL headers at build time (auto-detected).

    *   NKey auth requires OpenSSL with Ed25519 support (1.1.1+).

    *   The module handles all data as bytes. Encode UTF-8 strings before
        passing them.

    *   Do not let the "EV::Nats" instance go out of scope (or be explicitly
        "undef"-ed) from inside a callback while that callback is still
        executing. The callback closure normally references $nats (via
        "$nats->publish(...)" etc.), which keeps it alive; if you write a
        callback that does not capture $nats and you "undef" the last outer
        reference inside that callback, Perl will run "DESTROY" mid-callback
        and free the underlying state. Any subsequent operation on $nats in
        that callback is undefined behavior.

    *   Cluster URL discovery (the "connect_urls" field of INFO) is trusted
        by default. On failover the client connects to whatever hostnames
        the previous server advertised, and TLS hostname verification is
        performed against those names. Use a private CA ("tls_ca_file") to
        restrict which certificates are acceptable, or do not enable "tls"

lib/EV/Nats.pm  view on Meta::CPAN


=head2 Protocol

Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG, PING/PONG),
including headered publish/receive, wildcard subjects (C<*>, C<E<gt>>),
queue groups, and request/reply with an automatic shared inbox
subscription.

=head2 Connectivity

TCP and Unix-domain sockets; TCP keepalive; connect timeout; auto
reconnect with exponential backoff and jitter; subscription and
auto-unsub state restored on reconnect; cluster failover from INFO
C<connect_urls>; lame-duck-mode (leaf node graceful shutdown) callback;
graceful C<drain>.

=head2 Auth

Token, user/pass, NKey/JWT (Ed25519 via OpenSSL).

=head2 TLS

lib/EV/Nats.pm  view on Meta::CPAN

Server port.

=item path => Str

Unix-domain socket path. Mutually exclusive with C<host>.

=item connect_timeout => Int (ms; 0 = none)

How long to wait for the TCP/TLS handshake before giving up.

=item keepalive => Int (seconds)

If set, enables C<SO_KEEPALIVE> with this idle interval.

=item priority => Int (-2 .. +2)

L<EV> watcher priority for the I/O watchers on this connection.

=item loop => EV::Loop (default C<EV::default_loop>)

The L<EV> loop to attach watchers to.

lib/EV/Nats.pm  view on Meta::CPAN


Receive messages this client itself publishes.

=item no_responders => Bool (default 0)

Ask the server to send a 503 status reply when a request has no
responders, surfaced as the C<"no responders"> error in C<request>.

=item ping_interval => Int (ms, default 120000; 0 = disabled)

Client-initiated PING interval for keep-alive.

=item max_pings_outstanding => Int (default 2)

Maximum unacked PINGs before the connection is declared stale.

=back

=head3 Reconnect options

=over

lib/EV/Nats.pm  view on Meta::CPAN

Get/set PING interval.

=head2 max_pings_outstanding([$num])

Get/set max outstanding PINGs.

=head2 priority([$num])

Get/set EV watcher priority.

=head2 keepalive([$seconds])

Get/set TCP keepalive.

=head2 batch($coderef)

Batch multiple publishes into a single write. Suppresses per-publish
write scheduling; all buffered data is flushed after the coderef returns.

    $nats->batch(sub {
        $nats->publish("foo.$_", "msg-$_") for 1..1000;
    });

lib/EV/Nats.pm  view on Meta::CPAN

=item * TLS requires OpenSSL headers at build time (auto-detected).

=item * NKey auth requires OpenSSL with Ed25519 support (1.1.1+).

=item * The module handles all data as bytes. Encode UTF-8 strings before
passing them.

=item * Do not let the C<EV::Nats> instance go out of scope (or be
explicitly C<undef>-ed) from inside a callback while that callback is
still executing. The callback closure normally references C<$nats>
(via C<< $nats->publish(...) >> etc.), which keeps it alive; if you
write a callback that does not capture C<$nats> and you C<undef> the
last outer reference inside that callback, Perl will run C<DESTROY>
mid-callback and free the underlying state. Any subsequent operation
on C<$nats> in that callback is undefined behavior.

=item * Cluster URL discovery (the C<connect_urls> field of INFO) is
trusted by default. On failover the client connects to whatever
hostnames the previous server advertised, and TLS hostname verification
is performed against those names. Use a private CA (C<tls_ca_file>) to
restrict which certificates are acceptable, or do not enable C<tls> on

src/EV__Nats.xs  view on Meta::CPAN

    /* MSG/HMSG fields: absolute offsets into rbuf (safe across Renew) */
    size_t msg_subject_off;
    size_t msg_subject_len;
    size_t msg_reply_off;       /* msg_reply_len == 0 means no reply */
    size_t msg_reply_len;
    uint64_t msg_sid;
    size_t msg_hdr_len;
    size_t msg_total_len;

    int priority;
    int keepalive;

    /* Stats */
    UV msgs_in;
    UV msgs_out;
    UV bytes_in;
    UV bytes_out;

    /* Server pool for cluster failover */
    ngx_queue_t server_pool;
    int server_pool_count;

src/EV__Nats.xs  view on Meta::CPAN

        } else {
            PUSHs(&PL_sv_undef);
        }

        if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
            PUSHs(sv_2mortal(newSVpvn(payload, self->msg_hdr_len)));
        }

        PUTBACK;
        /* pin sub->cb: a callback that unsubscribes its own sid clears
           sub->cb (and frees the sub) mid-call; the pin keeps the CV alive */
        PINNED_CALL_SV(sub->cb, G_DISCARD);
        FREETMPS; LEAVE;
    }

    if (max_msgs > 0 && received >= max_msgs) {
        sub = nats_find_sub(self, sid);
        if (sub)
            nats_remove_sub(self, sub);
    }
}

src/EV__Nats.xs  view on Meta::CPAN


    for (rp = res; rp != NULL; rp = rp->ai_next) {
        fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
        if (fd < 0) continue;

        fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);

        int flag = 1;
        setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));

        if (self->keepalive > 0) {
            setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag));
            #ifdef TCP_KEEPIDLE
            setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &self->keepalive, sizeof(self->keepalive));
            #endif
        }

        rv = connect(fd, rp->ai_addr, rp->ai_addrlen);
        if (rv == 0 || errno == EINPROGRESS) break;

        close(fd);
        fd = -1;
    }

src/EV__Nats.xs  view on Meta::CPAN

            else if (strcmp(key, "max_reconnect_delay") == 0)
                self->max_reconnect_delay_ms = SvIV(val);
            else if (strcmp(key, "connect_timeout") == 0)
                self->connect_timeout_ms = SvIV(val);
            else if (strcmp(key, "ping_interval") == 0)
                self->ping_interval_ms = SvIV(val);
            else if (strcmp(key, "max_pings_outstanding") == 0)
                self->max_pings_outstanding = SvIV(val);
            else if (strcmp(key, "priority") == 0)
                self->priority = SvIV(val);
            else if (strcmp(key, "keepalive") == 0)
                self->keepalive = SvIV(val);
  #ifdef HAVE_OPENSSL
            else if (strcmp(key, "tls") == 0)
                self->tls = SvTRUE(val) ? 1 : 0;
            else if (strcmp(key, "tls_ca_file") == 0)
                nats_set_str_sv(&self->tls_ca_file, val);
            else if (strcmp(key, "tls_skip_verify") == 0)
                self->tls_skip_verify = SvTRUE(val) ? 1 : 0;
            else if (strcmp(key, "nkey_seed") == 0)
                nats_set_str_sv(&self->nkey_seed, val);
  #endif

src/EV__Nats.xs  view on Meta::CPAN

priority(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->priority = SvIV(ST(1));
    RETVAL = self->priority;
  OUTPUT:
    RETVAL

int
keepalive(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->keepalive = SvIV(ST(1));
    RETVAL = self->keepalive;
  OUTPUT:
    RETVAL

void
on_error(self, ...)
    EV::Nats self
  PPCODE:
    if (items > 1) {
        CLEAR_HANDLER(self->on_error);
        if (SvOK(ST(1)))

xt/09_max_payload.t  view on Meta::CPAN

        undef $t;
        eval { $nats->publish('maxpay.ok', 'x' x ($max - 1)) };
        ok !$@, 'publish at max_payload-1 succeeds';

        # Publish over max_payload should croak
        eval { $nats->publish('maxpay.over', 'x' x ($max + 1)) };
        like $@, qr/max_payload/, 'publish over max_payload croaks';

        # Verify connection still works after croak
        $nats->subscribe('maxpay.after', sub {
            pass 'connection still alive after max_payload croak';
            $nats->disconnect;
            EV::break;
        });
        my $p; $p = EV::timer 0.1, 0, sub {
            undef $p;
            $nats->publish('maxpay.after', 'alive');
        };
    };
};

EV::run;



( run in 0.942 second using v1.01-cache-2.11-cpan-524268b4103 )