view release on metacpan or search on metacpan
0.07 2026-04-29
- Fix use-after-free on cancelled streaming handles past client cleanup
- Fix memory leak in txn op size validation
- Fix int64/uint64 truncation on 32-bit Perl
- Document response shapes; add ENCODING section; polish POD
- CI: add Linux ARM runners
- Expand t/ coverage; add xt/ author tests and four eg/ patterns
0.06 2026-03-17
- Add opts to lease_keepalive (auto_reconnect) and member_list (linearizable)
- Preserve watch_id across reconnects
- Remove GRPC_STATUS_INTERNAL from retryable set
- Add Data::Path::XS watch examples
0.05 2026-03-14
- Fix edge cases
0.04 2026-03-02
- Add missing use EV () before XSLoader::load
0.03 2026-03-02
- Fix error sources, election_observe arg order, watch early-cancel
- Clear auth token on auth_disable
- Deduplicate response handlers, remove dead code
0.02 2026-02-10
- Initial release
- KV operations: get, put, delete, range, txn (compare-and-swap)
- Watch with bidirectional streaming and auto-reconnect
- Lease: grant, revoke, keepalive, time-to-live, leases
- Lock and unlock tied to leases
- Election: campaign, proclaim, leader, resign, observe
- Cluster: member list/add/remove/update/promote
- Maintenance: status, compact, defragment, alarm, hash_kv, move_leader
- Auth: user/role management, authenticate, enable/disable
- Health monitoring with configurable interval and callback
- Automatic retries for transient gRPC failures
- Multiple endpoint support with failover
- Structured error callbacks ({code, message, source})
grpc_metadata_array_destroy(&pc->trailing_metadata);
if (pc->recv_buffer) grpc_byte_buffer_destroy(pc->recv_buffer);
grpc_slice_unref(pc->status_details);
if (pc->call) grpc_call_unref(pc->call);
SvREFCNT_dec(pc->callback);
Safefree(pc);
}
while (client->watches) {
cleanup_watch(aTHX_ client->watches);
}
while (client->keepalives) {
cleanup_keepalive(aTHX_ client->keepalives);
}
while (client->observes) {
cleanup_observe(aTHX_ client->observes);
}
/* Free client-level resources (mirrors free_perl_resources in DESTROY) */
if (client->health_callback)
SvREFCNT_dec(client->health_callback);
if (client->auth_token) {
memset(client->auth_token, 0, client->auth_token_len);
cleanup_watch(aTHX_ wc);
}
} else {
if (wc->active) {
CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "Watch setup failed", "watch");
if (!client->active) return;
}
cleanup_watch(aTHX_ wc);
}
} else if (base->type == CALL_TYPE_LEASE_KEEPALIVE_RECV) {
/* Keepalive receive completion */
keepalive_call_t *kc = (keepalive_call_t *)base;
if (success && kc->active) {
process_keepalive_response(aTHX_ kc);
if (!client->active) return;
/* Re-arm receive if still active */
if (kc->active) {
keepalive_rearm_recv(aTHX_ kc);
} else {
/* Response handler set active=0 (e.g., lease expired) */
cleanup_keepalive(aTHX_ kc);
}
} else if (!success && kc->active) {
/* Stream ended or error - try to reconnect */
kc->active = 0;
/* Try automatic reconnection */
if (try_reconnect_keepalive(aTHX_ kc)) {
/* Reconnection initiated, don't notify callback yet */
} else {
/* Reconnection failed or disabled, notify callback and cleanup */
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_UNAVAILABLE, "Keepalive stream ended", "keepalive");
if (!client->active) return;
cleanup_keepalive(aTHX_ kc);
}
} else {
/* RECV completed but keepalive already inactive */
cleanup_keepalive(aTHX_ kc);
}
} else if (base->type == CALL_TYPE_LEASE_KEEPALIVE) {
/* Initial keepalive setup complete - process first message if any */
keepalive_call_t *kc = (keepalive_call_t *)base;
if (success) {
/* Process the first message that was received in the initial batch */
if (kc->recv_buffer && kc->active) {
process_keepalive_response(aTHX_ kc);
if (!client->active) return;
}
/* Re-arm to receive more messages */
if (kc->active) {
keepalive_rearm_recv(aTHX_ kc);
} else {
/* First response set active=0 (e.g., lease already expired) */
cleanup_keepalive(aTHX_ kc);
}
} else {
if (kc->active) {
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive setup failed", "keepalive");
if (!client->active) return;
}
cleanup_keepalive(aTHX_ kc);
}
} else if (base->type == CALL_TYPE_ELECTION_OBSERVE_RECV) {
/* Election observe receive completion */
observe_call_t *oc = (observe_call_t *)base;
if (success && oc->active) {
process_observe_response(aTHX_ oc);
if (!client->active) return;
/* Re-arm receive if still active */
if (oc->active) {
if (err != GRPC_CALL_OK) {
CLEANUP_PENDING_CALL_ON_ERROR(pc);
croak("Failed to start gRPC call: %d", err);
}
pc->next = client->pending_calls;
client->pending_calls = pc;
}
EV::Etcd::Keepalive
ev_etcd_lease_keepalive(client, lease_id, ...)
EV::Etcd client
int64_t lease_id
CODE:
{
/* Parse arguments: lease_keepalive(lease_id, [opts,] callback) */
SV *opts = NULL;
SV *callback;
if (items == 3) {
callback = ST(2);
} else if (items == 4) {
opts = ST(2);
callback = ST(3);
} else {
croak("Usage: $client->lease_keepalive($lease_id, [\\%%opts,] $callback)");
}
VALIDATE_CALLBACK(callback);
/* Create keepalive structure (Newxz zeroes everything; only set non-zero fields) */
keepalive_call_t *kc;
Newxz(kc, 1, keepalive_call_t);
init_call_base(&kc->base, CALL_TYPE_LEASE_KEEPALIVE);
kc->callback = newSVsv(callback);
kc->client = client;
kc->active = 1;
kc->auto_reconnect = 1; /* Enable by default */
kc->lease_id = lease_id;
kc->client_owns = 1;
kc->perl_owns = 1;
if (opts && SvROK(opts) && SvTYPE(SvRV(opts)) == SVt_PVHV) {
grpc_metadata_array_init(&kc->trailing_metadata);
kc->status_details = grpc_empty_slice();
/* Build LeaseKeepAliveRequest */
Etcdserverpb__LeaseKeepAliveRequest req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
req.id = lease_id;
/* Serialize request */
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcdserverpb__lease_keep_alive_request__get_packed_size,
etcdserverpb__lease_keep_alive_request__pack, &req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Create streaming call */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
kc->call = grpc_channel_create_call(
client->channel,
NULL,
GRPC_PROPAGATE_DEFAULTS,
NULL
);
if (!kc->call) {
grpc_byte_buffer_destroy(send_buffer);
grpc_metadata_array_destroy(&kc->initial_metadata);
grpc_metadata_array_destroy(&kc->trailing_metadata);
grpc_slice_unref(kc->status_details);
SvREFCNT_dec(kc->callback);
Safefree(kc);
croak("Failed to create gRPC call for lease_keepalive");
}
/* Start the call with initial operations */
grpc_op ops[4] = {0};
grpc_metadata auth_md;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
setup_auth_metadata(client, &ops[0], &auth_md);
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
if (err != GRPC_CALL_OK) {
grpc_metadata_array_destroy(&kc->initial_metadata);
grpc_metadata_array_destroy(&kc->trailing_metadata);
grpc_slice_unref(kc->status_details);
grpc_call_unref(kc->call);
SvREFCNT_dec(kc->callback);
Safefree(kc);
croak("Failed to start gRPC call: %d", err);
}
kc->next = client->keepalives;
client->keepalives = kc;
RETVAL = kc;
}
OUTPUT:
RETVAL
void
ev_etcd_txn(client, compare_av, success_av, failure_av, callback)
EV::Etcd client
SV *compare_av
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata.recv_initial_metadata = &oc->initial_metadata;
ops[2].op = GRPC_OP_SEND_MESSAGE;
ops[2].data.send_message.send_message = send_buffer;
/* Observe is server-streaming: the client sends one LeaderRequest and the
* server streams LeaderResponses. Half-close the client side so etcd's
* handler receives the completed request and begins streaming; without it
* the stream is established but never delivers an event. (Watch/keepalive
* are bidi and deliberately stay open, so they must NOT half-close.) */
ops[3].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[4].op = GRPC_OP_RECV_MESSAGE;
ops[4].data.recv_message.recv_message = &oc->recv_buffer;
grpc_call_error err = grpc_call_start_batch(oc->call, ops, 5, &oc->base, NULL);
cleanup_auth_metadata(client, &auth_md);
grpc_byte_buffer_destroy(send_buffer);
EV::Etcd client
CODE:
{
/* Fork safety: in a child process, the gRPC thread and completion queue
* are in undefined state. Skip gRPC cleanup, just free Perl-side resources. */
if (client->owner_pid != getpid()) {
warn("EV::Etcd: client destroyed in forked child (pid %d, created in %d)"
" -- skipping gRPC cleanup", (int)getpid(), (int)client->owner_pid);
/* Free SV callbacks and Safefree'd params only; don't touch gRPC objects.
* Honor dual-ownership: if Perl handle is still alive we leave the struct
* for *_DESTROY to free. */
pending_call_t *pc = client->pending_calls;
while (pc) {
pending_call_t *next = pc->next;
SvREFCNT_dec(pc->callback);
Safefree(pc);
pc = next;
}
watch_call_t *wc = client->watches;
while (wc) {
SvREFCNT_dec(wc->callback);
wc->callback = NULL;
wc->client_owns = 0;
if (!wc->perl_owns) {
if (wc->params.key) Safefree(wc->params.key);
if (wc->params.range_end) Safefree(wc->params.range_end);
Safefree(wc);
}
wc = next;
}
keepalive_call_t *kc = client->keepalives;
while (kc) {
keepalive_call_t *next = kc->next;
if (ev_is_active(&kc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
SvREFCNT_dec(kc->callback);
kc->callback = NULL;
kc->client_owns = 0;
if (!kc->perl_owns) Safefree(kc);
kc = next;
}
observe_call_t *oc = client->observes;
while (oc) {
while (wc) {
wc->active = 0;
if (ev_is_active(&wc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
if (wc->call) {
grpc_call_cancel(wc->call, NULL);
}
wc = wc->next;
}
keepalive_call_t *kc = client->keepalives;
while (kc) {
kc->active = 0;
if (ev_is_active(&kc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
if (kc->call) {
grpc_call_cancel(kc->call, NULL);
}
kc = kc->next;
}
}
SvREFCNT_dec(pc->callback);
Safefree(pc);
pc = next;
}
/* Streaming-call cleanup honors dual-ownership: cleanup_* unlinks and
* frees gRPC state, then frees the struct only if the Perl handle has
* already been released. Otherwise the struct lives until *_DESTROY. */
while (client->watches) cleanup_watch(aTHX_ client->watches);
while (client->keepalives) cleanup_keepalive(aTHX_ client->keepalives);
while (client->observes) cleanup_observe(aTHX_ client->observes);
}
/* Stop health timer */
ev_timer_stop(EV_DEFAULT, &client->health_timer);
if (client->channel) {
grpc_channel_destroy(client->channel);
}
}
void
ev_etcd_watch_DESTROY(watch)
EV::Etcd::Watch watch
CODE:
{
watch_call_perl_release(aTHX_ watch);
}
MODULE = EV::Etcd PACKAGE = EV::Etcd::Keepalive PREFIX = ev_etcd_keepalive_
void
ev_etcd_keepalive_cancel(keepalive, callback)
EV::Etcd::Keepalive keepalive
SV *callback
CODE:
{
VALIDATE_CALLBACK(callback);
keepalive_call_t *kc = keepalive;
if (!kc->client_owns) {
CALL_SUCCESS_CALLBACK(callback, newHV());
return;
}
if (ev_is_active(&kc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
if (!kc->active) {
kc->active = 0;
if (kc->call)
grpc_call_cancel(kc->call, NULL);
CALL_SUCCESS_CALLBACK(callback, newHV());
}
void
ev_etcd_keepalive_DESTROY(keepalive)
EV::Etcd::Keepalive keepalive
CODE:
{
keepalive_call_perl_release(aTHX_ keepalive);
}
MODULE = EV::Etcd PACKAGE = EV::Etcd::Observe PREFIX = ev_etcd_observe_
void
ev_etcd_observe_cancel(observe, callback)
EV::Etcd::Observe observe
SV *callback
CODE:
{
}
});
EV::run;
```
## Features
- **KV**: get, put, delete, range, transactions (compare-and-swap)
- **Watch**: bidirectional streaming with auto-reconnect
- **Lease**: grant, revoke, keepalive, time-to-live
- **Lock**: distributed locking tied to leases
- **Election**: leader campaign, observe, proclaim, resign
- **Cluster**: member list/add/remove/update/promote
- **Maintenance**: status, compact, defragment, alarm, hash_kv, move_leader
- **Auth**: user/role management, authenticate, enable/disable
- **Health monitoring** with configurable interval and callback
- **Automatic retries** for transient gRPC failures
## Architecture
eg/distributed_mutex.pl view on Meta::CPAN
use lib 'blib/lib', 'blib/arch';
use EV;
use EV::Etcd;
my $name = $ARGV[0] // '/jobs/example';
my $work_msg = $ARGV[1] // 'critical section';
my $work_time = $ARGV[2] // 5; # seconds the lock is held
my $lock_ttl = 10; # seconds; must exceed work_time + slack
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], max_retries => 5);
my ($lease_id, $lock_key, $keepalive, $work_timer, $lease_died);
# 1. Lease for the lock â if we crash, the lock auto-releases after lock_ttl
$client->lease_grant($lock_ttl, sub {
my ($r, $err) = @_;
die "lease_grant: $err->{message}\n" if $err;
$lease_id = $r->{id};
say "[$$] lease=$lease_id ttl=${lock_ttl}s";
# 2. Refresh the lease while we work
$keepalive = $client->lease_keepalive($lease_id, sub {
my (undef, $kerr) = @_;
return unless $kerr;
# Lease died â our lock is gone. Bail out before we corrupt anything.
$lease_died = 1;
warn "[$$] keepalive lost ($kerr->{message}) â aborting work\n";
graceful_exit(2);
});
# 3. Acquire the lock. The etcd Lock RPC blocks server-side until
# granted. There's no client-side cancel for unary calls, so the
# timer below abandons the response (the late callback is a no-op
# under the $acquired guard). The lease will still expire on its
# own ttl if we exit, releasing any partially-granted lock.
my $acquired;
my $start = time;
eg/distributed_mutex.pl view on Meta::CPAN
return if $acquired++; # timer already fired
if ($lerr) {
warn "[$$] lock failed: $lerr->{message}\n";
return graceful_exit(3);
}
$lock_key = $lr->{key};
my $waited = time - $start;
say "[$$] acquired (waited ${waited}s) â running: $work_msg";
# 4. Do the work. Real code would do its critical section inside
# an ev_timer or similar so it remains async with the keepalive.
$work_timer = EV::timer($work_time, 0, sub {
return if $lease_died;
say "[$$] work complete â releasing lock";
graceful_exit(0);
});
});
# Optional caller-side timeout: cancel after N seconds if we never get the
# lock. Without this the process can wait forever behind contention.
my $acquire_timeout = 30;
eg/leader_cron.pl view on Meta::CPAN
#!/usr/bin/env perl
#
# leader_cron.pl - Run a periodic job on exactly one elected leader.
#
# Pattern: campaign for an election lease, run a tick every N seconds while
# this process is leader, resign cleanly on Ctrl-C. Touches three subsystems
# at once (election + lease + keepalive) and is the canonical "exactly one
# worker across the fleet" pattern.
#
# Run two instances of this script side-by-side: only one ticks at a time.
# Kill the leader, the other becomes leader within ~lease_ttl seconds.
#
use v5.10;
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use EV;
eg/leader_cron.pl view on Meta::CPAN
my $election = "/cron/nightly-aggregator";
my $lease_ttl = 10; # seconds; shorter = faster failover, more chatter
my $tick_period = 2; # how often the leader runs the job
my $value = "$0/$$"; # what other observers see for this leader
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
max_retries => 5,
);
my ($lease_id, $leader, $keepalive, $tick_timer);
sub fail { my $m = shift; warn "[$$] $m\n"; cleanup_and_exit(1) }
# 1. Grant an election lease
$client->lease_grant($lease_ttl, sub {
my ($r, $err) = @_;
fail("lease_grant: $err->{message}") if $err;
$lease_id = $r->{id};
say "[$$] lease granted: $lease_id (ttl=$r->{ttl}s)";
# 2. Keep the lease alive while we run
$keepalive = $client->lease_keepalive($lease_id, sub {
my (undef, $kerr) = @_;
return unless $kerr;
# NOT_FOUND = the lease expired; we lost leadership unexpectedly
fail("keepalive lost: $kerr->{message}");
});
# 3. Campaign â this blocks until we're elected
say "[$$] campaigning for $election ...";
$client->election_campaign($election, $lease_id, $value, sub {
my ($cresp, $cerr) = @_;
fail("campaign: $cerr->{message}") if $cerr;
$leader = $cresp->{leader};
say "[$$] elected as leader (key=$leader->{key} rev=$leader->{rev})";
eg/lease_test.pl view on Meta::CPAN
use EV::Etcd;
use Data::Dumper;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
print "=== Lease Test ===\n\n";
my $lease_id;
my $keepalive_count = 0;
# Step 1: Grant a lease with 10 second TTL
print "1. Granting lease with TTL=10s...\n";
$client->lease_grant(10, sub {
my ($resp, $err) = @_;
if ($err) {
print "Lease grant error: $err->{message}\n";
EV::break;
return;
eg/service_registry.pl view on Meta::CPAN
#!/usr/bin/env perl
#
# service_registry.pl - Register self under /services/<type>/<id>, heartbeat
# via lease keepalive, watch the prefix to discover peers, deregister cleanly
# on shutdown.
#
# Run several copies in parallel â each prints joins/leaves of the others in
# real time. Kill one harshly (kill -9): the others see it leave after
# lease_ttl seconds (etcd revokes the lease, the key disappears, watchers
# get a DELETE event).
#
use v5.10;
use strict;
use warnings;
eg/service_registry.pl view on Meta::CPAN
use EV::Etcd;
my $service_type = $ARGV[0] // 'web';
my $self_id = sprintf "%s-%d", ($ENV{HOSTNAME} || 'localhost'), $$;
my $self_value = "host=$self_id pid=$$ started=" . time();
my $prefix = "/services/$service_type/";
my $self_key = "$prefix$self_id";
my $lease_ttl = 15;
my $client = EV::Etcd->new(endpoints => ['127.0.0.1:2379'], max_retries => 5);
my ($lease_id, $keepalive, $watch);
# 1. Lease â keys self-evict if we crash
$client->lease_grant($lease_ttl, sub {
my ($r, $err) = @_;
die "lease_grant: $err->{message}\n" if $err;
$lease_id = $r->{id};
say "[$self_id] lease=$lease_id ttl=${lease_ttl}s";
# 2. Heartbeat
$keepalive = $client->lease_keepalive($lease_id, sub {
my (undef, $kerr) = @_;
warn "[$self_id] keepalive: $kerr->{message}\n" if $kerr;
});
# 3. Register self
$client->put($self_key, $self_value, { lease => $lease_id }, sub {
my (undef, $perr) = @_;
die "register: $perr->{message}\n" if $perr;
say "[$self_id] registered at $self_key";
});
});
eg/service_registry.pl view on Meta::CPAN
say "[$self_id] - $name (left)";
}
}
});
});
# Clean shutdown
my $shutdown = sub {
say "[$self_id] shutting down";
$watch && $watch->cancel(sub { });
$keepalive && $keepalive->cancel(sub { });
if ($lease_id) {
$client->lease_revoke($lease_id, sub { EV::break });
my $t = EV::timer(2, 0, sub { EV::break });
EV::run;
}
exit 0;
};
my $sigint = EV::signal('INT', $shutdown);
my $sigterm = EV::signal('TERM', $shutdown);
etcd_common.h view on Meta::CPAN
watch_params_t params;
int reconnect_attempt;
ev_timer reconnect_timer; /* Backoff timer for reconnection */
/* Dual ownership: gRPC state freed by client-side cleanup, struct freed by
* the last owner. Prevents use-after-free when Perl holds the handle past
* client cleanup. */
int client_owns;
int perl_owns;
} watch_call_t;
/* Keepalive structure (for streaming lease keepalive) */
typedef struct keepalive_call {
call_base_t base; /* Must be first */
grpc_call *call;
SV *callback;
grpc_metadata_array initial_metadata;
grpc_metadata_array trailing_metadata;
grpc_byte_buffer *recv_buffer;
grpc_slice status_details;
int64_t lease_id;
int active;
struct ev_etcd_struct *client;
struct keepalive_call *next;
int auto_reconnect;
int reconnect_attempt;
ev_timer reconnect_timer; /* Backoff timer for reconnection */
int client_owns; /* See watch_call_t â same dual-ownership */
int perl_owns;
} keepalive_call_t;
/* Election observe parameters for reconnection */
typedef struct observe_params {
char *name;
size_t name_len;
} observe_params_t;
/* Election observe structure (for streaming election observe) */
typedef struct observe_call {
call_base_t base; /* Must be first */
etcd_common.h view on Meta::CPAN
/* Hybrid threading: gRPC thread + ev_async for main thread notification */
pthread_t cq_thread; /* Thread running gRPC CQ loop */
pthread_mutex_t queue_mutex; /* Protects event_queue */
ev_async cq_async; /* Async watcher to wake main thread */
queued_event_t *event_queue; /* Queue of completed events */
queued_event_t *event_queue_tail; /* Tail for O(1) append */
volatile int thread_running; /* Flag to signal thread shutdown */
pending_call_t *pending_calls;
watch_call_t *watches;
keepalive_call_t *keepalives;
observe_call_t *observes;
int active;
int in_callback; /* Guard against freeing client during event processing */
char *auth_token;
size_t auth_token_len;
int timeout_seconds;
/* Multiple endpoints for failover */
char **endpoints;
int endpoint_count;
etcd_common.h view on Meta::CPAN
/* Health monitoring */
ev_timer health_timer;
int is_healthy;
SV *health_callback;
pid_t owner_pid; /* PID of process that created this client (fork safety) */
} ev_etcd_t;
typedef ev_etcd_t *EV__Etcd;
typedef watch_call_t *EV__Etcd__Watch;
typedef keepalive_call_t *EV__Etcd__Keepalive;
typedef observe_call_t *EV__Etcd__Observe;
/* Initialize a call's base structure */
static inline void init_call_base(call_base_t *base, call_type_t type) {
base->type = type;
}
/* Helper macro to validate callback is a code reference */
#define VALIDATE_CALLBACK(cb) \
do { \
etcd_common.h view on Meta::CPAN
grpc_metadata_array_destroy(&(pc)->trailing_metadata); \
if ((pc)->recv_buffer) grpc_byte_buffer_destroy((pc)->recv_buffer); \
grpc_slice_unref((pc)->status_details); \
if ((pc)->call) grpc_call_unref((pc)->call); \
SvREFCNT_dec((pc)->callback); \
Safefree((pc)); \
} while (0)
/*
* Helper macros for streaming call reconnection to reduce code triplication
* across watch, keepalive, and observe reconnect functions.
*/
/*
* Cleanup old streaming call state before reconnection.
* Works with any streaming call struct that has these fields.
*
* Usage:
* STREAMING_CALL_CLEANUP(wc); // For watch_call_t
* STREAMING_CALL_CLEANUP(kc); // For keepalive_call_t
* STREAMING_CALL_CLEANUP(oc); // For observe_call_t
*/
#define STREAMING_CALL_CLEANUP(call_ptr) \
do { \
if ((call_ptr)->call) { \
grpc_call_unref((call_ptr)->call); \
(call_ptr)->call = NULL; \
} \
grpc_metadata_array_destroy(&(call_ptr)->initial_metadata); \
grpc_metadata_array_destroy(&(call_ptr)->trailing_metadata); \
etcd_lease.c view on Meta::CPAN
hv_store(lease_hv, "id", 2, newSVi64(resp->leases[i]->id), 0);
av_push(leases_av, newRV_noinc((SV *)lease_hv));
}
hv_store(result, "leases", 6, newRV_noinc((SV *)leases_av), 0);
etcdserverpb__lease_leases_response__free_unpacked(resp, NULL);
CALL_SUCCESS_CALLBACK(pc->callback, result);
}
/* Re-arm keepalive to receive next message */
void keepalive_rearm_recv(pTHX_ keepalive_call_t *kc) {
if (!kc->active) return;
if (kc->recv_buffer) {
grpc_byte_buffer_destroy(kc->recv_buffer);
kc->recv_buffer = NULL;
}
kc->base.type = CALL_TYPE_LEASE_KEEPALIVE_RECV;
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message.recv_message = &kc->recv_buffer;
grpc_call_error err = grpc_call_start_batch(kc->call, &op, 1, &kc->base, NULL);
if (err != GRPC_CALL_OK) {
kc->active = 0;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive rearm failed", "keepalive");
cleanup_keepalive(aTHX_ kc);
}
}
static void keepalive_call_free(pTHX_ keepalive_call_t *kc) {
Safefree(kc);
}
/* See cleanup_watch â same dual-ownership pattern */
void cleanup_keepalive(pTHX_ keepalive_call_t *kc) {
if (!kc->client_owns) return;
ev_etcd_t *client = kc->client;
keepalive_call_t **kp = &client->keepalives;
while (*kp) {
if (*kp == kc) { *kp = kc->next; break; }
kp = &(*kp)->next;
}
if (ev_is_active(&kc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
grpc_metadata_array_destroy(&kc->initial_metadata);
grpc_metadata_array_destroy(&kc->trailing_metadata);
if (kc->recv_buffer) {
etcd_lease.c view on Meta::CPAN
grpc_slice_unref(kc->status_details);
if (kc->call) {
grpc_call_unref(kc->call);
kc->call = NULL;
}
SvREFCNT_dec(kc->callback);
kc->callback = NULL;
kc->active = 0;
kc->client_owns = 0;
if (!kc->perl_owns) keepalive_call_free(aTHX_ kc);
}
void keepalive_call_perl_release(pTHX_ keepalive_call_t *kc) {
kc->perl_owns = 0;
if (!kc->client_owns) keepalive_call_free(aTHX_ kc);
}
/* Process LeaseKeepAliveResponse */
void process_keepalive_response(pTHX_ keepalive_call_t *kc) {
if (!kc->recv_buffer) {
kc->active = 0;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "No keepalive response received", "keepalive");
return;
}
grpc_byte_buffer_reader reader;
if (!grpc_byte_buffer_reader_init(&reader, kc->recv_buffer)) {
kc->active = 0;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Failed to read keepalive response buffer", "keepalive");
return;
}
grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
grpc_byte_buffer_reader_destroy(&reader);
Etcdserverpb__LeaseKeepAliveResponse *resp = etcdserverpb__lease_keep_alive_response__unpack(
NULL, GRPC_SLICE_LENGTH(slice), GRPC_SLICE_START_PTR(slice));
grpc_slice_unref(slice);
if (!resp) {
kc->active = 0;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Failed to parse keepalive response", "keepalive");
return;
}
kc->reconnect_attempt = 0;
if (resp->ttl == 0) {
kc->active = 0;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_NOT_FOUND, "Lease expired", "keepalive");
etcdserverpb__lease_keep_alive_response__free_unpacked(resp, NULL);
return;
}
HV *result = newHV();
add_header_to_hv(aTHX_ result, resp->header);
hv_store(result, "id", 2, newSVi64(resp->id), 0);
hv_store(result, "ttl", 3, newSVi64(resp->ttl), 0);
etcdserverpb__lease_keep_alive_response__free_unpacked(resp, NULL);
CALL_SUCCESS_CALLBACK(kc->callback, result);
}
/* Perform keepalive reconnection (called from timer callback) */
static void keepalive_reconnect_cb(struct ev_loop *loop, ev_timer *w, int revents) {
dTHX;
(void)loop;
(void)revents;
keepalive_call_t *kc = (keepalive_call_t *)((char *)w - offsetof(keepalive_call_t, reconnect_timer));
ev_etcd_t *client = kc->client;
if (!client->active) {
cleanup_keepalive(aTHX_ kc);
return;
}
/* Cleanup and reinitialize streaming state */
STREAMING_CALL_CLEANUP(kc);
STREAMING_CALL_REINIT(kc);
/* Build keepalive request */
Etcdserverpb__LeaseKeepAliveRequest keep_req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
keep_req.id = kc->lease_id;
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcdserverpb__lease_keep_alive_request__get_packed_size,
etcdserverpb__lease_keep_alive_request__pack, &keep_req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Create call and setup ops */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
kc->call = grpc_channel_create_call(
client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
client->cq, METHOD_LEASE_KEEPALIVE, NULL, deadline, NULL);
if (!kc->call) {
grpc_byte_buffer_destroy(send_buffer);
kc->active = 0;
client->in_callback = 1;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive reconnect failed", "keepalive");
client->in_callback = 0;
if (!client->active) {
finish_client_destroy(aTHX_ client);
return;
}
cleanup_keepalive(aTHX_ kc);
return;
}
grpc_op ops[4] = {0};
grpc_metadata auth_md;
STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, kc);
init_call_base(&kc->base, CALL_TYPE_LEASE_KEEPALIVE);
grpc_call_error err = grpc_call_start_batch(kc->call, ops, 4, &kc->base, NULL);
cleanup_auth_metadata(client, &auth_md);
grpc_byte_buffer_destroy(send_buffer);
if (err != GRPC_CALL_OK) {
STREAMING_CALL_BATCH_ERROR(kc);
client->in_callback = 1;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive reconnect batch failed", "keepalive");
client->in_callback = 0;
if (!client->active) {
finish_client_destroy(aTHX_ client);
return;
}
cleanup_keepalive(aTHX_ kc);
}
}
int try_reconnect_keepalive(pTHX_ keepalive_call_t *kc) {
ev_etcd_t *client = kc->client;
if (!kc->auto_reconnect || !client->active || kc->lease_id <= 0) {
return 0;
}
if (kc->reconnect_attempt >= client->max_retries) {
return 0;
}
kc->reconnect_attempt++;
ev_tstamp delay = RECONNECT_BACKOFF_SECONDS(kc->reconnect_attempt);
ev_timer_init(&kc->reconnect_timer, keepalive_reconnect_cb, delay, 0.0);
ev_timer_start(EV_DEFAULT, &kc->reconnect_timer);
return 1;
}
etcd_lease.h view on Meta::CPAN
#define ETCD_LEASE_H
#include "etcd_common.h"
/* Lease response handlers */
void process_lease_grant_response(pTHX_ pending_call_t *pc);
void process_lease_revoke_response(pTHX_ pending_call_t *pc);
void process_lease_time_to_live_response(pTHX_ pending_call_t *pc);
void process_lease_leases_response(pTHX_ pending_call_t *pc);
/* Keepalive handlers */
void process_keepalive_response(pTHX_ keepalive_call_t *kc);
void keepalive_rearm_recv(pTHX_ keepalive_call_t *kc);
void cleanup_keepalive(pTHX_ keepalive_call_t *kc);
void keepalive_call_perl_release(pTHX_ keepalive_call_t *kc);
int try_reconnect_keepalive(pTHX_ keepalive_call_t *kc);
#endif /* ETCD_LEASE_H */
etcd_watch.c view on Meta::CPAN
}
/* Free struct memory and key buffers â final step once both owners released */
static void watch_call_free(pTHX_ watch_call_t *wc) {
if (wc->params.key) Safefree(wc->params.key);
if (wc->params.range_end) Safefree(wc->params.range_end);
Safefree(wc);
}
/* Client-side cleanup: free gRPC state, unlink from list, drop client ownership.
* If Perl side already released, free the struct. Otherwise leave it alive and
* inert for the Perl handle's DESTROY to free later â prevents UAF when the
* user holds the handle past cancellation. */
void cleanup_watch(pTHX_ watch_call_t *wc) {
if (!wc->client_owns) return;
ev_etcd_t *client = wc->client;
watch_call_t **wp = &client->watches;
while (*wp) {
if (*wp == wc) { *wp = wc->next; break; }
wp = &(*wp)->next;
lib/EV/Etcd.pm view on Meta::CPAN
C<['127.0.0.1:2379']>. When more than one is provided, the client uses the
first endpoint and rotates to subsequent endpoints on connection failure.
=item timeout
RPC timeout in seconds. Default is 30 seconds. Minimum value is 1 second.
=item max_retries
Maximum number of reconnection attempts for streaming operations (watch,
lease_keepalive, election_observe) after a connection failure. Default is 3.
Set to 0 to disable automatic reconnection.
=item health_interval
Interval in seconds for health monitoring. Default is 0 (disabled).
When enabled, the client periodically checks the gRPC channel connectivity
state and calls the on_health_change callback when the connection state changes.
=item on_health_change
lib/EV/Etcd.pm view on Meta::CPAN
{
code => 14, # gRPC status code (integer)
status => "UNAVAILABLE", # gRPC status name (string)
message => "Connection refused", # Error message
source => "get", # Which operation failed
retryable => 1, # Whether the error is retryable
}
The C<retryable> field indicates whether the error is transient (status codes:
UNAVAILABLE, RESOURCE_EXHAUSTED, ABORTED, DEADLINE_EXCEEDED).
Streaming operations (watch, keepalive, observe) automatically reconnect
on transient failures according to the C<max_retries> configuration.
Unary RPCs (get, put, delete, etc.) do not retry automatically; use the
C<retryable> field to implement application-level retry logic.
=head1 KEY-VALUE OPERATIONS
=head2 put
$client->put($key, $value, $callback);
$client->put($key, $value, \%opts, $callback);
lib/EV/Etcd.pm view on Meta::CPAN
=back
=head2 lease_revoke
$client->lease_revoke($lease_id, $callback);
Revoke a lease. All keys attached to the lease will be deleted. The response
contains C<header> only.
=head2 lease_keepalive
my $keepalive = $client->lease_keepalive($lease_id, $callback);
my $keepalive = $client->lease_keepalive($lease_id, \%opts, $callback);
Keep a lease alive. Creates a bidirectional streaming connection that keeps
the lease refreshed. Returns an C<EV::Etcd::Keepalive> object that can be
used to cancel the keepalive stream:
$keepalive->cancel(sub { my ($resp, $err) = @_; });
Options:
=over 4
=item auto_reconnect
If true, the keepalive stream will automatically reconnect after a connection
failure, with exponential backoff up to C<max_retries> (set on the client).
Default is 1 (enabled). Pass C<0> to disable.
=back
The keepalive callback receives C<($response, $error)> for each tick. On
success the response includes C<id>, C<ttl>, and C<header>. When the lease
has expired the server sends C<ttl=0>; the client maps that to an error
callback with C<< source => "keepalive" >> and status C<NOT_FOUND>.
=head2 EV::Etcd::Keepalive Methods
=head3 cancel
$keepalive->cancel($callback);
Cancel the keepalive stream. The callback receives C<($response, $error)>
when cancellation is complete. The response is an empty hash reference on
success.
Calling C<cancel> on an already-cancelled handle is safe: the callback fires
immediately with success. The handle remains valid as a Perl reference until
you drop it.
=head2 lease_time_to_live
$client->lease_time_to_live($lease_id, $callback);
lib/EV/Etcd.pm view on Meta::CPAN
The name (identifier) of the lock to acquire. This is a byte string that
identifies the resource being locked. Multiple clients attempting to lock
the same name will block until the lock is available.
=item lease_id
The ID of a lease to attach to the lock. The lock will be held for the
duration of the lease. If the lease expires or is revoked, the lock is
automatically released. You must first create a lease with C<lease_grant>
and optionally keep it alive with C<lease_keepalive>.
=item callback
Called with C<($response, $error)> when the lock is acquired (or fails).
=back
The response contains:
=over 4
lib/EV/Etcd.pm view on Meta::CPAN
Elections use leases to ensure that leadership is automatically released
when a leader fails.
=head2 election_campaign
$client->election_campaign($name, $lease_id, $value, $callback);
Campaign for leadership of an election.
This call blocks until the caller is elected as leader. Once elected, the
caller should periodically keep the lease alive to maintain leadership.
Arguments:
=over 4
=item name
The name of the election to campaign in.
=item lease_id
}
void etcdserverpb__lease_revoke_response__free_unpacked
(Etcdserverpb__LeaseRevokeResponse *message,
ProtobufCAllocator *allocator)
{
if(!message)
return;
assert(message->base.descriptor == &etcdserverpb__lease_revoke_response__descriptor);
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
void etcdserverpb__lease_keep_alive_request__init
(Etcdserverpb__LeaseKeepAliveRequest *message)
{
static const Etcdserverpb__LeaseKeepAliveRequest init_value = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
*message = init_value;
}
size_t etcdserverpb__lease_keep_alive_request__get_packed_size
(const Etcdserverpb__LeaseKeepAliveRequest *message)
{
assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_request__descriptor);
return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
}
size_t etcdserverpb__lease_keep_alive_request__pack
(const Etcdserverpb__LeaseKeepAliveRequest *message,
uint8_t *out)
{
assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_request__descriptor);
return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
}
size_t etcdserverpb__lease_keep_alive_request__pack_to_buffer
(const Etcdserverpb__LeaseKeepAliveRequest *message,
ProtobufCBuffer *buffer)
{
assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_request__descriptor);
return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
}
Etcdserverpb__LeaseKeepAliveRequest *
etcdserverpb__lease_keep_alive_request__unpack
(ProtobufCAllocator *allocator,
size_t len,
const uint8_t *data)
{
return (Etcdserverpb__LeaseKeepAliveRequest *)
protobuf_c_message_unpack (&etcdserverpb__lease_keep_alive_request__descriptor,
allocator, len, data);
}
void etcdserverpb__lease_keep_alive_request__free_unpacked
(Etcdserverpb__LeaseKeepAliveRequest *message,
ProtobufCAllocator *allocator)
{
if(!message)
return;
assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_request__descriptor);
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
void etcdserverpb__lease_keep_alive_response__init
(Etcdserverpb__LeaseKeepAliveResponse *message)
{
static const Etcdserverpb__LeaseKeepAliveResponse init_value = ETCDSERVERPB__LEASE_KEEP_ALIVE_RESPONSE__INIT;
*message = init_value;
}
size_t etcdserverpb__lease_keep_alive_response__get_packed_size
(const Etcdserverpb__LeaseKeepAliveResponse *message)
{
assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_response__descriptor);
return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
}
size_t etcdserverpb__lease_keep_alive_response__pack
(const Etcdserverpb__LeaseKeepAliveResponse *message,
uint8_t *out)
{
assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_response__descriptor);
return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
}
size_t etcdserverpb__lease_keep_alive_response__pack_to_buffer
(const Etcdserverpb__LeaseKeepAliveResponse *message,
ProtobufCBuffer *buffer)
{
assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_response__descriptor);
return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
}
Etcdserverpb__LeaseKeepAliveResponse *
etcdserverpb__lease_keep_alive_response__unpack
(ProtobufCAllocator *allocator,
size_t len,
const uint8_t *data)
{
return (Etcdserverpb__LeaseKeepAliveResponse *)
protobuf_c_message_unpack (&etcdserverpb__lease_keep_alive_response__descriptor,
allocator, len, data);
}
void etcdserverpb__lease_keep_alive_response__free_unpacked
(Etcdserverpb__LeaseKeepAliveResponse *message,
ProtobufCAllocator *allocator)
{
if(!message)
return;
assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_response__descriptor);
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
void etcdserverpb__lease_time_to_live_request__init
(Etcdserverpb__LeaseTimeToLiveRequest *message)
{
static const Etcdserverpb__LeaseTimeToLiveRequest init_value = ETCDSERVERPB__LEASE_TIME_TO_LIVE_REQUEST__INIT;
*message = init_value;
}
size_t etcdserverpb__lease_time_to_live_request__get_packed_size
(const Etcdserverpb__LeaseTimeToLiveRequest *message)
"Etcdserverpb__LeaseRevokeResponse",
"etcdserverpb",
sizeof(Etcdserverpb__LeaseRevokeResponse),
1,
etcdserverpb__lease_revoke_response__field_descriptors,
etcdserverpb__lease_revoke_response__field_indices_by_name,
1, etcdserverpb__lease_revoke_response__number_ranges,
(ProtobufCMessageInit) etcdserverpb__lease_revoke_response__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor etcdserverpb__lease_keep_alive_request__field_descriptors[1] =
{
{
"ID",
1,
PROTOBUF_C_LABEL_NONE,
PROTOBUF_C_TYPE_INT64,
0, /* quantifier_offset */
offsetof(Etcdserverpb__LeaseKeepAliveRequest, id),
NULL,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
};
static const unsigned etcdserverpb__lease_keep_alive_request__field_indices_by_name[] = {
0, /* field[0] = ID */
};
static const ProtobufCIntRange etcdserverpb__lease_keep_alive_request__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 1 }
};
const ProtobufCMessageDescriptor etcdserverpb__lease_keep_alive_request__descriptor =
{
PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
"etcdserverpb.LeaseKeepAliveRequest",
"LeaseKeepAliveRequest",
"Etcdserverpb__LeaseKeepAliveRequest",
"etcdserverpb",
sizeof(Etcdserverpb__LeaseKeepAliveRequest),
1,
etcdserverpb__lease_keep_alive_request__field_descriptors,
etcdserverpb__lease_keep_alive_request__field_indices_by_name,
1, etcdserverpb__lease_keep_alive_request__number_ranges,
(ProtobufCMessageInit) etcdserverpb__lease_keep_alive_request__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor etcdserverpb__lease_keep_alive_response__field_descriptors[3] =
{
{
"header",
1,
PROTOBUF_C_LABEL_NONE,
PROTOBUF_C_TYPE_MESSAGE,
0, /* quantifier_offset */
offsetof(Etcdserverpb__LeaseKeepAliveResponse, header),
&etcdserverpb__response_header__descriptor,
NULL,
PROTOBUF_C_LABEL_NONE,
PROTOBUF_C_TYPE_INT64,
0, /* quantifier_offset */
offsetof(Etcdserverpb__LeaseKeepAliveResponse, ttl),
NULL,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
};
static const unsigned etcdserverpb__lease_keep_alive_response__field_indices_by_name[] = {
1, /* field[1] = ID */
2, /* field[2] = TTL */
0, /* field[0] = header */
};
static const ProtobufCIntRange etcdserverpb__lease_keep_alive_response__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 3 }
};
const ProtobufCMessageDescriptor etcdserverpb__lease_keep_alive_response__descriptor =
{
PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
"etcdserverpb.LeaseKeepAliveResponse",
"LeaseKeepAliveResponse",
"Etcdserverpb__LeaseKeepAliveResponse",
"etcdserverpb",
sizeof(Etcdserverpb__LeaseKeepAliveResponse),
3,
etcdserverpb__lease_keep_alive_response__field_descriptors,
etcdserverpb__lease_keep_alive_response__field_indices_by_name,
1, etcdserverpb__lease_keep_alive_response__number_ranges,
(ProtobufCMessageInit) etcdserverpb__lease_keep_alive_response__init,
NULL,NULL,NULL /* reserved[123] */
};
static const ProtobufCFieldDescriptor etcdserverpb__lease_time_to_live_request__field_descriptors[2] =
{
{
"ID",
1,
PROTOBUF_C_LABEL_NONE,
PROTOBUF_C_TYPE_INT64,
0, /* quantifier_offset */
{ PROTOBUF_C_MESSAGE_INIT (&etcdserverpb__lease_revoke_response__descriptor) \
, NULL }
struct Etcdserverpb__LeaseKeepAliveRequest
{
ProtobufCMessage base;
int64_t id;
};
#define ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&etcdserverpb__lease_keep_alive_request__descriptor) \
, 0 }
struct Etcdserverpb__LeaseKeepAliveResponse
{
ProtobufCMessage base;
Etcdserverpb__ResponseHeader *header;
int64_t id;
int64_t ttl;
};
#define ETCDSERVERPB__LEASE_KEEP_ALIVE_RESPONSE__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&etcdserverpb__lease_keep_alive_response__descriptor) \
, NULL, 0, 0 }
struct Etcdserverpb__LeaseTimeToLiveRequest
{
ProtobufCMessage base;
int64_t id;
protobuf_c_boolean keys;
};
#define ETCDSERVERPB__LEASE_TIME_TO_LIVE_REQUEST__INIT \
ProtobufCBuffer *buffer);
Etcdserverpb__LeaseRevokeResponse *
etcdserverpb__lease_revoke_response__unpack
(ProtobufCAllocator *allocator,
size_t len,
const uint8_t *data);
void etcdserverpb__lease_revoke_response__free_unpacked
(Etcdserverpb__LeaseRevokeResponse *message,
ProtobufCAllocator *allocator);
/* Etcdserverpb__LeaseKeepAliveRequest methods */
void etcdserverpb__lease_keep_alive_request__init
(Etcdserverpb__LeaseKeepAliveRequest *message);
size_t etcdserverpb__lease_keep_alive_request__get_packed_size
(const Etcdserverpb__LeaseKeepAliveRequest *message);
size_t etcdserverpb__lease_keep_alive_request__pack
(const Etcdserverpb__LeaseKeepAliveRequest *message,
uint8_t *out);
size_t etcdserverpb__lease_keep_alive_request__pack_to_buffer
(const Etcdserverpb__LeaseKeepAliveRequest *message,
ProtobufCBuffer *buffer);
Etcdserverpb__LeaseKeepAliveRequest *
etcdserverpb__lease_keep_alive_request__unpack
(ProtobufCAllocator *allocator,
size_t len,
const uint8_t *data);
void etcdserverpb__lease_keep_alive_request__free_unpacked
(Etcdserverpb__LeaseKeepAliveRequest *message,
ProtobufCAllocator *allocator);
/* Etcdserverpb__LeaseKeepAliveResponse methods */
void etcdserverpb__lease_keep_alive_response__init
(Etcdserverpb__LeaseKeepAliveResponse *message);
size_t etcdserverpb__lease_keep_alive_response__get_packed_size
(const Etcdserverpb__LeaseKeepAliveResponse *message);
size_t etcdserverpb__lease_keep_alive_response__pack
(const Etcdserverpb__LeaseKeepAliveResponse *message,
uint8_t *out);
size_t etcdserverpb__lease_keep_alive_response__pack_to_buffer
(const Etcdserverpb__LeaseKeepAliveResponse *message,
ProtobufCBuffer *buffer);
Etcdserverpb__LeaseKeepAliveResponse *
etcdserverpb__lease_keep_alive_response__unpack
(ProtobufCAllocator *allocator,
size_t len,
const uint8_t *data);
void etcdserverpb__lease_keep_alive_response__free_unpacked
(Etcdserverpb__LeaseKeepAliveResponse *message,
ProtobufCAllocator *allocator);
/* Etcdserverpb__LeaseTimeToLiveRequest methods */
void etcdserverpb__lease_time_to_live_request__init
(Etcdserverpb__LeaseTimeToLiveRequest *message);
size_t etcdserverpb__lease_time_to_live_request__get_packed_size
(const Etcdserverpb__LeaseTimeToLiveRequest *message);
size_t etcdserverpb__lease_time_to_live_request__pack
(const Etcdserverpb__LeaseTimeToLiveRequest *message,
uint8_t *out);
extern const ProtobufCMessageDescriptor etcdserverpb__delete_range_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__watch_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__watch_create_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__watch_cancel_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__watch_progress_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__watch_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_grant_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_grant_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_revoke_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_revoke_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_keep_alive_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_keep_alive_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_time_to_live_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_time_to_live_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_leases_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_status__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_leases_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__compaction_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__compaction_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__status_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__status_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__alarm_member__descriptor;
t/cancel_handle_lifetime.t view on Meta::CPAN
my $t2 = EV::timer(0.2, 0, sub { EV::break });
EV::run;
# Now call cancel again on the still-held handle â must not crash
my $second = 0;
$watch->cancel(sub { $second = 1; EV::break });
my $t3 = EV::timer(2, 0, sub { EV::break });
EV::run;
ok($second, 'second cancel on already-cleaned handle is safe');
# --- Keepalive ---
my $lease_id;
$client->lease_grant(10, sub { $lease_id = $_[0]->{id}; EV::break });
my $tg = EV::timer(3, 0, sub { EV::break });
EV::run;
ok($lease_id, 'lease granted');
my $ka = $client->lease_keepalive($lease_id, sub { });
ok($ka, 'keepalive handle created');
my $ka_done = 0;
$ka->cancel(sub { $ka_done = 1; EV::break });
my $tk = EV::timer(2, 0, sub { EV::break });
EV::run;
ok($ka_done, 'keepalive cancel fired');
my $ti = EV::timer(0.2, 0, sub { EV::break });
EV::run;
my $ka_second = 0;
$ka->cancel(sub { $ka_second = 1; EV::break });
my $tk2 = EV::timer(2, 0, sub { EV::break });
EV::run;
ok($ka_second, 'second keepalive cancel is safe');
# --- Observe (election) ---
my $election_name = "test-cancel-lifetime-$$";
my $observe = $client->election_observe($election_name, sub { });
ok($observe, 'observe handle created');
my $obs_done = 0;
$observe->cancel(sub { $obs_done = 1; EV::break });
my $to1 = EV::timer(2, 0, sub { EV::break });
EV::run;
}
waitpid $pid, 0;
my $child_status = $?;
is($child_status & 0x7f, 0, 'child did not die from a signal');
is($child_status >> 8, 0, 'child exited 0');
# Parent's client must still work
my $put_ok;
$client->put("/test_fork_$$", "parent-still-alive", sub { $put_ok = !$_[1]; EV::break });
my $t = EV::timer(3, 0, sub { EV::break });
EV::run;
ok($put_ok, 'parent client still functional after child exit');
$client->delete("/test_fork_$$", sub { EV::break });
my $td = EV::timer(2, 0, sub { EV::break });
EV::run;
done_testing();
$found = 1;
last;
}
}
diag("Found " . scalar(@{$resp->{leases} || []}) . " lease(s), our lease " . ($found ? "found" : "not found"));
EV::break;
});
my $t5 = EV::timer(5, 0, sub { fail('timeout'); EV::break });
EV::run;
# Test 14-15: lease_keepalive (send one keepalive)
my $keepalive_received = 0;
$client->lease_keepalive($lease_id, sub {
my ($resp, $err) = @_;
if (!$keepalive_received) {
$keepalive_received = 1;
ok(!$err, 'lease_keepalive succeeded');
ok($resp->{ttl} > 0, 'keepalive response has positive ttl');
diag("Keepalive response: id=$resp->{id}, ttl=$resp->{ttl}");
EV::break;
}
});
my $t6 = EV::timer(5, 0, sub {
fail('keepalive timeout') unless $keepalive_received;
EV::break;
});
EV::run;
# Test 16-17: lease_revoke
$client->lease_revoke($lease_id, sub {
my ($resp, $err) = @_;
ok(!$err, 'lease_revoke succeeded');
ok($resp->{header}, 'revoke response has header');
diag("Lease revoked");
t/reconnect_drop.t view on Meta::CPAN
#!/usr/bin/env perl
# Drop the gRPC connection mid-watch and verify auto-reconnect resumes
# delivering events. We freeze etcd with SIGSTOP so the existing stream
# breaks (gRPC keepalive timeout) and unfreeze to let the reconnect succeed.
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use Test::More;
BEGIN { eval { require EV }; plan skip_all => 'EV required' if $@ }
use EV;
use EV::Etcd;
# Find a running etcd we can SIGSTOP. We only run when we own the process.
t/reconnect_drop.t view on Meta::CPAN
EV::run;
ok($created, 'watch registered server-side');
# Pre-drop: confirm normal delivery
my $pre_count = @events;
$client->put($key, "before", sub { EV::break });
my $t1 = EV::timer(2, 0, sub { EV::break });
EV::run;
ok(@events > $pre_count, 'event delivered before drop');
# Freeze etcd â server-side stream stalls; gRPC keepalive eventually closes it
note("SIGSTOP etcd pid=$etcd_pid");
kill 'STOP', $etcd_pid;
my $stop_timer = EV::timer(8, 0, sub { EV::break });
EV::run;
note("SIGCONT etcd pid=$etcd_pid");
kill 'CONT', $etcd_pid;
# Give the reconnect machinery time to backoff + re-establish
my $recover_timer = EV::timer(10, 0, sub { EV::break });
t/streaming.t view on Meta::CPAN
plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;
plan tests => 11;
my $client = EV::Etcd->new(
endpoints => ['127.0.0.1:2379'],
);
my $test_prefix = "/test-streaming-$$-" . time();
# === lease_keepalive streaming tests ===
# Test 1-4: lease_keepalive receives response
my $lease_id;
my $keepalive_count = 0;
$client->lease_grant(10, sub {
my ($resp, $err) = @_;
ok(!$err, 'lease_grant succeeded');
$lease_id = $resp->{id};
diag("Granted lease: id=$lease_id, ttl=$resp->{ttl}");
EV::break;
});
my $t1 = EV::timer(5, 0, sub { fail('lease_grant timeout'); EV::break });
EV::run;
undef $t1; # Cancel timer
SKIP: {
skip "no lease id", 4 unless $lease_id;
my $keepalive_handle = $client->lease_keepalive($lease_id, sub {
my ($resp, $err) = @_;
if ($err) {
diag("Keepalive error: " . (ref($err) ? $err->{message} : $err));
return;
}
$keepalive_count++;
diag("Keepalive response #$keepalive_count: ttl=$resp->{ttl}");
if ($keepalive_count >= 1) {
EV::break;
}
});
ok(defined $keepalive_handle, 'lease_keepalive returns handle');
isa_ok($keepalive_handle, 'EV::Etcd::Keepalive', 'keepalive handle');
# Wait for at least one keepalive response
my $keepalive_timer = EV::timer 5, 0, sub {
diag("Keepalive timer expired, received $keepalive_count responses");
EV::break;
};
EV::run;
ok($keepalive_count >= 1, "received at least 1 keepalive response (got $keepalive_count)");
# Cleanup lease (this will end the keepalive)
$client->lease_revoke($lease_id, sub {
my ($resp, $err) = @_;
diag($err ? "Revoke failed" : "Lease revoked");
});
pass('cleanup initiated');
}
# === Watch streaming tests ===
# Test 5-9: watch receives multiple events
TYPEMAP
int64_t T_I64
uint64_t T_U64
EV::Etcd T_PTROBJ
EV::Etcd::Watch T_PTROBJ
EV::Etcd::Keepalive T_PTROBJ
EV::Etcd::Observe T_PTROBJ
INPUT
T_PTROBJ
if (SvROK($arg) && sv_derived_from($arg, \"${ntype}\")) {
IV tmp = SvIV((SV*)SvRV($arg));
$var = INT2PTR($type, tmp);
} else {
croak(\"$var is not of type ${ntype}\");
}
xt/pod-coverage.t view on Meta::CPAN
use strict;
use warnings;
use Test::More;
eval "use Test::Pod::Coverage 1.08; 1"
or plan skip_all => 'Test::Pod::Coverage 1.08 required';
eval "use Pod::Coverage 0.18; 1"
or plan skip_all => 'Pod::Coverage 0.18 required';
# Coverage check on EV::Etcd only. The streaming-handle sub-packages
# (EV::Etcd::Watch / Keepalive / Observe) are XS-defined with no .pm of
# their own and only expose cancel + DESTROY â both documented as
# =head3 cancel under their parent service section in lib/EV/Etcd.pm.
# Pod::Coverage can't follow that cross-module reference, so we skip
# them rather than emit false negatives.
pod_coverage_ok(
'EV::Etcd',
{ trustme => [qr/^txn$/] }, # txn is wrapped in pure-Perl, doc'd via XS path
'EV::Etcd has POD coverage',
);
xt/spelling.t view on Meta::CPAN
# Project-specific terms that aren't in any dictionary
add_stopwords(qw(
Async EV NOSPACE READWRITE
async backoff defragment reconnection ttl txn
etcd gRPC libev libgrpc protobuf-c protobuf
XS pthread typemap
auth backend cancellable cluster_id codepoints compact_revision
cpantesters dbsize Defragment Defragmentation deserialize
endpoint endpoints failover
hashref hashrefs IDs ipv4 keepalive keepalives kv kvs
learner linearizable longjmp memberid mTLS mvccpb
namespace observe param params pre prev_kv prev_kvs proclaim
progress_notify protobufs raft RPC RPCs runtime serializable
serialize Sub-packages subkey subprocess SvUTF8 sync TLS TTL
UTF UV-cant userland vmactions watch_id YK
Yegor Korablev vividsnow
));
all_pod_files_spelling_ok();