Net-Async-Kubernetes
view release on metacpan or search on metacpan
t/lib/MockTransport.pm view on Meta::CPAN
package MockTransport;
# Mock HTTP transport for Net::Async::Kubernetes tests.
# Overrides _do_request and _do_streaming_request to return
# pre-configured responses without needing a real cluster.
use strict;
use warnings;
use Future;
use JSON::MaybeXS;
use Kubernetes::REST::HTTPResponse;
my $json = JSON::MaybeXS->new(utf8 => 1, convert_blessed => 1);
my %responses;
my @request_log;
my %watch_events;
my %watch_opts;
my %stream_chunks;
my %stream_opts;
my $duplex_session;
sub reset {
%responses = ();
@request_log = ();
%watch_events = ();
%watch_opts = ();
%stream_chunks = ();
%stream_opts = ();
$duplex_session = undef;
}
sub request_log { @request_log }
sub last_request { $request_log[-1] }
# Register a mock response for a method+path combo
# mock_response('GET', '/api/v1/namespaces', { kind => 'NamespaceList', ... });
# mock_response('GET', '/api/v1/namespaces', { ... }, 404);
sub mock_response {
my ($method, $path, $data, $status) = @_;
$status //= 200;
my $key = uc($method) . ' ' . $path;
$responses{$key} = {
status => $status,
content => ref($data) ? $json->encode($data) : ($data // ''),
};
}
# Register mock watch events for a path
# mock_watch_events('/api/v1/pods', [ { type => 'ADDED', object => {...} }, ... ]);
# mock_watch_events('/api/v1/pods', [...], { complete => 1 }); # resolve after events
# mock_watch_events('/api/v1/pods', [...], { fail => 'some error' }); # fail after events
sub mock_watch_events {
my ($path, $events, $opts) = @_;
$watch_events{$path} = $events;
$watch_opts{$path} = $opts // {};
}
# Register mock streaming chunks for a path (e.g. Pod logs)
# mock_stream_chunks('/api/v1/namespaces/default/pods/x/log', [ "line1\n", "line2\n" ]);
# mock_stream_chunks('/api/v1/namespaces/default/pods/x/log', [...], { complete => 1 });
# mock_stream_chunks('/api/v1/namespaces/default/pods/x/log', [...], { fail => 'some error' });
sub mock_stream_chunks {
my ($path, $chunks, $opts) = @_;
$stream_chunks{$path} = $chunks;
$stream_opts{$path} = $opts // {};
}
sub mock_duplex_session {
my ($session) = @_;
$duplex_session = $session;
}
# Install the mock transport on a Net::Async::Kubernetes instance.
# Replaces _do_request and _do_streaming_request with mock versions.
sub install {
my ($kube) = @_;
no warnings 'redefine';
my $class = ref($kube) || $kube;
# Override _do_request
no strict 'refs';
*{"${class}::_do_request"} = sub {
my ($self, $req) = @_;
my $method = $req->method;
my $url = $req->url;
# Strip the server prefix to get the path
my $path = $url;
$path =~ s{^https?://[^/]+}{};
push @request_log, {
method => $method,
url => $url,
path => $path,
headers => $req->headers,
content => $req->content,
};
my $key = uc($method) . ' ' . $path;
if (my $resp = $responses{$key}) {
return Future->done(Kubernetes::REST::HTTPResponse->new(
status => $resp->{status},
content => $resp->{content},
));
}
# Not found
return Future->done(Kubernetes::REST::HTTPResponse->new(
status => 404,
content => $json->encode({
kind => 'Status', status => 'Failure',
message => "Mock: no response for $key",
code => 404,
}),
));
};
# Override _do_streaming_request
# Events are delivered asynchronously via the event loop so that
# $watcher has been assigned in the test closure before callbacks fire.
# Returns a pending Future that stays open (like a real watch connection);
# the test calls $watcher->stop to cancel it.
*{"${class}::_do_streaming_request"} = sub {
my ($self, $req, $on_chunk) = @_;
my $url = $req->url;
my $path = $url;
$path =~ s{^https?://[^/]+}{};
$path =~ s{\?.*}{}; # Strip query params
push @request_log, {
method => $req->method,
url => $url,
path => $path,
streaming => 1,
};
if (my $events = $watch_events{$path}) {
my $f = $self->loop->new_future;
my $opts = $watch_opts{$path} || {};
my $status = $opts->{status} // 200;
if (@$events || $opts->{complete} || $opts->{fail}) {
# Deliver all events in one tick (like a real chunked response).
# Don't check cancellation between events - the watcher's chunk
# callback can handle events even after stop() is called.
$self->loop->later(sub {
for my $event (@$events) {
my $line = $json->encode($event) . "\n";
$on_chunk->($line);
}
# Optionally complete or fail after delivering events
if ($opts->{fail}) {
$f->fail($opts->{fail}) unless $f->is_cancelled;
} elsif ($opts->{complete}) {
$f->done(Kubernetes::REST::HTTPResponse->new(
status => $status,
content => '',
)) unless $f->is_cancelled;
}
});
}
# Return pending future - will be cancelled when stop() is called
return $f;
}
if (my $chunks = $stream_chunks{$path}) {
my $f = $self->loop->new_future;
my $opts = $stream_opts{$path} || {};
my $status = $opts->{status} // 200;
$self->loop->later(sub {
for my $chunk (@$chunks) {
$on_chunk->($chunk);
}
if ($opts->{fail}) {
$f->fail($opts->{fail}) unless $f->is_cancelled;
} else {
$f->done(Kubernetes::REST::HTTPResponse->new(
status => $status,
content => '',
)) unless $f->is_cancelled;
}
});
return $f;
}
return Future->fail("Mock: no watch events for $path");
};
# Override _do_duplex_request
*{"${class}::_do_duplex_request"} = sub {
my ($self, $req, %callbacks) = @_;
( run in 0.489 second using v1.01-cache-2.11-cpan-140bd7fdf52 )