EV-Nats

 view release on metacpan or  search on metacpan

lib/EV/Nats/JetStream.pm  view on Meta::CPAN

            my $code = $1;
            # A NATS/1.0 status frame is a control message, never data, so it
            # must not be pushed as a result. 100 = idle heartbeat / flow
            # control: ignore and keep waiting. Everything else (404 no
            # messages, 408 expires elapsed, 409 consumer issue, 503 no
            # responders, 400 bad request, ...) terminates the batch.
            return if $code == 100;
            return $finish->();
        }
        push @messages, {
            subject => $subject,
            payload => $payload,
            reply   => $reply,
            headers => $headers,
        };
        $finish->() if @messages >= $batch;
    });

    my %req = (batch => $batch, expires => $expires_ns);
    $req{no_wait} = JSON::PP::true() if $no_wait;
    my $req_body = JSON::PP::encode_json(\%req);
    my $subj = "$self->{prefix}.CONSUMER.MSG.NEXT.$stream.$consumer";
    $nats->publish($subj, $req_body, $inbox);

    $timer = EV::timer($expires_sec + 1, 0, sub {
        $finish->();
    });
    return;
}

1;

=head1 NAME

EV::Nats::JetStream - JetStream API client for L<EV::Nats>

=head1 SYNOPSIS

    use EV;
    use EV::Nats;
    use EV::Nats::JetStream;

    my $nats = EV::Nats->new(host => '127.0.0.1');
    my $js   = EV::Nats::JetStream->new(nats => $nats);

    $js->stream_create({ name => 'ORDERS', subjects => ['orders.>'] },
                       sub {
        my ($info, $err) = @_;
        die $err if $err;
        $js->js_publish('orders.new', '{"item":"widget"}', sub {
            my ($ack, $err) = @_;
            print "stored at seq=$ack->{seq}\n";
        });
    });

    EV::run;

=head1 DESCRIPTION

Thin async wrapper over the JetStream C<$JS.API.*> request/reply
endpoints. Each method is a single request whose callback is invoked
with the decoded JSON response (or an error string). The C<$nats>
connection passed to L</new> handles all the actual I/O.

L<EV::Nats::KV> and L<EV::Nats::ObjectStore> build on top of this
module -- see those for higher-level KV / blob APIs.

=head1 METHODS

All methods are async. Callbacks fire on the L<EV> loop.

=head2 new(%opts)

    my $js = EV::Nats::JetStream->new(
        nats    => $nats,
        prefix  => '$JS.API',   # default API subject prefix
        timeout => 5000,        # ms; default 5000
    );

=head2 Stream management

=head3 stream_create($config, $cb)

Create a stream. C<$config> is passed verbatim as the
C<StreamConfig> request body. Callback: C<($info, $err)>.

=head3 stream_update($config, $cb)

Update an existing stream. Same shape as C<stream_create>.

=head3 stream_delete($name, $cb)

Delete the stream by name.

=head3 stream_info($name, [\%opts], $cb)

Fetch stream config + state. Optional C<\%opts> may include
C<subjects_filter> (e.g. C<E<gt>>) to populate C<state.subjects>;
without it the server omits that field for performance.

=head3 stream_list($cb)

List all streams' info.

=head3 stream_purge($name, $cb)

Purge all messages from the stream.

=head3 stream_msg_get($stream, \%opts, $cb)

Fetch a single message from C<$stream>. C<\%opts> selects the message:

    { seq => $n }                                # by sequence number
    { last_by_subj => $subject }                 # latest matching subject
    { next_by_subj => $subject, seq => $start }  # next at-or-after $start

The message body and headers in the response are base64-encoded
under C<< $resp->{message}{data} >> and C<< $resp->{message}{hdrs} >>.

=head2 Consumer management



( run in 0.552 second using v1.01-cache-2.11-cpan-524268b4103 )