EV-Nats
view release on metacpan or search on metacpan
* Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG)
* Request/reply with automatic inbox management
* Queue group subscriptions for load balancing
* Wildcard subjects ("*" and ">")
* Headers support (HPUB/HMSG)
* Automatic PING/PONG keep-alive
* Automatic reconnection with subscription and queue group restore
* Fire-and-forget publish (no callback overhead)
* Token, user/pass authentication
* TCP keepalive and connect timeout
* Write coalescing via ev_prepare (batches writes per event loop
iteration)
* O(1) subscription lookup via hash table
* Graceful drain (unsubscribe all, flush, then disconnect)
* Server pool with cluster URL failover from INFO connect_urls
ping_interval => $ms (default 120000)
Interval for client-initiated PING. 0 = disabled.
max_pings_outstanding => $num (default 2)
Max unanswered PINGs before declaring stale connection.
priority => $num (-2 to +2)
EV watcher priority.
keepalive => $seconds
TCP keepalive interval.
path => 'Str'
Unix socket path. Mutually exclusive with "host".
loop => EV::Loop
EV loop to use. Default: "EV::default_loop".
connect($host, [$port])
Connect to NATS server. Port defaults to 4222.
ping_interval([$ms])
Get/set PING interval.
max_pings_outstanding([$num])
Get/set max outstanding PINGs.
priority([$num])
Get/set EV watcher priority.
keepalive([$seconds])
Get/set TCP keepalive.
batch($coderef)
Batch multiple publishes into a single write. Suppresses per-publish
write scheduling; all buffered data is flushed after the coderef
returns.
$nats->batch(sub {
$nats->publish("foo.$_", "msg-$_") for 1..1000;
});
lib/EV/Nats.pm view on Meta::CPAN
=item * Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG)
=item * Request/reply with automatic inbox management
=item * Queue group subscriptions for load balancing
=item * Wildcard subjects (C<*> and C<E<gt>>)
=item * Headers support (HPUB/HMSG)
=item * Automatic PING/PONG keep-alive
=item * Automatic reconnection with subscription and queue group restore
=item * Fire-and-forget publish (no callback overhead)
=item * Token, user/pass authentication
=item * TCP keepalive and connect timeout
=item * Write coalescing via ev_prepare (batches writes per event loop iteration)
=item * O(1) subscription lookup via hash table
=item * Graceful drain (unsubscribe all, flush, then disconnect)
=item * Server pool with cluster URL failover from INFO connect_urls
=item * Optional TLS via OpenSSL (auto-detected at build time)
lib/EV/Nats.pm view on Meta::CPAN
Interval for client-initiated PING. 0 = disabled.
=item max_pings_outstanding => $num (default 2)
Max unanswered PINGs before declaring stale connection.
=item priority => $num (-2 to +2)
EV watcher priority.
=item keepalive => $seconds
TCP keepalive interval.
=item path => 'Str'
Unix socket path. Mutually exclusive with C<host>.
=item loop => EV::Loop
EV loop to use. Default: C<EV::default_loop>.
=back
lib/EV/Nats.pm view on Meta::CPAN
Get/set PING interval.
=head2 max_pings_outstanding([$num])
Get/set max outstanding PINGs.
=head2 priority([$num])
Get/set EV watcher priority.
=head2 keepalive([$seconds])
Get/set TCP keepalive.
=head2 batch($coderef)
Batch multiple publishes into a single write. Suppresses per-publish
write scheduling; all buffered data is flushed after the coderef returns.
$nats->batch(sub {
$nats->publish("foo.$_", "msg-$_") for 1..1000;
});
src/EV__Nats.xs view on Meta::CPAN
/* MSG/HMSG fields: absolute offsets into rbuf (safe across Renew) */
size_t msg_subject_off;
size_t msg_subject_len;
size_t msg_reply_off; /* msg_reply_len == 0 means no reply */
size_t msg_reply_len;
uint64_t msg_sid;
size_t msg_hdr_len;
size_t msg_total_len;
int priority;
int keepalive;
/* Stats */
UV msgs_in;
UV msgs_out;
UV bytes_in;
UV bytes_out;
/* Server pool for cluster failover */
ngx_queue_t server_pool;
int server_pool_count;
src/EV__Nats.xs view on Meta::CPAN
for (rp = res; rp != NULL; rp = rp->ai_next) {
fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd < 0) continue;
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
int flag = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
if (self->keepalive > 0) {
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag));
#ifdef TCP_KEEPIDLE
setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &self->keepalive, sizeof(self->keepalive));
#endif
}
rv = connect(fd, rp->ai_addr, rp->ai_addrlen);
if (rv == 0 || errno == EINPROGRESS) break;
close(fd);
fd = -1;
}
src/EV__Nats.xs view on Meta::CPAN
else if (strcmp(key, "max_reconnect_delay") == 0)
self->max_reconnect_delay_ms = SvIV(val);
else if (strcmp(key, "connect_timeout") == 0)
self->connect_timeout_ms = SvIV(val);
else if (strcmp(key, "ping_interval") == 0)
self->ping_interval_ms = SvIV(val);
else if (strcmp(key, "max_pings_outstanding") == 0)
self->max_pings_outstanding = SvIV(val);
else if (strcmp(key, "priority") == 0)
self->priority = SvIV(val);
else if (strcmp(key, "keepalive") == 0)
self->keepalive = SvIV(val);
#ifdef HAVE_OPENSSL
else if (strcmp(key, "tls") == 0)
self->tls = SvTRUE(val) ? 1 : 0;
else if (strcmp(key, "tls_ca_file") == 0) {
STRLEN l; const char *s = SvPV(val, l);
Newx(self->tls_ca_file, l + 1, char);
Copy(s, self->tls_ca_file, l + 1, char);
}
else if (strcmp(key, "tls_skip_verify") == 0)
self->tls_skip_verify = SvTRUE(val) ? 1 : 0;
src/EV__Nats.xs view on Meta::CPAN
priority(self, ...)
EV::Nats self
CODE:
if (items > 1)
self->priority = SvIV(ST(1));
RETVAL = self->priority;
OUTPUT:
RETVAL
int
keepalive(self, ...)
EV::Nats self
CODE:
if (items > 1)
self->keepalive = SvIV(ST(1));
RETVAL = self->keepalive;
OUTPUT:
RETVAL
void
on_error(self, ...)
EV::Nats self
PPCODE:
if (items > 1) {
CLEAR_HANDLER(self->on_error);
if (SvOK(ST(1)))
xt/09_max_payload.t view on Meta::CPAN
undef $t;
eval { $nats->publish('maxpay.ok', 'x' x ($max - 1)) };
ok !$@, 'publish at max_payload-1 succeeds';
# Publish over max_payload should croak
eval { $nats->publish('maxpay.over', 'x' x ($max + 1)) };
like $@, qr/max_payload/, 'publish over max_payload croaks';
# Verify connection still works after croak
$nats->subscribe('maxpay.after', sub {
pass 'connection still alive after max_payload croak';
$nats->disconnect;
EV::break;
});
my $p; $p = EV::timer 0.1, 0, sub {
undef $p;
$nats->publish('maxpay.after', 'alive');
};
};
};
EV::run;
( run in 1.699 second using v1.01-cache-2.11-cpan-39bf76dae61 )