EV-Etcd

 view release on metacpan or  search on metacpan

Etcd.xs  view on Meta::CPAN

#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"

#include "ppport.h"

/* Use Perl EV's API for proper integration */
#include <EV/EVAPI.h>

#include <time.h>
#include <stdlib.h>
#include <unistd.h>

/* Include modular components */
#include "etcd_common.h"
#include "etcd_kv.h"
#include "etcd_watch.h"
#include "etcd_lease.h"
#include "etcd_maint.h"
#include "etcd_lock.h"
#include "etcd_election.h"
#include "etcd_cluster.h"
#include "etcd_txn.h"  /* For FREE_REQUEST_OPS macro */

/* Types and common functions defined in etcd_common.h */
/* KV handlers in etcd_kv.h, watch in etcd_watch.h, etc. */

/* Forward declarations for functions still in this file */
static void *cq_thread_func(void *arg);
static void cq_async_callback(EV_P_ ev_async *w, int revents);
static void process_grpc_event(pTHX_ ev_etcd_t *client, void *tag, int success);

/* Sentinel tag for fire-and-forget batches (e.g. watch cancel SEND_MESSAGE).
 * gRPC's GRPC_CQ_NEXT contract requires a non-NULL tag; we use this dummy
 * call_base_t so the completion can be identified and skipped. */
static call_base_t cancel_sentinel = { CALL_TYPE_NONE };
static void process_txn_response(pTHX_ pending_call_t *pc);
static void process_auth_response(pTHX_ pending_call_t *pc);
static void process_user_add_response(pTHX_ pending_call_t *pc);
static void process_user_delete_response(pTHX_ pending_call_t *pc);
static void process_user_change_password_response(pTHX_ pending_call_t *pc);
static void process_auth_enable_response(pTHX_ pending_call_t *pc);
static void process_auth_disable_response(pTHX_ pending_call_t *pc);
static void process_role_add_response(pTHX_ pending_call_t *pc);
static void process_role_delete_response(pTHX_ pending_call_t *pc);
static void process_role_get_response(pTHX_ pending_call_t *pc);
static void process_role_list_response(pTHX_ pending_call_t *pc);
static void process_role_grant_permission_response(pTHX_ pending_call_t *pc);
static void process_role_revoke_permission_response(pTHX_ pending_call_t *pc);
static void process_user_grant_role_response(pTHX_ pending_call_t *pc);
static void process_user_revoke_role_response(pTHX_ pending_call_t *pc);
static void process_user_get_response(pTHX_ pending_call_t *pc);
static void process_user_list_response(pTHX_ pending_call_t *pc);
static SV* response_op_to_hashref(pTHX_ Etcdserverpb__ResponseOp *op);
static void parse_request_ops(pTHX_ SV *src_av, Etcdserverpb__RequestOp ***dst_ops, size_t *dst_n);

/* Reconnect to the next endpoint (or same if only one) */
static void reconnect_channel(ev_etcd_t *client) {
    if (client->endpoint_count > 1)
        client->current_endpoint = (client->current_endpoint + 1) % client->endpoint_count;

    if (client->channel) {
        grpc_channel_destroy(client->channel);
        client->channel = NULL;
    }
    client->channel = etcd_create_insecure_channel(
        client->endpoints[client->current_endpoint], NULL);
}

/*
 * Compute range_end for prefix queries.
 * For a prefix, range_end is the key with the last byte incremented.
 * Handles trailing 0xFF bytes by truncating and incrementing.
 * Returns allocated buffer (caller must Safefree) and sets *out_len.
 * Returns NULL if key_len is 0.
 */
static char* compute_prefix_range_end(const char *key, size_t key_len, size_t *out_len) {
    if (key_len == 0) {
        *out_len = 0;
        return NULL;
    }

    /* Find first non-0xFF byte from end */
    size_t i = key_len;
    while (i > 0 && (unsigned char)key[i - 1] == 0xFF) {
        i--;
    }

    char *range_end;
    if (i == 0) {
        /* All bytes are 0xFF - use "\x00" for range_end (all keys >= key) */
        Newx(range_end, 1, char);
        range_end[0] = '\0';
        *out_len = 1;
    } else {
        /* Truncate trailing 0xFF bytes and increment last byte */
        Newx(range_end, i, char);
        memcpy(range_end, key, i);
        ((unsigned char *)range_end)[i - 1]++;
        *out_len = i;
    }

    return range_end;
}

/* Health timer callback - performs periodic health checks */
static void health_timer_callback(struct ev_loop *loop, ev_timer *w, int revents) {
    dTHX;
    ev_etcd_t *client = (ev_etcd_t *)((char *)w - offsetof(ev_etcd_t, health_timer));

    (void)loop;
    (void)revents;

    if (!client->active) {
        return;
    }

    /* Check channel connectivity state */
    grpc_connectivity_state state = grpc_channel_check_connectivity_state(client->channel, 0);

    int was_healthy = client->is_healthy;
    int is_healthy = (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE);

    if (was_healthy != is_healthy) {
        client->is_healthy = is_healthy;

        /* If unhealthy, try reconnecting to next endpoint */
        if (!is_healthy && client->endpoint_count > 1) {
            reconnect_channel(client);
        }

        /* Call health callback if provided */
        if (client->health_callback) {
            dSP;
            ENTER;
            SAVETMPS;
            PUSHMARK(SP);
            EXTEND(SP, 2);
            PUSHs(sv_2mortal(newSViv(is_healthy)));
            PUSHs(sv_2mortal(newSVpv(client->endpoints[client->current_endpoint], 0)));
            PUTBACK;
            client->in_callback = 1;
            CALL_SV_SAFE(client->health_callback, G_DISCARD);
            FREETMPS;
            LEAVE;
            client->in_callback = 0;
            if (!client->active) {
                finish_client_destroy(aTHX_ client);
                return;
            }
        }
    }
}

/*
 * gRPC completion queue thread function.
 * Runs in a separate thread, polls the CQ for events, and signals the main thread.
 */
static void *cq_thread_func(void *arg) {
    ev_etcd_t *client = (ev_etcd_t *)arg;

    while (client->thread_running) {
        /* Poll with 100ms timeout to allow checking thread_running periodically */
        gpr_timespec deadline = gpr_time_add(
            gpr_now(GPR_CLOCK_REALTIME),
            gpr_time_from_millis(100, GPR_TIMESPAN));

        grpc_event event = grpc_completion_queue_next(client->cq, deadline, NULL);

        if (event.type == GRPC_QUEUE_SHUTDOWN) {
            break;
        }

        if (event.type != GRPC_OP_COMPLETE) {
            continue;
        }

        /* Queue the event for the main thread */
        queued_event_t *qe = (queued_event_t *)malloc(sizeof(queued_event_t));
        if (!qe) {
            /* OOM is extremely rare but would cause callbacks to never fire.
             * Log to stderr since we can't call Perl from this thread. */
            fprintf(stderr, "EV::Etcd: CRITICAL - malloc failed in gRPC thread, event dropped\n");
            continue;
        }

        qe->tag = event.tag;
        qe->success = event.success;
        qe->next = NULL;

        pthread_mutex_lock(&client->queue_mutex);
        if (client->event_queue_tail) {
            client->event_queue_tail->next = qe;
        } else {
            client->event_queue = qe;
        }
        client->event_queue_tail = qe;
        pthread_mutex_unlock(&client->queue_mutex);

        /* Signal main thread */
        ev_async_send(EV_DEFAULT, &client->cq_async);
    }

    return NULL;
}

/*
 * Finish deferred client destruction.
 * Called after in_callback is cleared and client->active is false,
 * meaning DESTROY ran during a callback but deferred the final cleanup.
 */
void finish_client_destroy(pTHX_ ev_etcd_t *client) {
    /* Free any remaining call structures */
    while (client->pending_calls) {
        pending_call_t *pc = client->pending_calls;
        client->pending_calls = pc->next;
        grpc_metadata_array_destroy(&pc->initial_metadata);
        grpc_metadata_array_destroy(&pc->trailing_metadata);
        if (pc->recv_buffer) grpc_byte_buffer_destroy(pc->recv_buffer);
        grpc_slice_unref(pc->status_details);
        if (pc->call) grpc_call_unref(pc->call);
        SvREFCNT_dec(pc->callback);
        Safefree(pc);
    }
    while (client->watches) {
        cleanup_watch(aTHX_ client->watches);
    }
    while (client->keepalives) {
        cleanup_keepalive(aTHX_ client->keepalives);
    }
    while (client->observes) {
        cleanup_observe(aTHX_ client->observes);
    }

    /* Free client-level resources (mirrors free_perl_resources in DESTROY) */
    if (client->health_callback)
        SvREFCNT_dec(client->health_callback);
    if (client->auth_token) {
        memset(client->auth_token, 0, client->auth_token_len);
        Safefree(client->auth_token);
    }
    if (client->endpoints) {
        for (int i = 0; i < client->endpoint_count; i++)
            if (client->endpoints[i]) Safefree(client->endpoints[i]);
        Safefree(client->endpoints);
    }

    Safefree(client);
}

/*
 * ev_async callback - runs in main thread when signaled by gRPC thread.
 * Drains the event queue and processes each event.
 */
static void cq_async_callback(struct ev_loop *loop, ev_async *w, int revents) {
    dTHX;
    (void)loop;
    (void)revents;

    ev_etcd_t *client = (ev_etcd_t *)((char *)w - offsetof(ev_etcd_t, cq_async));

    /* Don't process if client is being destroyed */
    if (!client->active) {
        return;
    }

    /* Drain the queue under lock, then process without lock */
    pthread_mutex_lock(&client->queue_mutex);
    queued_event_t *queue = client->event_queue;
    client->event_queue = NULL;
    client->event_queue_tail = NULL;
    pthread_mutex_unlock(&client->queue_mutex);

    /* Guard against client being freed during event processing */
    client->in_callback = 1;

    /* Process all queued events */
    while (queue) {
        queued_event_t *qe = queue;
        queue = qe->next;

        /* Defensive NULL guard — every code path uses a real tag (see cancel_sentinel) */
        if (qe->tag) {
            process_grpc_event(aTHX_ client, qe->tag, qe->success);
        }

        free(qe);

        /* Check if client was destroyed during callback processing */
        if (!client->active) {
            /* Free remaining queued events */
            while (queue) {
                qe = queue;
                queue = qe->next;
                free(qe);
            }
            break;
        }
    }

    client->in_callback = 0;

    /* If DESTROY was called during event processing, finish the deferred cleanup */
    if (!client->active) {
        finish_client_destroy(aTHX_ client);

Etcd.xs  view on Meta::CPAN

    CALL_SUCCESS_CALLBACK(pc->callback, result);
}

/* Process user_get response - returns roles assigned to user */
static void process_user_get_response(pTHX_ pending_call_t *pc) {
    BEGIN_RESPONSE_HANDLER(pc, "user_get");

    Etcdserverpb__AuthUserGetResponse *resp;
    UNPACK_RESPONSE(pc, resp, etcdserverpb__auth_user_get_response__unpack);

    HV *result = newHV();
    add_header_to_hv(aTHX_ result, resp->header);

    AV *roles = newAV();
    for (size_t i = 0; i < resp->n_roles; i++) {
        av_push(roles, resp->roles[i] ? newSVpv(resp->roles[i], 0) : newSVpvn("", 0));
    }
    hv_store(result, "roles", 5, newRV_noinc((SV *)roles), 0);

    etcdserverpb__auth_user_get_response__free_unpacked(resp, NULL);

    CALL_SUCCESS_CALLBACK(pc->callback, result);
}

/* Process user_list response - returns users list */
static void process_user_list_response(pTHX_ pending_call_t *pc) {
    BEGIN_RESPONSE_HANDLER(pc, "user_list");

    Etcdserverpb__AuthUserListResponse *resp;
    UNPACK_RESPONSE(pc, resp, etcdserverpb__auth_user_list_response__unpack);

    HV *result = newHV();
    add_header_to_hv(aTHX_ result, resp->header);

    AV *users = newAV();
    for (size_t i = 0; i < resp->n_users; i++) {
        av_push(users, resp->users[i] ? newSVpv(resp->users[i], 0) : newSVpvn("", 0));
    }
    hv_store(result, "users", 5, newRV_noinc((SV *)users), 0);

    etcdserverpb__auth_user_list_response__free_unpacked(resp, NULL);

    CALL_SUCCESS_CALLBACK(pc->callback, result);
}

MODULE = EV::Etcd  PACKAGE = EV::Etcd  PREFIX = ev_etcd_

PROTOTYPES: DISABLE

BOOT:
    I_EV_API("EV::Etcd");
    grpc_init();
    init_method_slices();

EV::Etcd
ev_etcd_new(class, ...)
    char *class
CODE:
{
    ev_etcd_t *client;
    AV *endpoints_av = NULL;
    int timeout_seconds = 30;  /* Default timeout */
    int max_retries = 3;       /* Default max retries */
    int health_interval = 0;   /* Default: disabled */
    SV *health_callback = NULL;
    char *init_auth_token = NULL;
    STRLEN init_auth_token_len = 0;
    int i;

    /* Parse options */
    for (i = 1; i < items; i += 2) {
        if (i + 1 < items) {
            const char *key = SvPV_nolen(ST(i));
            if (strEQ(key, "endpoints")) {
                if (SvROK(ST(i + 1)) && SvTYPE(SvRV(ST(i + 1))) == SVt_PVAV) {
                    endpoints_av = (AV *)SvRV(ST(i + 1));
                }
            } else if (strEQ(key, "timeout")) {
                timeout_seconds = SvIV(ST(i + 1));
                if (timeout_seconds < 1) {
                    timeout_seconds = 1;  /* Minimum 1 second */
                }
            } else if (strEQ(key, "max_retries")) {
                max_retries = SvIV(ST(i + 1));
                if (max_retries < 0) {
                    max_retries = 0;
                }
            } else if (strEQ(key, "health_interval")) {
                health_interval = SvIV(ST(i + 1));
                if (health_interval < 0) {
                    health_interval = 0;
                }
            } else if (strEQ(key, "on_health_change")) {
                if (SvROK(ST(i + 1)) && SvTYPE(SvRV(ST(i + 1))) == SVt_PVCV) {
                    health_callback = ST(i + 1);
                }
            } else if (strEQ(key, "auth_token")) {
                if (SvPOK(ST(i + 1))) {
                    init_auth_token = SvPV(ST(i + 1), init_auth_token_len);
                }
            }
        }
    }

    /* Pre-validate endpoint URL sizes before allocating */
    if (endpoints_av && av_len(endpoints_av) >= 0) {
        int count = av_len(endpoints_av) + 1;
        for (i = 0; i < count; i++) {
            SV **ep = av_fetch(endpoints_av, i, 0);
            if (ep && SvPOK(*ep)) {
                VALIDATE_URL_SIZE(SvCUR(*ep));
            }
        }
    }

    Newxz(client, 1, ev_etcd_t);

    /* Store endpoints */
    if (endpoints_av && av_len(endpoints_av) >= 0) {
        int count = av_len(endpoints_av) + 1;
        Newx(client->endpoints, count, char *);
        client->endpoint_count = count;
        for (i = 0; i < count; i++) {
            SV **ep = av_fetch(endpoints_av, i, 0);
            if (ep && SvPOK(*ep)) {
                STRLEN len;
                const char *str = SvPV(*ep, len);
                Newx(client->endpoints[i], len + 1, char);
                Copy(str, client->endpoints[i], len + 1, char);
            } else {
                /* Default endpoint for invalid entries */
                client->endpoints[i] = savepv("127.0.0.1:2379");
            }
        }
    } else {
        /* Default single endpoint */
        Newx(client->endpoints, 1, char *);
        client->endpoints[0] = savepv("127.0.0.1:2379");
        client->endpoint_count = 1;
    }
    client->current_endpoint = 0;

    /* Create gRPC channel to first endpoint */
    client->channel = etcd_create_insecure_channel(client->endpoints[0], NULL);

    if (!client->channel) {
        for (int j = 0; j < client->endpoint_count; j++) {
            Safefree(client->endpoints[j]);
        }
        Safefree(client->endpoints);
        Safefree(client);
        croak("Failed to create gRPC channel");
    }

    /* Create completion queue for polling */
    client->cq = grpc_completion_queue_create_for_next(NULL);

    /* Initialize threading for hybrid gRPC/EV approach */
    pthread_mutex_init(&client->queue_mutex, NULL);
    client->thread_running = 1;

    /* Initialize ev_async watcher for main thread notification */
    ev_async_init(&client->cq_async, cq_async_callback);
    ev_async_start(EV_DEFAULT, &client->cq_async);

    /* Start gRPC completion queue thread */
    if (pthread_create(&client->cq_thread, NULL, cq_thread_func, client) != 0) {
        ev_async_stop(EV_DEFAULT, &client->cq_async);
        pthread_mutex_destroy(&client->queue_mutex);
        grpc_completion_queue_shutdown(client->cq);
        while (grpc_completion_queue_next(client->cq,
               gpr_inf_past(GPR_CLOCK_REALTIME), NULL).type != GRPC_QUEUE_SHUTDOWN)
            ;
        grpc_completion_queue_destroy(client->cq);
        grpc_channel_destroy(client->channel);
        /* Free endpoints */
        for (int j = 0; j < client->endpoint_count; j++) {
            Safefree(client->endpoints[j]);
        }
        Safefree(client->endpoints);
        Safefree(client);
        croak("Failed to create gRPC completion queue thread");
    }

    if (init_auth_token && init_auth_token_len > 0) {
        Newx(client->auth_token, init_auth_token_len + 1, char);
        Copy(init_auth_token, client->auth_token, init_auth_token_len, char);
        client->auth_token[init_auth_token_len] = '\0';
        client->auth_token_len = init_auth_token_len;
    }
    client->timeout_seconds = timeout_seconds;
    client->active = 1;
    client->owner_pid = getpid();
    client->max_retries = max_retries;
    client->is_healthy = 1;  /* Assume healthy initially */
    if (health_callback)
        client->health_callback = SvREFCNT_inc(health_callback);

    /* Initialize health timer (stopped initially) */
    ev_timer_init(&client->health_timer, health_timer_callback, 0.0, 0.0);

    /* Start health monitoring if interval > 0 */
    if (health_interval > 0) {
        ev_timer_set(&client->health_timer, (double)health_interval, (double)health_interval);
        ev_timer_start(EV_DEFAULT, &client->health_timer);
    }

    RETVAL = client;
}
OUTPUT:
    RETVAL

void
ev_etcd_get(client, key, ...)
    EV::Etcd client
    SV *key
CODE:
{
    /* Parse arguments: get(key, [opts,] callback) */
    SV *opts = NULL;
    SV *callback;

    if (items == 3) {
        /* get(key, callback) */
        callback = ST(2);
    } else if (items == 4) {
        /* get(key, opts, callback) */
        opts = ST(2);
        callback = ST(3);
    } else {
        croak("Usage: $client->get($key, [\\%%opts,] $callback)");
    }

    VALIDATE_CALLBACK(callback);

    STRLEN key_len;
    const char *key_str = SvPV(key, key_len);
    VALIDATE_KEY_SIZE(key_len);

    /* Pre-validate option sizes before allocating pending call */

Etcd.xs  view on Meta::CPAN

    pthread_mutex_unlock(&client->queue_mutex);

    /* Destroy the mutex */
    pthread_mutex_destroy(&client->queue_mutex);

    /* Destroy the completion queue */
    if (client->cq) {
        grpc_completion_queue_destroy(client->cq);
    }

    /* Cleanup call structures - skip if called during event processing
     * (the currently-processing call struct would be a use-after-free).
     * Deferred to cq_async_callback when in_callback is set. */
    if (!client->in_callback) {
        pc = client->pending_calls;
        while (pc) {
            pending_call_t *next = pc->next;
            grpc_metadata_array_destroy(&pc->initial_metadata);
            grpc_metadata_array_destroy(&pc->trailing_metadata);
            if (pc->recv_buffer) {
                grpc_byte_buffer_destroy(pc->recv_buffer);
            }
            grpc_slice_unref(pc->status_details);
            if (pc->call) {
                grpc_call_unref(pc->call);
            }
            SvREFCNT_dec(pc->callback);
            Safefree(pc);
            pc = next;
        }

        /* Streaming-call cleanup honors dual-ownership: cleanup_* unlinks and
         * frees gRPC state, then frees the struct only if the Perl handle has
         * already been released. Otherwise the struct lives until *_DESTROY. */
        while (client->watches) cleanup_watch(aTHX_ client->watches);
        while (client->keepalives) cleanup_keepalive(aTHX_ client->keepalives);
        while (client->observes) cleanup_observe(aTHX_ client->observes);
    }

    /* Stop health timer */
    ev_timer_stop(EV_DEFAULT, &client->health_timer);

    if (client->channel) {
        grpc_channel_destroy(client->channel);
    }

    free_perl_resources:
    /* Free health callback */
    if (client->health_callback) {
        SvREFCNT_dec(client->health_callback);
        client->health_callback = NULL;
    }

    /* Free auth token - securely zero before freeing */
    if (client->auth_token) {
        memset(client->auth_token, 0, client->auth_token_len);
        Safefree(client->auth_token);
        client->auth_token = NULL;
    }

    /* Free endpoints */
    if (client->endpoints) {
        int i;
        for (i = 0; i < client->endpoint_count; i++) {
            if (client->endpoints[i]) {
                Safefree(client->endpoints[i]);
            }
        }
        Safefree(client->endpoints);
        client->endpoints = NULL;
    }

    /* If called during event processing, defer struct free to cq_async_callback */
    if (!client->in_callback) {
        Safefree(client);
    }
}

MODULE = EV::Etcd  PACKAGE = EV::Etcd::Watch  PREFIX = ev_etcd_watch_

void
ev_etcd_watch_cancel(watch, callback)
    EV::Etcd::Watch watch
    SV *callback
CODE:
{
    VALIDATE_CALLBACK(callback);

    watch_call_t *wc = watch;

    if (!wc->client_owns) {
        CALL_SUCCESS_CALLBACK(callback, newHV());
        return;
    }

    if (ev_is_active(&wc->reconnect_timer))
        ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);

    if (!wc->active) {
        CALL_SUCCESS_CALLBACK(callback, newHV());
        return;
    }

    wc->active = 0;

    /* If we have a watch_id, send cancel request */
    if (wc->watch_id >= 0) {
        Etcdserverpb__WatchCancelRequest cancel_req = ETCDSERVERPB__WATCH_CANCEL_REQUEST__INIT;
        cancel_req.watch_id = wc->watch_id;

        Etcdserverpb__WatchRequest req = ETCDSERVERPB__WATCH_REQUEST__INIT;
        req.request_union_case = ETCDSERVERPB__WATCH_REQUEST__REQUEST_UNION_CANCEL_REQUEST;
        req.cancel_request = &cancel_req;

        grpc_slice req_slice;
        SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
            etcdserverpb__watch_request__get_packed_size,
            etcdserverpb__watch_request__pack, &req);
        grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
        grpc_slice_unref(req_slice);

        grpc_op op;
        memset(&op, 0, sizeof(op));
        op.op = GRPC_OP_SEND_MESSAGE;
        op.data.send_message.send_message = send_buffer;

        (void)grpc_call_start_batch(wc->call, &op, 1, &cancel_sentinel, NULL);
        grpc_byte_buffer_destroy(send_buffer);
    }



( run in 0.845 second using v1.01-cache-2.11-cpan-524268b4103 )