EV-Etcd
view release on metacpan or search on metacpan
#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "ppport.h"
/* Use Perl EV's API for proper integration */
#include <EV/EVAPI.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
/* Include modular components */
#include "etcd_common.h"
#include "etcd_kv.h"
#include "etcd_watch.h"
#include "etcd_lease.h"
#include "etcd_maint.h"
#include "etcd_lock.h"
#include "etcd_election.h"
#include "etcd_cluster.h"
#include "etcd_txn.h" /* For FREE_REQUEST_OPS macro */
/* Types and common functions defined in etcd_common.h */
/* KV handlers in etcd_kv.h, watch in etcd_watch.h, etc. */
/* Forward declarations for functions still in this file */
static void *cq_thread_func(void *arg);
static void cq_async_callback(EV_P_ ev_async *w, int revents);
static void process_grpc_event(pTHX_ ev_etcd_t *client, void *tag, int success);
/* Sentinel tag for fire-and-forget batches (e.g. watch cancel SEND_MESSAGE).
* gRPC's GRPC_CQ_NEXT contract requires a non-NULL tag; we use this dummy
* call_base_t so the completion can be identified and skipped. */
static call_base_t cancel_sentinel = { CALL_TYPE_NONE };
static void process_txn_response(pTHX_ pending_call_t *pc);
static void process_auth_response(pTHX_ pending_call_t *pc);
static void process_user_add_response(pTHX_ pending_call_t *pc);
static void process_user_delete_response(pTHX_ pending_call_t *pc);
static void process_user_change_password_response(pTHX_ pending_call_t *pc);
static void process_auth_enable_response(pTHX_ pending_call_t *pc);
static void process_auth_disable_response(pTHX_ pending_call_t *pc);
static void process_role_add_response(pTHX_ pending_call_t *pc);
static void process_role_delete_response(pTHX_ pending_call_t *pc);
static void process_role_get_response(pTHX_ pending_call_t *pc);
static void process_role_list_response(pTHX_ pending_call_t *pc);
static void process_role_grant_permission_response(pTHX_ pending_call_t *pc);
static void process_role_revoke_permission_response(pTHX_ pending_call_t *pc);
static void process_user_grant_role_response(pTHX_ pending_call_t *pc);
static void process_user_revoke_role_response(pTHX_ pending_call_t *pc);
static void process_user_get_response(pTHX_ pending_call_t *pc);
static void process_user_list_response(pTHX_ pending_call_t *pc);
static SV* response_op_to_hashref(pTHX_ Etcdserverpb__ResponseOp *op);
static void parse_request_ops(pTHX_ SV *src_av, Etcdserverpb__RequestOp ***dst_ops, size_t *dst_n);
/* Reconnect to the next endpoint (or same if only one) */
static void reconnect_channel(ev_etcd_t *client) {
if (client->endpoint_count > 1)
client->current_endpoint = (client->current_endpoint + 1) % client->endpoint_count;
if (client->channel) {
grpc_channel_destroy(client->channel);
client->channel = NULL;
}
client->channel = etcd_create_insecure_channel(
client->endpoints[client->current_endpoint], NULL);
}
/*
* Compute range_end for prefix queries.
* For a prefix, range_end is the key with the last byte incremented.
* Handles trailing 0xFF bytes by truncating and incrementing.
* Returns allocated buffer (caller must Safefree) and sets *out_len.
* Returns NULL if key_len is 0.
*/
static char* compute_prefix_range_end(const char *key, size_t key_len, size_t *out_len) {
if (key_len == 0) {
*out_len = 0;
return NULL;
}
/* Find first non-0xFF byte from end */
size_t i = key_len;
while (i > 0 && (unsigned char)key[i - 1] == 0xFF) {
i--;
}
char *range_end;
if (i == 0) {
/* All bytes are 0xFF - use "\x00" for range_end (all keys >= key) */
Newx(range_end, 1, char);
range_end[0] = '\0';
*out_len = 1;
} else {
/* Truncate trailing 0xFF bytes and increment last byte */
Newx(range_end, i, char);
memcpy(range_end, key, i);
((unsigned char *)range_end)[i - 1]++;
*out_len = i;
}
return range_end;
}
/* Health timer callback - performs periodic health checks */
static void health_timer_callback(struct ev_loop *loop, ev_timer *w, int revents) {
dTHX;
ev_etcd_t *client = (ev_etcd_t *)((char *)w - offsetof(ev_etcd_t, health_timer));
(void)loop;
(void)revents;
if (!client->active) {
return;
}
/* Check channel connectivity state */
grpc_connectivity_state state = grpc_channel_check_connectivity_state(client->channel, 0);
int was_healthy = client->is_healthy;
int is_healthy = (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE);
if (was_healthy != is_healthy) {
client->is_healthy = is_healthy;
/* If unhealthy, try reconnecting to next endpoint */
if (!is_healthy && client->endpoint_count > 1) {
reconnect_channel(client);
}
/* Call health callback if provided */
if (client->health_callback) {
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
EXTEND(SP, 2);
PUSHs(sv_2mortal(newSViv(is_healthy)));
PUSHs(sv_2mortal(newSVpv(client->endpoints[client->current_endpoint], 0)));
PUTBACK;
client->in_callback = 1;
CALL_SV_SAFE(client->health_callback, G_DISCARD);
FREETMPS;
LEAVE;
client->in_callback = 0;
if (!client->active) {
finish_client_destroy(aTHX_ client);
return;
}
}
}
}
/*
* gRPC completion queue thread function.
* Runs in a separate thread, polls the CQ for events, and signals the main thread.
*/
static void *cq_thread_func(void *arg) {
ev_etcd_t *client = (ev_etcd_t *)arg;
while (client->thread_running) {
/* Poll with 100ms timeout to allow checking thread_running periodically */
gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(100, GPR_TIMESPAN));
grpc_event event = grpc_completion_queue_next(client->cq, deadline, NULL);
if (event.type == GRPC_QUEUE_SHUTDOWN) {
break;
}
if (event.type != GRPC_OP_COMPLETE) {
continue;
}
/* Queue the event for the main thread */
queued_event_t *qe = (queued_event_t *)malloc(sizeof(queued_event_t));
if (!qe) {
/* OOM is extremely rare but would cause callbacks to never fire.
* Log to stderr since we can't call Perl from this thread. */
fprintf(stderr, "EV::Etcd: CRITICAL - malloc failed in gRPC thread, event dropped\n");
continue;
}
qe->tag = event.tag;
qe->success = event.success;
qe->next = NULL;
pthread_mutex_lock(&client->queue_mutex);
if (client->event_queue_tail) {
client->event_queue_tail->next = qe;
} else {
client->event_queue = qe;
}
client->event_queue_tail = qe;
pthread_mutex_unlock(&client->queue_mutex);
/* Signal main thread */
ev_async_send(EV_DEFAULT, &client->cq_async);
}
return NULL;
}
/*
* Finish deferred client destruction.
* Called after in_callback is cleared and client->active is false,
* meaning DESTROY ran during a callback but deferred the final cleanup.
*/
void finish_client_destroy(pTHX_ ev_etcd_t *client) {
/* Free any remaining call structures */
while (client->pending_calls) {
pending_call_t *pc = client->pending_calls;
client->pending_calls = pc->next;
grpc_metadata_array_destroy(&pc->initial_metadata);
grpc_metadata_array_destroy(&pc->trailing_metadata);
if (pc->recv_buffer) grpc_byte_buffer_destroy(pc->recv_buffer);
grpc_slice_unref(pc->status_details);
if (pc->call) grpc_call_unref(pc->call);
SvREFCNT_dec(pc->callback);
Safefree(pc);
}
while (client->watches) {
cleanup_watch(aTHX_ client->watches);
}
while (client->keepalives) {
cleanup_keepalive(aTHX_ client->keepalives);
}
while (client->observes) {
cleanup_observe(aTHX_ client->observes);
}
/* Free client-level resources (mirrors free_perl_resources in DESTROY) */
if (client->health_callback)
SvREFCNT_dec(client->health_callback);
if (client->auth_token) {
memset(client->auth_token, 0, client->auth_token_len);
Safefree(client->auth_token);
}
if (client->endpoints) {
for (int i = 0; i < client->endpoint_count; i++)
if (client->endpoints[i]) Safefree(client->endpoints[i]);
Safefree(client->endpoints);
}
Safefree(client);
}
/*
* ev_async callback - runs in main thread when signaled by gRPC thread.
* Drains the event queue and processes each event.
*/
static void cq_async_callback(struct ev_loop *loop, ev_async *w, int revents) {
dTHX;
(void)loop;
(void)revents;
ev_etcd_t *client = (ev_etcd_t *)((char *)w - offsetof(ev_etcd_t, cq_async));
/* Don't process if client is being destroyed */
if (!client->active) {
return;
}
/* Drain the queue under lock, then process without lock */
pthread_mutex_lock(&client->queue_mutex);
queued_event_t *queue = client->event_queue;
client->event_queue = NULL;
client->event_queue_tail = NULL;
pthread_mutex_unlock(&client->queue_mutex);
/* Guard against client being freed during event processing */
client->in_callback = 1;
/* Process all queued events */
while (queue) {
queued_event_t *qe = queue;
queue = qe->next;
/* Defensive NULL guard â every code path uses a real tag (see cancel_sentinel) */
if (qe->tag) {
process_grpc_event(aTHX_ client, qe->tag, qe->success);
}
free(qe);
/* Check if client was destroyed during callback processing */
if (!client->active) {
/* Free remaining queued events */
while (queue) {
qe = queue;
queue = qe->next;
free(qe);
}
break;
}
}
client->in_callback = 0;
/* If DESTROY was called during event processing, finish the deferred cleanup */
if (!client->active) {
finish_client_destroy(aTHX_ client);
CALL_SUCCESS_CALLBACK(pc->callback, result);
}
/* Process user_get response - returns roles assigned to user */
static void process_user_get_response(pTHX_ pending_call_t *pc) {
BEGIN_RESPONSE_HANDLER(pc, "user_get");
Etcdserverpb__AuthUserGetResponse *resp;
UNPACK_RESPONSE(pc, resp, etcdserverpb__auth_user_get_response__unpack);
HV *result = newHV();
add_header_to_hv(aTHX_ result, resp->header);
AV *roles = newAV();
for (size_t i = 0; i < resp->n_roles; i++) {
av_push(roles, resp->roles[i] ? newSVpv(resp->roles[i], 0) : newSVpvn("", 0));
}
hv_store(result, "roles", 5, newRV_noinc((SV *)roles), 0);
etcdserverpb__auth_user_get_response__free_unpacked(resp, NULL);
CALL_SUCCESS_CALLBACK(pc->callback, result);
}
/* Process user_list response - returns users list */
static void process_user_list_response(pTHX_ pending_call_t *pc) {
BEGIN_RESPONSE_HANDLER(pc, "user_list");
Etcdserverpb__AuthUserListResponse *resp;
UNPACK_RESPONSE(pc, resp, etcdserverpb__auth_user_list_response__unpack);
HV *result = newHV();
add_header_to_hv(aTHX_ result, resp->header);
AV *users = newAV();
for (size_t i = 0; i < resp->n_users; i++) {
av_push(users, resp->users[i] ? newSVpv(resp->users[i], 0) : newSVpvn("", 0));
}
hv_store(result, "users", 5, newRV_noinc((SV *)users), 0);
etcdserverpb__auth_user_list_response__free_unpacked(resp, NULL);
CALL_SUCCESS_CALLBACK(pc->callback, result);
}
MODULE = EV::Etcd PACKAGE = EV::Etcd PREFIX = ev_etcd_
PROTOTYPES: DISABLE
BOOT:
I_EV_API("EV::Etcd");
grpc_init();
init_method_slices();
EV::Etcd
ev_etcd_new(class, ...)
char *class
CODE:
{
ev_etcd_t *client;
AV *endpoints_av = NULL;
int timeout_seconds = 30; /* Default timeout */
int max_retries = 3; /* Default max retries */
int health_interval = 0; /* Default: disabled */
SV *health_callback = NULL;
char *init_auth_token = NULL;
STRLEN init_auth_token_len = 0;
int i;
/* Parse options */
for (i = 1; i < items; i += 2) {
if (i + 1 < items) {
const char *key = SvPV_nolen(ST(i));
if (strEQ(key, "endpoints")) {
if (SvROK(ST(i + 1)) && SvTYPE(SvRV(ST(i + 1))) == SVt_PVAV) {
endpoints_av = (AV *)SvRV(ST(i + 1));
}
} else if (strEQ(key, "timeout")) {
timeout_seconds = SvIV(ST(i + 1));
if (timeout_seconds < 1) {
timeout_seconds = 1; /* Minimum 1 second */
}
} else if (strEQ(key, "max_retries")) {
max_retries = SvIV(ST(i + 1));
if (max_retries < 0) {
max_retries = 0;
}
} else if (strEQ(key, "health_interval")) {
health_interval = SvIV(ST(i + 1));
if (health_interval < 0) {
health_interval = 0;
}
} else if (strEQ(key, "on_health_change")) {
if (SvROK(ST(i + 1)) && SvTYPE(SvRV(ST(i + 1))) == SVt_PVCV) {
health_callback = ST(i + 1);
}
} else if (strEQ(key, "auth_token")) {
if (SvPOK(ST(i + 1))) {
init_auth_token = SvPV(ST(i + 1), init_auth_token_len);
}
}
}
}
/* Pre-validate endpoint URL sizes before allocating */
if (endpoints_av && av_len(endpoints_av) >= 0) {
int count = av_len(endpoints_av) + 1;
for (i = 0; i < count; i++) {
SV **ep = av_fetch(endpoints_av, i, 0);
if (ep && SvPOK(*ep)) {
VALIDATE_URL_SIZE(SvCUR(*ep));
}
}
}
Newxz(client, 1, ev_etcd_t);
/* Store endpoints */
if (endpoints_av && av_len(endpoints_av) >= 0) {
int count = av_len(endpoints_av) + 1;
Newx(client->endpoints, count, char *);
client->endpoint_count = count;
for (i = 0; i < count; i++) {
SV **ep = av_fetch(endpoints_av, i, 0);
if (ep && SvPOK(*ep)) {
STRLEN len;
const char *str = SvPV(*ep, len);
Newx(client->endpoints[i], len + 1, char);
Copy(str, client->endpoints[i], len + 1, char);
} else {
/* Default endpoint for invalid entries */
client->endpoints[i] = savepv("127.0.0.1:2379");
}
}
} else {
/* Default single endpoint */
Newx(client->endpoints, 1, char *);
client->endpoints[0] = savepv("127.0.0.1:2379");
client->endpoint_count = 1;
}
client->current_endpoint = 0;
/* Create gRPC channel to first endpoint */
client->channel = etcd_create_insecure_channel(client->endpoints[0], NULL);
if (!client->channel) {
for (int j = 0; j < client->endpoint_count; j++) {
Safefree(client->endpoints[j]);
}
Safefree(client->endpoints);
Safefree(client);
croak("Failed to create gRPC channel");
}
/* Create completion queue for polling */
client->cq = grpc_completion_queue_create_for_next(NULL);
/* Initialize threading for hybrid gRPC/EV approach */
pthread_mutex_init(&client->queue_mutex, NULL);
client->thread_running = 1;
/* Initialize ev_async watcher for main thread notification */
ev_async_init(&client->cq_async, cq_async_callback);
ev_async_start(EV_DEFAULT, &client->cq_async);
/* Start gRPC completion queue thread */
if (pthread_create(&client->cq_thread, NULL, cq_thread_func, client) != 0) {
ev_async_stop(EV_DEFAULT, &client->cq_async);
pthread_mutex_destroy(&client->queue_mutex);
grpc_completion_queue_shutdown(client->cq);
while (grpc_completion_queue_next(client->cq,
gpr_inf_past(GPR_CLOCK_REALTIME), NULL).type != GRPC_QUEUE_SHUTDOWN)
;
grpc_completion_queue_destroy(client->cq);
grpc_channel_destroy(client->channel);
/* Free endpoints */
for (int j = 0; j < client->endpoint_count; j++) {
Safefree(client->endpoints[j]);
}
Safefree(client->endpoints);
Safefree(client);
croak("Failed to create gRPC completion queue thread");
}
if (init_auth_token && init_auth_token_len > 0) {
Newx(client->auth_token, init_auth_token_len + 1, char);
Copy(init_auth_token, client->auth_token, init_auth_token_len, char);
client->auth_token[init_auth_token_len] = '\0';
client->auth_token_len = init_auth_token_len;
}
client->timeout_seconds = timeout_seconds;
client->active = 1;
client->owner_pid = getpid();
client->max_retries = max_retries;
client->is_healthy = 1; /* Assume healthy initially */
if (health_callback)
client->health_callback = SvREFCNT_inc(health_callback);
/* Initialize health timer (stopped initially) */
ev_timer_init(&client->health_timer, health_timer_callback, 0.0, 0.0);
/* Start health monitoring if interval > 0 */
if (health_interval > 0) {
ev_timer_set(&client->health_timer, (double)health_interval, (double)health_interval);
ev_timer_start(EV_DEFAULT, &client->health_timer);
}
RETVAL = client;
}
OUTPUT:
RETVAL
void
ev_etcd_get(client, key, ...)
EV::Etcd client
SV *key
CODE:
{
/* Parse arguments: get(key, [opts,] callback) */
SV *opts = NULL;
SV *callback;
if (items == 3) {
/* get(key, callback) */
callback = ST(2);
} else if (items == 4) {
/* get(key, opts, callback) */
opts = ST(2);
callback = ST(3);
} else {
croak("Usage: $client->get($key, [\\%%opts,] $callback)");
}
VALIDATE_CALLBACK(callback);
STRLEN key_len;
const char *key_str = SvPV(key, key_len);
VALIDATE_KEY_SIZE(key_len);
/* Pre-validate option sizes before allocating pending call */
pthread_mutex_unlock(&client->queue_mutex);
/* Destroy the mutex */
pthread_mutex_destroy(&client->queue_mutex);
/* Destroy the completion queue */
if (client->cq) {
grpc_completion_queue_destroy(client->cq);
}
/* Cleanup call structures - skip if called during event processing
* (the currently-processing call struct would be a use-after-free).
* Deferred to cq_async_callback when in_callback is set. */
if (!client->in_callback) {
pc = client->pending_calls;
while (pc) {
pending_call_t *next = pc->next;
grpc_metadata_array_destroy(&pc->initial_metadata);
grpc_metadata_array_destroy(&pc->trailing_metadata);
if (pc->recv_buffer) {
grpc_byte_buffer_destroy(pc->recv_buffer);
}
grpc_slice_unref(pc->status_details);
if (pc->call) {
grpc_call_unref(pc->call);
}
SvREFCNT_dec(pc->callback);
Safefree(pc);
pc = next;
}
/* Streaming-call cleanup honors dual-ownership: cleanup_* unlinks and
* frees gRPC state, then frees the struct only if the Perl handle has
* already been released. Otherwise the struct lives until *_DESTROY. */
while (client->watches) cleanup_watch(aTHX_ client->watches);
while (client->keepalives) cleanup_keepalive(aTHX_ client->keepalives);
while (client->observes) cleanup_observe(aTHX_ client->observes);
}
/* Stop health timer */
ev_timer_stop(EV_DEFAULT, &client->health_timer);
if (client->channel) {
grpc_channel_destroy(client->channel);
}
free_perl_resources:
/* Free health callback */
if (client->health_callback) {
SvREFCNT_dec(client->health_callback);
client->health_callback = NULL;
}
/* Free auth token - securely zero before freeing */
if (client->auth_token) {
memset(client->auth_token, 0, client->auth_token_len);
Safefree(client->auth_token);
client->auth_token = NULL;
}
/* Free endpoints */
if (client->endpoints) {
int i;
for (i = 0; i < client->endpoint_count; i++) {
if (client->endpoints[i]) {
Safefree(client->endpoints[i]);
}
}
Safefree(client->endpoints);
client->endpoints = NULL;
}
/* If called during event processing, defer struct free to cq_async_callback */
if (!client->in_callback) {
Safefree(client);
}
}
MODULE = EV::Etcd PACKAGE = EV::Etcd::Watch PREFIX = ev_etcd_watch_
void
ev_etcd_watch_cancel(watch, callback)
EV::Etcd::Watch watch
SV *callback
CODE:
{
VALIDATE_CALLBACK(callback);
watch_call_t *wc = watch;
if (!wc->client_owns) {
CALL_SUCCESS_CALLBACK(callback, newHV());
return;
}
if (ev_is_active(&wc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
if (!wc->active) {
CALL_SUCCESS_CALLBACK(callback, newHV());
return;
}
wc->active = 0;
/* If we have a watch_id, send cancel request */
if (wc->watch_id >= 0) {
Etcdserverpb__WatchCancelRequest cancel_req = ETCDSERVERPB__WATCH_CANCEL_REQUEST__INIT;
cancel_req.watch_id = wc->watch_id;
Etcdserverpb__WatchRequest req = ETCDSERVERPB__WATCH_REQUEST__INIT;
req.request_union_case = ETCDSERVERPB__WATCH_REQUEST__REQUEST_UNION_CANCEL_REQUEST;
req.cancel_request = &cancel_req;
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcdserverpb__watch_request__get_packed_size,
etcdserverpb__watch_request__pack, &req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = send_buffer;
(void)grpc_call_start_batch(wc->call, &op, 1, &cancel_sentinel, NULL);
grpc_byte_buffer_destroy(send_buffer);
}
( run in 0.845 second using v1.01-cache-2.11-cpan-524268b4103 )