view release on metacpan or search on metacpan
- Extend `Net::Async::Kubernetes::PortForwardSession` with `write_stdin`
(writes to channel 0) and `resize` (writes TTY resize JSON to channel 4)
helpers for exec/attach sessions.
- Add async `cp_to_pod()` and `cp_from_pod()` helpers that stream tar
archives through an exec session for file transfer.
- Add mock test coverage for exec (`t/14-mock-exec.t`), attach
(`t/15-mock-attach.t`), and cp (`t/16-mock-cp.t`) flows.
0.006 2026-03-09 08:37:54Z
- Add async Pod Log API via `log()`.
Supports one-shot mode (Future resolves to full text) and streaming mode
(`on_line` callback with Kubernetes::REST::LogEvent objects).
Streaming mode supports follow, tailLines, sinceSeconds, sinceTime,
timestamps, previous, limitBytes, and container parameters.
- Add mock log test coverage (`t/09-mock-log.t`) and extend mock transport
with generic streaming chunk support for log streams.
- Add active `port_forward()` API that builds Kubernetes port-forward requests
and delegates to duplex transport (`_do_duplex_request`).
- Implement default websocket duplex transport for `port_forward()` using
Net::Async::WebSocket::Client, including channel frame decoding and a
`Net::Async::Kubernetes::PortForwardSession` helper (`write_channel`,
`close`).
- Add duplex transport tests (`t/13-duplex-transport.t`) for websocket
handshake wiring, frame handling, session writes/closes, and connect errors.
- Add active `exec()` API that builds Kubernetes pod `/exec` requests with
repeated `command=` parameters and stream toggles
- Add async exec test coverage (`t/14-mock-exec.t`) and duplex transport
assertions for exec in `t/13-duplex-transport.t`.
- Add active `attach()` API that builds Kubernetes pod `/attach` requests with
stream toggles (stdin/stdout/stderr/tty).
- Add async attach test coverage (`t/15-mock-attach.t`) and duplex transport
assertions for attach in `t/13-duplex-transport.t`.
- Extend `Net::Async::Kubernetes::PortForwardSession` with convenience
helpers `write_stdin()` (channel 0) and `resize()` (channel 4 terminal
resize payload for exec/attach).
- Add async cp helpers `cp_to_pod()` and `cp_from_pod()` built on `exec()`.
Supports single-file upload/download via channel streaming with Future-based
completion and status/error propagation.
- Add cp helper mock tests in `t/16-mock-cp.t`.
- Add live demo script `ex/live_features.pl` to exercise log, exec, attach,
port-forward, and cp helpers against a real cluster/kubeconfig.
- Raise minimum versions to Kubernetes::REST 1.101 and IO::K8s 1.008.
0.005 2026-03-04 17:14:31Z
- Use public Kubernetes::REST building-block API (build_path,
prepare_request, check_response, inflate_object, inflate_list,
process_watch_chunk) instead of private underscore methods.
0.002 2026-02-18 22:52:19Z
- Updating minimum requirements
0.001 2026-02-13 06:03:10Z
- Initial release
- Async Kubernetes client built on IO::Async and Kubernetes::REST
- Future-based CRUD: list(), get(), create(), update(), patch(), delete()
- Three patch types: strategic-merge (default), merge, json
- Delete by class+name or by object reference
- Net::Async::Kubernetes::Watcher for streaming watch with auto-reconnect
- Separate on_added, on_modified, on_deleted, on_error callbacks
- Catch-all on_event callback for raw WatchEvent access
- Client-side event filtering: names (regex/string/array) and event_types
- Smart event type auto-derivation from registered callbacks
- Resumable watches via resourceVersion tracking
- Automatic 410 Gone recovery (clear resourceVersion and restart)
- Auto-reconnect on stream completion (server timeout) and connection errors
- Kubeconfig support via Kubernetes::REST::Kubeconfig
- SSL/TLS with client certificate support
- Custom resource_map for CRD support
lib/Net/Async/Kubernetes.pm view on Meta::CPAN
$params{sinceSeconds} = $since_seconds if defined $since_seconds;
$params{sinceTime} = $since_time if defined $since_time;
$params{timestamps} = 'true' if $timestamps;
$params{previous} = 'true' if $previous;
$params{limitBytes} = $limit_bytes if defined $limit_bytes;
if ($on_line) {
my $req = $rest->prepare_request('GET', $path, parameters => \%params);
my $buffer = '';
return $self->_do_streaming_request($req, sub {
my ($chunk) = @_;
for my $event ($rest->process_log_chunk(\$buffer, $chunk)) {
$on_line->($event);
}
})->then(sub {
my ($response) = @_;
$rest->check_response($response, "log $short_class");
if (length $buffer) {
$on_line->(Kubernetes::REST::LogEvent->new(line => $buffer));
}
lib/Net/Async/Kubernetes.pm view on Meta::CPAN
$self->_ssl_options,
)->then(sub {
my ($response) = @_;
return Future->done(Kubernetes::REST::HTTPResponse->new(
status => $response->code,
content => $response->decoded_content // $response->content // '',
));
});
}
sub _do_streaming_request {
my ($self, $req, $on_chunk) = @_;
my $uri = URI->new($req->url);
return $self->_http->do_request(
method => $req->method,
uri => $uri,
headers => $req->headers,
on_header => sub {
my ($response) = @_;
lib/Net/Async/Kubernetes.pm view on Meta::CPAN
)->get;
$kube->delete('Pod', 'nginx', namespace => 'default')->get;
# Pod logs (one-shot)
my $text = $kube->log('Pod', 'nginx',
namespace => 'default',
tailLines => 100,
)->get;
# Pod logs (streaming)
$kube->log('Pod', 'nginx',
namespace => 'default',
follow => 1,
on_line => sub { my ($event) = @_; say $event->line },
)->get;
# Port-forward (built-in websocket duplex support)
my $pf = $kube->port_forward('Pod', 'nginx',
namespace => 'default',
ports => [8080],
lib/Net/Async/Kubernetes.pm view on Meta::CPAN
=head1 DESCRIPTION
C<Net::Async::Kubernetes> is an async Kubernetes client built on L<IO::Async>.
It extends L<IO::Async::Notifier> and uses L<Net::Async::HTTP> for
non-blocking HTTP communication, plus L<Net::Async::WebSocket::Client> for
duplex subresources like pod port-forward.
All CRUD, log, port-forward, exec, attach, and cp helper methods return L<Future> objects. The
L<Net::Async::Kubernetes::Watcher>
provides auto-reconnecting event streaming with separate callbacks per
event type.
Request preparation and response processing are delegated to
L<Kubernetes::REST>, so the same IO::K8s object inflation, short class
names, and CRD support are available.
Authentication is automatically resolved in the following order:
=over 4
lib/Net/Async/Kubernetes.pm view on Meta::CPAN
on_line => sub {
my ($event) = @_; # Kubernetes::REST::LogEvent
say $event->line;
},
)->get;
Retrieve logs from a pod.
Without C<on_line>, returns a L<Future> that resolves to the full log text.
With C<on_line>, opens a streaming request and invokes the callback once per
line with L<Kubernetes::REST::LogEvent> objects. The returned L<Future>
resolves when the stream ends.
=head2 port_forward
my $f = $kube->port_forward('Pod', 'my-pod',
namespace => 'default',
ports => [8080, 8443],
on_frame => sub { my ($channel, $payload) = @_; ... },
);
lib/Net/Async/Kubernetes.pm view on Meta::CPAN
=head2 cp_to_pod
my $f = $kube->cp_to_pod('Pod', 'my-pod',
namespace => 'default',
container => 'app',
local => '/tmp/local.txt',
remote => '/tmp/remote.txt',
);
my $result = $f->get;
Copy a single local file into a pod using C<exec()> and stdin streaming.
Returns a L<Future> resolving to a hashref containing C<local>, C<remote>,
C<bytes>, C<stderr>, and C<status>.
=head2 cp_from_pod
my $f = $kube->cp_from_pod('Pod', 'my-pod',
namespace => 'default',
container => 'app',
remote => '/tmp/remote.txt',
local => '/tmp/local.txt',
);
my $result = $f->get;
Copy a single file from a pod using C<exec()> and stdout streaming.
Returns a L<Future> resolving to a hashref containing C<local>, C<remote>,
C<bytes>, C<stderr>, and C<status>.
=head2 watcher
my $watcher = $kube->watcher('Pod',
namespace => 'default',
label_selector => 'app=web',
on_added => sub { my ($pod) = @_; ... },
lib/Net/Async/Kubernetes/Watcher.pm view on Meta::CPAN
if defined $self->{_resource_version};
$params{labelSelector} = $self->label_selector
if defined $self->label_selector;
$params{fieldSelector} = $self->field_selector
if defined $self->field_selector;
my $req = $rest->prepare_request('GET', $path, parameters => \%params);
weaken(my $weak_self = $self);
my $f = $self->kube->_do_streaming_request($req, sub {
my ($chunk) = @_;
return unless $weak_self;
my $buffer = $weak_self->{_buffer};
for my $result ($rest->process_watch_chunk($class, \$buffer, $chunk)) {
$weak_self->{_buffer} = $buffer;
if ($result->{resourceVersion}) {
$weak_self->{_resource_version} = $result->{resourceVersion};
}
t/09-mock-log.t view on Meta::CPAN
MockTransport::mock_response('GET', '/api/v1/namespaces/default/pods/nginx/log', "line 1\nline 2\n");
my $text = $kube->log('Pod', 'nginx', namespace => 'default')->get;
is($text, "line 1\nline 2\n", 'full log text returned');
my $req = MockTransport::last_request();
is($req->{method}, 'GET', 'used GET');
like($req->{path}, qr{/api/v1/namespaces/default/pods/nginx/log}, 'uses pod log path');
};
subtest 'streaming log emits LogEvent lines and resolves' => sub {
my $kube = make_kube();
MockTransport::mock_stream_chunks('/api/v1/namespaces/default/pods/nginx/log', [
"line 1\nline",
" 2\nline 3",
]);
my @lines;
my @classes;
t/09-mock-log.t view on Meta::CPAN
follow => 1,
on_line => sub {
my ($event) = @_;
push @classes, ref($event);
push @lines, $event->line;
},
);
$f->on_ready(sub { $loop->stop; });
$loop->watch_time(after => 2, code => sub {
fail('timeout waiting for streaming log');
$loop->stop;
});
$loop->run;
ok($f->is_done, 'streaming future resolved');
is_deeply(\@lines, ['line 1', 'line 2', 'line 3'], 'received chunked + trailing partial line');
is($classes[0], 'Kubernetes::REST::LogEvent', 'on_line receives LogEvent objects');
};
subtest 'log query parameters are sent' => sub {
my $kube = make_kube();
MockTransport::mock_stream_chunks('/api/v1/namespaces/default/pods/nginx/log', [
"ok\n",
]);
t/09-mock-log.t view on Meta::CPAN
tailLines => 10,
sinceSeconds => 30,
timestamps => 1,
previous => 1,
limitBytes => 2048,
on_line => sub {},
);
$f->on_ready(sub { $loop->stop; });
$loop->watch_time(after => 2, code => sub {
fail('timeout waiting for streaming log with params');
$loop->stop;
});
$loop->run;
my $req = MockTransport::last_request();
like($req->{url}, qr/container=sidecar/, 'container param');
like($req->{url}, qr/follow=true/, 'follow param');
like($req->{url}, qr/tailLines=10/, 'tailLines param');
like($req->{url}, qr/sinceSeconds=30/, 'sinceSeconds param');
like($req->{url}, qr/timestamps=true/, 'timestamps param');
t/09-mock-log.t view on Meta::CPAN
kind => 'Status',
status => 'Failure',
message => 'pods "missing" not found',
code => 404,
}, 404);
eval { $kube->log('Pod', 'missing', namespace => 'default')->get };
like($@, qr/error|404|Failure/i, 'one-shot log failure propagated');
};
subtest 'streaming log propagates HTTP errors' => sub {
my $kube = make_kube();
MockTransport::mock_stream_chunks('/api/v1/namespaces/default/pods/missing/log', [], {
status => 404,
});
my $f = $kube->log('Pod', 'missing',
namespace => 'default',
follow => 1,
on_line => sub {},
);
my $failed = 0;
$f->on_fail(sub { $failed = 1; $loop->stop; });
$f->on_done(sub { $loop->stop; });
$loop->watch_time(after => 2, code => sub {
fail('timeout waiting for streaming log error');
$loop->stop;
});
$loop->run;
ok($failed, 'streaming failure propagated');
};
subtest 'log argument validation' => sub {
my $kube = make_kube();
my $f1 = $kube->log('Pod', namespace => 'default');
ok($f1->is_failed, 'missing name returns failed future');
like($f1->failure, qr/name required for log/, 'missing name message');
my $f2 = $kube->log('Pod', 'nginx', 'orphan');
t/lib/MockTransport.pm view on Meta::CPAN
package MockTransport;
# Mock HTTP transport for Net::Async::Kubernetes tests.
# Overrides _do_request and _do_streaming_request to return
# pre-configured responses without needing a real cluster.
use strict;
use warnings;
use Future;
use JSON::MaybeXS;
use Kubernetes::REST::HTTPResponse;
my $json = JSON::MaybeXS->new(utf8 => 1, convert_blessed => 1);
t/lib/MockTransport.pm view on Meta::CPAN
# mock_watch_events('/api/v1/pods', [ { type => 'ADDED', object => {...} }, ... ]);
# mock_watch_events('/api/v1/pods', [...], { complete => 1 }); # resolve after events
# mock_watch_events('/api/v1/pods', [...], { fail => 'some error' }); # fail after events
sub mock_watch_events {
my ($path, $events, $opts) = @_;
$watch_events{$path} = $events;
$watch_opts{$path} = $opts // {};
}
# Register mock streaming chunks for a path (e.g. Pod logs)
# mock_stream_chunks('/api/v1/namespaces/default/pods/x/log', [ "line1\n", "line2\n" ]);
# mock_stream_chunks('/api/v1/namespaces/default/pods/x/log', [...], { complete => 1 });
# mock_stream_chunks('/api/v1/namespaces/default/pods/x/log', [...], { fail => 'some error' });
sub mock_stream_chunks {
my ($path, $chunks, $opts) = @_;
$stream_chunks{$path} = $chunks;
$stream_opts{$path} = $opts // {};
}
sub mock_duplex_session {
my ($session) = @_;
$duplex_session = $session;
}
# Install the mock transport on a Net::Async::Kubernetes instance.
# Replaces _do_request and _do_streaming_request with mock versions.
sub install {
my ($kube) = @_;
no warnings 'redefine';
my $class = ref($kube) || $kube;
# Override _do_request
no strict 'refs';
*{"${class}::_do_request"} = sub {
t/lib/MockTransport.pm view on Meta::CPAN
return Future->done(Kubernetes::REST::HTTPResponse->new(
status => 404,
content => $json->encode({
kind => 'Status', status => 'Failure',
message => "Mock: no response for $key",
code => 404,
}),
));
};
# Override _do_streaming_request
# Events are delivered asynchronously via the event loop so that
# $watcher has been assigned in the test closure before callbacks fire.
# Returns a pending Future that stays open (like a real watch connection);
# the test calls $watcher->stop to cancel it.
*{"${class}::_do_streaming_request"} = sub {
my ($self, $req, $on_chunk) = @_;
my $url = $req->url;
my $path = $url;
$path =~ s{^https?://[^/]+}{};
$path =~ s{\?.*}{}; # Strip query params
push @request_log, {
method => $req->method,
url => $url,
path => $path,
streaming => 1,
};
if (my $events = $watch_events{$path}) {
my $f = $self->loop->new_future;
my $opts = $watch_opts{$path} || {};
my $status = $opts->{status} // 200;
if (@$events || $opts->{complete} || $opts->{fail}) {
# Deliver all events in one tick (like a real chunked response).
# Don't check cancellation between events - the watcher's chunk