view release on metacpan or search on metacpan
- Add missing use EV () before XSLoader::load
0.03 2026-03-02
- Fix error sources, election_observe arg order, watch early-cancel
- Clear auth token on auth_disable
- Deduplicate response handlers, remove dead code
0.02 2026-02-10
- Initial release
- KV operations: get, put, delete, range, txn (compare-and-swap)
- Watch with bidirectional streaming and auto-reconnect
- Lease: grant, revoke, keepalive, time-to-live, leases
- Lock and unlock tied to leases
- Election: campaign, proclaim, leader, resign, observe
- Cluster: member list/add/remove/update/promote
- Maintenance: status, compact, defragment, alarm, hash_kv, move_leader
- Auth: user/role management, authenticate, enable/disable
- Health monitoring with configurable interval and callback
- Automatic retries for transient gRPC failures
- Multiple endpoint support with failover
- Structured error callbacks ({code, message, source})
/* Serialize request */
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcdserverpb__watch_request__get_packed_size,
etcdserverpb__watch_request__pack, &req);
if (range_end_copy) Safefree(range_end_copy);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Create streaming call */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); /* No timeout for watch */
wc->call = grpc_channel_create_call(
client->channel,
NULL, /* parent call */
GRPC_PROPAGATE_DEFAULTS,
client->cq,
METHOD_WATCH,
NULL, /* host */
deadline,
req.id = lease_id;
/* Serialize request */
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcdserverpb__lease_keep_alive_request__get_packed_size,
etcdserverpb__lease_keep_alive_request__pack, &req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Create streaming call */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
kc->call = grpc_channel_create_call(
client->channel,
NULL,
GRPC_PROPAGATE_DEFAULTS,
client->cq,
METHOD_LEASE_KEEPALIVE,
NULL,
deadline,
req.name.data = (uint8_t *)name_str;
req.name.len = name_len;
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
v3electionpb__leader_request__get_packed_size,
v3electionpb__leader_request__pack, &req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Use infinite deadline for streaming call */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
oc->call = grpc_channel_create_call(
client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
client->cq, METHOD_ELECTION_OBSERVE, NULL, deadline, NULL
);
if (!oc->call) {
grpc_byte_buffer_destroy(send_buffer);
grpc_metadata_array_destroy(&oc->initial_metadata);
t/election.t
t/error_structure.t
t/kv.t
t/kv_advanced.t
t/lease.t
t/lock.t
t/maintenance.t
t/move_leader.t
t/parameters.t
t/retry_config.t
t/streaming.t
t/txn.t
t/txn_range.t
t/watch_prev_kv.t
t/watch_reconnect.t
t/watch_resume.t
typemap
META.yml Module YAML meta-data (added by MakeMaker)
META.json Module JSON meta-data (added by MakeMaker)
say "$event->{type} on $event->{kv}{key}";
}
});
EV::run;
```
## Features
- **KV**: get, put, delete, range, transactions (compare-and-swap)
- **Watch**: bidirectional streaming with auto-reconnect
- **Lease**: grant, revoke, keepalive, time-to-live
- **Lock**: distributed locking tied to leases
- **Election**: leader campaign, observe, proclaim, resign
- **Cluster**: member list/add/remove/update/promote
- **Maintenance**: status, compact, defragment, alarm, hash_kv, move_leader
- **Auth**: user/role management, authenticate, enable/disable
- **Health monitoring** with configurable interval and callback
- **Automatic retries** for transient gRPC failures
## Architecture
etcd_common.h view on Meta::CPAN
size_t key_len;
char *range_end;
size_t range_end_len;
int64_t start_revision;
int prev_kv;
int progress_notify;
int64_t watch_id;
int has_watch_id;
} watch_params_t;
/* Watch structure (for streaming watch) */
typedef struct watch_call {
call_base_t base; /* Must be first */
grpc_call *call;
SV *callback;
grpc_metadata_array initial_metadata;
grpc_metadata_array trailing_metadata;
grpc_byte_buffer *recv_buffer;
grpc_slice status_details;
int64_t watch_id;
int active;
struct ev_etcd_struct *client;
struct watch_call *next;
int auto_reconnect;
int64_t last_revision;
watch_params_t params;
int reconnect_attempt;
ev_timer reconnect_timer; /* Backoff timer for reconnection */
} watch_call_t;
/* Keepalive structure (for streaming lease keepalive) */
typedef struct keepalive_call {
call_base_t base; /* Must be first */
grpc_call *call;
SV *callback;
grpc_metadata_array initial_metadata;
grpc_metadata_array trailing_metadata;
grpc_byte_buffer *recv_buffer;
grpc_slice status_details;
int64_t lease_id;
int active;
etcd_common.h view on Meta::CPAN
int reconnect_attempt;
ev_timer reconnect_timer; /* Backoff timer for reconnection */
} keepalive_call_t;
/* Election observe parameters for reconnection */
typedef struct observe_params {
char *name;
size_t name_len;
} observe_params_t;
/* Election observe structure (for streaming election observe) */
typedef struct observe_call {
call_base_t base; /* Must be first */
grpc_call *call;
SV *callback;
grpc_metadata_array initial_metadata;
grpc_metadata_array trailing_metadata;
grpc_byte_buffer *recv_buffer;
grpc_slice status_details;
int active;
struct ev_etcd_struct *client;
etcd_common.h view on Meta::CPAN
grpc_metadata_array_destroy(&(pc)->initial_metadata); \
grpc_metadata_array_destroy(&(pc)->trailing_metadata); \
if ((pc)->recv_buffer) grpc_byte_buffer_destroy((pc)->recv_buffer); \
grpc_slice_unref((pc)->status_details); \
if ((pc)->call) grpc_call_unref((pc)->call); \
SvREFCNT_dec((pc)->callback); \
Safefree((pc)); \
} while (0)
/*
* Helper macros for streaming call reconnection to reduce code triplication
* across watch, keepalive, and observe reconnect functions.
*/
/*
* Cleanup old streaming call state before reconnection.
* Works with any streaming call struct that has these fields.
*
* Usage:
* STREAMING_CALL_CLEANUP(wc); // For watch_call_t
* STREAMING_CALL_CLEANUP(kc); // For keepalive_call_t
* STREAMING_CALL_CLEANUP(oc); // For observe_call_t
*/
#define STREAMING_CALL_CLEANUP(call_ptr) \
do { \
if ((call_ptr)->call) { \
grpc_call_unref((call_ptr)->call); \
etcd_common.h view on Meta::CPAN
grpc_metadata_array_destroy(&(call_ptr)->initial_metadata); \
grpc_metadata_array_destroy(&(call_ptr)->trailing_metadata); \
if ((call_ptr)->recv_buffer) { \
grpc_byte_buffer_destroy((call_ptr)->recv_buffer); \
(call_ptr)->recv_buffer = NULL; \
} \
grpc_slice_unref((call_ptr)->status_details); \
} while (0)
/*
* Reinitialize streaming call state for reconnection.
*
* Usage:
* STREAMING_CALL_REINIT(wc);
*/
#define STREAMING_CALL_REINIT(call_ptr) \
do { \
grpc_metadata_array_init(&(call_ptr)->initial_metadata); \
grpc_metadata_array_init(&(call_ptr)->trailing_metadata); \
(call_ptr)->status_details = grpc_empty_slice(); \
(call_ptr)->active = 1; \
} while (0)
/*
* Setup standard 4-op batch for streaming call reconnection.
* Requires: ops[4], auth_md, send_buffer, call_ptr all in scope.
*
* Usage:
* STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, wc);
*/
#define STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buf, call_ptr) \
do { \
(ops)[0].op = GRPC_OP_SEND_INITIAL_METADATA; \
setup_auth_metadata(client, &(ops)[0], &(auth_md)); \
(ops)[1].op = GRPC_OP_RECV_INITIAL_METADATA; \
(ops)[1].data.recv_initial_metadata.recv_initial_metadata = &(call_ptr)->initial_metadata; \
(ops)[2].op = GRPC_OP_SEND_MESSAGE; \
(ops)[2].data.send_message.send_message = (send_buf); \
(ops)[3].op = GRPC_OP_RECV_MESSAGE; \
(ops)[3].data.recv_message.recv_message = &(call_ptr)->recv_buffer; \
} while (0)
/*
* Handle error after failed batch start for streaming reconnect.
*
* Usage:
* STREAMING_CALL_BATCH_ERROR(wc);
*/
#define STREAMING_CALL_BATCH_ERROR(call_ptr) \
do { \
(call_ptr)->active = 0; \
if ((call_ptr)->call) { \
grpc_call_unref((call_ptr)->call); \
(call_ptr)->call = NULL; \
etcd_election.c view on Meta::CPAN
(void)revents;
observe_call_t *oc = (observe_call_t *)((char *)w - offsetof(observe_call_t, reconnect_timer));
ev_etcd_t *client = oc->client;
if (!client->active) {
cleanup_observe(aTHX_ oc);
return;
}
/* Cleanup and reinitialize streaming state */
STREAMING_CALL_CLEANUP(oc);
STREAMING_CALL_REINIT(oc);
/* Create LeaderRequest for observe */
V3electionpb__LeaderRequest req = V3ELECTIONPB__LEADER_REQUEST__INIT;
req.name.data = (uint8_t *)oc->params.name;
req.name.len = oc->params.name_len;
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcd_election.h view on Meta::CPAN
#include "etcd_common.h"
#include "election.pb-c.h"
/* Election response handlers */
void process_campaign_response(pTHX_ pending_call_t *pc);
void process_proclaim_response(pTHX_ pending_call_t *pc);
void process_leader_response(pTHX_ pending_call_t *pc);
void process_resign_response(pTHX_ pending_call_t *pc);
/* Election observe (streaming) handlers */
void process_observe_response(pTHX_ observe_call_t *oc);
void observe_rearm_recv(pTHX_ observe_call_t *oc);
void cleanup_observe(pTHX_ observe_call_t *oc);
int try_reconnect_observe(pTHX_ observe_call_t *oc);
/* Helper to convert LeaderKey to hash */
HV *leader_key_to_hv(pTHX_ V3electionpb__LeaderKey *lk);
#endif /* ETCD_ELECTION_H */
etcd_lease.c view on Meta::CPAN
(void)revents;
keepalive_call_t *kc = (keepalive_call_t *)((char *)w - offsetof(keepalive_call_t, reconnect_timer));
ev_etcd_t *client = kc->client;
if (!client->active) {
cleanup_keepalive(aTHX_ kc);
return;
}
/* Cleanup and reinitialize streaming state */
STREAMING_CALL_CLEANUP(kc);
STREAMING_CALL_REINIT(kc);
/* Build keepalive request */
Etcdserverpb__LeaseKeepAliveRequest keep_req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
keep_req.id = kc->lease_id;
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcdserverpb__lease_keep_alive_request__get_packed_size,
etcd_watch.c view on Meta::CPAN
(void)revents;
watch_call_t *wc = (watch_call_t *)((char *)w - offsetof(watch_call_t, reconnect_timer));
ev_etcd_t *client = wc->client;
if (!client->active) {
cleanup_watch(aTHX_ wc);
return;
}
/* Cleanup and reinitialize streaming state */
STREAMING_CALL_CLEANUP(wc);
STREAMING_CALL_REINIT(wc);
/* Build watch create request */
Etcdserverpb__WatchCreateRequest create_req = ETCDSERVERPB__WATCH_CREATE_REQUEST__INIT;
create_req.key.data = (uint8_t *)wc->params.key;
create_req.key.len = wc->params.key_len;
if (wc->params.range_end && wc->params.range_end_len > 0) {
create_req.range_end.data = (uint8_t *)wc->params.range_end;
lib/EV/Etcd.pm view on Meta::CPAN
=item endpoints
ArrayRef of etcd endpoints (host:port).
=item timeout
RPC timeout in seconds. Default is 30 seconds. Minimum value is 1 second.
=item max_retries
Maximum number of reconnection attempts for streaming operations (watch,
lease_keepalive, election_observe) after a connection failure. Default is 3.
Set to 0 to disable automatic reconnection.
=item health_interval
Interval in seconds for health monitoring. Default is 0 (disabled).
When enabled, the client periodically checks the gRPC channel connectivity
state and calls the on_health_change callback when the connection state changes.
=item on_health_change
lib/EV/Etcd.pm view on Meta::CPAN
$client->lease_revoke($lease_id, $callback);
Revoke a lease. All keys attached to the lease will be deleted.
=head2 lease_keepalive
my $keepalive = $client->lease_keepalive($lease_id, $callback);
my $keepalive = $client->lease_keepalive($lease_id, \%opts, $callback);
Keep a lease alive. Creates a bidirectional streaming connection that keeps
the lease refreshed. Returns an C<EV::Etcd::Keepalive> object that can be
used to cancel the keepalive stream:
$keepalive->cancel(sub { my ($resp, $err) = @_; });
Options:
auto_reconnect => 1 # auto-reconnect on failure (default: 1)
=head2 EV::Etcd::Keepalive Methods
lib/EV/Etcd.pm view on Meta::CPAN
$client->election_resign($leader_key, sub {
my ($resp, $err) = @_;
say "Resigned from leadership" unless $err;
});
=head2 election_observe
my $observe = $client->election_observe($name, $callback);
my $observe = $client->election_observe($name, \%opts, $callback);
Observe leader changes for an election. This creates a streaming connection
that receives notifications whenever the leader changes. Returns an
C<EV::Etcd::Observe> object that can be used to cancel the observe stream:
$observe->cancel(sub { my ($resp, $err) = @_; });
Arguments:
=over 4
=item name
t/election.t view on Meta::CPAN
if ($err) {
diag("Expected error after resign: $err->{status} - $err->{message}");
}
EV::break;
});
my $t7 = EV::timer(5, 0, sub { fail('timeout'); EV::break });
EV::run;
}
}
# Test 17-22: election_observe streaming test
{
my $observe_election = "observe-test-$$-" . time();
my $observe_lease_id;
my @observed_events;
my $observe_leader_key;
# Grant a lease for this test
$client->lease_grant(30, sub {
my ($resp, $err) = @_;
$observe_lease_id = $resp->{id} if !$err;
t/streaming.t view on Meta::CPAN
};
plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;
plan tests => 11;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
my $test_prefix = "/test-streaming-$$-" . time();
# === lease_keepalive streaming tests ===
# Test 1-4: lease_keepalive receives response
my $lease_id;
my $keepalive_count = 0;
$client->lease_grant(10, sub {
my ($resp, $err) = @_;
ok(!$err, 'lease_grant succeeded');
$lease_id = $resp->{id};
diag("Granted lease: id=$lease_id, ttl=$resp->{ttl}");
t/streaming.t view on Meta::CPAN
ok($keepalive_count >= 1, "received at least 1 keepalive response (got $keepalive_count)");
# Cleanup lease (this will end the keepalive)
$client->lease_revoke($lease_id, sub {
my ($resp, $err) = @_;
diag($err ? "Revoke failed" : "Lease revoked");
});
pass('cleanup initiated');
}
# === Watch streaming tests ===
# Test 5-9: watch receives multiple events
my $watch_key = "$test_prefix/watch-stream-test";
my $watch_count = 0;
my $watch_target = 3;
my $watch_handle;
$watch_handle = $client->watch($watch_key, sub {
my ($resp, $err) = @_;
if ($err) {
t/streaming.t view on Meta::CPAN
});
}
my $watch_timer = EV::timer 10, 0, sub {
diag("Watch timer expired, received $watch_count events");
EV::break;
};
EV::run;
ok($watch_count >= 1, "watch received at least 1 event (got $watch_count)");
cmp_ok($watch_count, '>=', $watch_target, "watch received all $watch_target events (streaming works)");
# Test watch cancel - uses callback-based cancel API
my $cancel_done = 0;
$watch_handle->cancel(sub {
my ($resp, $err) = @_;
$cancel_done = 1;
diag("Watch cancel callback: " . ($err ? "error: $err" : "success"));
EV::break;
});
my $cancel_timer = EV::timer 5, 0, sub {
t/watch_reconnect.t view on Meta::CPAN
my ($resp, $err) = @_;
return if $err;
if ($resp->{events} && @{$resp->{events}}) {
push @events, @{$resp->{events}};
EV::break if @events >= 3;
}
});
ok(defined $watch, 'watch with default auto_reconnect created');
# Send multiple events to verify streaming works
for my $i (1..3) {
$client->put($key, "value-$i", sub {});
}
my $t3 = EV::timer(5, 0, sub { EV::break });
EV::run;
cmp_ok(scalar @events, '>=', 1, "default watch received events");
# Verify events have expected structure
my $ev = $events[0];