Net-Async-Kubernetes
view release on metacpan or search on metacpan
lib/Net/Async/Kubernetes.pm view on Meta::CPAN
my $path = $rest->build_path($class, name => $name, namespace => $namespace);
my $req = $rest->prepare_request('DELETE', $path);
return $self->_do_request($req)->then(sub {
my ($response) = @_;
$rest->check_response($response, "delete $class");
return Future->done(1);
});
}
sub log {
my ($self, $short_class, @rest_args) = @_;
my $rest = $self->_rest;
my %args;
# Support: log('Pod', 'name', ...) and log('Pod', name => 'name', ...)
if (@rest_args >= 1
&& !ref($rest_args[0])
&& $rest_args[0] !~ /^(name|namespace|container|follow|tailLines|sinceSeconds|sinceTime|timestamps|previous|limitBytes|on_line)$/
) {
$args{name} = shift @rest_args;
return Future->fail("Invalid arguments to log()") if @rest_args % 2;
%args = (%args, @rest_args);
} elsif (@rest_args % 2 == 0) {
%args = @rest_args;
} else {
return Future->fail("Invalid arguments to log()");
}
return Future->fail("name required for log") unless $args{name};
my $on_line = delete $args{on_line};
my $container = delete $args{container};
my $follow = delete $args{follow};
my $tail_lines = delete $args{tailLines};
my $since_seconds = delete $args{sinceSeconds};
my $since_time = delete $args{sinceTime};
my $timestamps = delete $args{timestamps};
my $previous = delete $args{previous};
my $limit_bytes = delete $args{limitBytes};
my $class = $rest->expand_class($short_class);
my $path = $rest->build_path($class, %args) . '/log';
my %params;
$params{container} = $container if defined $container;
$params{follow} = 'true' if $follow;
$params{tailLines} = $tail_lines if defined $tail_lines;
$params{sinceSeconds} = $since_seconds if defined $since_seconds;
$params{sinceTime} = $since_time if defined $since_time;
$params{timestamps} = 'true' if $timestamps;
$params{previous} = 'true' if $previous;
$params{limitBytes} = $limit_bytes if defined $limit_bytes;
if ($on_line) {
my $req = $rest->prepare_request('GET', $path, parameters => \%params);
my $buffer = '';
return $self->_do_streaming_request($req, sub {
my ($chunk) = @_;
for my $event ($rest->process_log_chunk(\$buffer, $chunk)) {
$on_line->($event);
}
})->then(sub {
my ($response) = @_;
$rest->check_response($response, "log $short_class");
if (length $buffer) {
$on_line->(Kubernetes::REST::LogEvent->new(line => $buffer));
}
return Future->done(undef);
});
}
my $req = $rest->prepare_request('GET', $path,
%params ? (parameters => \%params) : (),
);
return $self->_do_request($req)->then(sub {
my ($response) = @_;
$rest->check_response($response, "log $short_class");
return Future->done($response->content);
});
}
sub port_forward {
my ($self, $short_class, @rest_args) = @_;
my $rest = $self->_rest;
my %args;
# Support: port_forward('Pod', 'name', ...) and port_forward('Pod', name => 'name', ...)
if (@rest_args >= 1
&& !ref($rest_args[0])
&& $rest_args[0] !~ /^(name|namespace|ports|subprotocol|on_open|on_frame|on_close|on_error)$/
) {
$args{name} = shift @rest_args;
return Future->fail("Invalid arguments to port_forward()") if @rest_args % 2;
%args = (%args, @rest_args);
} elsif (@rest_args % 2 == 0) {
%args = @rest_args;
} else {
return Future->fail("Invalid arguments to port_forward()");
}
return Future->fail("name required for port_forward") unless $args{name};
my $ports = delete $args{ports};
return Future->fail("ports required for port_forward") unless defined $ports;
$ports = [$ports] unless ref($ports) eq 'ARRAY';
return Future->fail("ports required for port_forward") unless @$ports;
for my $p (@$ports) {
return Future->fail("invalid port '$p' for port_forward")
unless defined($p) && $p =~ /^\d+$/ && $p > 0 && $p <= 65535;
}
my $subprotocol = delete $args{subprotocol} // 'v4.channel.k8s.io';
my $on_open = delete $args{on_open};
my $on_frame = delete $args{on_frame};
my $on_close = delete $args{on_close};
lib/Net/Async/Kubernetes.pm view on Meta::CPAN
my ($self, $resource, %args) = @_;
require Net::Async::Kubernetes::Watcher;
my $watcher = Net::Async::Kubernetes::Watcher->new(
kube => $self,
resource => $resource,
%args,
);
$self->add_child($watcher);
return $watcher;
}
sub controller {
my ($self, %args) = @_;
require Net::Async::Kubernetes::Controller;
my $controller = Net::Async::Kubernetes::Controller->new(
kube => $self,
%args,
);
$self->add_child($controller);
return $controller;
}
# ============================================================================
# HTTP TRANSPORT
# ============================================================================
sub _do_request {
my ($self, $req) = @_;
my $uri = URI->new($req->url);
my @content_args;
if (defined $req->content) {
my $ct = $req->headers->{'Content-Type'} // 'application/json';
@content_args = (content => $req->content, content_type => $ct);
}
return $self->_http->do_request(
method => $req->method,
uri => $uri,
headers => $req->headers,
@content_args,
$self->_ssl_options,
)->then(sub {
my ($response) = @_;
return Future->done(Kubernetes::REST::HTTPResponse->new(
status => $response->code,
content => $response->decoded_content // $response->content // '',
));
});
}
sub _do_streaming_request {
my ($self, $req, $on_chunk) = @_;
my $uri = URI->new($req->url);
return $self->_http->do_request(
method => $req->method,
uri => $uri,
headers => $req->headers,
on_header => sub {
my ($response) = @_;
return sub {
my ($chunk) = @_;
if (defined $chunk) {
$on_chunk->($chunk);
}
};
},
$self->_ssl_options,
)->then(sub {
my ($response) = @_;
return Future->done(Kubernetes::REST::HTTPResponse->new(
status => $response->code,
content => '',
));
});
}
sub _do_duplex_request {
my ($self, $req, %callbacks) = @_;
my $loop = eval { $self->loop };
return Future->fail("port_forward requires Net::Async::Kubernetes to be added to an IO::Async::Loop")
unless $loop;
my $on_open = $callbacks{on_open};
my $on_frame = $callbacks{on_frame};
my $on_close = $callbacks{on_close};
my $on_error = $callbacks{on_error};
my $ws_url = $self->_build_websocket_url($req->url);
my $ws_req = $self->_build_websocket_request($req);
my $client;
my $session;
my $close_notified = 0;
my $detach_client = sub {
return unless $client;
return unless $client->can('parent');
return unless $client->parent && $client->parent == $self;
$self->remove_child($client);
};
my $notify_error = sub {
my ($err) = @_;
return unless ref($on_error) eq 'CODE';
my $ok = eval { $on_error->($err); 1 };
return if $ok;
warn $@;
};
lib/Net/Async/Kubernetes.pm view on Meta::CPAN
package Net::Async::Kubernetes;
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Net::Async::Kubernetes - Async Kubernetes client for IO::Async
=head1 VERSION
version 0.007
=head1 SYNOPSIS
use IO::Async::Loop;
use Net::Async::Kubernetes;
my $loop = IO::Async::Loop->new;
# From kubeconfig (easiest)
my $kube = Net::Async::Kubernetes->new(
kubeconfig => "$ENV{HOME}/.kube/config",
);
$loop->add($kube);
# In-cluster: auto-detects service account token (no config needed)
my $kube = Net::Async::Kubernetes->new;
$loop->add($kube);
# Or with explicit server/credentials
my $kube = Net::Async::Kubernetes->new(
server => { endpoint => 'https://kubernetes.local:6443' },
credentials => { token => $token },
);
$loop->add($kube);
# Future-based CRUD
my $pods = $kube->list('Pod', namespace => 'default')->get;
my $pod = $kube->get('Pod', 'nginx', namespace => 'default')->get;
my $patched = $kube->patch('Pod', 'nginx',
namespace => 'default',
patch => { metadata => { labels => { env => 'staging' } } },
)->get;
$kube->delete('Pod', 'nginx', namespace => 'default')->get;
# Pod logs (one-shot)
my $text = $kube->log('Pod', 'nginx',
namespace => 'default',
tailLines => 100,
)->get;
# Pod logs (streaming)
$kube->log('Pod', 'nginx',
namespace => 'default',
follow => 1,
on_line => sub { my ($event) = @_; say $event->line },
)->get;
# Port-forward (built-in websocket duplex support)
my $pf = $kube->port_forward('Pod', 'nginx',
namespace => 'default',
ports => [8080],
on_frame => sub { my ($channel, $payload) = @_; ... },
)->get;
$pf->write_channel(0, "GET / HTTP/1.1\r\n\r\n");
$pf->close(code => 1000);
# Pod exec (websocket duplex)
my $exec = $kube->exec('Pod', 'nginx',
namespace => 'default',
command => ['sh', '-c', 'id'],
on_frame => sub { my ($channel, $payload) = @_; ... },
)->get;
$exec->write_stdin("id\n");
$exec->resize(width => 120, height => 40);
# Pod attach (websocket duplex)
my $attach = $kube->attach('Pod', 'nginx',
namespace => 'default',
container => 'app',
stdin => 1,
stdout => 1,
stderr => 1,
tty => 0,
on_frame => sub { my ($channel, $payload) = @_; ... },
)->get;
$attach->write_stdin("help\n");
# Copy local file to pod and back (built on exec)
$kube->cp_to_pod('Pod', 'nginx',
namespace => 'default',
local => '/tmp/local.txt',
remote => '/tmp/remote.txt',
)->get;
$kube->cp_from_pod('Pod', 'nginx',
namespace => 'default',
remote => '/tmp/remote.txt',
local => '/tmp/downloaded.txt',
)->get;
# Watcher with auto-reconnect
my $watcher = $kube->watcher('Pod',
namespace => 'default',
on_added => sub { my ($pod) = @_; say "Added: " . $pod->metadata->name },
on_modified => sub { my ($pod) = @_; say "Modified: " . $pod->metadata->name },
on_deleted => sub { my ($pod) = @_; say "Deleted: " . $pod->metadata->name },
);
$loop->run;
=head1 DESCRIPTION
C<Net::Async::Kubernetes> is an async Kubernetes client built on L<IO::Async>.
It extends L<IO::Async::Notifier> and uses L<Net::Async::HTTP> for
non-blocking HTTP communication, plus L<Net::Async::WebSocket::Client> for
duplex subresources like pod port-forward.
All CRUD, log, port-forward, exec, attach, and cp helper methods return L<Future> objects. The
L<Net::Async::Kubernetes::Watcher>
provides auto-reconnecting event streaming with separate callbacks per
event type.
Request preparation and response processing are delegated to
L<Kubernetes::REST>, so the same IO::K8s object inflation, short class
names, and CRD support are available.
Authentication is automatically resolved in the following order:
=over 4
=item 1. Explicit C<server> and C<credentials> parameters
=item 2. C<kubeconfig> file (via L<Kubernetes::REST::Kubeconfig>)
=item 3. In-cluster service account token at
C</var/run/secrets/kubernetes.io/serviceaccount/token> (automatic when
running inside a Kubernetes pod)
=back
=head2 configure
Internal L<IO::Async::Notifier> configuration method. Handles initialization
of C<kubeconfig>, C<context>, C<server>, C<credentials>, C<resource_map>,
and C<resource_map_from_cluster> parameters.
If C<kubeconfig> is provided without explicit C<server> or C<credentials>,
they are loaded automatically via L<Kubernetes::REST::Kubeconfig>.
When running inside a Kubernetes pod (no C<kubeconfig> or C<server> set),
the service account token at
C</var/run/secrets/kubernetes.io/serviceaccount/token> is used
automatically for in-cluster authentication.
=head2 kubeconfig
Path to kubeconfig file. If provided, C<server> and C<credentials> are
extracted automatically (via L<Kubernetes::REST::Kubeconfig>).
=head2 context
Kubernetes context to use from the kubeconfig. Defaults to current-context.
=head2 resource_map
Optional. Custom resource map for short class names.
=head2 resource_map_from_cluster
Optional boolean. Load resource map from cluster OpenAPI spec.
Defaults to false.
=head2 server
Returns the L<Kubernetes::REST::Server> instance. Croaks if neither C<server>
nor C<kubeconfig> was provided during initialization.
=head2 credentials
Returns the credentials object (typically L<Kubernetes::REST::AuthToken>).
lib/Net/Async/Kubernetes.pm view on Meta::CPAN
=over 4
=item C<$class_or_object> - Resource class name or IO::K8s object
=item C<name> - Resource name (required unless passing object)
=item C<namespace> - Namespace (if namespaced)
=item C<patch> - HashRef of changes to apply (required)
=item C<type> - Patch type: C<'strategic'> (default), C<'merge'>, or C<'json'>
=back
=head2 delete
# By class and name
my $future = $kube->delete('Pod', 'nginx', namespace => 'default');
$future->get;
# Or by object
my $future = $kube->delete($pod_object);
$future->get;
Delete a resource. Returns a L<Future> that resolves to C<1> on success.
Arguments:
=over 4
=item C<$class_or_object> - Resource class name or IO::K8s object
=item C<$name> - Resource name (required unless passing object)
=item C<%args> - Optional parameters (C<namespace>, etc.)
=back
=head2 log
# One-shot mode (Future resolves to full text)
my $text = $kube->log('Pod', 'my-pod',
namespace => 'default',
tailLines => 100,
)->get;
# Streaming mode (Future resolves when stream ends)
$kube->log('Pod', 'my-pod',
namespace => 'default',
follow => 1,
on_line => sub {
my ($event) = @_; # Kubernetes::REST::LogEvent
say $event->line;
},
)->get;
Retrieve logs from a pod.
Without C<on_line>, returns a L<Future> that resolves to the full log text.
With C<on_line>, opens a streaming request and invokes the callback once per
line with L<Kubernetes::REST::LogEvent> objects. The returned L<Future>
resolves when the stream ends.
=head2 port_forward
my $f = $kube->port_forward('Pod', 'my-pod',
namespace => 'default',
ports => [8080, 8443],
on_frame => sub { my ($channel, $payload) = @_; ... },
);
my $session = $f->get;
Create an async pod port-forward session request.
Returns a L<Future> that resolves to the duplex session object returned by the
transport backend. The default transport returns a
L<Net::Async::Kubernetes::PortForwardSession> object.
The session helper supports C<write_channel>, C<write_stdin>, C<resize>, and
C<close>.
C<on_open> receives the created session object.
C<on_frame> receives C<($channel, $payload)> where the first byte of each
binary websocket frame is decoded as Kubernetes channel id.
=head2 exec
my $f = $kube->exec('Pod', 'my-pod',
namespace => 'default',
command => ['sh', '-c', 'id'],
on_frame => sub { my ($channel, $payload) = @_; ... },
);
my $session = $f->get;
Create an async pod exec session request.
Returns a L<Future> that resolves to the duplex session object returned by the
transport backend. The default transport returns a
L<Net::Async::Kubernetes::PortForwardSession> object.
The session helper supports C<write_channel>, C<write_stdin>, C<resize>, and
C<close>.
C<on_open> receives the created session object.
C<on_frame> receives C<($channel, $payload)> where the first byte of each
binary websocket frame is decoded as Kubernetes channel id.
=head2 attach
my $f = $kube->attach('Pod', 'my-pod',
namespace => 'default',
container => 'app',
stdin => 1,
stdout => 1,
stderr => 1,
tty => 0,
on_frame => sub { my ($channel, $payload) = @_; ... },
);
my $session = $f->get;
Create an async pod attach session request.
Returns a L<Future> that resolves to the duplex session object returned by the
transport backend. The default transport returns a
L<Net::Async::Kubernetes::PortForwardSession> object.
The session helper supports C<write_channel>, C<write_stdin>, C<resize>, and
C<close>.
C<on_open> receives the created session object.
C<on_frame> receives C<($channel, $payload)> where the first byte of each
binary websocket frame is decoded as Kubernetes channel id.
=head2 cp_to_pod
my $f = $kube->cp_to_pod('Pod', 'my-pod',
namespace => 'default',
container => 'app',
local => '/tmp/local.txt',
remote => '/tmp/remote.txt',
);
my $result = $f->get;
Copy a single local file into a pod using C<exec()> and stdin streaming.
Returns a L<Future> resolving to a hashref containing C<local>, C<remote>,
C<bytes>, C<stderr>, and C<status>.
=head2 cp_from_pod
my $f = $kube->cp_from_pod('Pod', 'my-pod',
namespace => 'default',
container => 'app',
remote => '/tmp/remote.txt',
local => '/tmp/local.txt',
);
my $result = $f->get;
Copy a single file from a pod using C<exec()> and stdout streaming.
Returns a L<Future> resolving to a hashref containing C<local>, C<remote>,
C<bytes>, C<stderr>, and C<status>.
=head2 watcher
my $watcher = $kube->watcher('Pod',
namespace => 'default',
label_selector => 'app=web',
on_added => sub { my ($pod) = @_; ... },
on_modified => sub { my ($pod) = @_; ... },
on_deleted => sub { my ($pod) = @_; ... },
);
Create and register a L<Net::Async::Kubernetes::Watcher> for the specified
resource type. The watcher is added as a child notifier and will start
automatically when the parent is added to a loop.
Returns the watcher object.
Arguments:
=over 4
=item C<$resource> - Resource type to watch (e.g., C<'Pod'>, C<'Deployment'>)
=item C<%args> - Watcher parameters (C<namespace>, C<label_selector>, callbacks, etc.)
=back
See L<Net::Async::Kubernetes::Watcher> for all available parameters.
=head2 controller
my $controller = $kube->controller(
on_reconcile => sub {
my ($ctx) = @_;
...
},
);
Create and register a L<Net::Async::Kubernetes::Controller> runtime bound to
this client. The controller is added as a child notifier and can register
resource watches, queue reconcile work, and patch object status.
Returns the controller object.
=head1 NAME
Net::Async::Kubernetes - Async Kubernetes client for IO::Async
=head1 SEE ALSO
L<Net::Async::Kubernetes::Watcher>, L<Kubernetes::REST>, L<IO::Async>,
L<IO::K8s>, L<Net::Async::WebSocket::Client>
=head1 SUPPORT
=head2 Issues
( run in 1.659 second using v1.01-cache-2.11-cpan-140bd7fdf52 )