EV-Etcd
view release on metacpan or search on metacpan
eg/resumable_watch.pl view on Meta::CPAN
#!/usr/bin/env perl
#
# resumable_watch.pl - A watcher that survives process restarts without
# missing events. Persists the last seen revision to a file; on startup,
# reads it back, lists any keys changed during downtime via a one-shot
# get(prefix=>1, revision=>) and then resumes streaming from the next
# revision.
#
# Handles the compaction edge case: if the server has compacted past our
# saved revision, the watch arrives in the *error* callback (per the POD
# documentation of watch's error semantics). We then re-list at HEAD and
# accept the gap.
#
# Try it:
# $ perl eg/resumable_watch.pl /myapp/config/
# $ etcdctl put /myapp/config/foo bar
# $ ^C
# $ etcdctl put /myapp/config/baz qux # while we're down
# $ perl eg/resumable_watch.pl /myapp/config/ # picks up baz
#
use v5.10;
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
my $prefix = $ARGV[0] // '/myapp/config/';
my $state_dir = $ENV{RESUMABLE_WATCH_STATE_DIR} || '/tmp';
my $state_path = "$state_dir/resumable_watch_" . _safe_name($prefix) . ".rev";
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], max_retries => 5);
my $last_rev = read_last_rev();
if ($last_rev) {
say "[resume] last seen revision: $last_rev â fetching missed changes";
fetch_gap($last_rev + 1, sub { start_watch($last_rev + 1) });
} else {
say "[resume] no saved revision â starting from current HEAD";
$client->get('/', { prefix => 1, count_only => 1 }, sub {
my ($r, $err) = @_;
die "head probe: $err->{message}\n" if $err;
my $head = $r->{header}{revision};
say "[resume] HEAD revision = $head";
$last_rev = $head;
save_last_rev($last_rev);
start_watch($head + 1);
});
}
# Persist progress periodically and on shutdown
my $persist_timer = EV::timer(5, 5, \&save_last_rev);
my $shutdown = sub {
save_last_rev();
say "[resume] saved revision $last_rev to $state_path on shutdown";
EV::break;
exit 0;
};
my $sigint = EV::signal('INT', $shutdown);
my $sigterm = EV::signal('TERM', $shutdown);
EV::run;
# --------------------------------------------------------------------------
sub fetch_gap {
my ($from_rev, $cb) = @_;
$client->get($prefix, { prefix => 1, revision => $from_rev }, sub {
my ($r, $err) = @_;
if ($err) {
# If the saved revision was already compacted, etcd returns
# OUT_OF_RANGE. Skip the gap: just start at HEAD.
warn "[resume] gap fetch failed: $err->{message} â skipping gap\n";
return $cb->();
}
for my $kv (@{$r->{kvs} || []}) {
say "[gap] $kv->{key} = $kv->{value} (mod_rev=$kv->{mod_revision})";
}
$last_rev = $r->{header}{revision};
$cb->();
});
}
sub start_watch {
my $start_rev = shift;
say "[watch] starting from revision $start_rev";
$client->watch($prefix, {
prefix => 1,
start_revision => $start_rev,
progress_notify => 1,
}, sub {
my ($r, $err) = @_;
( run in 0.549 second using v1.01-cache-2.11-cpan-524268b4103 )