EV-Nats

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

    *   Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG)

    *   Request/reply with automatic inbox management

    *   Queue group subscriptions for load balancing

    *   Wildcard subjects ("*" and ">")

    *   Headers support (HPUB/HMSG)

    *   Automatic PING/PONG keep-alive

    *   Automatic reconnection with subscription and queue group restore

    *   Fire-and-forget publish (no callback overhead)

    *   Token, user/pass authentication

    *   TCP keepalive and connect timeout

    *   Write coalescing via ev_prepare (batches writes per event loop
        iteration)

    *   O(1) subscription lookup via hash table

    *   Graceful drain (unsubscribe all, flush, then disconnect)

    *   Server pool with cluster URL failover from INFO connect_urls

README  view on Meta::CPAN


    ping_interval => $ms (default 120000)
        Interval for client-initiated PING. 0 = disabled.

    max_pings_outstanding => $num (default 2)
        Max unanswered PINGs before declaring stale connection.

    priority => $num (-2 to +2)
        EV watcher priority.

    keepalive => $seconds
        TCP keepalive interval.

    path => 'Str'
        Unix socket path. Mutually exclusive with "host".

    loop => EV::Loop
        EV loop to use. Default: "EV::default_loop".

  connect($host, [$port])
    Connect to NATS server. Port defaults to 4222.

README  view on Meta::CPAN


  ping_interval([$ms])
    Get/set PING interval.

  max_pings_outstanding([$num])
    Get/set max outstanding PINGs.

  priority([$num])
    Get/set EV watcher priority.

  keepalive([$seconds])
    Get/set TCP keepalive.

  batch($coderef)
    Batch multiple publishes into a single write. Suppresses per-publish
    write scheduling; all buffered data is flushed after the coderef
    returns.

        $nats->batch(sub {
            $nats->publish("foo.$_", "msg-$_") for 1..1000;
        });

lib/EV/Nats.pm  view on Meta::CPAN

=item * Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG)

=item * Request/reply with automatic inbox management

=item * Queue group subscriptions for load balancing

=item * Wildcard subjects (C<*> and C<E<gt>>)

=item * Headers support (HPUB/HMSG)

=item * Automatic PING/PONG keep-alive

=item * Automatic reconnection with subscription and queue group restore

=item * Fire-and-forget publish (no callback overhead)

=item * Token, user/pass authentication

=item * TCP keepalive and connect timeout

=item * Write coalescing via ev_prepare (batches writes per event loop iteration)

=item * O(1) subscription lookup via hash table

=item * Graceful drain (unsubscribe all, flush, then disconnect)

=item * Server pool with cluster URL failover from INFO connect_urls

=item * Optional TLS via OpenSSL (auto-detected at build time)

lib/EV/Nats.pm  view on Meta::CPAN

Interval for client-initiated PING. 0 = disabled.

=item max_pings_outstanding => $num (default 2)

Max unanswered PINGs before declaring stale connection.

=item priority => $num (-2 to +2)

EV watcher priority.

=item keepalive => $seconds

TCP keepalive interval.

=item path => 'Str'

Unix socket path. Mutually exclusive with C<host>.

=item loop => EV::Loop

EV loop to use. Default: C<EV::default_loop>.

=back

lib/EV/Nats.pm  view on Meta::CPAN

Get/set PING interval.

=head2 max_pings_outstanding([$num])

Get/set max outstanding PINGs.

=head2 priority([$num])

Get/set EV watcher priority.

=head2 keepalive([$seconds])

Get/set TCP keepalive.

=head2 batch($coderef)

Batch multiple publishes into a single write. Suppresses per-publish
write scheduling; all buffered data is flushed after the coderef returns.

    $nats->batch(sub {
        $nats->publish("foo.$_", "msg-$_") for 1..1000;
    });

src/EV__Nats.xs  view on Meta::CPAN

    /* MSG/HMSG fields: absolute offsets into rbuf (safe across Renew) */
    size_t msg_subject_off;
    size_t msg_subject_len;
    size_t msg_reply_off;       /* msg_reply_len == 0 means no reply */
    size_t msg_reply_len;
    uint64_t msg_sid;
    size_t msg_hdr_len;
    size_t msg_total_len;

    int priority;
    int keepalive;

    /* Stats */
    UV msgs_in;
    UV msgs_out;
    UV bytes_in;
    UV bytes_out;

    /* Server pool for cluster failover */
    ngx_queue_t server_pool;
    int server_pool_count;

src/EV__Nats.xs  view on Meta::CPAN


    for (rp = res; rp != NULL; rp = rp->ai_next) {
        fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
        if (fd < 0) continue;

        fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);

        int flag = 1;
        setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));

        if (self->keepalive > 0) {
            setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag));
            #ifdef TCP_KEEPIDLE
            setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &self->keepalive, sizeof(self->keepalive));
            #endif
        }

        rv = connect(fd, rp->ai_addr, rp->ai_addrlen);
        if (rv == 0 || errno == EINPROGRESS) break;

        close(fd);
        fd = -1;
    }

src/EV__Nats.xs  view on Meta::CPAN

            else if (strcmp(key, "max_reconnect_delay") == 0)
                self->max_reconnect_delay_ms = SvIV(val);
            else if (strcmp(key, "connect_timeout") == 0)
                self->connect_timeout_ms = SvIV(val);
            else if (strcmp(key, "ping_interval") == 0)
                self->ping_interval_ms = SvIV(val);
            else if (strcmp(key, "max_pings_outstanding") == 0)
                self->max_pings_outstanding = SvIV(val);
            else if (strcmp(key, "priority") == 0)
                self->priority = SvIV(val);
            else if (strcmp(key, "keepalive") == 0)
                self->keepalive = SvIV(val);
  #ifdef HAVE_OPENSSL
            else if (strcmp(key, "tls") == 0)
                self->tls = SvTRUE(val) ? 1 : 0;
            else if (strcmp(key, "tls_ca_file") == 0) {
                STRLEN l; const char *s = SvPV(val, l);
                Newx(self->tls_ca_file, l + 1, char);
                Copy(s, self->tls_ca_file, l + 1, char);
            }
            else if (strcmp(key, "tls_skip_verify") == 0)
                self->tls_skip_verify = SvTRUE(val) ? 1 : 0;

src/EV__Nats.xs  view on Meta::CPAN

priority(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->priority = SvIV(ST(1));
    RETVAL = self->priority;
  OUTPUT:
    RETVAL

int
keepalive(self, ...)
    EV::Nats self
  CODE:
    if (items > 1)
        self->keepalive = SvIV(ST(1));
    RETVAL = self->keepalive;
  OUTPUT:
    RETVAL

void
on_error(self, ...)
    EV::Nats self
  PPCODE:
    if (items > 1) {
        CLEAR_HANDLER(self->on_error);
        if (SvOK(ST(1)))

xt/09_max_payload.t  view on Meta::CPAN

        undef $t;
        eval { $nats->publish('maxpay.ok', 'x' x ($max - 1)) };
        ok !$@, 'publish at max_payload-1 succeeds';

        # Publish over max_payload should croak
        eval { $nats->publish('maxpay.over', 'x' x ($max + 1)) };
        like $@, qr/max_payload/, 'publish over max_payload croaks';

        # Verify connection still works after croak
        $nats->subscribe('maxpay.after', sub {
            pass 'connection still alive after max_payload croak';
            $nats->disconnect;
            EV::break;
        });
        my $p; $p = EV::timer 0.1, 0, sub {
            undef $p;
            $nats->publish('maxpay.after', 'alive');
        };
    };
};

EV::run;



( run in 1.699 second using v1.01-cache-2.11-cpan-39bf76dae61 )