AnyEvent-Consul-Exec

 view release on metacpan or  search on metacpan

lib/AnyEvent/Consul/Exec.pm  view on Meta::CPAN

        if ($act eq 'exit') {
          $self->{_nexit}++;
          $self->{on_exit}->($node, $kv->value);

          # XXX super naive. there might be some that haven't acked yet
          #     should schedule done for a lil bit in the future
          if (   $self->{_nack} == $self->{_nexit}
              && $self->{_nexit} >= $self->{min_node_count})
          {
            $self->{_done} = 1;
            $self->_cleanup(sub { $self->{on_done}->() });
          }
          next;
        }

        warn "W: $node: unknown action: $act\n";
      }

      $self->_wait_responses($meta->index) unless $self->{_done};
    },
  );
}

sub _fire_event {
  my ($self) = @_;
  my $payload = {
    Prefix  => "_rexec",
    Session => $self->{_sid},
  };
  $self->{_c}->event->fire(
    "_rexec",
    payload => encode_json($payload),
    $self->{dc_args}->@*,
    $self->{node}    ? (node    => $self->{node})    : (),
    $self->{service} ? (service => $self->{service}) : (),
    $self->{tag}     ? (tag     => $self->{tag})     : (),
    cb => sub { $self->_wait_responses(0) },
  );
}

sub _setup_job {
  my ($self) = @_;
  my $job = {
    Command => $self->{command},
    Wait    => $self->{wait} * 1_000_000_000, # nanoseconds
  };
  $self->{_c}->kv->put(
    "_rexec/$self->{_sid}/job",
    encode_json($job),
    acquire => $self->{_sid},
    $self->{dc_args}->@*,
    cb => sub { $self->_fire_event },
  );
}

sub _start_session {
  my ($self) = @_;

  my $session_started_cb = sub {
    $self->{_sid} = shift;
    $self->{_refresh_guard} = AnyEvent->timer(after => "5s", interval => "5s", cb => sub {
      $self->{_c}->session->renew(
        $self->{_sid},
        $self->{dc_args}->@*,
      );
    });
    $self->_setup_job;
  };

  if ($self->{dc}) {
    $self->{_c}->health->service(
      "consul",
      $self->{dc_args}->@*,
      cb => sub {
        my ($services) = @_;
        my $service = shift $services->@*;
        unless ($service) {
          # XXX no consuls at remote DC
          ...
        }
        my $node = $service->node->name;
        $self->{_c}->session->create(
          Consul::Session->new(
            name     => 'Remote exec via ...', # XXX local node name
            behavior => 'delete',
            ttl      => "15s",
            node     => $node,
          ),
          $self->{dc_args}->@*,
          cb => $session_started_cb,
        );
      },
      error_cb => sub {
        my ($err) = @_;
        $self->_cleanup(sub { $self->{on_error}->($err) });
      },
    );
  }

  else {
    $self->{_c}->session->create(
      Consul::Session->new(
        name     => 'Remote exec',
        behavior => 'delete',
        ttl      => "15s",
      ),
      cb => $session_started_cb,
    );
  }
}

sub _cleanup {
  my ($self, $cb) = @_;
  delete $self->{_refresh_guard};
  if ($self->{_sid}) {
    $self->{_c}->session->destroy(
      $self->{_sid},
      $self->{dc_args}->@*,
      cb => sub {
      $self->{_c}->kv->delete(
        "_rexec/$self->{_sid}",



( run in 0.641 second using v1.01-cache-2.11-cpan-5b529ec07f3 )