EV-Etcd

 view release on metacpan or  search on metacpan

.github/workflows/ci.yml  view on Meta::CPAN

        run: |
          ARCH=$(uname -m | sed 's/x86_64/amd64/;s/aarch64/arm64/')
          curl -fsSL "https://github.com/etcd-io/etcd/releases/download/v3.5.26/etcd-v3.5.26-linux-${ARCH}.tar.gz" | tar xz
          sudo mv "etcd-v3.5.26-linux-${ARCH}/etcd" /usr/local/bin/
          sudo mv "etcd-v3.5.26-linux-${ARCH}/etcdctl" /usr/local/bin/

      - name: Start etcd
        run: |
          etcd --data-dir /tmp/etcd-data &
          for i in $(seq 1 30); do
            etcdctl endpoint health 2>/dev/null && break
            sleep 1
          done
          etcdctl endpoint health

      - name: Test
        run: make test

  test-debian:
    runs-on: ubuntu-latest
    container: debian:bookworm
    name: Debian bookworm
    # etcd 3.5 in Debian-bookworm-in-container has issues with member
    # mutation operations (cluster.t fails) — known platform quirk, the

.github/workflows/ci.yml  view on Meta::CPAN

        run: |
          ARCH=$(uname -m | sed 's/x86_64/amd64/;s/aarch64/arm64/')
          curl -fsSL "https://github.com/etcd-io/etcd/releases/download/v3.5.26/etcd-v3.5.26-linux-${ARCH}.tar.gz" | tar xz
          mv "etcd-v3.5.26-linux-${ARCH}/etcd" /usr/local/bin/
          mv "etcd-v3.5.26-linux-${ARCH}/etcdctl" /usr/local/bin/

      - name: Start etcd
        run: |
          etcd --data-dir /tmp/etcd-data &
          for i in $(seq 1 30); do
            etcdctl endpoint health 2>/dev/null && break
            sleep 1
          done
          etcdctl endpoint health

      - name: Test
        run: make test

  test-macos:
    runs-on: macos-latest
    strategy:
      fail-fast: false
      matrix:
        perl: ['5.38', '5.40']

.github/workflows/ci.yml  view on Meta::CPAN

          ARCH=$(uname -m | sed 's/x86_64/amd64/;s/aarch64/arm64/;s/arm64/arm64/')
          curl -fsSL "https://github.com/etcd-io/etcd/releases/download/v3.5.26/etcd-v3.5.26-darwin-${ARCH}.zip" -o etcd.zip
          unzip -q etcd.zip
          sudo mv "etcd-v3.5.26-darwin-${ARCH}/etcd" /usr/local/bin/
          sudo mv "etcd-v3.5.26-darwin-${ARCH}/etcdctl" /usr/local/bin/

      - name: Start etcd
        run: |
          etcd --data-dir /tmp/etcd-data &
          for i in $(seq 1 30); do
            etcdctl endpoint health 2>/dev/null && break
            sleep 1
          done
          etcdctl endpoint health

      - name: Test
        run: make test

  test-freebsd:
    runs-on: ubuntu-latest
    name: FreeBSD 14.2
    steps:
      - uses: actions/checkout@v5
      - uses: vmactions/freebsd-vm@v1

.github/workflows/ci.yml  view on Meta::CPAN

          usesh: true
          prepare: |
            pkg install -y grpc protobuf-c pkgconf perl5 p5-EV gmake
            pkg install -y etcd3 || pkg install -y etcd || true
          run: |
            perl Makefile.PL
            gmake
            if command -v etcd >/dev/null 2>&1; then
              etcd --data-dir /tmp/etcd-data &
              sleep 5
              etcdctl endpoint health 2>/dev/null || true
            fi
            gmake test

  test-openbsd:
    runs-on: ubuntu-latest
    name: OpenBSD
    continue-on-error: true
    steps:
      - uses: actions/checkout@v5
      - uses: vmactions/openbsd-vm@v1

Changes  view on Meta::CPAN

    - KV operations: get, put, delete, range, txn (compare-and-swap)
    - Watch with bidirectional streaming and auto-reconnect
    - Lease: grant, revoke, keepalive, time-to-live, leases
    - Lock and unlock tied to leases
    - Election: campaign, proclaim, leader, resign, observe
    - Cluster: member list/add/remove/update/promote
    - Maintenance: status, compact, defragment, alarm, hash_kv, move_leader
    - Auth: user/role management, authenticate, enable/disable
    - Health monitoring with configurable interval and callback
    - Automatic retries for transient gRPC failures
    - Multiple endpoint support with failover
    - Structured error callbacks ({code, message, source})

Etcd.xs  view on Meta::CPAN

static void process_role_list_response(pTHX_ pending_call_t *pc);
static void process_role_grant_permission_response(pTHX_ pending_call_t *pc);
static void process_role_revoke_permission_response(pTHX_ pending_call_t *pc);
static void process_user_grant_role_response(pTHX_ pending_call_t *pc);
static void process_user_revoke_role_response(pTHX_ pending_call_t *pc);
static void process_user_get_response(pTHX_ pending_call_t *pc);
static void process_user_list_response(pTHX_ pending_call_t *pc);
static SV* response_op_to_hashref(pTHX_ Etcdserverpb__ResponseOp *op);
static void parse_request_ops(pTHX_ SV *src_av, Etcdserverpb__RequestOp ***dst_ops, size_t *dst_n);

/* Reconnect to the next endpoint (or same if only one) */
static void reconnect_channel(ev_etcd_t *client) {
    if (client->endpoint_count > 1)
        client->current_endpoint = (client->current_endpoint + 1) % client->endpoint_count;

    if (client->channel) {
        grpc_channel_destroy(client->channel);
        client->channel = NULL;
    }
    client->channel = etcd_create_insecure_channel(
        client->endpoints[client->current_endpoint], NULL);
}

/*
 * Compute range_end for prefix queries.
 * For a prefix, range_end is the key with the last byte incremented.
 * Handles trailing 0xFF bytes by truncating and incrementing.
 * Returns allocated buffer (caller must Safefree) and sets *out_len.
 * Returns NULL if key_len is 0.
 */
static char* compute_prefix_range_end(const char *key, size_t key_len, size_t *out_len) {

Etcd.xs  view on Meta::CPAN


    /* Check channel connectivity state */
    grpc_connectivity_state state = grpc_channel_check_connectivity_state(client->channel, 0);

    int was_healthy = client->is_healthy;
    int is_healthy = (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE);

    if (was_healthy != is_healthy) {
        client->is_healthy = is_healthy;

        /* If unhealthy, try reconnecting to next endpoint */
        if (!is_healthy && client->endpoint_count > 1) {
            reconnect_channel(client);
        }

        /* Call health callback if provided */
        if (client->health_callback) {
            dSP;
            ENTER;
            SAVETMPS;
            PUSHMARK(SP);
            EXTEND(SP, 2);
            PUSHs(sv_2mortal(newSViv(is_healthy)));
            PUSHs(sv_2mortal(newSVpv(client->endpoints[client->current_endpoint], 0)));
            PUTBACK;
            client->in_callback = 1;
            CALL_SV_SAFE(client->health_callback, G_DISCARD);
            FREETMPS;
            LEAVE;
            client->in_callback = 0;
            if (!client->active) {
                finish_client_destroy(aTHX_ client);
                return;
            }

Etcd.xs  view on Meta::CPAN

        cleanup_observe(aTHX_ client->observes);
    }

    /* Free client-level resources (mirrors free_perl_resources in DESTROY) */
    if (client->health_callback)
        SvREFCNT_dec(client->health_callback);
    if (client->auth_token) {
        memset(client->auth_token, 0, client->auth_token_len);
        Safefree(client->auth_token);
    }
    if (client->endpoints) {
        for (int i = 0; i < client->endpoint_count; i++)
            if (client->endpoints[i]) Safefree(client->endpoints[i]);
        Safefree(client->endpoints);
    }

    Safefree(client);
}

/*
 * ev_async callback - runs in main thread when signaled by gRPC thread.
 * Drains the event queue and processes each event.
 */
static void cq_async_callback(struct ev_loop *loop, ev_async *w, int revents) {

Etcd.xs  view on Meta::CPAN

    I_EV_API("EV::Etcd");
    grpc_init();
    init_method_slices();

EV::Etcd
ev_etcd_new(class, ...)
    char *class
CODE:
{
    ev_etcd_t *client;
    AV *endpoints_av = NULL;
    int timeout_seconds = 30;  /* Default timeout */
    int max_retries = 3;       /* Default max retries */
    int health_interval = 0;   /* Default: disabled */
    SV *health_callback = NULL;
    char *init_auth_token = NULL;
    STRLEN init_auth_token_len = 0;
    int i;

    /* Parse options */
    for (i = 1; i < items; i += 2) {
        if (i + 1 < items) {
            const char *key = SvPV_nolen(ST(i));
            if (strEQ(key, "endpoints")) {
                if (SvROK(ST(i + 1)) && SvTYPE(SvRV(ST(i + 1))) == SVt_PVAV) {
                    endpoints_av = (AV *)SvRV(ST(i + 1));
                }
            } else if (strEQ(key, "timeout")) {
                timeout_seconds = SvIV(ST(i + 1));
                if (timeout_seconds < 1) {
                    timeout_seconds = 1;  /* Minimum 1 second */
                }
            } else if (strEQ(key, "max_retries")) {
                max_retries = SvIV(ST(i + 1));
                if (max_retries < 0) {
                    max_retries = 0;

Etcd.xs  view on Meta::CPAN

                    health_callback = ST(i + 1);
                }
            } else if (strEQ(key, "auth_token")) {
                if (SvPOK(ST(i + 1))) {
                    init_auth_token = SvPV(ST(i + 1), init_auth_token_len);
                }
            }
        }
    }

    /* Pre-validate endpoint URL sizes before allocating */
    if (endpoints_av && av_len(endpoints_av) >= 0) {
        int count = av_len(endpoints_av) + 1;
        for (i = 0; i < count; i++) {
            SV **ep = av_fetch(endpoints_av, i, 0);
            if (ep && SvPOK(*ep)) {
                VALIDATE_URL_SIZE(SvCUR(*ep));
            }
        }
    }

    Newxz(client, 1, ev_etcd_t);

    /* Store endpoints */
    if (endpoints_av && av_len(endpoints_av) >= 0) {
        int count = av_len(endpoints_av) + 1;
        Newx(client->endpoints, count, char *);
        client->endpoint_count = count;
        for (i = 0; i < count; i++) {
            SV **ep = av_fetch(endpoints_av, i, 0);
            if (ep && SvPOK(*ep)) {
                STRLEN len;
                const char *str = SvPV(*ep, len);
                Newx(client->endpoints[i], len + 1, char);
                Copy(str, client->endpoints[i], len + 1, char);
            } else {
                /* Default endpoint for invalid entries */
                client->endpoints[i] = savepv("127.0.0.1:2379");
            }
        }
    } else {
        /* Default single endpoint */
        Newx(client->endpoints, 1, char *);
        client->endpoints[0] = savepv("127.0.0.1:2379");
        client->endpoint_count = 1;
    }
    client->current_endpoint = 0;

    /* Create gRPC channel to first endpoint */
    client->channel = etcd_create_insecure_channel(client->endpoints[0], NULL);

    if (!client->channel) {
        for (int j = 0; j < client->endpoint_count; j++) {
            Safefree(client->endpoints[j]);
        }
        Safefree(client->endpoints);
        Safefree(client);
        croak("Failed to create gRPC channel");
    }

    /* Create completion queue for polling */
    client->cq = grpc_completion_queue_create_for_next(NULL);

    /* Initialize threading for hybrid gRPC/EV approach */
    pthread_mutex_init(&client->queue_mutex, NULL);
    client->thread_running = 1;

Etcd.xs  view on Meta::CPAN

    /* Start gRPC completion queue thread */
    if (pthread_create(&client->cq_thread, NULL, cq_thread_func, client) != 0) {
        ev_async_stop(EV_DEFAULT, &client->cq_async);
        pthread_mutex_destroy(&client->queue_mutex);
        grpc_completion_queue_shutdown(client->cq);
        while (grpc_completion_queue_next(client->cq,
               gpr_inf_past(GPR_CLOCK_REALTIME), NULL).type != GRPC_QUEUE_SHUTDOWN)
            ;
        grpc_completion_queue_destroy(client->cq);
        grpc_channel_destroy(client->channel);
        /* Free endpoints */
        for (int j = 0; j < client->endpoint_count; j++) {
            Safefree(client->endpoints[j]);
        }
        Safefree(client->endpoints);
        Safefree(client);
        croak("Failed to create gRPC completion queue thread");
    }

    if (init_auth_token && init_auth_token_len > 0) {
        Newx(client->auth_token, init_auth_token_len + 1, char);
        Copy(init_auth_token, client->auth_token, init_auth_token_len, char);
        client->auth_token[init_auth_token_len] = '\0';
        client->auth_token_len = init_auth_token_len;
    }

Etcd.xs  view on Meta::CPAN

        client->health_callback = NULL;
    }

    /* Free auth token - securely zero before freeing */
    if (client->auth_token) {
        memset(client->auth_token, 0, client->auth_token_len);
        Safefree(client->auth_token);
        client->auth_token = NULL;
    }

    /* Free endpoints */
    if (client->endpoints) {
        int i;
        for (i = 0; i < client->endpoint_count; i++) {
            if (client->endpoints[i]) {
                Safefree(client->endpoints[i]);
            }
        }
        Safefree(client->endpoints);
        client->endpoints = NULL;
    }

    /* If called during event processing, defer struct free to cq_async_callback */
    if (!client->in_callback) {
        Safefree(client);
    }
}

MODULE = EV::Etcd  PACKAGE = EV::Etcd::Watch  PREFIX = ev_etcd_watch_

README.md  view on Meta::CPAN


Async etcd v3 client for Perl using native gRPC Core C API and EV/libev.

## Synopsis

```perl
use EV;
use EV::Etcd;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

# Put
$client->put('/my/key', 'value', sub {
    my ($resp, $err) = @_;
    die $err->{message} if $err;
    say "Revision: $resp->{header}{revision}";
});

# Get

bench.pl  view on Meta::CPAN

use Time::HiRes qw(time);
use lib 'blib/lib', 'blib/arch';
$| = 1;  # Autoflush

use EV;
use EV::Etcd;

# Check if etcd is running
my $etcd_running = 0;
eval {
    my $result = `etcdctl endpoint health 2>&1`;
    $etcd_running = 1 if $result =~ /is healthy/;
};
die "etcd not running\n" unless $etcd_running;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

my $prefix = "/bench_$$";
my $iterations = $ENV{BENCH_ITER} || 1000;

print "EV::Etcd Benchmark\n";
print "==================\n\n";

# Benchmark 1: Sequential puts
{

eg/auth_test.pl  view on Meta::CPAN

# 2. Or create a non-root user:
#    etcdctl user add testuser:testpass
#    etcdctl role add testrole
#    etcdctl role grant-permission testrole readwrite /authtest --prefix
#    etcdctl user grant-role testuser testrole

my $username = $ENV{ETCD_USER} // 'root';
my $password = $ENV{ETCD_PASSWORD} // 'rootpassword';

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

print "=== Authentication Test ===\n\n";

# Step 1: Authenticate
print "1. Authenticating as '$username'...\n";
$client->authenticate($username, $password, sub {
    my ($resp, $err) = @_;

    if ($err) {

eg/batch_watch_example.pl  view on Meta::CPAN


my $prefix = "/config/app1/";

# Track state
my $watcher1_batches = 0;
my $watcher2_batches = 0;
my $writes_done = 0;
my $cleaning_up = 0;

# === Client 1: First watcher ===
my $watcher_client1 = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);

my $watch1 = $watcher_client1->watch($prefix, { prefix => 1 }, sub {
    my ($resp, $err) = @_;
    if ($err) {
        print "Watcher 1 error: $err->{message}\n";
        return;
    }

    $watcher1_batches++;
    my $events = $resp->{events} || [];

eg/batch_watch_example.pl  view on Meta::CPAN

        my $type = $event->{type} // 'PUT';
        my $key = $event->{kv}{key} // '';
        my $value = $event->{kv}{value} // '';
        print "  - $type: $key = $value\n";
    }

    check_done();
});

# === Client 2: Second watcher ===
my $watcher_client2 = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);

my $watch2 = $watcher_client2->watch($prefix, { prefix => 1 }, sub {
    my ($resp, $err) = @_;
    if ($err) {
        print "Watcher 2 error: $err->{message}\n";
        return;
    }

    $watcher2_batches++;
    my $events = $resp->{events} || [];

eg/batch_watch_example.pl  view on Meta::CPAN

        my $type = $event->{type} // 'PUT';
        my $key = $event->{kv}{key} // '';
        my $value = $event->{kv}{value} // '';
        print "  - $type: $key = $value\n";
    }

    check_done();
});

# === Client 3: Writer (performs transactional batch write) ===
my $writer_client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);

# Wait a moment for watches to be established, then write
my $write_timer = EV::timer(0.5, 0, sub {
    print "Writer: Performing transactional batch write to $prefix\n";

    # Transaction that writes 5 keys atomically
    # All watchers will receive these as ONE batch
    $writer_client->txn(
        # No compare conditions - always succeeds
        compare => [],

eg/crud_test.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Dumper;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

print "=== CRUD Test ===\n\n";

# Step 1: Put
print "1. PUT /test = 'hello world'\n";
$client->put('/test', 'hello world', sub {
    my ($resp, $err) = @_;
    die "Put error: $err->{message}" if $err;
    print "   Put OK, revision: $resp->{header}{revision}\n\n";

eg/distributed_mutex.pl  view on Meta::CPAN

use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;

my $name      = $ARGV[0] // '/jobs/example';
my $work_msg  = $ARGV[1] // 'critical section';
my $work_time = $ARGV[2] // 5;     # seconds the lock is held
my $lock_ttl  = 10;                # seconds; must exceed work_time + slack

my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], max_retries => 5);
my ($lease_id, $lock_key, $keepalive, $work_timer, $lease_died);

# 1. Lease for the lock — if we crash, the lock auto-releases after lock_ttl
$client->lease_grant($lock_ttl, sub {
    my ($r, $err) = @_;
    die "lease_grant: $err->{message}\n" if $err;
    $lease_id = $r->{id};
    say "[$$] lease=$lease_id ttl=${lock_ttl}s";

    # 2. Refresh the lease while we work

eg/leader_cron.pl  view on Meta::CPAN

use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;

my $election    = "/cron/nightly-aggregator";
my $lease_ttl   = 10;        # seconds; shorter = faster failover, more chatter
my $tick_period = 2;         # how often the leader runs the job
my $value       = "$0/$$";   # what other observers see for this leader

my $client = EV::Etcd->new(
    endpoints   => ['127.0.0.1:2379'],
    max_retries => 5,
);

my ($lease_id, $leader, $keepalive, $tick_timer);

sub fail { my $m = shift; warn "[$$] $m\n"; cleanup_and_exit(1) }

# 1. Grant an election lease
$client->lease_grant($lease_ttl, sub {
    my ($r, $err) = @_;

eg/lease_test.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Dumper;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

print "=== Lease Test ===\n\n";

my $lease_id;
my $keepalive_count = 0;

# Step 1: Grant a lease with 10 second TTL
print "1. Granting lease with TTL=10s...\n";
$client->lease_grant(10, sub {

eg/put_get_test.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Dumper;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

print "Created client\n";

# First, put a value
print "Putting key '/test' with value 'hello from perl'...\n";

$client->put('/test', 'hello from perl', sub {
    my ($resp, $err) = @_;

eg/resumable_watch.pl  view on Meta::CPAN

use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;

my $prefix    = $ARGV[0] // '/myapp/config/';
my $state_dir = $ENV{RESUMABLE_WATCH_STATE_DIR} || '/tmp';
my $state_path = "$state_dir/resumable_watch_" . _safe_name($prefix) . ".rev";

my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], max_retries => 5);
my $last_rev = read_last_rev();

if ($last_rev) {
    say "[resume] last seen revision: $last_rev — fetching missed changes";
    fetch_gap($last_rev + 1, sub { start_watch($last_rev + 1) });
} else {
    say "[resume] no saved revision — starting from current HEAD";
    $client->get('/', { prefix => 1, count_only => 1 }, sub {
        my ($r, $err) = @_;
        die "head probe: $err->{message}\n" if $err;

eg/service_registry.pl  view on Meta::CPAN

use EV;
use EV::Etcd;

my $service_type = $ARGV[0] // 'web';
my $self_id      = sprintf "%s-%d", ($ENV{HOSTNAME} || 'localhost'), $$;
my $self_value   = "host=$self_id pid=$$ started=" . time();
my $prefix       = "/services/$service_type/";
my $self_key     = "$prefix$self_id";
my $lease_ttl    = 15;

my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], max_retries => 5);
my ($lease_id, $keepalive, $watch);

# 1. Lease — keys self-evict if we crash
$client->lease_grant($lease_ttl, sub {
    my ($r, $err) = @_;
    die "lease_grant: $err->{message}\n" if $err;
    $lease_id = $r->{id};
    say "[$self_id] lease=$lease_id ttl=${lease_ttl}s";

    # 2. Heartbeat

eg/sync_and_watch_example.pl  view on Meta::CPAN

use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;

my $prefix = "/myapp/config/";

# === Setup: Create some initial data ===
print "=== Setup: Creating initial data ===\n";

my $setup_client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $setup_done = 0;

# Use transaction to create initial config atomically
$setup_client->txn(
    compare => [],
    success => [
        { put => { key => "${prefix}db/host",     value => "localhost" } },
        { put => { key => "${prefix}db/port",     value => "5432" } },
        { put => { key => "${prefix}db/name",     value => "myapp" } },
        { put => { key => "${prefix}cache/host",  value => "localhost" } },

eg/sync_and_watch_example.pl  view on Meta::CPAN

        $setup_done = 1;
        EV::break;
    }
);
my $t_setup = EV::timer(5, 0, sub { die "Setup timeout" });
EV::run;

# === Client 1: Subscriber that loads then watches ===
print "=== Client 1: Loading existing data and watching ===\n";

my $subscriber = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my %local_cache;       # Local copy of the config branch
my $watch_revision;    # Revision to start watching from
my $watch_handle;
my $load_done = 0;

# Step 1: Load all existing data from the branch
$subscriber->get($prefix, { prefix => 1 }, sub {
    my ($resp, $err) = @_;
    die "Load failed: $err->{message}" if $err;

eg/sync_and_watch_example.pl  view on Meta::CPAN

    print "\nNow watching for changes...\n";
    EV::break;
});

my $t_load = EV::timer(5, 0, sub { die "Load timeout" });
EV::run;

# === Client 2: Publisher that makes changes ===
print "\n=== Client 2: Making changes ===\n";

my $publisher = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $changes_done = 0;

# Schedule a series of changes
my @changes = (
    sub {
        print "\nPublisher: Updating db/host...\n";
        $publisher->put("${prefix}db/host", "db.production.local", sub {
            my ($resp, $err) = @_;
            print "  Updated (rev $resp->{header}{revision})\n" unless $err;
            EV::break;

eg/txn_test.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Dumper;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

print "=== Transaction Test ===\n\n";

# Test 1: Compare-and-swap - create key if it doesn't exist (version == 0)
print "1. Compare-and-swap: Create '/txntest' only if it doesn't exist...\n";

$client->txn(
    # Compare: version == 0 means key doesn't exist
    [

eg/watch_config_tree.pl  view on Meta::CPAN

use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Path::XS qw(path_get path_set path_delete);
use Data::Dumper;

my $prefix = "/myapp/";
my %config;

my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);

# Seed some data
print "=== Seeding config ===\n";
$client->txn(
    compare => [],
    success => [
        { put => { key => "${prefix}db/host",    value => "localhost" } },
        { put => { key => "${prefix}db/port",    value => "5432" } },
        { put => { key => "${prefix}cache/host", value => "redis.local" } },
        { put => { key => "${prefix}cache/ttl",  value => "3600" } },

eg/watch_extract_fields.pl  view on Meta::CPAN

use Data::Path::XS qw(pathc_get path_compile);

# Pre-compile paths for fields we extract from every event
my $p_type  = path_compile('/type');
my $p_key   = path_compile('/kv/key');
my $p_value = path_compile('/kv/value');
my $p_rev   = path_compile('/kv/mod_revision');
my $p_lease = path_compile('/kv/lease');
my $p_prev  = path_compile('/prev_kv/value');

my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $prefix = "/events-demo/";

# Seed data
$client->put("${prefix}sensor/temp", "22.5", sub { EV::break });
my $t0 = EV::timer(5, 0, sub { die "timeout" });
EV::run;

print "=== Watching $prefix with prev_kv ===\n";
print "(Compiled paths extract fields from each event)\n\n";

eg/watch_test.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
use Data::Dumper;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

print "=== Watch Test ===\n\n";
print "Watching key '/watchtest'...\n";
print "(In another terminal, run: etcdctl put /watchtest value1)\n\n";

my $event_count = 0;

$client->watch('/watchtest', sub {
    my ($resp, $err) = @_;

etcd_common.h  view on Meta::CPAN

    pending_call_t *pending_calls;
    watch_call_t *watches;
    keepalive_call_t *keepalives;
    observe_call_t *observes;
    int active;
    int in_callback;  /* Guard against freeing client during event processing */
    char *auth_token;
    size_t auth_token_len;
    int timeout_seconds;

    /* Multiple endpoints for failover */
    char **endpoints;
    int endpoint_count;
    int current_endpoint;

    /* Retry configuration */
    int max_retries;

    /* Health monitoring */
    ev_timer health_timer;
    int is_healthy;
    SV *health_callback;
    pid_t owner_pid;  /* PID of process that created this client (fork safety) */
} ev_etcd_t;

lib/EV/Etcd.pm  view on Meta::CPAN


EV::Etcd - Async etcd v3 client using native gRPC and EV/libev

=head1 SYNOPSIS

    use v5.10;
    use EV;
    use EV::Etcd;

    my $client = EV::Etcd->new(
        endpoints => ['127.0.0.1:2379'],
    );

    # Async put
    $client->put('/my/key', 'value', sub {
        my ($resp, $err) = @_;
        die $err->{message} if $err;
        say "Put succeeded, revision: $resp->{header}{revision}";
    });

    # Async get

lib/EV/Etcd.pm  view on Meta::CPAN

=head1 METHODS

=head2 new

    my $client = EV::Etcd->new(%options);

Options:

=over 4

=item endpoints

ArrayRef of etcd endpoints (host:port). Optional; defaults to
C<['127.0.0.1:2379']>. When more than one is provided, the client uses the
first endpoint and rotates to subsequent endpoints on connection failure.

=item timeout

RPC timeout in seconds. Default is 30 seconds. Minimum value is 1 second.

=item max_retries

Maximum number of reconnection attempts for streaming operations (watch,
lease_keepalive, election_observe) after a connection failure. Default is 3.
Set to 0 to disable automatic reconnection.

lib/EV/Etcd.pm  view on Meta::CPAN

=item health_interval

Interval in seconds for health monitoring. Default is 0 (disabled).
When enabled, the client periodically checks the gRPC channel connectivity
state and calls the on_health_change callback when the connection state changes.

=item on_health_change

Callback called when the connection health status changes. Receives two
arguments: a boolean indicating health status (1=healthy, 0=unhealthy) and
the current endpoint string.

    my $client = EV::Etcd->new(
        endpoints => ['127.0.0.1:2379'],
        health_interval => 5,
        on_health_change => sub {
            my ($is_healthy, $endpoint) = @_;
            warn $is_healthy ? "Connected to $endpoint" : "Disconnected from $endpoint";
        },
    );

=item auth_token

Pre-set authentication token. Use this to create an authenticated client
without calling authenticate() first. Useful when you already have a valid
token from a previous session.

    my $client = EV::Etcd->new(
        endpoints => ['127.0.0.1:2379'],
        auth_token => $saved_token,
    );

=back

=head1 ENCODING

Keys and values are stored by etcd as raw bytes; this module does not perform
any character encoding. If you pass a Perl string with the UTF-8 flag set
(e.g. a literal containing non-ASCII characters under C<use utf8>), the UTF-8

lib/EV/Etcd.pm  view on Meta::CPAN


Called with C<($response, $error)> when complete.

=back

=head2 Complete Authentication Example

    use EV;
    use EV::Etcd;

    my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);

    # Setup authentication (run once, as root)
    sub setup_auth {
        # Create a role with permissions
        $client->role_add('app-role', sub {
            my ($resp, $err) = @_;

            # Grant read/write on /app/ prefix
            $client->role_grant_permission('app-role', 'READWRITE', '/app/', '/app0', sub {
                my ($resp, $err) = @_;

scripts/local_cluster.sh  view on Meta::CPAN


    start_node "$NODE1_NAME" "$NODE1_CLIENT" "$NODE1_PEER"
    start_node "$NODE2_NAME" "$NODE2_CLIENT" "$NODE2_PEER"
    start_node "$NODE3_NAME" "$NODE3_CLIENT" "$NODE3_PEER"

    echo ""
    echo "Waiting for cluster to form..."
    sleep 3

    # Check health
    if etcdctl --endpoints="http://${NODE1_CLIENT}" endpoint health 2>/dev/null; then
        echo ""
        echo "Cluster is healthy!"
        echo ""
        echo "Endpoints:"
        echo "  Node 1: http://${NODE1_CLIENT}"
        echo "  Node 2: http://${NODE2_CLIENT}"
        echo "  Node 3: http://${NODE3_CLIENT}"
        echo ""
        echo "To use with EV::Etcd:"
        echo "  my \$client = EV::Etcd->new(endpoints => ['${NODE1_CLIENT}', '${NODE2_CLIENT}', '${NODE3_CLIENT}']);"
    else
        echo "Warning: Cluster may not be fully healthy yet"
    fi
}

stop_cluster() {
    echo "Stopping etcd cluster..."

    for name in "$NODE1_NAME" "$NODE2_NAME" "$NODE3_NAME"; do
        local pidfile="${CLUSTER_DIR}/${name}/etcd.pid"

scripts/local_cluster.sh  view on Meta::CPAN

            fi
        else
            echo "  $name: NOT STARTED"
        fi
    done

    echo ""

    if [ $running -eq 3 ]; then
        echo "Endpoint health:"
        etcdctl --endpoints="http://${NODE1_CLIENT},http://${NODE2_CLIENT},http://${NODE3_CLIENT}" endpoint health 2>/dev/null || echo "  (health check failed)"
        echo ""
        echo "Endpoint status:"
        etcdctl --endpoints="http://${NODE1_CLIENT},http://${NODE2_CLIENT},http://${NODE3_CLIENT}" endpoint status --write-out=table 2>/dev/null || echo "  (status check failed)"
    fi
}

case "${1:-}" in
    start)
        stop_cluster 2>/dev/null || true
        start_cluster
        ;;
    stop)
        stop_cluster

t/00-load.t  view on Meta::CPAN

    if ($@) {
        plan skip_all => 'EV module not available';
        exit;
    }
}

plan tests => 2;

use_ok('EV::Etcd');

my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
isa_ok($client, 'EV::Etcd');

t/auth.t  view on Meta::CPAN

    plan skip_all => 'EV required' if $@;
}

use EV;
use EV::Etcd;

# Check if etcd is available
my $etcd_available = 0;
eval {
    my $client = EV::Etcd->new(
        endpoints => ['127.0.0.1:2379'],
        timeout => 2,
    );
    $client->status(sub {
        my ($resp, $err) = @_;
        $etcd_available = 1 if !$err;
        EV::break;
    });
    my $t = EV::timer(3, 0, sub { EV::break });
    EV::run;
};

plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;

# Note: These tests are designed to work whether auth is enabled or not.
# We test user/role management which works regardless of auth state.
# We intentionally avoid enabling/disabling auth to not disrupt the cluster.

plan tests => 30;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

my $test_user = "test-user-$$-" . time();
my $test_role = "test-role-$$-" . time();

# Test 1-2: auth_status
$client->auth_status(sub {
    my ($resp, $err) = @_;
    ok(!$err, 'auth_status succeeded');
    ok(defined $resp->{enabled}, 'auth_status has enabled field');

t/auth_enable_disable.t  view on Meta::CPAN

    plan skip_all => 'EV required' if $@;
}

use EV;
use EV::Etcd;

# Check if etcd is available
my $etcd_available = 0;
eval {
    my $client = EV::Etcd->new(
        endpoints => ['127.0.0.1:2379'],
        timeout => 2,
    );
    $client->status(sub {
        my ($resp, $err) = @_;
        $etcd_available = 1 if !$err;
        EV::break;
    });
    my $t = EV::timer(3, 0, sub { EV::break });
    EV::run;
};

plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;

# Check if auth is already enabled
my $auth_enabled = 0;
{
    my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
    $client->auth_status(sub {
        my ($resp, $err) = @_;
        $auth_enabled = $resp->{enabled} if !$err && $resp;
        EV::break;
    });
    my $t = EV::timer(3, 0, sub { EV::break });
    EV::run;
}

if ($auth_enabled) {
    plan skip_all => 'Auth already enabled - cannot run enable/disable tests';
}

plan tests => 10;

my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $root_password = "root-test-pwd-$$-" . time();
my $auth_token;

# Cleanup function to attempt disabling auth if test fails
END {
    if ($auth_token && $ENV{ETCD_TEST_AUTH_ENABLE_DISABLE}) {
        # Try to disable auth on exit
        my $cleanup_client = EV::Etcd->new(
            endpoints => ['127.0.0.1:2379'],
            auth_token => $auth_token,
        );
        $cleanup_client->auth_disable(sub { EV::break; });
        my $t = EV::timer(5, 0, sub { EV::break });
        EV::run;
    }
}

# Test 1: Create root user
$client->user_add('root', $root_password, sub {

t/auth_enable_disable.t  view on Meta::CPAN

    $auth_token = $resp->{token} if $resp;
    ok($auth_token, 'authenticated and got token');
    diag("Error: " . ($err->{message} // $err)) if $err;
    EV::break;
});
my $t4 = EV::timer(5, 0, sub { fail('timeout'); EV::break });
EV::run;

# Create authenticated client
my $auth_client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
    auth_token => $auth_token,
);

# Test 5: Verify auth is enabled
$auth_client->auth_status(sub {
    my ($resp, $err) = @_;
    ok(!$err && $resp->{enabled}, 'auth_status confirms enabled');
    diag("Auth status: enabled=" . ($resp->{enabled} ? "yes" : "no")) if $resp;
    EV::break;
});

t/auth_errors.t  view on Meta::CPAN

use warnings;
use lib 'blib/lib', 'blib/arch';
use Test::More;

BEGIN { eval { require EV }; plan skip_all => 'EV required' if $@ }
use EV;
use EV::Etcd;

my $available = 0;
eval {
    my $c = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], timeout => 2);
    $c->status(sub { $available = 1 if !$_[1]; EV::break });
    my $t = EV::timer(3, 0, sub { EV::break });
    EV::run;
};
plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $available;

my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);
my $user = "test_autherr_$$";

# Setup: create a user we can test against
my $err;
$client->user_add($user, "rightpw", sub { $err = $_[1]; EV::break });
my $t1 = EV::timer(3, 0, sub { EV::break });
EV::run;
plan skip_all => "user_add failed: $err->{message}" if $err && $err->{status} ne 'ALREADY_EXISTS';

# 1. user_add of an existing user -> ALREADY_EXISTS / FAILED_PRECONDITION

t/auto_reconnect.t  view on Meta::CPAN

    eval { require EV };
    plan skip_all => 'EV required' if $@;
}

use EV;
use EV::Etcd;

# Check if etcd is available
my $etcd_available = 0;
eval {
    my $c = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], timeout => 2);
    $c->status(sub { $etcd_available = 1 if !$_[1]; EV::break });
    my $t = EV::timer(3, 0, sub { EV::break });
    EV::run;
};

plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;

plan tests => 4;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
    max_retries => 5,
);

ok($client, 'client created');

my $test_key = "/test_auto_reconnect_$$";
my $watch;
my $events_received = 0;
my $test_done = 0;
my $watch_created = 0;

t/binary_data.t  view on Meta::CPAN

    plan skip_all => 'EV required' if $@;
}

use EV;
use EV::Etcd;

# Check if etcd is available
my $etcd_available = 0;
eval {
    my $client = EV::Etcd->new(
        endpoints => ['127.0.0.1:2379'],
        timeout => 2,
    );
    $client->status(sub {
        my ($resp, $err) = @_;
        $etcd_available = 1 if !$err;
        EV::break;
    });
    my $t = EV::timer(3, 0, sub { EV::break });
    EV::run;
};

plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;

plan tests => 18;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

my $prefix = "/test-binary-$$-" . time();

# Test 1-3: UTF-8 key and value
{
    my $utf8_key = "$prefix/utf8-\x{4e2d}\x{6587}";  # Chinese characters
    my $utf8_value = "value-\x{65e5}\x{672c}\x{8a9e}";  # Japanese characters

    my $put_ok = 0;

t/callback_validation.t  view on Meta::CPAN

        plan skip_all => 'EV module not available';
        exit;
    }
}

plan tests => 12;

use_ok('EV::Etcd');

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

ok($client, 'client created');

# Test that invalid callbacks are rejected with proper error message

# Test get with invalid callback
eval { $client->get('/test/key', 'not_a_callback'); };
like($@, qr/callback must be a code reference/, 'get rejects non-coderef callback');

t/cancel_handle_lifetime.t  view on Meta::CPAN

BEGIN {
    eval { require EV };
    plan skip_all => 'EV required' if $@;
}

use EV;
use EV::Etcd;

my $etcd_available = 0;
eval {
    my $c = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], timeout => 2);
    $c->status(sub { $etcd_available = 1 if !$_[1]; EV::break });
    my $t = EV::timer(3, 0, sub { EV::break });
    EV::run;
};
plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;

# Verifies the dual-ownership lifetime: a Perl handle held past client-side
# cleanup (cancellation -> RECV completion -> cleanup_watch) must remain safe
# to call methods on. Pre-fix this would UAF.

my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379']);

# --- Watch ---
my $key = "/test_cancel_lifetime_$$";
my $watch = $client->watch($key, sub { });
ok($watch, 'watch handle created');

my $cancel_done = 0;
$watch->cancel(sub { $cancel_done = 1; EV::break });
my $t1 = EV::timer(2, 0, sub { EV::break });
EV::run;



( run in 3.471 seconds using v1.01-cache-2.11-cpan-524268b4103 )