EV-Etcd

 view release on metacpan or  search on metacpan

eg/resumable_watch.pl  view on Meta::CPAN

#!/usr/bin/env perl
#
# resumable_watch.pl - A watcher that survives process restarts without
# missing events. Persists the last seen revision to a file; on startup,
# reads it back, lists any keys changed during downtime via a one-shot
# get(prefix=>1, revision=>) and then resumes streaming from the next
# revision.
#
# Handles the compaction edge case: if the server has compacted past our
# saved revision, the watch arrives in the *error* callback (per the POD
# documentation of watch's error semantics). We then re-list at HEAD and
# accept the gap.
#
# Try it:
#   $ perl eg/resumable_watch.pl /myapp/config/
#   $ etcdctl put /myapp/config/foo bar
#   $ ^C
#   $ etcdctl put /myapp/config/baz qux   # while we're down
#   $ perl eg/resumable_watch.pl /myapp/config/   # picks up baz
#
use v5.10;
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;

my $prefix    = $ARGV[0] // '/myapp/config/';
my $state_dir = $ENV{RESUMABLE_WATCH_STATE_DIR} || '/tmp';
my $state_path = "$state_dir/resumable_watch_" . _safe_name($prefix) . ".rev";

my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], max_retries => 5);
my $last_rev = read_last_rev();

if ($last_rev) {
    say "[resume] last seen revision: $last_rev — fetching missed changes";
    fetch_gap($last_rev + 1, sub { start_watch($last_rev + 1) });
} else {
    say "[resume] no saved revision — starting from current HEAD";
    $client->get('/', { prefix => 1, count_only => 1 }, sub {
        my ($r, $err) = @_;
        die "head probe: $err->{message}\n" if $err;
        my $head = $r->{header}{revision};
        say "[resume] HEAD revision = $head";
        $last_rev = $head;
        save_last_rev($last_rev);
        start_watch($head + 1);
    });
}

# Persist progress periodically and on shutdown
my $persist_timer = EV::timer(5, 5, \&save_last_rev);
my $shutdown = sub {
    save_last_rev();
    say "[resume] saved revision $last_rev to $state_path on shutdown";
    EV::break;
    exit 0;
};
my $sigint  = EV::signal('INT',  $shutdown);
my $sigterm = EV::signal('TERM', $shutdown);

EV::run;

# --------------------------------------------------------------------------

sub fetch_gap {



( run in 2.228 seconds using v1.01-cache-2.11-cpan-140bd7fdf52 )