EV-Etcd

 view release on metacpan or  search on metacpan

etcd_watch.c  view on Meta::CPAN

/*
 * etcd_watch.c - Watch operation handlers for EV::Etcd
 */
#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "ppport.h"

#include "etcd_common.h"
#include "etcd_watch.h"

/* Re-arm watch to receive next message */
void watch_rearm_recv(pTHX_ watch_call_t *wc) {
    if (!wc->active) return;

    if (wc->recv_buffer) {
        grpc_byte_buffer_destroy(wc->recv_buffer);
        wc->recv_buffer = NULL;
    }

    wc->base.type = CALL_TYPE_WATCH_RECV;

    grpc_op op;
    memset(&op, 0, sizeof(op));
    op.op = GRPC_OP_RECV_MESSAGE;
    op.data.recv_message.recv_message = &wc->recv_buffer;

    grpc_call_error err = grpc_call_start_batch(wc->call, &op, 1, &wc->base, NULL);
    if (err != GRPC_CALL_OK) {
        wc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "Watch rearm failed", "watch");
        cleanup_watch(aTHX_ wc);
    }
}

/* Free struct memory and key buffers — final step once both owners released */
static void watch_call_free(pTHX_ watch_call_t *wc) {
    if (wc->params.key) Safefree(wc->params.key);
    if (wc->params.range_end) Safefree(wc->params.range_end);
    Safefree(wc);
}

/* Client-side cleanup: free gRPC state, unlink from list, drop client ownership.
 * If Perl side already released, free the struct. Otherwise leave it alive and
 * inert for the Perl handle's DESTROY to free later — prevents UAF when the
 * user holds the handle past cancellation. */
void cleanup_watch(pTHX_ watch_call_t *wc) {
    if (!wc->client_owns) return;

    ev_etcd_t *client = wc->client;
    watch_call_t **wp = &client->watches;
    while (*wp) {
        if (*wp == wc) { *wp = wc->next; break; }
        wp = &(*wp)->next;
    }

    if (ev_is_active(&wc->reconnect_timer))
        ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
    grpc_metadata_array_destroy(&wc->initial_metadata);
    grpc_metadata_array_destroy(&wc->trailing_metadata);
    if (wc->recv_buffer) {
        grpc_byte_buffer_destroy(wc->recv_buffer);
        wc->recv_buffer = NULL;
    }
    grpc_slice_unref(wc->status_details);
    if (wc->call) {
        grpc_call_unref(wc->call);
        wc->call = NULL;
    }
    SvREFCNT_dec(wc->callback);
    wc->callback = NULL;
    wc->active = 0;
    wc->client_owns = 0;

    if (!wc->perl_owns) watch_call_free(aTHX_ wc);
}

/* Perl-side cleanup: drop perl ownership; free struct if client side already done */
void watch_call_perl_release(pTHX_ watch_call_t *wc) {
    wc->perl_owns = 0;
    if (!wc->client_owns) watch_call_free(aTHX_ wc);
}

/* Process WatchResponse and call Perl callback */
void process_watch_response(pTHX_ watch_call_t *wc) {
    if (!wc->recv_buffer) {
        wc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "No watch response received", "watch");
        return;
    }

    grpc_byte_buffer_reader reader;
    if (!grpc_byte_buffer_reader_init(&reader, wc->recv_buffer)) {
        wc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "Failed to read watch response buffer", "watch");
        return;
    }

    grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
    grpc_byte_buffer_reader_destroy(&reader);

    Etcdserverpb__WatchResponse *resp = etcdserverpb__watch_response__unpack(
        NULL, GRPC_SLICE_LENGTH(slice), GRPC_SLICE_START_PTR(slice));
    grpc_slice_unref(slice);



( run in 0.988 second using v1.01-cache-2.11-cpan-ceb78f64989 )