Net-Async-Kubernetes

 view release on metacpan or  search on metacpan

lib/Net/Async/Kubernetes.pm  view on Meta::CPAN

    my $path = $rest->build_path($class, name => $name, namespace => $namespace);
    my $req = $rest->prepare_request('DELETE', $path);

    return $self->_do_request($req)->then(sub {
        my ($response) = @_;
        $rest->check_response($response, "delete $class");
        return Future->done(1);
    });
}


sub log {
    my ($self, $short_class, @rest_args) = @_;

    my $rest = $self->_rest;
    my %args;

    # Support: log('Pod', 'name', ...) and log('Pod', name => 'name', ...)
    if (@rest_args >= 1
        && !ref($rest_args[0])
        && $rest_args[0] !~ /^(name|namespace|container|follow|tailLines|sinceSeconds|sinceTime|timestamps|previous|limitBytes|on_line)$/
    ) {
        $args{name} = shift @rest_args;
        return Future->fail("Invalid arguments to log()") if @rest_args % 2;
        %args = (%args, @rest_args);
    } elsif (@rest_args % 2 == 0) {
        %args = @rest_args;
    } else {
        return Future->fail("Invalid arguments to log()");
    }

    return Future->fail("name required for log") unless $args{name};

    my $on_line       = delete $args{on_line};
    my $container     = delete $args{container};
    my $follow        = delete $args{follow};
    my $tail_lines    = delete $args{tailLines};
    my $since_seconds = delete $args{sinceSeconds};
    my $since_time    = delete $args{sinceTime};
    my $timestamps    = delete $args{timestamps};
    my $previous      = delete $args{previous};
    my $limit_bytes   = delete $args{limitBytes};

    my $class = $rest->expand_class($short_class);
    my $path = $rest->build_path($class, %args) . '/log';

    my %params;
    $params{container}    = $container     if defined $container;
    $params{follow}       = 'true'         if $follow;
    $params{tailLines}    = $tail_lines    if defined $tail_lines;
    $params{sinceSeconds} = $since_seconds if defined $since_seconds;
    $params{sinceTime}    = $since_time    if defined $since_time;
    $params{timestamps}   = 'true'         if $timestamps;
    $params{previous}     = 'true'         if $previous;
    $params{limitBytes}   = $limit_bytes   if defined $limit_bytes;

    if ($on_line) {
        my $req = $rest->prepare_request('GET', $path, parameters => \%params);
        my $buffer = '';

        return $self->_do_streaming_request($req, sub {
            my ($chunk) = @_;
            for my $event ($rest->process_log_chunk(\$buffer, $chunk)) {
                $on_line->($event);
            }
        })->then(sub {
            my ($response) = @_;
            $rest->check_response($response, "log $short_class");
            if (length $buffer) {
                $on_line->(Kubernetes::REST::LogEvent->new(line => $buffer));
            }
            return Future->done(undef);
        });
    }

    my $req = $rest->prepare_request('GET', $path,
        %params ? (parameters => \%params) : (),
    );
    return $self->_do_request($req)->then(sub {
        my ($response) = @_;
        $rest->check_response($response, "log $short_class");
        return Future->done($response->content);
    });
}


sub port_forward {
    my ($self, $short_class, @rest_args) = @_;

    my $rest = $self->_rest;
    my %args;

    # Support: port_forward('Pod', 'name', ...) and port_forward('Pod', name => 'name', ...)
    if (@rest_args >= 1
        && !ref($rest_args[0])
        && $rest_args[0] !~ /^(name|namespace|ports|subprotocol|on_open|on_frame|on_close|on_error)$/
    ) {
        $args{name} = shift @rest_args;
        return Future->fail("Invalid arguments to port_forward()") if @rest_args % 2;
        %args = (%args, @rest_args);
    } elsif (@rest_args % 2 == 0) {
        %args = @rest_args;
    } else {
        return Future->fail("Invalid arguments to port_forward()");
    }

    return Future->fail("name required for port_forward") unless $args{name};

    my $ports = delete $args{ports};
    return Future->fail("ports required for port_forward") unless defined $ports;
    $ports = [$ports] unless ref($ports) eq 'ARRAY';
    return Future->fail("ports required for port_forward") unless @$ports;
    for my $p (@$ports) {
        return Future->fail("invalid port '$p' for port_forward")
            unless defined($p) && $p =~ /^\d+$/ && $p > 0 && $p <= 65535;
    }

    my $subprotocol = delete $args{subprotocol} // 'v4.channel.k8s.io';
    my $on_open  = delete $args{on_open};
    my $on_frame = delete $args{on_frame};
    my $on_close = delete $args{on_close};

lib/Net/Async/Kubernetes.pm  view on Meta::CPAN

    my ($self, $resource, %args) = @_;

    require Net::Async::Kubernetes::Watcher;

    my $watcher = Net::Async::Kubernetes::Watcher->new(
        kube     => $self,
        resource => $resource,
        %args,
    );

    $self->add_child($watcher);
    return $watcher;
}

sub controller {
    my ($self, %args) = @_;

    require Net::Async::Kubernetes::Controller;

    my $controller = Net::Async::Kubernetes::Controller->new(
        kube => $self,
        %args,
    );

    $self->add_child($controller);
    return $controller;
}



# ============================================================================
# HTTP TRANSPORT
# ============================================================================

sub _do_request {
    my ($self, $req) = @_;

    my $uri = URI->new($req->url);

    my @content_args;
    if (defined $req->content) {
        my $ct = $req->headers->{'Content-Type'} // 'application/json';
        @content_args = (content => $req->content, content_type => $ct);
    }

    return $self->_http->do_request(
        method  => $req->method,
        uri     => $uri,
        headers => $req->headers,
        @content_args,
        $self->_ssl_options,
    )->then(sub {
        my ($response) = @_;
        return Future->done(Kubernetes::REST::HTTPResponse->new(
            status  => $response->code,
            content => $response->decoded_content // $response->content // '',
        ));
    });
}

sub _do_streaming_request {
    my ($self, $req, $on_chunk) = @_;

    my $uri = URI->new($req->url);

    return $self->_http->do_request(
        method  => $req->method,
        uri     => $uri,
        headers => $req->headers,
        on_header => sub {
            my ($response) = @_;
            return sub {
                my ($chunk) = @_;
                if (defined $chunk) {
                    $on_chunk->($chunk);
                }
            };
        },
        $self->_ssl_options,
    )->then(sub {
        my ($response) = @_;
        return Future->done(Kubernetes::REST::HTTPResponse->new(
            status  => $response->code,
            content => '',
        ));
    });
}

sub _do_duplex_request {
    my ($self, $req, %callbacks) = @_;
    my $loop = eval { $self->loop };
    return Future->fail("port_forward requires Net::Async::Kubernetes to be added to an IO::Async::Loop")
        unless $loop;

    my $on_open  = $callbacks{on_open};
    my $on_frame = $callbacks{on_frame};
    my $on_close = $callbacks{on_close};
    my $on_error = $callbacks{on_error};

    my $ws_url = $self->_build_websocket_url($req->url);
    my $ws_req = $self->_build_websocket_request($req);

    my $client;
    my $session;
    my $close_notified = 0;

    my $detach_client = sub {
        return unless $client;
        return unless $client->can('parent');
        return unless $client->parent && $client->parent == $self;
        $self->remove_child($client);
    };

    my $notify_error = sub {
        my ($err) = @_;
        return unless ref($on_error) eq 'CODE';
        my $ok = eval { $on_error->($err); 1 };
        return if $ok;
        warn $@;
    };

lib/Net/Async/Kubernetes.pm  view on Meta::CPAN

package Net::Async::Kubernetes;

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Net::Async::Kubernetes - Async Kubernetes client for IO::Async

=head1 VERSION

version 0.007

=head1 SYNOPSIS

    use IO::Async::Loop;
    use Net::Async::Kubernetes;

    my $loop = IO::Async::Loop->new;

    # From kubeconfig (easiest)
    my $kube = Net::Async::Kubernetes->new(
        kubeconfig => "$ENV{HOME}/.kube/config",
    );
    $loop->add($kube);

    # In-cluster: auto-detects service account token (no config needed)
    my $kube = Net::Async::Kubernetes->new;
    $loop->add($kube);

    # Or with explicit server/credentials
    my $kube = Net::Async::Kubernetes->new(
        server      => { endpoint => 'https://kubernetes.local:6443' },
        credentials => { token => $token },
    );
    $loop->add($kube);

    # Future-based CRUD
    my $pods = $kube->list('Pod', namespace => 'default')->get;

    my $pod = $kube->get('Pod', 'nginx', namespace => 'default')->get;

    my $patched = $kube->patch('Pod', 'nginx',
        namespace => 'default',
        patch     => { metadata => { labels => { env => 'staging' } } },
    )->get;

    $kube->delete('Pod', 'nginx', namespace => 'default')->get;

    # Pod logs (one-shot)
    my $text = $kube->log('Pod', 'nginx',
        namespace => 'default',
        tailLines => 100,
    )->get;

    # Pod logs (streaming)
    $kube->log('Pod', 'nginx',
        namespace => 'default',
        follow    => 1,
        on_line   => sub { my ($event) = @_; say $event->line },
    )->get;

    # Port-forward (built-in websocket duplex support)
    my $pf = $kube->port_forward('Pod', 'nginx',
        namespace => 'default',
        ports     => [8080],
        on_frame  => sub { my ($channel, $payload) = @_; ... },
    )->get;

    $pf->write_channel(0, "GET / HTTP/1.1\r\n\r\n");
    $pf->close(code => 1000);

    # Pod exec (websocket duplex)
    my $exec = $kube->exec('Pod', 'nginx',
        namespace => 'default',
        command   => ['sh', '-c', 'id'],
        on_frame  => sub { my ($channel, $payload) = @_; ... },
    )->get;
    $exec->write_stdin("id\n");
    $exec->resize(width => 120, height => 40);

    # Pod attach (websocket duplex)
    my $attach = $kube->attach('Pod', 'nginx',
        namespace => 'default',
        container => 'app',
        stdin     => 1,
        stdout    => 1,
        stderr    => 1,
        tty       => 0,
        on_frame  => sub { my ($channel, $payload) = @_; ... },
    )->get;
    $attach->write_stdin("help\n");

    # Copy local file to pod and back (built on exec)
    $kube->cp_to_pod('Pod', 'nginx',
        namespace => 'default',
        local     => '/tmp/local.txt',
        remote    => '/tmp/remote.txt',
    )->get;
    $kube->cp_from_pod('Pod', 'nginx',
        namespace => 'default',
        remote    => '/tmp/remote.txt',
        local     => '/tmp/downloaded.txt',
    )->get;

    # Watcher with auto-reconnect
    my $watcher = $kube->watcher('Pod',
        namespace   => 'default',
        on_added    => sub { my ($pod) = @_; say "Added: " . $pod->metadata->name },
        on_modified => sub { my ($pod) = @_; say "Modified: " . $pod->metadata->name },
        on_deleted  => sub { my ($pod) = @_; say "Deleted: " . $pod->metadata->name },
    );

    $loop->run;

=head1 DESCRIPTION

C<Net::Async::Kubernetes> is an async Kubernetes client built on L<IO::Async>.
It extends L<IO::Async::Notifier> and uses L<Net::Async::HTTP> for
non-blocking HTTP communication, plus L<Net::Async::WebSocket::Client> for
duplex subresources like pod port-forward.

All CRUD, log, port-forward, exec, attach, and cp helper methods return L<Future> objects. The
L<Net::Async::Kubernetes::Watcher>
provides auto-reconnecting event streaming with separate callbacks per
event type.

Request preparation and response processing are delegated to
L<Kubernetes::REST>, so the same IO::K8s object inflation, short class
names, and CRD support are available.

Authentication is automatically resolved in the following order:

=over 4

=item 1. Explicit C<server> and C<credentials> parameters

=item 2. C<kubeconfig> file (via L<Kubernetes::REST::Kubeconfig>)

=item 3. In-cluster service account token at
C</var/run/secrets/kubernetes.io/serviceaccount/token> (automatic when
running inside a Kubernetes pod)

=back

=head2 configure

Internal L<IO::Async::Notifier> configuration method. Handles initialization
of C<kubeconfig>, C<context>, C<server>, C<credentials>, C<resource_map>,
and C<resource_map_from_cluster> parameters.

If C<kubeconfig> is provided without explicit C<server> or C<credentials>,
they are loaded automatically via L<Kubernetes::REST::Kubeconfig>.

When running inside a Kubernetes pod (no C<kubeconfig> or C<server> set),
the service account token at
C</var/run/secrets/kubernetes.io/serviceaccount/token> is used
automatically for in-cluster authentication.

=head2 kubeconfig

Path to kubeconfig file. If provided, C<server> and C<credentials> are
extracted automatically (via L<Kubernetes::REST::Kubeconfig>).

=head2 context

Kubernetes context to use from the kubeconfig. Defaults to current-context.

=head2 resource_map

Optional. Custom resource map for short class names.

=head2 resource_map_from_cluster

Optional boolean. Load resource map from cluster OpenAPI spec.
Defaults to false.

=head2 server

Returns the L<Kubernetes::REST::Server> instance. Croaks if neither C<server>
nor C<kubeconfig> was provided during initialization.

=head2 credentials

Returns the credentials object (typically L<Kubernetes::REST::AuthToken>).

lib/Net/Async/Kubernetes.pm  view on Meta::CPAN

=over 4

=item C<$class_or_object> - Resource class name or IO::K8s object

=item C<name> - Resource name (required unless passing object)

=item C<namespace> - Namespace (if namespaced)

=item C<patch> - HashRef of changes to apply (required)

=item C<type> - Patch type: C<'strategic'> (default), C<'merge'>, or C<'json'>

=back

=head2 delete

    # By class and name
    my $future = $kube->delete('Pod', 'nginx', namespace => 'default');
    $future->get;

    # Or by object
    my $future = $kube->delete($pod_object);
    $future->get;

Delete a resource. Returns a L<Future> that resolves to C<1> on success.

Arguments:

=over 4

=item C<$class_or_object> - Resource class name or IO::K8s object

=item C<$name> - Resource name (required unless passing object)

=item C<%args> - Optional parameters (C<namespace>, etc.)

=back

=head2 log

    # One-shot mode (Future resolves to full text)
    my $text = $kube->log('Pod', 'my-pod',
        namespace => 'default',
        tailLines => 100,
    )->get;

    # Streaming mode (Future resolves when stream ends)
    $kube->log('Pod', 'my-pod',
        namespace => 'default',
        follow    => 1,
        on_line   => sub {
            my ($event) = @_;  # Kubernetes::REST::LogEvent
            say $event->line;
        },
    )->get;

Retrieve logs from a pod.

Without C<on_line>, returns a L<Future> that resolves to the full log text.

With C<on_line>, opens a streaming request and invokes the callback once per
line with L<Kubernetes::REST::LogEvent> objects. The returned L<Future>
resolves when the stream ends.

=head2 port_forward

    my $f = $kube->port_forward('Pod', 'my-pod',
        namespace => 'default',
        ports     => [8080, 8443],
        on_frame  => sub { my ($channel, $payload) = @_; ... },
    );
    my $session = $f->get;

Create an async pod port-forward session request.

Returns a L<Future> that resolves to the duplex session object returned by the
transport backend. The default transport returns a
L<Net::Async::Kubernetes::PortForwardSession> object.

The session helper supports C<write_channel>, C<write_stdin>, C<resize>, and
C<close>.

C<on_open> receives the created session object.

C<on_frame> receives C<($channel, $payload)> where the first byte of each
binary websocket frame is decoded as Kubernetes channel id.

=head2 exec

    my $f = $kube->exec('Pod', 'my-pod',
        namespace => 'default',
        command   => ['sh', '-c', 'id'],
        on_frame  => sub { my ($channel, $payload) = @_; ... },
    );
    my $session = $f->get;

Create an async pod exec session request.

Returns a L<Future> that resolves to the duplex session object returned by the
transport backend. The default transport returns a
L<Net::Async::Kubernetes::PortForwardSession> object.

The session helper supports C<write_channel>, C<write_stdin>, C<resize>, and
C<close>.

C<on_open> receives the created session object.

C<on_frame> receives C<($channel, $payload)> where the first byte of each
binary websocket frame is decoded as Kubernetes channel id.

=head2 attach

    my $f = $kube->attach('Pod', 'my-pod',
        namespace => 'default',
        container => 'app',
        stdin     => 1,
        stdout    => 1,
        stderr    => 1,
        tty       => 0,
        on_frame  => sub { my ($channel, $payload) = @_; ... },
    );
    my $session = $f->get;

Create an async pod attach session request.

Returns a L<Future> that resolves to the duplex session object returned by the
transport backend. The default transport returns a
L<Net::Async::Kubernetes::PortForwardSession> object.

The session helper supports C<write_channel>, C<write_stdin>, C<resize>, and
C<close>.

C<on_open> receives the created session object.

C<on_frame> receives C<($channel, $payload)> where the first byte of each
binary websocket frame is decoded as Kubernetes channel id.

=head2 cp_to_pod

    my $f = $kube->cp_to_pod('Pod', 'my-pod',
        namespace => 'default',
        container => 'app',
        local     => '/tmp/local.txt',
        remote    => '/tmp/remote.txt',
    );
    my $result = $f->get;

Copy a single local file into a pod using C<exec()> and stdin streaming.

Returns a L<Future> resolving to a hashref containing C<local>, C<remote>,
C<bytes>, C<stderr>, and C<status>.

=head2 cp_from_pod

    my $f = $kube->cp_from_pod('Pod', 'my-pod',
        namespace => 'default',
        container => 'app',
        remote    => '/tmp/remote.txt',
        local     => '/tmp/local.txt',
    );
    my $result = $f->get;

Copy a single file from a pod using C<exec()> and stdout streaming.

Returns a L<Future> resolving to a hashref containing C<local>, C<remote>,
C<bytes>, C<stderr>, and C<status>.

=head2 watcher

    my $watcher = $kube->watcher('Pod',
        namespace      => 'default',
        label_selector => 'app=web',
        on_added       => sub { my ($pod) = @_; ... },
        on_modified    => sub { my ($pod) = @_; ... },
        on_deleted     => sub { my ($pod) = @_; ... },
    );

Create and register a L<Net::Async::Kubernetes::Watcher> for the specified
resource type. The watcher is added as a child notifier and will start
automatically when the parent is added to a loop.

Returns the watcher object.

Arguments:

=over 4

=item C<$resource> - Resource type to watch (e.g., C<'Pod'>, C<'Deployment'>)

=item C<%args> - Watcher parameters (C<namespace>, C<label_selector>, callbacks, etc.)

=back

See L<Net::Async::Kubernetes::Watcher> for all available parameters.

=head2 controller

    my $controller = $kube->controller(
        on_reconcile => sub {
            my ($ctx) = @_;
            ...
        },
    );

Create and register a L<Net::Async::Kubernetes::Controller> runtime bound to
this client. The controller is added as a child notifier and can register
resource watches, queue reconcile work, and patch object status.

Returns the controller object.

=head1 NAME

Net::Async::Kubernetes - Async Kubernetes client for IO::Async

=head1 SEE ALSO

L<Net::Async::Kubernetes::Watcher>, L<Kubernetes::REST>, L<IO::Async>,
L<IO::K8s>, L<Net::Async::WebSocket::Client>

=head1 SUPPORT

=head2 Issues



( run in 1.659 second using v1.01-cache-2.11-cpan-140bd7fdf52 )