EV-Etcd

 view release on metacpan or  search on metacpan

eg/sync_and_watch_example.pl  view on Meta::CPAN

#!/usr/bin/env perl
#
# Example: Load branch data then watch for changes
#
# Common pattern for distributed configuration:
#   1. Load all existing keys from a prefix (branch)
#   2. Start watching from that revision for future changes
#   3. Receive incremental updates as they happen
#
# This ensures no updates are missed between load and watch.
#
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;

my $prefix = "/myapp/config/";

# === Setup: Create some initial data ===
print "=== Setup: Creating initial data ===\n";

my $setup_client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $setup_done = 0;

# Use transaction to create initial config atomically
$setup_client->txn(
    compare => [],
    success => [
        { put => { key => "${prefix}db/host",     value => "localhost" } },
        { put => { key => "${prefix}db/port",     value => "5432" } },
        { put => { key => "${prefix}db/name",     value => "myapp" } },
        { put => { key => "${prefix}cache/host",  value => "localhost" } },
        { put => { key => "${prefix}cache/port",  value => "6379" } },
    ],
    failure => [],
    sub {
        my ($resp, $err) = @_;
        die "Setup failed: $err->{message}" if $err;
        print "Initial data created at revision $resp->{header}{revision}\n\n";
        $setup_done = 1;
        EV::break;
    }
);
my $t_setup = EV::timer(5, 0, sub { die "Setup timeout" });
EV::run;

# === Client 1: Subscriber that loads then watches ===
print "=== Client 1: Loading existing data and watching ===\n";

my $subscriber = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my %local_cache;       # Local copy of the config branch
my $watch_revision;    # Revision to start watching from
my $watch_handle;
my $load_done = 0;

# Step 1: Load all existing data from the branch
$subscriber->get($prefix, { prefix => 1 }, sub {
    my ($resp, $err) = @_;
    die "Load failed: $err->{message}" if $err;

    print "Loaded " . scalar(@{$resp->{kvs} || []}) . " keys:\n";

    for my $kv (@{$resp->{kvs} || []}) {
        my $key = $kv->{key};
        my $value = $kv->{value};
        $local_cache{$key} = $value;
        print "  $key = $value\n";
    }

    # IMPORTANT: Start watching from the revision AFTER our get
    # This ensures we don't miss any updates that happened during/after our get
    $watch_revision = $resp->{header}{revision} + 1;
    print "\nWill watch from revision: $watch_revision\n";

    # Step 2: Start watching for changes from that revision
    $watch_handle = $subscriber->watch($prefix, {
        prefix        => 1,
        start_revision => $watch_revision,
    }, sub {
        my ($resp, $err) = @_;
        if ($err) {
            print "Watch error: $err->{message}\n";
            return;
        }

        my $events = $resp->{events} || [];
        return unless @$events;  # Skip empty responses

        print "\n--- Received " . scalar(@$events) . " update(s) ---\n";

        for my $event (@$events) {
            my $type = $event->{type} // 'PUT';
            my $key = $event->{kv}{key};
            my $value = $event->{kv}{value} // '';
            my $mod_rev = $event->{kv}{mod_revision};

            if ($type eq 'DELETE') {
                delete $local_cache{$key};
                print "  DELETE: $key (rev $mod_rev)\n";
            } else {
                $local_cache{$key} = $value;
                print "  PUT: $key = $value (rev $mod_rev)\n";
            }
        }

        print "Local cache now has " . scalar(keys %local_cache) . " keys\n";
    });

    $load_done = 1;
    print "\nNow watching for changes...\n";
    EV::break;
});

my $t_load = EV::timer(5, 0, sub { die "Load timeout" });
EV::run;

# === Client 2: Publisher that makes changes ===
print "\n=== Client 2: Making changes ===\n";

my $publisher = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $changes_done = 0;

# Schedule a series of changes
my @changes = (
    sub {
        print "\nPublisher: Updating db/host...\n";
        $publisher->put("${prefix}db/host", "db.production.local", sub {
            my ($resp, $err) = @_;
            print "  Updated (rev $resp->{header}{revision})\n" unless $err;
            EV::break;
        });
    },
    sub {
        print "\nPublisher: Adding new key logging/level...\n";
        $publisher->put("${prefix}logging/level", "info", sub {
            my ($resp, $err) = @_;
            print "  Added (rev $resp->{header}{revision})\n" unless $err;
            EV::break;
        });
    },
    sub {
        print "\nPublisher: Batch update via transaction...\n";
        $publisher->txn(
            compare => [],
            success => [
                { put => { key => "${prefix}db/pool_size",    value => "10" } },
                { put => { key => "${prefix}db/timeout",      value => "30" } },
                { put => { key => "${prefix}cache/ttl",       value => "3600" } },
            ],
            failure => [],
            sub {
                my ($resp, $err) = @_;
                print "  Batch updated (rev $resp->{header}{revision})\n" unless $err;
                EV::break;
            }
        );
    },
    sub {
        print "\nPublisher: Deleting cache/port...\n";
        $publisher->delete("${prefix}cache/port", sub {
            my ($resp, $err) = @_;
            print "  Deleted (rev $resp->{header}{revision})\n" unless $err;
            $changes_done = 1;
            EV::break;
        });
    },
);

# Execute changes sequentially with delays
for my $i (0..$#changes) {
    $changes[$i]->();
    my $t = EV::timer(5, 0, sub { EV::break });
    EV::run;

    # Small delay between changes for watch to process
    my $delay = EV::timer(0.3, 0, sub { EV::break });
    EV::run;
}

# === Final state ===



( run in 1.155 second using v1.01-cache-2.11-cpan-524268b4103 )