EV-Etcd

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

    - Add missing use EV () before XSLoader::load

0.03  2026-03-02
    - Fix error sources, election_observe arg order, watch early-cancel
    - Clear auth token on auth_disable
    - Deduplicate response handlers, remove dead code

0.02  2026-02-10
    - Initial release
    - KV operations: get, put, delete, range, txn (compare-and-swap)
    - Watch with bidirectional streaming and auto-reconnect
    - Lease: grant, revoke, keepalive, time-to-live, leases
    - Lock and unlock tied to leases
    - Election: campaign, proclaim, leader, resign, observe
    - Cluster: member list/add/remove/update/promote
    - Maintenance: status, compact, defragment, alarm, hash_kv, move_leader
    - Auth: user/role management, authenticate, enable/disable
    - Health monitoring with configurable interval and callback
    - Automatic retries for transient gRPC failures
    - Multiple endpoint support with failover
    - Structured error callbacks ({code, message, source})

Etcd.xs  view on Meta::CPAN


    /* Serialize request */
    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        etcdserverpb__watch_request__get_packed_size,
        etcdserverpb__watch_request__pack, &req);
    if (range_end_copy) Safefree(range_end_copy);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Create streaming call */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);  /* No timeout for watch */

    wc->call = grpc_channel_create_call(
        client->channel,
        NULL,  /* parent call */
        GRPC_PROPAGATE_DEFAULTS,
        client->cq,
        METHOD_WATCH,
        NULL,  /* host */
        deadline,

Etcd.xs  view on Meta::CPAN

    req.id = lease_id;

    /* Serialize request */
    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        etcdserverpb__lease_keep_alive_request__get_packed_size,
        etcdserverpb__lease_keep_alive_request__pack, &req);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Create streaming call */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);

    kc->call = grpc_channel_create_call(
        client->channel,
        NULL,
        GRPC_PROPAGATE_DEFAULTS,
        client->cq,
        METHOD_LEASE_KEEPALIVE,
        NULL,
        deadline,

Etcd.xs  view on Meta::CPAN

    req.name.data = (uint8_t *)name_str;
    req.name.len = name_len;

    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        v3electionpb__leader_request__get_packed_size,
        v3electionpb__leader_request__pack, &req);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Use infinite deadline for streaming call */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);

    oc->call = grpc_channel_create_call(
        client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
        client->cq, METHOD_ELECTION_OBSERVE, NULL, deadline, NULL
    );

    if (!oc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        grpc_metadata_array_destroy(&oc->initial_metadata);

MANIFEST  view on Meta::CPAN

t/election.t
t/error_structure.t
t/kv.t
t/kv_advanced.t
t/lease.t
t/lock.t
t/maintenance.t
t/move_leader.t
t/parameters.t
t/retry_config.t
t/streaming.t
t/txn.t
t/txn_range.t
t/watch_prev_kv.t
t/watch_reconnect.t
t/watch_resume.t
typemap
META.yml                                 Module YAML meta-data (added by MakeMaker)
META.json                                Module JSON meta-data (added by MakeMaker)

README.md  view on Meta::CPAN

        say "$event->{type} on $event->{kv}{key}";
    }
});

EV::run;
```

## Features

- **KV**: get, put, delete, range, transactions (compare-and-swap)
- **Watch**: bidirectional streaming with auto-reconnect
- **Lease**: grant, revoke, keepalive, time-to-live
- **Lock**: distributed locking tied to leases
- **Election**: leader campaign, observe, proclaim, resign
- **Cluster**: member list/add/remove/update/promote
- **Maintenance**: status, compact, defragment, alarm, hash_kv, move_leader
- **Auth**: user/role management, authenticate, enable/disable
- **Health monitoring** with configurable interval and callback
- **Automatic retries** for transient gRPC failures

## Architecture

etcd_common.h  view on Meta::CPAN

    size_t key_len;
    char *range_end;
    size_t range_end_len;
    int64_t start_revision;
    int prev_kv;
    int progress_notify;
    int64_t watch_id;
    int has_watch_id;
} watch_params_t;

/* Watch structure (for streaming watch) */
typedef struct watch_call {
    call_base_t base;  /* Must be first */
    grpc_call *call;
    SV *callback;
    grpc_metadata_array initial_metadata;
    grpc_metadata_array trailing_metadata;
    grpc_byte_buffer *recv_buffer;
    grpc_slice status_details;
    int64_t watch_id;
    int active;
    struct ev_etcd_struct *client;
    struct watch_call *next;
    int auto_reconnect;
    int64_t last_revision;
    watch_params_t params;
    int reconnect_attempt;
    ev_timer reconnect_timer;  /* Backoff timer for reconnection */
} watch_call_t;

/* Keepalive structure (for streaming lease keepalive) */
typedef struct keepalive_call {
    call_base_t base;  /* Must be first */
    grpc_call *call;
    SV *callback;
    grpc_metadata_array initial_metadata;
    grpc_metadata_array trailing_metadata;
    grpc_byte_buffer *recv_buffer;
    grpc_slice status_details;
    int64_t lease_id;
    int active;

etcd_common.h  view on Meta::CPAN

    int reconnect_attempt;
    ev_timer reconnect_timer;  /* Backoff timer for reconnection */
} keepalive_call_t;

/* Election observe parameters for reconnection */
typedef struct observe_params {
    char *name;
    size_t name_len;
} observe_params_t;

/* Election observe structure (for streaming election observe) */
typedef struct observe_call {
    call_base_t base;  /* Must be first */
    grpc_call *call;
    SV *callback;
    grpc_metadata_array initial_metadata;
    grpc_metadata_array trailing_metadata;
    grpc_byte_buffer *recv_buffer;
    grpc_slice status_details;
    int active;
    struct ev_etcd_struct *client;

etcd_common.h  view on Meta::CPAN

        grpc_metadata_array_destroy(&(pc)->initial_metadata); \
        grpc_metadata_array_destroy(&(pc)->trailing_metadata); \
        if ((pc)->recv_buffer) grpc_byte_buffer_destroy((pc)->recv_buffer); \
        grpc_slice_unref((pc)->status_details); \
        if ((pc)->call) grpc_call_unref((pc)->call); \
        SvREFCNT_dec((pc)->callback); \
        Safefree((pc)); \
    } while (0)

/*
 * Helper macros for streaming call reconnection to reduce code triplication
 * across watch, keepalive, and observe reconnect functions.
 */

/*
 * Cleanup old streaming call state before reconnection.
 * Works with any streaming call struct that has these fields.
 *
 * Usage:
 *   STREAMING_CALL_CLEANUP(wc);  // For watch_call_t
 *   STREAMING_CALL_CLEANUP(kc);  // For keepalive_call_t
 *   STREAMING_CALL_CLEANUP(oc);  // For observe_call_t
 */
#define STREAMING_CALL_CLEANUP(call_ptr) \
    do { \
        if ((call_ptr)->call) { \
            grpc_call_unref((call_ptr)->call); \

etcd_common.h  view on Meta::CPAN

        grpc_metadata_array_destroy(&(call_ptr)->initial_metadata); \
        grpc_metadata_array_destroy(&(call_ptr)->trailing_metadata); \
        if ((call_ptr)->recv_buffer) { \
            grpc_byte_buffer_destroy((call_ptr)->recv_buffer); \
            (call_ptr)->recv_buffer = NULL; \
        } \
        grpc_slice_unref((call_ptr)->status_details); \
    } while (0)

/*
 * Reinitialize streaming call state for reconnection.
 *
 * Usage:
 *   STREAMING_CALL_REINIT(wc);
 */
#define STREAMING_CALL_REINIT(call_ptr) \
    do { \
        grpc_metadata_array_init(&(call_ptr)->initial_metadata); \
        grpc_metadata_array_init(&(call_ptr)->trailing_metadata); \
        (call_ptr)->status_details = grpc_empty_slice(); \
        (call_ptr)->active = 1; \
    } while (0)

/*
 * Setup standard 4-op batch for streaming call reconnection.
 * Requires: ops[4], auth_md, send_buffer, call_ptr all in scope.
 *
 * Usage:
 *   STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, wc);
 */
#define STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buf, call_ptr) \
    do { \
        (ops)[0].op = GRPC_OP_SEND_INITIAL_METADATA; \
        setup_auth_metadata(client, &(ops)[0], &(auth_md)); \
        (ops)[1].op = GRPC_OP_RECV_INITIAL_METADATA; \
        (ops)[1].data.recv_initial_metadata.recv_initial_metadata = &(call_ptr)->initial_metadata; \
        (ops)[2].op = GRPC_OP_SEND_MESSAGE; \
        (ops)[2].data.send_message.send_message = (send_buf); \
        (ops)[3].op = GRPC_OP_RECV_MESSAGE; \
        (ops)[3].data.recv_message.recv_message = &(call_ptr)->recv_buffer; \
    } while (0)

/*
 * Handle error after failed batch start for streaming reconnect.
 *
 * Usage:
 *   STREAMING_CALL_BATCH_ERROR(wc);
 */
#define STREAMING_CALL_BATCH_ERROR(call_ptr) \
    do { \
        (call_ptr)->active = 0; \
        if ((call_ptr)->call) { \
            grpc_call_unref((call_ptr)->call); \
            (call_ptr)->call = NULL; \

etcd_election.c  view on Meta::CPAN

    (void)revents;

    observe_call_t *oc = (observe_call_t *)((char *)w - offsetof(observe_call_t, reconnect_timer));
    ev_etcd_t *client = oc->client;

    if (!client->active) {
        cleanup_observe(aTHX_ oc);
        return;
    }

    /* Cleanup and reinitialize streaming state */
    STREAMING_CALL_CLEANUP(oc);
    STREAMING_CALL_REINIT(oc);

    /* Create LeaderRequest for observe */
    V3electionpb__LeaderRequest req = V3ELECTIONPB__LEADER_REQUEST__INIT;
    req.name.data = (uint8_t *)oc->params.name;
    req.name.len = oc->params.name_len;

    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,

etcd_election.h  view on Meta::CPAN


#include "etcd_common.h"
#include "election.pb-c.h"

/* Election response handlers */
void process_campaign_response(pTHX_ pending_call_t *pc);
void process_proclaim_response(pTHX_ pending_call_t *pc);
void process_leader_response(pTHX_ pending_call_t *pc);
void process_resign_response(pTHX_ pending_call_t *pc);

/* Election observe (streaming) handlers */
void process_observe_response(pTHX_ observe_call_t *oc);
void observe_rearm_recv(pTHX_ observe_call_t *oc);
void cleanup_observe(pTHX_ observe_call_t *oc);
int try_reconnect_observe(pTHX_ observe_call_t *oc);

/* Helper to convert LeaderKey to hash */
HV *leader_key_to_hv(pTHX_ V3electionpb__LeaderKey *lk);

#endif /* ETCD_ELECTION_H */

etcd_lease.c  view on Meta::CPAN

    (void)revents;

    keepalive_call_t *kc = (keepalive_call_t *)((char *)w - offsetof(keepalive_call_t, reconnect_timer));
    ev_etcd_t *client = kc->client;

    if (!client->active) {
        cleanup_keepalive(aTHX_ kc);
        return;
    }

    /* Cleanup and reinitialize streaming state */
    STREAMING_CALL_CLEANUP(kc);
    STREAMING_CALL_REINIT(kc);

    /* Build keepalive request */
    Etcdserverpb__LeaseKeepAliveRequest keep_req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
    keep_req.id = kc->lease_id;

    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        etcdserverpb__lease_keep_alive_request__get_packed_size,

etcd_watch.c  view on Meta::CPAN

    (void)revents;

    watch_call_t *wc = (watch_call_t *)((char *)w - offsetof(watch_call_t, reconnect_timer));
    ev_etcd_t *client = wc->client;

    if (!client->active) {
        cleanup_watch(aTHX_ wc);
        return;
    }

    /* Cleanup and reinitialize streaming state */
    STREAMING_CALL_CLEANUP(wc);
    STREAMING_CALL_REINIT(wc);

    /* Build watch create request */
    Etcdserverpb__WatchCreateRequest create_req = ETCDSERVERPB__WATCH_CREATE_REQUEST__INIT;
    create_req.key.data = (uint8_t *)wc->params.key;
    create_req.key.len = wc->params.key_len;

    if (wc->params.range_end && wc->params.range_end_len > 0) {
        create_req.range_end.data = (uint8_t *)wc->params.range_end;

lib/EV/Etcd.pm  view on Meta::CPAN

=item endpoints

ArrayRef of etcd endpoints (host:port).

=item timeout

RPC timeout in seconds. Default is 30 seconds. Minimum value is 1 second.

=item max_retries

Maximum number of reconnection attempts for streaming operations (watch,
lease_keepalive, election_observe) after a connection failure. Default is 3.
Set to 0 to disable automatic reconnection.

=item health_interval

Interval in seconds for health monitoring. Default is 0 (disabled).
When enabled, the client periodically checks the gRPC channel connectivity
state and calls the on_health_change callback when the connection state changes.

=item on_health_change

lib/EV/Etcd.pm  view on Meta::CPAN


    $client->lease_revoke($lease_id, $callback);

Revoke a lease. All keys attached to the lease will be deleted.

=head2 lease_keepalive

    my $keepalive = $client->lease_keepalive($lease_id, $callback);
    my $keepalive = $client->lease_keepalive($lease_id, \%opts, $callback);

Keep a lease alive. Creates a bidirectional streaming connection that keeps
the lease refreshed. Returns an C<EV::Etcd::Keepalive> object that can be
used to cancel the keepalive stream:

    $keepalive->cancel(sub { my ($resp, $err) = @_; });

Options:

    auto_reconnect => 1   # auto-reconnect on failure (default: 1)

=head2 EV::Etcd::Keepalive Methods

lib/EV/Etcd.pm  view on Meta::CPAN

    $client->election_resign($leader_key, sub {
        my ($resp, $err) = @_;
        say "Resigned from leadership" unless $err;
    });

=head2 election_observe

    my $observe = $client->election_observe($name, $callback);
    my $observe = $client->election_observe($name, \%opts, $callback);

Observe leader changes for an election. This creates a streaming connection
that receives notifications whenever the leader changes. Returns an
C<EV::Etcd::Observe> object that can be used to cancel the observe stream:

    $observe->cancel(sub { my ($resp, $err) = @_; });

Arguments:

=over 4

=item name

t/election.t  view on Meta::CPAN

            if ($err) {
                diag("Expected error after resign: $err->{status} - $err->{message}");
            }
            EV::break;
        });
        my $t7 = EV::timer(5, 0, sub { fail('timeout'); EV::break });
        EV::run;
    }
}

# Test 17-22: election_observe streaming test
{
    my $observe_election = "observe-test-$$-" . time();
    my $observe_lease_id;
    my @observed_events;
    my $observe_leader_key;

    # Grant a lease for this test
    $client->lease_grant(30, sub {
        my ($resp, $err) = @_;
        $observe_lease_id = $resp->{id} if !$err;

t/streaming.t  view on Meta::CPAN

};

plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;

plan tests => 11;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

my $test_prefix = "/test-streaming-$$-" . time();

# === lease_keepalive streaming tests ===

# Test 1-4: lease_keepalive receives response
my $lease_id;
my $keepalive_count = 0;

$client->lease_grant(10, sub {
    my ($resp, $err) = @_;
    ok(!$err, 'lease_grant succeeded');
    $lease_id = $resp->{id};
    diag("Granted lease: id=$lease_id, ttl=$resp->{ttl}");

t/streaming.t  view on Meta::CPAN

    ok($keepalive_count >= 1, "received at least 1 keepalive response (got $keepalive_count)");

    # Cleanup lease (this will end the keepalive)
    $client->lease_revoke($lease_id, sub {
        my ($resp, $err) = @_;
        diag($err ? "Revoke failed" : "Lease revoked");
    });
    pass('cleanup initiated');
}

# === Watch streaming tests ===

# Test 5-9: watch receives multiple events
my $watch_key = "$test_prefix/watch-stream-test";
my $watch_count = 0;
my $watch_target = 3;

my $watch_handle;
$watch_handle = $client->watch($watch_key, sub {
    my ($resp, $err) = @_;
    if ($err) {

t/streaming.t  view on Meta::CPAN

    });
}

my $watch_timer = EV::timer 10, 0, sub {
    diag("Watch timer expired, received $watch_count events");
    EV::break;
};
EV::run;

ok($watch_count >= 1, "watch received at least 1 event (got $watch_count)");
cmp_ok($watch_count, '>=', $watch_target, "watch received all $watch_target events (streaming works)");

# Test watch cancel - uses callback-based cancel API
my $cancel_done = 0;
$watch_handle->cancel(sub {
    my ($resp, $err) = @_;
    $cancel_done = 1;
    diag("Watch cancel callback: " . ($err ? "error: $err" : "success"));
    EV::break;
});
my $cancel_timer = EV::timer 5, 0, sub {

t/watch_reconnect.t  view on Meta::CPAN

        my ($resp, $err) = @_;
        return if $err;
        if ($resp->{events} && @{$resp->{events}}) {
            push @events, @{$resp->{events}};
            EV::break if @events >= 3;
        }
    });

    ok(defined $watch, 'watch with default auto_reconnect created');

    # Send multiple events to verify streaming works
    for my $i (1..3) {
        $client->put($key, "value-$i", sub {});
    }
    my $t3 = EV::timer(5, 0, sub { EV::break });
    EV::run;

    cmp_ok(scalar @events, '>=', 1, "default watch received events");

    # Verify events have expected structure
    my $ev = $events[0];



( run in 1.086 second using v1.01-cache-2.11-cpan-39bf76dae61 )