Net-Async-Kubernetes

 view release on metacpan or  search on metacpan

lib/Net/Async/Kubernetes/Watcher.pm  view on Meta::CPAN

    $self->stop;
}

sub start {
    my ($self) = @_;
    return if $self->{_watching};
    $self->{_stopped} = 0;
    $self->_start_watch;
}


sub stop {
    my ($self) = @_;
    $self->{_stopped} = 1;
    $self->{_watching} = 0;
    if (my $f = delete $self->{_watch_future}) {
        return if $f->is_ready;
        # Defer cancel to next loop iteration to avoid closing the HTTP
        # connection from within its own on_read handler, which triggers
        # Net::Async::HTTP's "Spurious on_read of connection while idle".
        if (my $loop = $self->loop) {
            $loop->later(sub {
                $f->cancel if !$f->is_ready;
            });
        } else {
            $f->cancel;
        }
    }
}


sub _start_watch {
    my ($self) = @_;
    return if $self->{_stopped};
    return unless $self->{kube};

    $self->{_watching} = 1;
    $self->{_buffer} = '';

    my $rest = $self->kube->_rest;
    my $class = $rest->expand_class($self->resource);
    my $path = $rest->build_path($class,
        ($self->namespace ? (namespace => $self->namespace) : ()),
    );

    my %params = (
        watch          => 'true',
        timeoutSeconds => $self->timeout,
    );
    $params{resourceVersion} = $self->{_resource_version}
        if defined $self->{_resource_version};
    $params{labelSelector} = $self->label_selector
        if defined $self->label_selector;
    $params{fieldSelector} = $self->field_selector
        if defined $self->field_selector;

    my $req = $rest->prepare_request('GET', $path, parameters => \%params);

    weaken(my $weak_self = $self);

    my $f = $self->kube->_do_streaming_request($req, sub {
        my ($chunk) = @_;
        return unless $weak_self;

        my $buffer = $weak_self->{_buffer};
        for my $result ($rest->process_watch_chunk($class, \$buffer, $chunk)) {
            $weak_self->{_buffer} = $buffer;

            if ($result->{resourceVersion}) {
                $weak_self->{_resource_version} = $result->{resourceVersion};
            }

            my $event = $result->{event};

            if ($result->{error_code} == 410) {
                $weak_self->{_resource_version} = undef;
                return;
            }

            $weak_self->_dispatch_event($event);
        }
        $weak_self->{_buffer} = $buffer;
    });

    $f->on_done(sub {
        return unless $weak_self;
        return if $weak_self->{_stopped};
        $weak_self->{_watching} = 0;
        $weak_self->_start_watch;
    });

    $f->on_fail(sub {
        my ($error) = @_;
        return unless $weak_self;
        return if $weak_self->{_stopped};
        $weak_self->{_watching} = 0;
        $weak_self->loop->watch_time(
            after => 1,
            code  => sub { $weak_self->_start_watch if $weak_self && !$weak_self->{_stopped} },
        );
    });

    $self->{_watch_future} = $f;
}

sub _dispatch_event {
    my ($self, $event) = @_;
    my $type = $event->type;

    # Client-side event type filter
    # Explicit event_types wins; otherwise auto-derive from callbacks
    # (on_event is catch-all, so if set, all types pass)
    if (my $types = $self->event_types) {
        my %allowed = map { uc($_) => 1 } @$types;
        return unless $allowed{$type};
    } elsif (!$self->on_event) {
        my %has;
        $has{ADDED}    = 1 if $self->on_added;
        $has{MODIFIED} = 1 if $self->on_modified;
        $has{DELETED}  = 1 if $self->on_deleted;
        $has{ERROR}    = 1 if $self->on_error;



( run in 1.172 second using v1.01-cache-2.11-cpan-140bd7fdf52 )