EV-Etcd
view release on metacpan or search on metacpan
etcd_watch.c view on Meta::CPAN
/*
* etcd_watch.c - Watch operation handlers for EV::Etcd
*/
#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "ppport.h"
#include "etcd_common.h"
#include "etcd_watch.h"
/* Re-arm watch to receive next message */
void watch_rearm_recv(pTHX_ watch_call_t *wc) {
if (!wc->active) return;
if (wc->recv_buffer) {
grpc_byte_buffer_destroy(wc->recv_buffer);
wc->recv_buffer = NULL;
}
wc->base.type = CALL_TYPE_WATCH_RECV;
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message.recv_message = &wc->recv_buffer;
grpc_call_error err = grpc_call_start_batch(wc->call, &op, 1, &wc->base, NULL);
if (err != GRPC_CALL_OK) {
wc->active = 0;
CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "Watch rearm failed", "watch");
cleanup_watch(aTHX_ wc);
}
}
/* Free struct memory and key buffers â final step once both owners released */
static void watch_call_free(pTHX_ watch_call_t *wc) {
if (wc->params.key) Safefree(wc->params.key);
if (wc->params.range_end) Safefree(wc->params.range_end);
Safefree(wc);
}
/* Client-side cleanup: free gRPC state, unlink from list, drop client ownership.
* If Perl side already released, free the struct. Otherwise leave it alive and
* inert for the Perl handle's DESTROY to free later â prevents UAF when the
* user holds the handle past cancellation. */
void cleanup_watch(pTHX_ watch_call_t *wc) {
if (!wc->client_owns) return;
ev_etcd_t *client = wc->client;
watch_call_t **wp = &client->watches;
while (*wp) {
if (*wp == wc) { *wp = wc->next; break; }
wp = &(*wp)->next;
}
if (ev_is_active(&wc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
grpc_metadata_array_destroy(&wc->initial_metadata);
grpc_metadata_array_destroy(&wc->trailing_metadata);
if (wc->recv_buffer) {
grpc_byte_buffer_destroy(wc->recv_buffer);
wc->recv_buffer = NULL;
}
grpc_slice_unref(wc->status_details);
if (wc->call) {
grpc_call_unref(wc->call);
wc->call = NULL;
}
SvREFCNT_dec(wc->callback);
wc->callback = NULL;
wc->active = 0;
wc->client_owns = 0;
if (!wc->perl_owns) watch_call_free(aTHX_ wc);
}
/* Perl-side cleanup: drop perl ownership; free struct if client side already done */
void watch_call_perl_release(pTHX_ watch_call_t *wc) {
wc->perl_owns = 0;
if (!wc->client_owns) watch_call_free(aTHX_ wc);
}
/* Process WatchResponse and call Perl callback */
void process_watch_response(pTHX_ watch_call_t *wc) {
if (!wc->recv_buffer) {
wc->active = 0;
CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "No watch response received", "watch");
return;
}
grpc_byte_buffer_reader reader;
if (!grpc_byte_buffer_reader_init(&reader, wc->recv_buffer)) {
wc->active = 0;
CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "Failed to read watch response buffer", "watch");
return;
}
grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
grpc_byte_buffer_reader_destroy(&reader);
Etcdserverpb__WatchResponse *resp = etcdserverpb__watch_response__unpack(
NULL, GRPC_SLICE_LENGTH(slice), GRPC_SLICE_START_PTR(slice));
grpc_slice_unref(slice);
( run in 0.988 second using v1.01-cache-2.11-cpan-ceb78f64989 )