view release on metacpan or search on metacpan
.github/workflows/ci.yml view on Meta::CPAN
run: |
ARCH=$(uname -m | sed 's/x86_64/amd64/;s/aarch64/arm64/')
curl -fsSL "https://github.com/etcd-io/etcd/releases/download/v3.5.26/etcd-v3.5.26-linux-${ARCH}.tar.gz" | tar xz
sudo mv "etcd-v3.5.26-linux-${ARCH}/etcd" /usr/local/bin/
sudo mv "etcd-v3.5.26-linux-${ARCH}/etcdctl" /usr/local/bin/
- name: Start etcd
run: |
etcd --data-dir /tmp/etcd-data &
for i in $(seq 1 30); do
etcdctl endpoint health 2>/dev/null && break
sleep 1
done
etcdctl endpoint health
- name: Test
run: make test
test-debian:
runs-on: ubuntu-latest
container: debian:bookworm
name: Debian bookworm
# etcd 3.5 in Debian-bookworm-in-container has issues with member
# mutation operations (cluster.t fails) â known platform quirk, the
.github/workflows/ci.yml view on Meta::CPAN
run: |
ARCH=$(uname -m | sed 's/x86_64/amd64/;s/aarch64/arm64/')
curl -fsSL "https://github.com/etcd-io/etcd/releases/download/v3.5.26/etcd-v3.5.26-linux-${ARCH}.tar.gz" | tar xz
mv "etcd-v3.5.26-linux-${ARCH}/etcd" /usr/local/bin/
mv "etcd-v3.5.26-linux-${ARCH}/etcdctl" /usr/local/bin/
- name: Start etcd
run: |
etcd --data-dir /tmp/etcd-data &
for i in $(seq 1 30); do
etcdctl endpoint health 2>/dev/null && break
sleep 1
done
etcdctl endpoint health
- name: Test
run: make test
test-macos:
runs-on: macos-latest
strategy:
fail-fast: false
matrix:
perl: ['5.38', '5.40']
.github/workflows/ci.yml view on Meta::CPAN
ARCH=$(uname -m | sed 's/x86_64/amd64/;s/aarch64/arm64/;s/arm64/arm64/')
curl -fsSL "https://github.com/etcd-io/etcd/releases/download/v3.5.26/etcd-v3.5.26-darwin-${ARCH}.zip" -o etcd.zip
unzip -q etcd.zip
sudo mv "etcd-v3.5.26-darwin-${ARCH}/etcd" /usr/local/bin/
sudo mv "etcd-v3.5.26-darwin-${ARCH}/etcdctl" /usr/local/bin/
- name: Start etcd
run: |
etcd --data-dir /tmp/etcd-data &
for i in $(seq 1 30); do
etcdctl endpoint health 2>/dev/null && break
sleep 1
done
etcdctl endpoint health
- name: Test
run: make test
test-freebsd:
runs-on: ubuntu-latest
name: FreeBSD 14.2
steps:
- uses: actions/checkout@v5
- uses: vmactions/freebsd-vm@v1
.github/workflows/ci.yml view on Meta::CPAN
usesh: true
prepare: |
pkg install -y grpc protobuf-c pkgconf perl5 p5-EV gmake
pkg install -y etcd3 || pkg install -y etcd || true
run: |
perl Makefile.PL
gmake
if command -v etcd >/dev/null 2>&1; then
etcd --data-dir /tmp/etcd-data &
sleep 5
etcdctl endpoint health 2>/dev/null || true
fi
gmake test
test-openbsd:
runs-on: ubuntu-latest
name: OpenBSD
continue-on-error: true
steps:
- uses: actions/checkout@v5
- uses: vmactions/openbsd-vm@v1
- KV operations: get, put, delete, range, txn (compare-and-swap)
- Watch with bidirectional streaming and auto-reconnect
- Lease: grant, revoke, keepalive, time-to-live, leases
- Lock and unlock tied to leases
- Election: campaign, proclaim, leader, resign, observe
- Cluster: member list/add/remove/update/promote
- Maintenance: status, compact, defragment, alarm, hash_kv, move_leader
- Auth: user/role management, authenticate, enable/disable
- Health monitoring with configurable interval and callback
- Automatic retries for transient gRPC failures
- Multiple endpoint support with failover
- Structured error callbacks ({code, message, source})
static void process_role_list_response(pTHX_ pending_call_t *pc);
static void process_role_grant_permission_response(pTHX_ pending_call_t *pc);
static void process_role_revoke_permission_response(pTHX_ pending_call_t *pc);
static void process_user_grant_role_response(pTHX_ pending_call_t *pc);
static void process_user_revoke_role_response(pTHX_ pending_call_t *pc);
static void process_user_get_response(pTHX_ pending_call_t *pc);
static void process_user_list_response(pTHX_ pending_call_t *pc);
static SV* response_op_to_hashref(pTHX_ Etcdserverpb__ResponseOp *op);
static void parse_request_ops(pTHX_ SV *src_av, Etcdserverpb__RequestOp ***dst_ops, size_t *dst_n);
/* Reconnect to the next endpoint (or same if only one) */
static void reconnect_channel(ev_etcd_t *client) {
if (client->endpoint_count > 1)
client->current_endpoint = (client->current_endpoint + 1) % client->endpoint_count;
if (client->channel) {
grpc_channel_destroy(client->channel);
client->channel = NULL;
}
client->channel = etcd_create_insecure_channel(
client->endpoints[client->current_endpoint], NULL);
}
/*
* Compute range_end for prefix queries.
* For a prefix, range_end is the key with the last byte incremented.
* Handles trailing 0xFF bytes by truncating and incrementing.
* Returns allocated buffer (caller must Safefree) and sets *out_len.
* Returns NULL if key_len is 0.
*/
static char* compute_prefix_range_end(const char *key, size_t key_len, size_t *out_len) {
/* Check channel connectivity state */
grpc_connectivity_state state = grpc_channel_check_connectivity_state(client->channel, 0);
int was_healthy = client->is_healthy;
int is_healthy = (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE);
if (was_healthy != is_healthy) {
client->is_healthy = is_healthy;
/* If unhealthy, try reconnecting to next endpoint */
if (!is_healthy && client->endpoint_count > 1) {
reconnect_channel(client);
}
/* Call health callback if provided */
if (client->health_callback) {
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
EXTEND(SP, 2);
PUSHs(sv_2mortal(newSViv(is_healthy)));
PUSHs(sv_2mortal(newSVpv(client->endpoints[client->current_endpoint], 0)));
PUTBACK;
client->in_callback = 1;
CALL_SV_SAFE(client->health_callback, G_DISCARD);
FREETMPS;
LEAVE;
client->in_callback = 0;
if (!client->active) {
finish_client_destroy(aTHX_ client);
return;
}
cleanup_observe(aTHX_ client->observes);
}
/* Free client-level resources (mirrors free_perl_resources in DESTROY) */
if (client->health_callback)
SvREFCNT_dec(client->health_callback);
if (client->auth_token) {
memset(client->auth_token, 0, client->auth_token_len);
Safefree(client->auth_token);
}
if (client->endpoints) {
for (int i = 0; i < client->endpoint_count; i++)
if (client->endpoints[i]) Safefree(client->endpoints[i]);
Safefree(client->endpoints);
}
Safefree(client);
}
/*
* ev_async callback - runs in main thread when signaled by gRPC thread.
* Drains the event queue and processes each event.
*/
static void cq_async_callback(struct ev_loop *loop, ev_async *w, int revents) {
I_EV_API("EV::Etcd");
grpc_init();
init_method_slices();
EV::Etcd
ev_etcd_new(class, ...)
char *class
CODE:
{
ev_etcd_t *client;
AV *endpoints_av = NULL;
int timeout_seconds = 30; /* Default timeout */
int max_retries = 3; /* Default max retries */
int health_interval = 0; /* Default: disabled */
SV *health_callback = NULL;
char *init_auth_token = NULL;
STRLEN init_auth_token_len = 0;
int i;
/* Parse options */
for (i = 1; i < items; i += 2) {
if (i + 1 < items) {
const char *key = SvPV_nolen(ST(i));
if (strEQ(key, "endpoints")) {
if (SvROK(ST(i + 1)) && SvTYPE(SvRV(ST(i + 1))) == SVt_PVAV) {
endpoints_av = (AV *)SvRV(ST(i + 1));
}
} else if (strEQ(key, "timeout")) {
timeout_seconds = SvIV(ST(i + 1));
if (timeout_seconds < 1) {
timeout_seconds = 1; /* Minimum 1 second */
}
} else if (strEQ(key, "max_retries")) {
max_retries = SvIV(ST(i + 1));
if (max_retries < 0) {
max_retries = 0;
health_callback = ST(i + 1);
}
} else if (strEQ(key, "auth_token")) {
if (SvPOK(ST(i + 1))) {
init_auth_token = SvPV(ST(i + 1), init_auth_token_len);
}
}
}
}
/* Pre-validate endpoint URL sizes before allocating */
if (endpoints_av && av_len(endpoints_av) >= 0) {
int count = av_len(endpoints_av) + 1;
for (i = 0; i < count; i++) {
SV **ep = av_fetch(endpoints_av, i, 0);
if (ep && SvPOK(*ep)) {
VALIDATE_URL_SIZE(SvCUR(*ep));
}
}
}
Newxz(client, 1, ev_etcd_t);
/* Store endpoints */
if (endpoints_av && av_len(endpoints_av) >= 0) {
int count = av_len(endpoints_av) + 1;
Newx(client->endpoints, count, char *);
client->endpoint_count = count;
for (i = 0; i < count; i++) {
SV **ep = av_fetch(endpoints_av, i, 0);
if (ep && SvPOK(*ep)) {
STRLEN len;
const char *str = SvPV(*ep, len);
Newx(client->endpoints[i], len + 1, char);
Copy(str, client->endpoints[i], len + 1, char);
} else {
/* Default endpoint for invalid entries */
client->endpoints[i] = savepv("127.0.0.1:2379");
}
}
} else {
/* Default single endpoint */
Newx(client->endpoints, 1, char *);
client->endpoints[0] = savepv("127.0.0.1:2379");
client->endpoint_count = 1;
}
client->current_endpoint = 0;
/* Create gRPC channel to first endpoint */
client->channel = etcd_create_insecure_channel(client->endpoints[0], NULL);
if (!client->channel) {
for (int j = 0; j < client->endpoint_count; j++) {
Safefree(client->endpoints[j]);
}
Safefree(client->endpoints);
Safefree(client);
croak("Failed to create gRPC channel");
}
/* Create completion queue for polling */
client->cq = grpc_completion_queue_create_for_next(NULL);
/* Initialize threading for hybrid gRPC/EV approach */
pthread_mutex_init(&client->queue_mutex, NULL);
client->thread_running = 1;
/* Start gRPC completion queue thread */
if (pthread_create(&client->cq_thread, NULL, cq_thread_func, client) != 0) {
ev_async_stop(EV_DEFAULT, &client->cq_async);
pthread_mutex_destroy(&client->queue_mutex);
grpc_completion_queue_shutdown(client->cq);
while (grpc_completion_queue_next(client->cq,
gpr_inf_past(GPR_CLOCK_REALTIME), NULL).type != GRPC_QUEUE_SHUTDOWN)
;
grpc_completion_queue_destroy(client->cq);
grpc_channel_destroy(client->channel);
/* Free endpoints */
for (int j = 0; j < client->endpoint_count; j++) {
Safefree(client->endpoints[j]);
}
Safefree(client->endpoints);
Safefree(client);
croak("Failed to create gRPC completion queue thread");
}
if (init_auth_token && init_auth_token_len > 0) {
Newx(client->auth_token, init_auth_token_len + 1, char);
Copy(init_auth_token, client->auth_token, init_auth_token_len, char);
client->auth_token[init_auth_token_len] = '\0';
client->auth_token_len = init_auth_token_len;
}
client->health_callback = NULL;
}
/* Free auth token - securely zero before freeing */
if (client->auth_token) {
memset(client->auth_token, 0, client->auth_token_len);
Safefree(client->auth_token);
client->auth_token = NULL;
}
/* Free endpoints */
if (client->endpoints) {
int i;
for (i = 0; i < client->endpoint_count; i++) {
if (client->endpoints[i]) {
Safefree(client->endpoints[i]);
}
}
Safefree(client->endpoints);
client->endpoints = NULL;
}
/* If called during event processing, defer struct free to cq_async_callback */
if (!client->in_callback) {
Safefree(client);
}
}
MODULE = EV::Etcd PACKAGE = EV::Etcd::Watch PREFIX = ev_etcd_watch_
Async etcd v3 client for Perl using native gRPC Core C API and EV/libev.
## Synopsis
```perl
use EV;
use EV::Etcd;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
# Put
$client->put('/my/key', 'value', sub {
my ($resp, $err) = @_;
die $err->{message} if $err;
say "Revision: $resp->{header}{revision}";
});
# Get
use Time::HiRes qw(time);
use lib 'blib/lib', 'blib/arch';
$| = 1; # Autoflush
use EV;
use EV::Etcd;
# Check if etcd is running
my $etcd_running = 0;
eval {
my $result = `etcdctl endpoint health 2>&1`;
$etcd_running = 1 if $result =~ /is healthy/;
};
die "etcd not running\n" unless $etcd_running;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
my $prefix = "/bench_$$";
my $iterations = $ENV{BENCH_ITER} || 1000;
print "EV::Etcd Benchmark\n";
print "==================\n\n";
# Benchmark 1: Sequential puts
{
eg/auth_test.pl view on Meta::CPAN
# 2. Or create a non-root user:
# etcdctl user add testuser:testpass
# etcdctl role add testrole
# etcdctl role grant-permission testrole readwrite /authtest --prefix
# etcdctl user grant-role testuser testrole
my $username = $ENV{ETCD_USER} // 'root';
my $password = $ENV{ETCD_PASSWORD} // 'rootpassword';
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
print "=== Authentication Test ===\n\n";
# Step 1: Authenticate
print "1. Authenticating as '$username'...\n";
$client->authenticate($username, $password, sub {
my ($resp, $err) = @_;
if ($err) {
eg/batch_watch_example.pl view on Meta::CPAN
my $prefix = "/config/app1/";
# Track state
my $watcher1_batches = 0;
my $watcher2_batches = 0;
my $writes_done = 0;
my $cleaning_up = 0;
# === Client 1: First watcher ===
my $watcher_client1 = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $watch1 = $watcher_client1->watch($prefix, { prefix => 1 }, sub {
my ($resp, $err) = @_;
if ($err) {
print "Watcher 1 error: $err->{message}\n";
return;
}
$watcher1_batches++;
my $events = $resp->{events} || [];
eg/batch_watch_example.pl view on Meta::CPAN
my $type = $event->{type} // 'PUT';
my $key = $event->{kv}{key} // '';
my $value = $event->{kv}{value} // '';
print " - $type: $key = $value\n";
}
check_done();
});
# === Client 2: Second watcher ===
my $watcher_client2 = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $watch2 = $watcher_client2->watch($prefix, { prefix => 1 }, sub {
my ($resp, $err) = @_;
if ($err) {
print "Watcher 2 error: $err->{message}\n";
return;
}
$watcher2_batches++;
my $events = $resp->{events} || [];
eg/batch_watch_example.pl view on Meta::CPAN
my $type = $event->{type} // 'PUT';
my $key = $event->{kv}{key} // '';
my $value = $event->{kv}{value} // '';
print " - $type: $key = $value\n";
}
check_done();
});
# === Client 3: Writer (performs transactional batch write) ===
my $writer_client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
# Wait a moment for watches to be established, then write
my $write_timer = EV::timer(0.5, 0, sub {
print "Writer: Performing transactional batch write to $prefix\n";
# Transaction that writes 5 keys atomically
# All watchers will receive these as ONE batch
$writer_client->txn(
# No compare conditions - always succeeds
compare => [],
eg/crud_test.pl view on Meta::CPAN
#!/usr/bin/env perl
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Dumper;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
print "=== CRUD Test ===\n\n";
# Step 1: Put
print "1. PUT /test = 'hello world'\n";
$client->put('/test', 'hello world', sub {
my ($resp, $err) = @_;
die "Put error: $err->{message}" if $err;
print " Put OK, revision: $resp->{header}{revision}\n\n";
eg/distributed_mutex.pl view on Meta::CPAN
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
my $name = $ARGV[0] // '/jobs/example';
my $work_msg = $ARGV[1] // 'critical section';
my $work_time = $ARGV[2] // 5; # seconds the lock is held
my $lock_ttl = 10; # seconds; must exceed work_time + slack
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], max_retries => 5);
my ($lease_id, $lock_key, $keepalive, $work_timer, $lease_died);
# 1. Lease for the lock â if we crash, the lock auto-releases after lock_ttl
$client->lease_grant($lock_ttl, sub {
my ($r, $err) = @_;
die "lease_grant: $err->{message}\n" if $err;
$lease_id = $r->{id};
say "[$$] lease=$lease_id ttl=${lock_ttl}s";
# 2. Refresh the lease while we work
eg/leader_cron.pl view on Meta::CPAN
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
my $election = "/cron/nightly-aggregator";
my $lease_ttl = 10; # seconds; shorter = faster failover, more chatter
my $tick_period = 2; # how often the leader runs the job
my $value = "$0/$$"; # what other observers see for this leader
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
max_retries => 5,
);
my ($lease_id, $leader, $keepalive, $tick_timer);
sub fail { my $m = shift; warn "[$$] $m\n"; cleanup_and_exit(1) }
# 1. Grant an election lease
$client->lease_grant($lease_ttl, sub {
my ($r, $err) = @_;
eg/lease_test.pl view on Meta::CPAN
#!/usr/bin/env perl
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Dumper;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
print "=== Lease Test ===\n\n";
my $lease_id;
my $keepalive_count = 0;
# Step 1: Grant a lease with 10 second TTL
print "1. Granting lease with TTL=10s...\n";
$client->lease_grant(10, sub {
eg/put_get_test.pl view on Meta::CPAN
#!/usr/bin/env perl
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Dumper;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
print "Created client\n";
# First, put a value
print "Putting key '/test' with value 'hello from perl'...\n";
$client->put('/test', 'hello from perl', sub {
my ($resp, $err) = @_;
eg/resumable_watch.pl view on Meta::CPAN
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
my $prefix = $ARGV[0] // '/myapp/config/';
my $state_dir = $ENV{RESUMABLE_WATCH_STATE_DIR} || '/tmp';
my $state_path = "$state_dir/resumable_watch_" . _safe_name($prefix) . ".rev";
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], max_retries => 5);
my $last_rev = read_last_rev();
if ($last_rev) {
say "[resume] last seen revision: $last_rev â fetching missed changes";
fetch_gap($last_rev + 1, sub { start_watch($last_rev + 1) });
} else {
say "[resume] no saved revision â starting from current HEAD";
$client->get('/', { prefix => 1, count_only => 1 }, sub {
my ($r, $err) = @_;
die "head probe: $err->{message}\n" if $err;
eg/service_registry.pl view on Meta::CPAN
use EV;
use EV::Etcd;
my $service_type = $ARGV[0] // 'web';
my $self_id = sprintf "%s-%d", ($ENV{HOSTNAME} || 'localhost'), $$;
my $self_value = "host=$self_id pid=$$ started=" . time();
my $prefix = "/services/$service_type/";
my $self_key = "$prefix$self_id";
my $lease_ttl = 15;
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], max_retries => 5);
my ($lease_id, $keepalive, $watch);
# 1. Lease â keys self-evict if we crash
$client->lease_grant($lease_ttl, sub {
my ($r, $err) = @_;
die "lease_grant: $err->{message}\n" if $err;
$lease_id = $r->{id};
say "[$self_id] lease=$lease_id ttl=${lease_ttl}s";
# 2. Heartbeat
eg/sync_and_watch_example.pl view on Meta::CPAN
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
my $prefix = "/myapp/config/";
# === Setup: Create some initial data ===
print "=== Setup: Creating initial data ===\n";
my $setup_client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $setup_done = 0;
# Use transaction to create initial config atomically
$setup_client->txn(
compare => [],
success => [
{ put => { key => "${prefix}db/host", value => "localhost" } },
{ put => { key => "${prefix}db/port", value => "5432" } },
{ put => { key => "${prefix}db/name", value => "myapp" } },
{ put => { key => "${prefix}cache/host", value => "localhost" } },
eg/sync_and_watch_example.pl view on Meta::CPAN
$setup_done = 1;
EV::break;
}
);
my $t_setup = EV::timer(5, 0, sub { die "Setup timeout" });
EV::run;
# === Client 1: Subscriber that loads then watches ===
print "=== Client 1: Loading existing data and watching ===\n";
my $subscriber = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my %local_cache; # Local copy of the config branch
my $watch_revision; # Revision to start watching from
my $watch_handle;
my $load_done = 0;
# Step 1: Load all existing data from the branch
$subscriber->get($prefix, { prefix => 1 }, sub {
my ($resp, $err) = @_;
die "Load failed: $err->{message}" if $err;
eg/sync_and_watch_example.pl view on Meta::CPAN
print "\nNow watching for changes...\n";
EV::break;
});
my $t_load = EV::timer(5, 0, sub { die "Load timeout" });
EV::run;
# === Client 2: Publisher that makes changes ===
print "\n=== Client 2: Making changes ===\n";
my $publisher = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $changes_done = 0;
# Schedule a series of changes
my @changes = (
sub {
print "\nPublisher: Updating db/host...\n";
$publisher->put("${prefix}db/host", "db.production.local", sub {
my ($resp, $err) = @_;
print " Updated (rev $resp->{header}{revision})\n" unless $err;
EV::break;
eg/txn_test.pl view on Meta::CPAN
#!/usr/bin/env perl
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Dumper;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
print "=== Transaction Test ===\n\n";
# Test 1: Compare-and-swap - create key if it doesn't exist (version == 0)
print "1. Compare-and-swap: Create '/txntest' only if it doesn't exist...\n";
$client->txn(
# Compare: version == 0 means key doesn't exist
[
eg/watch_config_tree.pl view on Meta::CPAN
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Path::XS qw(path_get path_set path_delete);
use Data::Dumper;
my $prefix = "/myapp/";
my %config;
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
# Seed some data
print "=== Seeding config ===\n";
$client->txn(
compare => [],
success => [
{ put => { key => "${prefix}db/host", value => "localhost" } },
{ put => { key => "${prefix}db/port", value => "5432" } },
{ put => { key => "${prefix}cache/host", value => "redis.local" } },
{ put => { key => "${prefix}cache/ttl", value => "3600" } },
eg/watch_extract_fields.pl view on Meta::CPAN
use Data::Path::XS qw(pathc_get path_compile);
# Pre-compile paths for fields we extract from every event
my $p_type = path_compile('/type');
my $p_key = path_compile('/kv/key');
my $p_value = path_compile('/kv/value');
my $p_rev = path_compile('/kv/mod_revision');
my $p_lease = path_compile('/kv/lease');
my $p_prev = path_compile('/prev_kv/value');
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $prefix = "/events-demo/";
# Seed data
$client->put("${prefix}sensor/temp", "22.5", sub { EV::break });
my $t0 = EV::timer(5, 0, sub { die "timeout" });
EV::run;
print "=== Watching $prefix with prev_kv ===\n";
print "(Compiled paths extract fields from each event)\n\n";
eg/watch_test.pl view on Meta::CPAN
#!/usr/bin/env perl
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Dumper;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
print "=== Watch Test ===\n\n";
print "Watching key '/watchtest'...\n";
print "(In another terminal, run: etcdctl put /watchtest value1)\n\n";
my $event_count = 0;
$client->watch('/watchtest', sub {
my ($resp, $err) = @_;
etcd_common.h view on Meta::CPAN
pending_call_t *pending_calls;
watch_call_t *watches;
keepalive_call_t *keepalives;
observe_call_t *observes;
int active;
int in_callback; /* Guard against freeing client during event processing */
char *auth_token;
size_t auth_token_len;
int timeout_seconds;
/* Multiple endpoints for failover */
char **endpoints;
int endpoint_count;
int current_endpoint;
/* Retry configuration */
int max_retries;
/* Health monitoring */
ev_timer health_timer;
int is_healthy;
SV *health_callback;
pid_t owner_pid; /* PID of process that created this client (fork safety) */
} ev_etcd_t;
lib/EV/Etcd.pm view on Meta::CPAN
EV::Etcd - Async etcd v3 client using native gRPC and EV/libev
=head1 SYNOPSIS
use v5.10;
use EV;
use EV::Etcd;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
# Async put
$client->put('/my/key', 'value', sub {
my ($resp, $err) = @_;
die $err->{message} if $err;
say "Put succeeded, revision: $resp->{header}{revision}";
});
# Async get
lib/EV/Etcd.pm view on Meta::CPAN
=head1 METHODS
=head2 new
my $client = EV::Etcd->new(%options);
Options:
=over 4
=item endpoints
ArrayRef of etcd endpoints (host:port). Optional; defaults to
C<['127.0.0.1:2379']>. When more than one is provided, the client uses the
first endpoint and rotates to subsequent endpoints on connection failure.
=item timeout
RPC timeout in seconds. Default is 30 seconds. Minimum value is 1 second.
=item max_retries
Maximum number of reconnection attempts for streaming operations (watch,
lease_keepalive, election_observe) after a connection failure. Default is 3.
Set to 0 to disable automatic reconnection.
lib/EV/Etcd.pm view on Meta::CPAN
=item health_interval
Interval in seconds for health monitoring. Default is 0 (disabled).
When enabled, the client periodically checks the gRPC channel connectivity
state and calls the on_health_change callback when the connection state changes.
=item on_health_change
Callback called when the connection health status changes. Receives two
arguments: a boolean indicating health status (1=healthy, 0=unhealthy) and
the current endpoint string.
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
health_interval => 5,
on_health_change => sub {
my ($is_healthy, $endpoint) = @_;
warn $is_healthy ? "Connected to $endpoint" : "Disconnected from $endpoint";
},
);
=item auth_token
Pre-set authentication token. Use this to create an authenticated client
without calling authenticate() first. Useful when you already have a valid
token from a previous session.
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
auth_token => $saved_token,
);
=back
=head1 ENCODING
Keys and values are stored by etcd as raw bytes; this module does not perform
any character encoding. If you pass a Perl string with the UTF-8 flag set
(e.g. a literal containing non-ASCII characters under C<use utf8>), the UTF-8
lib/EV/Etcd.pm view on Meta::CPAN
Called with C<($response, $error)> when complete.
=back
=head2 Complete Authentication Example
use EV;
use EV::Etcd;
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
# Setup authentication (run once, as root)
sub setup_auth {
# Create a role with permissions
$client->role_add('app-role', sub {
my ($resp, $err) = @_;
# Grant read/write on /app/ prefix
$client->role_grant_permission('app-role', 'READWRITE', '/app/', '/app0', sub {
my ($resp, $err) = @_;
scripts/local_cluster.sh view on Meta::CPAN
start_node "$NODE1_NAME" "$NODE1_CLIENT" "$NODE1_PEER"
start_node "$NODE2_NAME" "$NODE2_CLIENT" "$NODE2_PEER"
start_node "$NODE3_NAME" "$NODE3_CLIENT" "$NODE3_PEER"
echo ""
echo "Waiting for cluster to form..."
sleep 3
# Check health
if etcdctl --endpoints="http://${NODE1_CLIENT}" endpoint health 2>/dev/null; then
echo ""
echo "Cluster is healthy!"
echo ""
echo "Endpoints:"
echo " Node 1: http://${NODE1_CLIENT}"
echo " Node 2: http://${NODE2_CLIENT}"
echo " Node 3: http://${NODE3_CLIENT}"
echo ""
echo "To use with EV::Etcd:"
echo " my \$client = EV::Etcd->new(endpoints => ['${NODE1_CLIENT}', '${NODE2_CLIENT}', '${NODE3_CLIENT}']);"
else
echo "Warning: Cluster may not be fully healthy yet"
fi
}
stop_cluster() {
echo "Stopping etcd cluster..."
for name in "$NODE1_NAME" "$NODE2_NAME" "$NODE3_NAME"; do
local pidfile="${CLUSTER_DIR}/${name}/etcd.pid"
scripts/local_cluster.sh view on Meta::CPAN
fi
else
echo " $name: NOT STARTED"
fi
done
echo ""
if [ $running -eq 3 ]; then
echo "Endpoint health:"
etcdctl --endpoints="http://${NODE1_CLIENT},http://${NODE2_CLIENT},http://${NODE3_CLIENT}" endpoint health 2>/dev/null || echo " (health check failed)"
echo ""
echo "Endpoint status:"
etcdctl --endpoints="http://${NODE1_CLIENT},http://${NODE2_CLIENT},http://${NODE3_CLIENT}" endpoint status --write-out=table 2>/dev/null || echo " (status check failed)"
fi
}
case "${1:-}" in
start)
stop_cluster 2>/dev/null || true
start_cluster
;;
stop)
stop_cluster
t/00-load.t view on Meta::CPAN
if ($@) {
plan skip_all => 'EV module not available';
exit;
}
}
plan tests => 2;
use_ok('EV::Etcd');
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
isa_ok($client, 'EV::Etcd');
plan skip_all => 'EV required' if $@;
}
use EV;
use EV::Etcd;
# Check if etcd is available
my $etcd_available = 0;
eval {
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
timeout => 2,
);
$client->status(sub {
my ($resp, $err) = @_;
$etcd_available = 1 if !$err;
EV::break;
});
my $t = EV::timer(3, 0, sub { EV::break });
EV::run;
};
plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;
# Note: These tests are designed to work whether auth is enabled or not.
# We test user/role management which works regardless of auth state.
# We intentionally avoid enabling/disabling auth to not disrupt the cluster.
plan tests => 30;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
my $test_user = "test-user-$$-" . time();
my $test_role = "test-role-$$-" . time();
# Test 1-2: auth_status
$client->auth_status(sub {
my ($resp, $err) = @_;
ok(!$err, 'auth_status succeeded');
ok(defined $resp->{enabled}, 'auth_status has enabled field');
t/auth_enable_disable.t view on Meta::CPAN
plan skip_all => 'EV required' if $@;
}
use EV;
use EV::Etcd;
# Check if etcd is available
my $etcd_available = 0;
eval {
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
timeout => 2,
);
$client->status(sub {
my ($resp, $err) = @_;
$etcd_available = 1 if !$err;
EV::break;
});
my $t = EV::timer(3, 0, sub { EV::break });
EV::run;
};
plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;
# Check if auth is already enabled
my $auth_enabled = 0;
{
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
$client->auth_status(sub {
my ($resp, $err) = @_;
$auth_enabled = $resp->{enabled} if !$err && $resp;
EV::break;
});
my $t = EV::timer(3, 0, sub { EV::break });
EV::run;
}
if ($auth_enabled) {
plan skip_all => 'Auth already enabled - cannot run enable/disable tests';
}
plan tests => 10;
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $root_password = "root-test-pwd-$$-" . time();
my $auth_token;
# Cleanup function to attempt disabling auth if test fails
END {
if ($auth_token && $ENV{ETCD_TEST_AUTH_ENABLE_DISABLE}) {
# Try to disable auth on exit
my $cleanup_client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
auth_token => $auth_token,
);
$cleanup_client->auth_disable(sub { EV::break; });
my $t = EV::timer(5, 0, sub { EV::break });
EV::run;
}
}
# Test 1: Create root user
$client->user_add('root', $root_password, sub {
t/auth_enable_disable.t view on Meta::CPAN
$auth_token = $resp->{token} if $resp;
ok($auth_token, 'authenticated and got token');
diag("Error: " . ($err->{message} // $err)) if $err;
EV::break;
});
my $t4 = EV::timer(5, 0, sub { fail('timeout'); EV::break });
EV::run;
# Create authenticated client
my $auth_client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
auth_token => $auth_token,
);
# Test 5: Verify auth is enabled
$auth_client->auth_status(sub {
my ($resp, $err) = @_;
ok(!$err && $resp->{enabled}, 'auth_status confirms enabled');
diag("Auth status: enabled=" . ($resp->{enabled} ? "yes" : "no")) if $resp;
EV::break;
});
t/auth_errors.t view on Meta::CPAN
use warnings;
use lib 'blib/lib', 'blib/arch';
use Test::More;
BEGIN { eval { require EV }; plan skip_all => 'EV required' if $@ }
use EV;
use EV::Etcd;
my $available = 0;
eval {
my $c = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], timeout => 2);
$c->status(sub { $available = 1 if !$_[1]; EV::break });
my $t = EV::timer(3, 0, sub { EV::break });
EV::run;
};
plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $available;
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $user = "test_autherr_$$";
# Setup: create a user we can test against
my $err;
$client->user_add($user, "rightpw", sub { $err = $_[1]; EV::break });
my $t1 = EV::timer(3, 0, sub { EV::break });
EV::run;
plan skip_all => "user_add failed: $err->{message}" if $err && $err->{status} ne 'ALREADY_EXISTS';
# 1. user_add of an existing user -> ALREADY_EXISTS / FAILED_PRECONDITION
t/auto_reconnect.t view on Meta::CPAN
eval { require EV };
plan skip_all => 'EV required' if $@;
}
use EV;
use EV::Etcd;
# Check if etcd is available
my $etcd_available = 0;
eval {
my $c = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], timeout => 2);
$c->status(sub { $etcd_available = 1 if !$_[1]; EV::break });
my $t = EV::timer(3, 0, sub { EV::break });
EV::run;
};
plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;
plan tests => 4;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
max_retries => 5,
);
ok($client, 'client created');
my $test_key = "/test_auto_reconnect_$$";
my $watch;
my $events_received = 0;
my $test_done = 0;
my $watch_created = 0;
t/binary_data.t view on Meta::CPAN
plan skip_all => 'EV required' if $@;
}
use EV;
use EV::Etcd;
# Check if etcd is available
my $etcd_available = 0;
eval {
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
timeout => 2,
);
$client->status(sub {
my ($resp, $err) = @_;
$etcd_available = 1 if !$err;
EV::break;
});
my $t = EV::timer(3, 0, sub { EV::break });
EV::run;
};
plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;
plan tests => 18;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
my $prefix = "/test-binary-$$-" . time();
# Test 1-3: UTF-8 key and value
{
my $utf8_key = "$prefix/utf8-\x{4e2d}\x{6587}"; # Chinese characters
my $utf8_value = "value-\x{65e5}\x{672c}\x{8a9e}"; # Japanese characters
my $put_ok = 0;
t/callback_validation.t view on Meta::CPAN
plan skip_all => 'EV module not available';
exit;
}
}
plan tests => 12;
use_ok('EV::Etcd');
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
ok($client, 'client created');
# Test that invalid callbacks are rejected with proper error message
# Test get with invalid callback
eval { $client->get('/test/key', 'not_a_callback'); };
like($@, qr/callback must be a code reference/, 'get rejects non-coderef callback');
t/cancel_handle_lifetime.t view on Meta::CPAN
BEGIN {
eval { require EV };
plan skip_all => 'EV required' if $@;
}
use EV;
use EV::Etcd;
my $etcd_available = 0;
eval {
my $c = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], timeout => 2);
$c->status(sub { $etcd_available = 1 if !$_[1]; EV::break });
my $t = EV::timer(3, 0, sub { EV::break });
EV::run;
};
plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;
# Verifies the dual-ownership lifetime: a Perl handle held past client-side
# cleanup (cancellation -> RECV completion -> cleanup_watch) must remain safe
# to call methods on. Pre-fix this would UAF.
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
# --- Watch ---
my $key = "/test_cancel_lifetime_$$";
my $watch = $client->watch($key, sub { });
ok($watch, 'watch handle created');
my $cancel_done = 0;
$watch->cancel(sub { $cancel_done = 1; EV::break });
my $t1 = EV::timer(2, 0, sub { EV::break });
EV::run;