Net-Async-Kubernetes

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

 - Extend `Net::Async::Kubernetes::PortForwardSession` with `write_stdin`
   (writes to channel 0) and `resize` (writes TTY resize JSON to channel 4)
   helpers for exec/attach sessions.
 - Add async `cp_to_pod()` and `cp_from_pod()` helpers that stream tar
   archives through an exec session for file transfer.
 - Add mock test coverage for exec (`t/14-mock-exec.t`), attach
   (`t/15-mock-attach.t`), and cp (`t/16-mock-cp.t`) flows.

0.006     2026-03-09 08:37:54Z
 - Add async Pod Log API via `log()`.
   Supports one-shot mode (Future resolves to full text) and streaming mode
   (`on_line` callback with Kubernetes::REST::LogEvent objects).
   Streaming mode supports follow, tailLines, sinceSeconds, sinceTime,
   timestamps, previous, limitBytes, and container parameters.
 - Add mock log test coverage (`t/09-mock-log.t`) and extend mock transport
   with generic streaming chunk support for log streams.
 - Add active `port_forward()` API that builds Kubernetes port-forward requests
   and delegates to duplex transport (`_do_duplex_request`).
 - Implement default websocket duplex transport for `port_forward()` using
   Net::Async::WebSocket::Client, including channel frame decoding and a
   `Net::Async::Kubernetes::PortForwardSession` helper (`write_channel`,
   `close`).
 - Add duplex transport tests (`t/13-duplex-transport.t`) for websocket
   handshake wiring, frame handling, session writes/closes, and connect errors.
 - Add active `exec()` API that builds Kubernetes pod `/exec` requests with
   repeated `command=` parameters and stream toggles

Changes  view on Meta::CPAN

 - Add async exec test coverage (`t/14-mock-exec.t`) and duplex transport
   assertions for exec in `t/13-duplex-transport.t`.
 - Add active `attach()` API that builds Kubernetes pod `/attach` requests with
   stream toggles (stdin/stdout/stderr/tty).
 - Add async attach test coverage (`t/15-mock-attach.t`) and duplex transport
   assertions for attach in `t/13-duplex-transport.t`.
 - Extend `Net::Async::Kubernetes::PortForwardSession` with convenience
   helpers `write_stdin()` (channel 0) and `resize()` (channel 4 terminal
   resize payload for exec/attach).
 - Add async cp helpers `cp_to_pod()` and `cp_from_pod()` built on `exec()`.
   Supports single-file upload/download via channel streaming with Future-based
   completion and status/error propagation.
 - Add cp helper mock tests in `t/16-mock-cp.t`.
 - Add live demo script `ex/live_features.pl` to exercise log, exec, attach,
   port-forward, and cp helpers against a real cluster/kubeconfig.
 - Raise minimum versions to Kubernetes::REST 1.101 and IO::K8s 1.008.

0.005     2026-03-04 17:14:31Z
 - Use public Kubernetes::REST building-block API (build_path,
   prepare_request, check_response, inflate_object, inflate_list,
   process_watch_chunk) instead of private underscore methods.

Changes  view on Meta::CPAN


0.002     2026-02-18 22:52:19Z
 - Updating minimum requirements

0.001     2026-02-13 06:03:10Z
 - Initial release
 - Async Kubernetes client built on IO::Async and Kubernetes::REST
 - Future-based CRUD: list(), get(), create(), update(), patch(), delete()
 - Three patch types: strategic-merge (default), merge, json
 - Delete by class+name or by object reference
 - Net::Async::Kubernetes::Watcher for streaming watch with auto-reconnect
 - Separate on_added, on_modified, on_deleted, on_error callbacks
 - Catch-all on_event callback for raw WatchEvent access
 - Client-side event filtering: names (regex/string/array) and event_types
 - Smart event type auto-derivation from registered callbacks
 - Resumable watches via resourceVersion tracking
 - Automatic 410 Gone recovery (clear resourceVersion and restart)
 - Auto-reconnect on stream completion (server timeout) and connection errors
 - Kubeconfig support via Kubernetes::REST::Kubeconfig
 - SSL/TLS with client certificate support
 - Custom resource_map for CRD support

lib/Net/Async/Kubernetes.pm  view on Meta::CPAN

    $params{sinceSeconds} = $since_seconds if defined $since_seconds;
    $params{sinceTime}    = $since_time    if defined $since_time;
    $params{timestamps}   = 'true'         if $timestamps;
    $params{previous}     = 'true'         if $previous;
    $params{limitBytes}   = $limit_bytes   if defined $limit_bytes;

    if ($on_line) {
        my $req = $rest->prepare_request('GET', $path, parameters => \%params);
        my $buffer = '';

        return $self->_do_streaming_request($req, sub {
            my ($chunk) = @_;
            for my $event ($rest->process_log_chunk(\$buffer, $chunk)) {
                $on_line->($event);
            }
        })->then(sub {
            my ($response) = @_;
            $rest->check_response($response, "log $short_class");
            if (length $buffer) {
                $on_line->(Kubernetes::REST::LogEvent->new(line => $buffer));
            }

lib/Net/Async/Kubernetes.pm  view on Meta::CPAN

        $self->_ssl_options,
    )->then(sub {
        my ($response) = @_;
        return Future->done(Kubernetes::REST::HTTPResponse->new(
            status  => $response->code,
            content => $response->decoded_content // $response->content // '',
        ));
    });
}

sub _do_streaming_request {
    my ($self, $req, $on_chunk) = @_;

    my $uri = URI->new($req->url);

    return $self->_http->do_request(
        method  => $req->method,
        uri     => $uri,
        headers => $req->headers,
        on_header => sub {
            my ($response) = @_;

lib/Net/Async/Kubernetes.pm  view on Meta::CPAN

    )->get;

    $kube->delete('Pod', 'nginx', namespace => 'default')->get;

    # Pod logs (one-shot)
    my $text = $kube->log('Pod', 'nginx',
        namespace => 'default',
        tailLines => 100,
    )->get;

    # Pod logs (streaming)
    $kube->log('Pod', 'nginx',
        namespace => 'default',
        follow    => 1,
        on_line   => sub { my ($event) = @_; say $event->line },
    )->get;

    # Port-forward (built-in websocket duplex support)
    my $pf = $kube->port_forward('Pod', 'nginx',
        namespace => 'default',
        ports     => [8080],

lib/Net/Async/Kubernetes.pm  view on Meta::CPAN


=head1 DESCRIPTION

C<Net::Async::Kubernetes> is an async Kubernetes client built on L<IO::Async>.
It extends L<IO::Async::Notifier> and uses L<Net::Async::HTTP> for
non-blocking HTTP communication, plus L<Net::Async::WebSocket::Client> for
duplex subresources like pod port-forward.

All CRUD, log, port-forward, exec, attach, and cp helper methods return L<Future> objects. The
L<Net::Async::Kubernetes::Watcher>
provides auto-reconnecting event streaming with separate callbacks per
event type.

Request preparation and response processing are delegated to
L<Kubernetes::REST>, so the same IO::K8s object inflation, short class
names, and CRD support are available.

Authentication is automatically resolved in the following order:

=over 4

lib/Net/Async/Kubernetes.pm  view on Meta::CPAN

        on_line   => sub {
            my ($event) = @_;  # Kubernetes::REST::LogEvent
            say $event->line;
        },
    )->get;

Retrieve logs from a pod.

Without C<on_line>, returns a L<Future> that resolves to the full log text.

With C<on_line>, opens a streaming request and invokes the callback once per
line with L<Kubernetes::REST::LogEvent> objects. The returned L<Future>
resolves when the stream ends.

=head2 port_forward

    my $f = $kube->port_forward('Pod', 'my-pod',
        namespace => 'default',
        ports     => [8080, 8443],
        on_frame  => sub { my ($channel, $payload) = @_; ... },
    );

lib/Net/Async/Kubernetes.pm  view on Meta::CPAN

=head2 cp_to_pod

    my $f = $kube->cp_to_pod('Pod', 'my-pod',
        namespace => 'default',
        container => 'app',
        local     => '/tmp/local.txt',
        remote    => '/tmp/remote.txt',
    );
    my $result = $f->get;

Copy a single local file into a pod using C<exec()> and stdin streaming.

Returns a L<Future> resolving to a hashref containing C<local>, C<remote>,
C<bytes>, C<stderr>, and C<status>.

=head2 cp_from_pod

    my $f = $kube->cp_from_pod('Pod', 'my-pod',
        namespace => 'default',
        container => 'app',
        remote    => '/tmp/remote.txt',
        local     => '/tmp/local.txt',
    );
    my $result = $f->get;

Copy a single file from a pod using C<exec()> and stdout streaming.

Returns a L<Future> resolving to a hashref containing C<local>, C<remote>,
C<bytes>, C<stderr>, and C<status>.

=head2 watcher

    my $watcher = $kube->watcher('Pod',
        namespace      => 'default',
        label_selector => 'app=web',
        on_added       => sub { my ($pod) = @_; ... },

lib/Net/Async/Kubernetes/Watcher.pm  view on Meta::CPAN

        if defined $self->{_resource_version};
    $params{labelSelector} = $self->label_selector
        if defined $self->label_selector;
    $params{fieldSelector} = $self->field_selector
        if defined $self->field_selector;

    my $req = $rest->prepare_request('GET', $path, parameters => \%params);

    weaken(my $weak_self = $self);

    my $f = $self->kube->_do_streaming_request($req, sub {
        my ($chunk) = @_;
        return unless $weak_self;

        my $buffer = $weak_self->{_buffer};
        for my $result ($rest->process_watch_chunk($class, \$buffer, $chunk)) {
            $weak_self->{_buffer} = $buffer;

            if ($result->{resourceVersion}) {
                $weak_self->{_resource_version} = $result->{resourceVersion};
            }

t/09-mock-log.t  view on Meta::CPAN

    MockTransport::mock_response('GET', '/api/v1/namespaces/default/pods/nginx/log', "line 1\nline 2\n");

    my $text = $kube->log('Pod', 'nginx', namespace => 'default')->get;
    is($text, "line 1\nline 2\n", 'full log text returned');

    my $req = MockTransport::last_request();
    is($req->{method}, 'GET', 'used GET');
    like($req->{path}, qr{/api/v1/namespaces/default/pods/nginx/log}, 'uses pod log path');
};

subtest 'streaming log emits LogEvent lines and resolves' => sub {
    my $kube = make_kube();

    MockTransport::mock_stream_chunks('/api/v1/namespaces/default/pods/nginx/log', [
        "line 1\nline",
        " 2\nline 3",
    ]);

    my @lines;
    my @classes;

t/09-mock-log.t  view on Meta::CPAN

        follow    => 1,
        on_line   => sub {
            my ($event) = @_;
            push @classes, ref($event);
            push @lines, $event->line;
        },
    );

    $f->on_ready(sub { $loop->stop; });
    $loop->watch_time(after => 2, code => sub {
        fail('timeout waiting for streaming log');
        $loop->stop;
    });
    $loop->run;

    ok($f->is_done, 'streaming future resolved');
    is_deeply(\@lines, ['line 1', 'line 2', 'line 3'], 'received chunked + trailing partial line');
    is($classes[0], 'Kubernetes::REST::LogEvent', 'on_line receives LogEvent objects');
};

subtest 'log query parameters are sent' => sub {
    my $kube = make_kube();

    MockTransport::mock_stream_chunks('/api/v1/namespaces/default/pods/nginx/log', [
        "ok\n",
    ]);

t/09-mock-log.t  view on Meta::CPAN

        tailLines    => 10,
        sinceSeconds => 30,
        timestamps   => 1,
        previous     => 1,
        limitBytes   => 2048,
        on_line      => sub {},
    );

    $f->on_ready(sub { $loop->stop; });
    $loop->watch_time(after => 2, code => sub {
        fail('timeout waiting for streaming log with params');
        $loop->stop;
    });
    $loop->run;

    my $req = MockTransport::last_request();
    like($req->{url}, qr/container=sidecar/, 'container param');
    like($req->{url}, qr/follow=true/, 'follow param');
    like($req->{url}, qr/tailLines=10/, 'tailLines param');
    like($req->{url}, qr/sinceSeconds=30/, 'sinceSeconds param');
    like($req->{url}, qr/timestamps=true/, 'timestamps param');

t/09-mock-log.t  view on Meta::CPAN

        kind    => 'Status',
        status  => 'Failure',
        message => 'pods "missing" not found',
        code    => 404,
    }, 404);

    eval { $kube->log('Pod', 'missing', namespace => 'default')->get };
    like($@, qr/error|404|Failure/i, 'one-shot log failure propagated');
};

subtest 'streaming log propagates HTTP errors' => sub {
    my $kube = make_kube();

    MockTransport::mock_stream_chunks('/api/v1/namespaces/default/pods/missing/log', [], {
        status => 404,
    });

    my $f = $kube->log('Pod', 'missing',
        namespace => 'default',
        follow    => 1,
        on_line   => sub {},
    );

    my $failed = 0;
    $f->on_fail(sub { $failed = 1; $loop->stop; });
    $f->on_done(sub { $loop->stop; });
    $loop->watch_time(after => 2, code => sub {
        fail('timeout waiting for streaming log error');
        $loop->stop;
    });
    $loop->run;

    ok($failed, 'streaming failure propagated');
};

subtest 'log argument validation' => sub {
    my $kube = make_kube();

    my $f1 = $kube->log('Pod', namespace => 'default');
    ok($f1->is_failed, 'missing name returns failed future');
    like($f1->failure, qr/name required for log/, 'missing name message');

    my $f2 = $kube->log('Pod', 'nginx', 'orphan');

t/lib/MockTransport.pm  view on Meta::CPAN

package MockTransport;
# Mock HTTP transport for Net::Async::Kubernetes tests.
# Overrides _do_request and _do_streaming_request to return
# pre-configured responses without needing a real cluster.

use strict;
use warnings;
use Future;
use JSON::MaybeXS;
use Kubernetes::REST::HTTPResponse;

my $json = JSON::MaybeXS->new(utf8 => 1, convert_blessed => 1);

t/lib/MockTransport.pm  view on Meta::CPAN

# mock_watch_events('/api/v1/pods', [ { type => 'ADDED', object => {...} }, ... ]);
# mock_watch_events('/api/v1/pods', [...], { complete => 1 });       # resolve after events
# mock_watch_events('/api/v1/pods', [...], { fail => 'some error' }); # fail after events

sub mock_watch_events {
    my ($path, $events, $opts) = @_;
    $watch_events{$path} = $events;
    $watch_opts{$path} = $opts // {};
}

# Register mock streaming chunks for a path (e.g. Pod logs)
# mock_stream_chunks('/api/v1/namespaces/default/pods/x/log', [ "line1\n", "line2\n" ]);
# mock_stream_chunks('/api/v1/namespaces/default/pods/x/log', [...], { complete => 1 });
# mock_stream_chunks('/api/v1/namespaces/default/pods/x/log', [...], { fail => 'some error' });
sub mock_stream_chunks {
    my ($path, $chunks, $opts) = @_;
    $stream_chunks{$path} = $chunks;
    $stream_opts{$path} = $opts // {};
}

sub mock_duplex_session {
    my ($session) = @_;
    $duplex_session = $session;
}

# Install the mock transport on a Net::Async::Kubernetes instance.
# Replaces _do_request and _do_streaming_request with mock versions.
sub install {
    my ($kube) = @_;

    no warnings 'redefine';

    my $class = ref($kube) || $kube;

    # Override _do_request
    no strict 'refs';
    *{"${class}::_do_request"} = sub {

t/lib/MockTransport.pm  view on Meta::CPAN

        return Future->done(Kubernetes::REST::HTTPResponse->new(
            status  => 404,
            content => $json->encode({
                kind => 'Status', status => 'Failure',
                message => "Mock: no response for $key",
                code => 404,
            }),
        ));
    };

    # Override _do_streaming_request
    # Events are delivered asynchronously via the event loop so that
    # $watcher has been assigned in the test closure before callbacks fire.
    # Returns a pending Future that stays open (like a real watch connection);
    # the test calls $watcher->stop to cancel it.
    *{"${class}::_do_streaming_request"} = sub {
        my ($self, $req, $on_chunk) = @_;
        my $url = $req->url;
        my $path = $url;
        $path =~ s{^https?://[^/]+}{};
        $path =~ s{\?.*}{};  # Strip query params

        push @request_log, {
            method  => $req->method,
            url     => $url,
            path    => $path,
            streaming => 1,
        };

        if (my $events = $watch_events{$path}) {
            my $f = $self->loop->new_future;
            my $opts = $watch_opts{$path} || {};
            my $status = $opts->{status} // 200;

            if (@$events || $opts->{complete} || $opts->{fail}) {
                # Deliver all events in one tick (like a real chunked response).
                # Don't check cancellation between events - the watcher's chunk



( run in 0.593 second using v1.01-cache-2.11-cpan-140bd7fdf52 )