EV-Etcd
view release on metacpan or search on metacpan
eg/batch_watch_example.pl view on Meta::CPAN
#!/usr/bin/env perl
#
# Example: Transactional batch writes with multiple watchers
#
# Demonstrates:
# 1. Two clients watching the same prefix (/config/)
# 2. A third client writing multiple keys atomically via transaction
# 3. Both watchers receive all changes in ONE batch (single callback)
#
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Dumper;
my $prefix = "/config/app1/";
# Track state
my $watcher1_batches = 0;
my $watcher2_batches = 0;
my $writes_done = 0;
my $cleaning_up = 0;
# === Client 1: First watcher ===
my $watcher_client1 = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $watch1 = $watcher_client1->watch($prefix, { prefix => 1 }, sub {
my ($resp, $err) = @_;
if ($err) {
print "Watcher 1 error: $err->{message}\n";
return;
}
$watcher1_batches++;
my $events = $resp->{events} || [];
print "\n=== Watcher 1 received batch #$watcher1_batches ===\n";
print " Events in this batch: " . scalar(@$events) . "\n";
for my $event (@$events) {
my $type = $event->{type} // 'PUT';
my $key = $event->{kv}{key} // '';
my $value = $event->{kv}{value} // '';
print " - $type: $key = $value\n";
}
check_done();
});
# === Client 2: Second watcher ===
my $watcher_client2 = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $watch2 = $watcher_client2->watch($prefix, { prefix => 1 }, sub {
my ($resp, $err) = @_;
if ($err) {
print "Watcher 2 error: $err->{message}\n";
return;
}
$watcher2_batches++;
my $events = $resp->{events} || [];
print "\n=== Watcher 2 received batch #$watcher2_batches ===\n";
print " Events in this batch: " . scalar(@$events) . "\n";
for my $event (@$events) {
my $type = $event->{type} // 'PUT';
my $key = $event->{kv}{key} // '';
my $value = $event->{kv}{value} // '';
print " - $type: $key = $value\n";
}
check_done();
});
# === Client 3: Writer (performs transactional batch write) ===
my $writer_client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
# Wait a moment for watches to be established, then write
my $write_timer = EV::timer(0.5, 0, sub {
print "Writer: Performing transactional batch write to $prefix\n";
# Transaction that writes 5 keys atomically
# All watchers will receive these as ONE batch
$writer_client->txn(
# No compare conditions - always succeeds
compare => [],
# Success: write multiple keys atomically
success => [
{ put => { key => "${prefix}database/host", value => "db.example.com" } },
{ put => { key => "${prefix}database/port", value => "5432" } },
{ put => { key => "${prefix}database/name", value => "myapp_prod" } },
{ put => { key => "${prefix}cache/host", value => "redis.example.com" } },
{ put => { key => "${prefix}cache/ttl", value => "3600" } },
],
# Failure: (not used since no compare conditions)
failure => [],
sub {
my ($resp, $err) = @_;
if ($err) {
print "Writer: Transaction FAILED - $err->{message}\n";
} else {
print "Writer: Transaction succeeded (revision: $resp->{header}{revision})\n";
print "Writer: Wrote 5 keys atomically\n";
}
$writes_done = 1;
check_done();
}
);
});
# Cleanup function
sub check_done {
return if $cleaning_up;
# Wait until both watchers received the transaction batch and write is done
if ($writes_done && $watcher1_batches >= 2 && $watcher2_batches >= 2) {
$cleaning_up = 1;
print "\n=== Summary ===\n";
print "Watcher 1 received $watcher1_batches batch(es)\n";
print "Watcher 2 received $watcher2_batches batch(es)\n";
print "Both watchers received the 5 keys as a SINGLE batch!\n";
# Cleanup: delete the test keys
cleanup();
}
}
sub cleanup {
print "\nCleaning up test keys...\n";
$writer_client->delete($prefix, { prefix => 1 }, sub {
my ($resp, $err) = @_;
print "Cleanup done (deleted $resp->{deleted} keys)\n";
EV::break;
( run in 0.604 second using v1.01-cache-2.11-cpan-524268b4103 )