AnyEvent-Consul-Exec
view release on metacpan or search on metacpan
lib/AnyEvent/Consul/Exec.pm view on Meta::CPAN
if ($act eq 'exit') {
$self->{_nexit}++;
$self->{on_exit}->($node, $kv->value);
# XXX super naive. there might be some that haven't acked yet
# should schedule done for a lil bit in the future
if ( $self->{_nack} == $self->{_nexit}
&& $self->{_nexit} >= $self->{min_node_count})
{
$self->{_done} = 1;
$self->_cleanup(sub { $self->{on_done}->() });
}
next;
}
warn "W: $node: unknown action: $act\n";
}
$self->_wait_responses($meta->index) unless $self->{_done};
},
);
}
sub _fire_event {
my ($self) = @_;
my $payload = {
Prefix => "_rexec",
Session => $self->{_sid},
};
$self->{_c}->event->fire(
"_rexec",
payload => encode_json($payload),
$self->{dc_args}->@*,
$self->{node} ? (node => $self->{node}) : (),
$self->{service} ? (service => $self->{service}) : (),
$self->{tag} ? (tag => $self->{tag}) : (),
cb => sub { $self->_wait_responses(0) },
);
}
sub _setup_job {
my ($self) = @_;
my $job = {
Command => $self->{command},
Wait => $self->{wait} * 1_000_000_000, # nanoseconds
};
$self->{_c}->kv->put(
"_rexec/$self->{_sid}/job",
encode_json($job),
acquire => $self->{_sid},
$self->{dc_args}->@*,
cb => sub { $self->_fire_event },
);
}
sub _start_session {
my ($self) = @_;
my $session_started_cb = sub {
$self->{_sid} = shift;
$self->{_refresh_guard} = AnyEvent->timer(after => "5s", interval => "5s", cb => sub {
$self->{_c}->session->renew(
$self->{_sid},
$self->{dc_args}->@*,
);
});
$self->_setup_job;
};
if ($self->{dc}) {
$self->{_c}->health->service(
"consul",
$self->{dc_args}->@*,
cb => sub {
my ($services) = @_;
my $service = shift $services->@*;
unless ($service) {
# XXX no consuls at remote DC
...
}
my $node = $service->node->name;
$self->{_c}->session->create(
Consul::Session->new(
name => 'Remote exec via ...', # XXX local node name
behavior => 'delete',
ttl => "15s",
node => $node,
),
$self->{dc_args}->@*,
cb => $session_started_cb,
);
},
error_cb => sub {
my ($err) = @_;
$self->_cleanup(sub { $self->{on_error}->($err) });
},
);
}
else {
$self->{_c}->session->create(
Consul::Session->new(
name => 'Remote exec',
behavior => 'delete',
ttl => "15s",
),
cb => $session_started_cb,
);
}
}
sub _cleanup {
my ($self, $cb) = @_;
delete $self->{_refresh_guard};
if ($self->{_sid}) {
$self->{_c}->session->destroy(
$self->{_sid},
$self->{dc_args}->@*,
cb => sub {
$self->{_c}->kv->delete(
"_rexec/$self->{_sid}",
( run in 0.641 second using v1.01-cache-2.11-cpan-5b529ec07f3 )