Net-Async-Kubernetes
view release on metacpan or search on metacpan
lib/Net/Async/Kubernetes/Watcher.pm view on Meta::CPAN
$self->stop;
}
sub start {
my ($self) = @_;
return if $self->{_watching};
$self->{_stopped} = 0;
$self->_start_watch;
}
sub stop {
my ($self) = @_;
$self->{_stopped} = 1;
$self->{_watching} = 0;
if (my $f = delete $self->{_watch_future}) {
return if $f->is_ready;
# Defer cancel to next loop iteration to avoid closing the HTTP
# connection from within its own on_read handler, which triggers
# Net::Async::HTTP's "Spurious on_read of connection while idle".
if (my $loop = $self->loop) {
$loop->later(sub {
$f->cancel if !$f->is_ready;
});
} else {
$f->cancel;
}
}
}
sub _start_watch {
my ($self) = @_;
return if $self->{_stopped};
return unless $self->{kube};
$self->{_watching} = 1;
$self->{_buffer} = '';
my $rest = $self->kube->_rest;
my $class = $rest->expand_class($self->resource);
my $path = $rest->build_path($class,
($self->namespace ? (namespace => $self->namespace) : ()),
);
my %params = (
watch => 'true',
timeoutSeconds => $self->timeout,
);
$params{resourceVersion} = $self->{_resource_version}
if defined $self->{_resource_version};
$params{labelSelector} = $self->label_selector
if defined $self->label_selector;
$params{fieldSelector} = $self->field_selector
if defined $self->field_selector;
my $req = $rest->prepare_request('GET', $path, parameters => \%params);
weaken(my $weak_self = $self);
my $f = $self->kube->_do_streaming_request($req, sub {
my ($chunk) = @_;
return unless $weak_self;
my $buffer = $weak_self->{_buffer};
for my $result ($rest->process_watch_chunk($class, \$buffer, $chunk)) {
$weak_self->{_buffer} = $buffer;
if ($result->{resourceVersion}) {
$weak_self->{_resource_version} = $result->{resourceVersion};
}
my $event = $result->{event};
if ($result->{error_code} == 410) {
$weak_self->{_resource_version} = undef;
return;
}
$weak_self->_dispatch_event($event);
}
$weak_self->{_buffer} = $buffer;
});
$f->on_done(sub {
return unless $weak_self;
return if $weak_self->{_stopped};
$weak_self->{_watching} = 0;
$weak_self->_start_watch;
});
$f->on_fail(sub {
my ($error) = @_;
return unless $weak_self;
return if $weak_self->{_stopped};
$weak_self->{_watching} = 0;
$weak_self->loop->watch_time(
after => 1,
code => sub { $weak_self->_start_watch if $weak_self && !$weak_self->{_stopped} },
);
});
$self->{_watch_future} = $f;
}
sub _dispatch_event {
my ($self, $event) = @_;
my $type = $event->type;
# Client-side event type filter
# Explicit event_types wins; otherwise auto-derive from callbacks
# (on_event is catch-all, so if set, all types pass)
if (my $types = $self->event_types) {
my %allowed = map { uc($_) => 1 } @$types;
return unless $allowed{$type};
} elsif (!$self->on_event) {
my %has;
$has{ADDED} = 1 if $self->on_added;
$has{MODIFIED} = 1 if $self->on_modified;
$has{DELETED} = 1 if $self->on_deleted;
$has{ERROR} = 1 if $self->on_error;
( run in 1.172 second using v1.01-cache-2.11-cpan-140bd7fdf52 )