EV-Nats
view release on metacpan or search on metacpan
lib/EV/Nats/JetStream.pm view on Meta::CPAN
my $code = $1;
# A NATS/1.0 status frame is a control message, never data, so it
# must not be pushed as a result. 100 = idle heartbeat / flow
# control: ignore and keep waiting. Everything else (404 no
# messages, 408 expires elapsed, 409 consumer issue, 503 no
# responders, 400 bad request, ...) terminates the batch.
return if $code == 100;
return $finish->();
}
push @messages, {
subject => $subject,
payload => $payload,
reply => $reply,
headers => $headers,
};
$finish->() if @messages >= $batch;
});
my %req = (batch => $batch, expires => $expires_ns);
$req{no_wait} = JSON::PP::true() if $no_wait;
my $req_body = JSON::PP::encode_json(\%req);
my $subj = "$self->{prefix}.CONSUMER.MSG.NEXT.$stream.$consumer";
$nats->publish($subj, $req_body, $inbox);
$timer = EV::timer($expires_sec + 1, 0, sub {
$finish->();
});
return;
}
1;
=head1 NAME
EV::Nats::JetStream - JetStream API client for L<EV::Nats>
=head1 SYNOPSIS
use EV;
use EV::Nats;
use EV::Nats::JetStream;
my $nats = EV::Nats->new(host => '127.0.0.1');
my $js = EV::Nats::JetStream->new(nats => $nats);
$js->stream_create({ name => 'ORDERS', subjects => ['orders.>'] },
sub {
my ($info, $err) = @_;
die $err if $err;
$js->js_publish('orders.new', '{"item":"widget"}', sub {
my ($ack, $err) = @_;
print "stored at seq=$ack->{seq}\n";
});
});
EV::run;
=head1 DESCRIPTION
Thin async wrapper over the JetStream C<$JS.API.*> request/reply
endpoints. Each method is a single request whose callback is invoked
with the decoded JSON response (or an error string). The C<$nats>
connection passed to L</new> handles all the actual I/O.
L<EV::Nats::KV> and L<EV::Nats::ObjectStore> build on top of this
module -- see those for higher-level KV / blob APIs.
=head1 METHODS
All methods are async. Callbacks fire on the L<EV> loop.
=head2 new(%opts)
my $js = EV::Nats::JetStream->new(
nats => $nats,
prefix => '$JS.API', # default API subject prefix
timeout => 5000, # ms; default 5000
);
=head2 Stream management
=head3 stream_create($config, $cb)
Create a stream. C<$config> is passed verbatim as the
C<StreamConfig> request body. Callback: C<($info, $err)>.
=head3 stream_update($config, $cb)
Update an existing stream. Same shape as C<stream_create>.
=head3 stream_delete($name, $cb)
Delete the stream by name.
=head3 stream_info($name, [\%opts], $cb)
Fetch stream config + state. Optional C<\%opts> may include
C<subjects_filter> (e.g. C<E<gt>>) to populate C<state.subjects>;
without it the server omits that field for performance.
=head3 stream_list($cb)
List all streams' info.
=head3 stream_purge($name, $cb)
Purge all messages from the stream.
=head3 stream_msg_get($stream, \%opts, $cb)
Fetch a single message from C<$stream>. C<\%opts> selects the message:
{ seq => $n } # by sequence number
{ last_by_subj => $subject } # latest matching subject
{ next_by_subj => $subject, seq => $start } # next at-or-after $start
The message body and headers in the response are base64-encoded
under C<< $resp->{message}{data} >> and C<< $resp->{message}{hdrs} >>.
=head2 Consumer management
( run in 0.552 second using v1.01-cache-2.11-cpan-524268b4103 )