EV-Etcd
view release on metacpan or search on metacpan
grpc_event event = grpc_completion_queue_next(client->cq, deadline, NULL);
if (event.type == GRPC_QUEUE_SHUTDOWN) {
break;
}
if (event.type != GRPC_OP_COMPLETE) {
continue;
}
/* Queue the event for the main thread */
queued_event_t *qe = (queued_event_t *)malloc(sizeof(queued_event_t));
if (!qe) {
/* OOM is extremely rare but would cause callbacks to never fire.
* Log to stderr since we can't call Perl from this thread. */
fprintf(stderr, "EV::Etcd: CRITICAL - malloc failed in gRPC thread, event dropped\n");
continue;
}
qe->tag = event.tag;
qe->success = event.success;
qe->next = NULL;
pthread_mutex_lock(&client->queue_mutex);
if (client->event_queue_tail) {
client->event_queue_tail->next = qe;
} else {
client->event_queue = qe;
}
client->event_queue_tail = qe;
pthread_mutex_unlock(&client->queue_mutex);
/* Signal main thread */
ev_async_send(EV_DEFAULT, &client->cq_async);
}
return NULL;
}
/*
* Finish deferred client destruction.
* Called after in_callback is cleared and client->active is false,
* meaning DESTROY ran during a callback but deferred the final cleanup.
*/
void finish_client_destroy(pTHX_ ev_etcd_t *client) {
/* Free any remaining call structures */
while (client->pending_calls) {
pending_call_t *pc = client->pending_calls;
client->pending_calls = pc->next;
grpc_metadata_array_destroy(&pc->initial_metadata);
grpc_metadata_array_destroy(&pc->trailing_metadata);
if (pc->recv_buffer) grpc_byte_buffer_destroy(pc->recv_buffer);
grpc_slice_unref(pc->status_details);
if (pc->call) grpc_call_unref(pc->call);
SvREFCNT_dec(pc->callback);
Safefree(pc);
}
while (client->watches) {
cleanup_watch(aTHX_ client->watches);
}
while (client->keepalives) {
cleanup_keepalive(aTHX_ client->keepalives);
}
while (client->observes) {
cleanup_observe(aTHX_ client->observes);
}
/* Free client-level resources (mirrors free_perl_resources in DESTROY) */
if (client->health_callback)
SvREFCNT_dec(client->health_callback);
if (client->auth_token) {
memset(client->auth_token, 0, client->auth_token_len);
Safefree(client->auth_token);
}
if (client->endpoints) {
for (int i = 0; i < client->endpoint_count; i++)
if (client->endpoints[i]) Safefree(client->endpoints[i]);
Safefree(client->endpoints);
}
Safefree(client);
}
/*
* ev_async callback - runs in main thread when signaled by gRPC thread.
* Drains the event queue and processes each event.
*/
static void cq_async_callback(struct ev_loop *loop, ev_async *w, int revents) {
dTHX;
(void)loop;
(void)revents;
ev_etcd_t *client = (ev_etcd_t *)((char *)w - offsetof(ev_etcd_t, cq_async));
/* Don't process if client is being destroyed */
if (!client->active) {
return;
}
/* Drain the queue under lock, then process without lock */
pthread_mutex_lock(&client->queue_mutex);
queued_event_t *queue = client->event_queue;
client->event_queue = NULL;
client->event_queue_tail = NULL;
pthread_mutex_unlock(&client->queue_mutex);
/* Guard against client being freed during event processing */
client->in_callback = 1;
/* Process all queued events */
while (queue) {
queued_event_t *qe = queue;
queue = qe->next;
/* Defensive NULL guard â every code path uses a real tag (see cancel_sentinel) */
if (qe->tag) {
process_grpc_event(aTHX_ client, qe->tag, qe->success);
}
free(qe);
/* Check if client was destroyed during callback processing */
*/
static void process_grpc_event(pTHX_ ev_etcd_t *client, void *tag, int success) {
call_base_t *base = (call_base_t *)tag;
/* Fire-and-forget cancel batch â nothing to do */
if (base->type == CALL_TYPE_NONE) return;
if (base->type == CALL_TYPE_WATCH_RECV) {
/* Watch receive completion */
watch_call_t *wc = (watch_call_t *)base;
if (success && wc->active) {
process_watch_response(aTHX_ wc);
if (!client->active) return; /* DESTROY called in callback */
/* Re-arm receive if still active */
if (wc->active) {
watch_rearm_recv(aTHX_ wc);
} else {
/* Response handler set active=0 (e.g., server cancel) */
cleanup_watch(aTHX_ wc);
}
} else if (!success && wc->active) {
/* Stream ended or error - try to reconnect */
wc->active = 0;
/* Try automatic reconnection */
if (try_reconnect_watch(aTHX_ wc)) {
/* Reconnection initiated, don't notify callback yet */
} else {
/* Reconnection failed or disabled, notify callback and cleanup */
CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_UNAVAILABLE, "Watch stream ended", "watch");
if (!client->active) return; /* DESTROY called in callback */
cleanup_watch(aTHX_ wc);
}
} else {
/* RECV completed but watch already inactive (user cancel) */
cleanup_watch(aTHX_ wc);
}
} else if (base->type == CALL_TYPE_WATCH) {
/* Initial watch setup complete - process first message if any */
watch_call_t *wc = (watch_call_t *)base;
if (success) {
/* Process the first message that was received in the initial batch */
if (wc->recv_buffer && wc->active) {
process_watch_response(aTHX_ wc);
if (!client->active) return;
}
/* Re-arm to receive more messages */
if (wc->active) {
watch_rearm_recv(aTHX_ wc);
} else {
cleanup_watch(aTHX_ wc);
}
} else {
if (wc->active) {
CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "Watch setup failed", "watch");
if (!client->active) return;
}
cleanup_watch(aTHX_ wc);
}
} else if (base->type == CALL_TYPE_LEASE_KEEPALIVE_RECV) {
/* Keepalive receive completion */
keepalive_call_t *kc = (keepalive_call_t *)base;
if (success && kc->active) {
process_keepalive_response(aTHX_ kc);
if (!client->active) return;
/* Re-arm receive if still active */
if (kc->active) {
keepalive_rearm_recv(aTHX_ kc);
} else {
/* Response handler set active=0 (e.g., lease expired) */
cleanup_keepalive(aTHX_ kc);
}
} else if (!success && kc->active) {
/* Stream ended or error - try to reconnect */
kc->active = 0;
/* Try automatic reconnection */
if (try_reconnect_keepalive(aTHX_ kc)) {
/* Reconnection initiated, don't notify callback yet */
} else {
/* Reconnection failed or disabled, notify callback and cleanup */
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_UNAVAILABLE, "Keepalive stream ended", "keepalive");
if (!client->active) return;
cleanup_keepalive(aTHX_ kc);
}
} else {
/* RECV completed but keepalive already inactive */
cleanup_keepalive(aTHX_ kc);
}
} else if (base->type == CALL_TYPE_LEASE_KEEPALIVE) {
/* Initial keepalive setup complete - process first message if any */
keepalive_call_t *kc = (keepalive_call_t *)base;
if (success) {
/* Process the first message that was received in the initial batch */
if (kc->recv_buffer && kc->active) {
process_keepalive_response(aTHX_ kc);
if (!client->active) return;
}
/* Re-arm to receive more messages */
if (kc->active) {
keepalive_rearm_recv(aTHX_ kc);
} else {
/* First response set active=0 (e.g., lease already expired) */
cleanup_keepalive(aTHX_ kc);
}
} else {
if (kc->active) {
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive setup failed", "keepalive");
if (!client->active) return;
}
cleanup_keepalive(aTHX_ kc);
}
} else if (base->type == CALL_TYPE_ELECTION_OBSERVE_RECV) {
/* Election observe receive completion */
observe_call_t *oc = (observe_call_t *)base;
if (success && oc->active) {
process_observe_response(aTHX_ oc);
if (!client->active) return;
/* Re-arm receive if still active */
if (oc->active) {
observe_rearm_recv(aTHX_ oc);
} else {
/* Response handler set active=0 */
cleanup_observe(aTHX_ oc);
}
} else if (!success && oc->active) {
/* Stream ended or error - try to reconnect */
oc->active = 0;
/* Try automatic reconnection */
if (try_reconnect_observe(aTHX_ oc)) {
/* Reconnection initiated, don't notify callback yet */
} else {
/* Reconnection failed or disabled, notify callback and cleanup */
CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_UNAVAILABLE, "Observe stream ended", "observe");
if (!client->active) return;
cleanup_observe(aTHX_ oc);
}
} else {
/* RECV completed but observe already inactive */
cleanup_observe(aTHX_ oc);
}
} else if (base->type == CALL_TYPE_ELECTION_OBSERVE) {
/* Initial observe setup complete - process first message if any */
observe_call_t *oc = (observe_call_t *)base;
if (success) {
/* Process the first message that was received in the initial batch */
if (oc->recv_buffer && oc->active) {
process_observe_response(aTHX_ oc);
if (!client->active) return;
}
/* Re-arm to receive more messages */
if (oc->active) {
observe_rearm_recv(aTHX_ oc);
} else {
/* First response set active=0 */
cleanup_observe(aTHX_ oc);
}
} else {
if (oc->active) {
CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_INTERNAL, "Observe setup failed", "observe");
if (!client->active) return;
}
cleanup_observe(aTHX_ oc);
}
} else {
/* Unary RPC completion */
pending_call_t *pc = (pending_call_t *)base;
/* Remove from pending list BEFORE calling handler to prevent
* use-after-free if the callback triggers DESTROY */
/* Create call */
gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(client->timeout_seconds, GPR_TIMESPAN)
);
pc->call = grpc_channel_create_call(
client->channel,
NULL,
GRPC_PROPAGATE_DEFAULTS,
client->cq,
METHOD_MAINTENANCE_STATUS,
NULL,
deadline,
NULL
);
if (!pc->call) {
grpc_byte_buffer_destroy(send_buffer);
CLEANUP_PENDING_CALL_ON_ERROR(pc);
croak("Failed to create gRPC call for status");
}
/* Set up operations */
grpc_op ops[6] = {0};
grpc_metadata auth_md;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
setup_auth_metadata(client, &ops[0], &auth_md);
ops[1].op = GRPC_OP_SEND_MESSAGE;
ops[1].data.send_message.send_message = send_buffer;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata.recv_initial_metadata = &pc->initial_metadata;
ops[4].op = GRPC_OP_RECV_MESSAGE;
ops[4].data.recv_message.recv_message = &pc->recv_buffer;
ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[5].data.recv_status_on_client.trailing_metadata = &pc->trailing_metadata;
ops[5].data.recv_status_on_client.status = &pc->status;
ops[5].data.recv_status_on_client.status_details = &pc->status_details;
grpc_call_error err = grpc_call_start_batch(pc->call, ops, 6, &pc->base, NULL);
cleanup_auth_metadata(client, &auth_md);
grpc_byte_buffer_destroy(send_buffer);
if (err != GRPC_CALL_OK) {
CLEANUP_PENDING_CALL_ON_ERROR(pc);
croak("Failed to start gRPC call: %d", err);
}
pc->next = client->pending_calls;
client->pending_calls = pc;
}
EV::Etcd::Keepalive
ev_etcd_lease_keepalive(client, lease_id, ...)
EV::Etcd client
int64_t lease_id
CODE:
{
/* Parse arguments: lease_keepalive(lease_id, [opts,] callback) */
SV *opts = NULL;
SV *callback;
if (items == 3) {
callback = ST(2);
} else if (items == 4) {
opts = ST(2);
callback = ST(3);
} else {
croak("Usage: $client->lease_keepalive($lease_id, [\\%%opts,] $callback)");
}
VALIDATE_CALLBACK(callback);
/* Create keepalive structure (Newxz zeroes everything; only set non-zero fields) */
keepalive_call_t *kc;
Newxz(kc, 1, keepalive_call_t);
init_call_base(&kc->base, CALL_TYPE_LEASE_KEEPALIVE);
kc->callback = newSVsv(callback);
kc->client = client;
kc->active = 1;
kc->auto_reconnect = 1; /* Enable by default */
kc->lease_id = lease_id;
kc->client_owns = 1;
kc->perl_owns = 1;
if (opts && SvROK(opts) && SvTYPE(SvRV(opts)) == SVt_PVHV) {
HV *hv = (HV *)SvRV(opts);
SV **svp;
if ((svp = hv_fetchs(hv, "auto_reconnect", 0)) && !SvTRUE(*svp)) {
kc->auto_reconnect = 0;
}
}
grpc_metadata_array_init(&kc->initial_metadata);
grpc_metadata_array_init(&kc->trailing_metadata);
kc->status_details = grpc_empty_slice();
/* Build LeaseKeepAliveRequest */
Etcdserverpb__LeaseKeepAliveRequest req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
req.id = lease_id;
/* Serialize request */
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcdserverpb__lease_keep_alive_request__get_packed_size,
etcdserverpb__lease_keep_alive_request__pack, &req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Create streaming call */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
kc->call = grpc_channel_create_call(
client->channel,
NULL,
GRPC_PROPAGATE_DEFAULTS,
client->cq,
METHOD_LEASE_KEEPALIVE,
NULL,
deadline,
NULL
);
if (!kc->call) {
grpc_byte_buffer_destroy(send_buffer);
grpc_metadata_array_destroy(&kc->initial_metadata);
grpc_metadata_array_destroy(&kc->trailing_metadata);
grpc_slice_unref(kc->status_details);
SvREFCNT_dec(kc->callback);
Safefree(kc);
croak("Failed to create gRPC call for lease_keepalive");
}
/* Start the call with initial operations */
grpc_op ops[4] = {0};
grpc_metadata auth_md;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
setup_auth_metadata(client, &ops[0], &auth_md);
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata.recv_initial_metadata = &kc->initial_metadata;
ops[2].op = GRPC_OP_SEND_MESSAGE;
ops[2].data.send_message.send_message = send_buffer;
/* Receive the first response */
ops[3].op = GRPC_OP_RECV_MESSAGE;
ops[3].data.recv_message.recv_message = &kc->recv_buffer;
grpc_call_error err = grpc_call_start_batch(kc->call, ops, 4, &kc->base, NULL);
cleanup_auth_metadata(client, &auth_md);
grpc_byte_buffer_destroy(send_buffer);
if (err != GRPC_CALL_OK) {
grpc_metadata_array_destroy(&kc->initial_metadata);
grpc_metadata_array_destroy(&kc->trailing_metadata);
grpc_slice_unref(kc->status_details);
grpc_call_unref(kc->call);
SvREFCNT_dec(kc->callback);
Safefree(kc);
croak("Failed to start gRPC call: %d", err);
}
kc->next = client->keepalives;
client->keepalives = kc;
RETVAL = kc;
}
OUTPUT:
RETVAL
void
ev_etcd_txn(client, compare_av, success_av, failure_av, callback)
EV::Etcd client
SV *compare_av
SV *success_av
SV *failure_av
SV *callback
CODE:
{
VALIDATE_CALLBACK(callback);
/* Pre-validate compare key/value sizes before allocating pending call */
if (SvROK(compare_av) && SvTYPE(SvRV(compare_av)) == SVt_PVAV) {
AV *av = (AV *)SvRV(compare_av);
size_t n = av_len(av) + 1;
for (size_t i = 0; i < n; i++) {
SV **elem = av_fetch(av, i, 0);
if (elem && SvROK(*elem) && SvTYPE(SvRV(*elem)) == SVt_PVHV) {
HV *hv = (HV *)SvRV(*elem);
STRLEN _l;
SV **key_sv = hv_fetch(hv, "key", 3, 0);
if (key_sv && SvOK(*key_sv)) {
(void)SvPV(*key_sv, _l);
VALIDATE_KEY_SIZE(_l);
}
SV **value_sv = hv_fetch(hv, "value", 5, 0);
if (value_sv && SvOK(*value_sv)) {
(void)SvPV(*value_sv, _l);
VALIDATE_VALUE_SIZE(_l);
}
}
}
}
/* Pre-validate success/failure op sizes too, before any allocation, so a
* croak from an oversized key/value can't leak the compare array below. */
validate_request_ops(aTHX_ success_av);
validate_request_ops(aTHX_ failure_av);
/* Create pending call structure */
pending_call_t *pc;
INIT_PENDING_CALL(pc, CALL_TYPE_TXN, callback, client);
/* Build TxnRequest */
Etcdserverpb__TxnRequest req = ETCDSERVERPB__TXN_REQUEST__INIT;
/* Parse compare array */
size_t n_compare = 0;
Etcdserverpb__Compare **compares = NULL;
if (SvROK(compare_av) && SvTYPE(SvRV(compare_av)) == SVt_PVAV) {
AV *av = (AV *)SvRV(compare_av);
n_compare = av_len(av) + 1;
if (n_compare > 0) {
oc->client = client;
oc->active = 1;
oc->auto_reconnect = auto_reconnect;
oc->client_owns = 1;
oc->perl_owns = 1;
grpc_metadata_array_init(&oc->initial_metadata);
grpc_metadata_array_init(&oc->trailing_metadata);
oc->status_details = grpc_empty_slice();
/* Save params for reconnection */
Newx(oc->params.name, name_len + 1, char);
Copy(name_str, oc->params.name, name_len, char);
oc->params.name[name_len] = '\0';
oc->params.name_len = name_len;
V3electionpb__LeaderRequest req = V3ELECTIONPB__LEADER_REQUEST__INIT;
req.name.data = (uint8_t *)name_str;
req.name.len = name_len;
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
v3electionpb__leader_request__get_packed_size,
v3electionpb__leader_request__pack, &req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Use infinite deadline for streaming call */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
oc->call = grpc_channel_create_call(
client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
client->cq, METHOD_ELECTION_OBSERVE, NULL, deadline, NULL
);
if (!oc->call) {
grpc_byte_buffer_destroy(send_buffer);
grpc_metadata_array_destroy(&oc->initial_metadata);
grpc_metadata_array_destroy(&oc->trailing_metadata);
grpc_slice_unref(oc->status_details);
SvREFCNT_dec(oc->callback);
if (oc->params.name) Safefree(oc->params.name);
Safefree(oc);
croak("Failed to create gRPC call for election_observe");
}
grpc_op ops[5] = {0};
grpc_metadata auth_md;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
setup_auth_metadata(client, &ops[0], &auth_md);
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata.recv_initial_metadata = &oc->initial_metadata;
ops[2].op = GRPC_OP_SEND_MESSAGE;
ops[2].data.send_message.send_message = send_buffer;
/* Observe is server-streaming: the client sends one LeaderRequest and the
* server streams LeaderResponses. Half-close the client side so etcd's
* handler receives the completed request and begins streaming; without it
* the stream is established but never delivers an event. (Watch/keepalive
* are bidi and deliberately stay open, so they must NOT half-close.) */
ops[3].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[4].op = GRPC_OP_RECV_MESSAGE;
ops[4].data.recv_message.recv_message = &oc->recv_buffer;
grpc_call_error err = grpc_call_start_batch(oc->call, ops, 5, &oc->base, NULL);
cleanup_auth_metadata(client, &auth_md);
grpc_byte_buffer_destroy(send_buffer);
if (err != GRPC_CALL_OK) {
grpc_metadata_array_destroy(&oc->initial_metadata);
grpc_metadata_array_destroy(&oc->trailing_metadata);
grpc_slice_unref(oc->status_details);
grpc_call_unref(oc->call);
SvREFCNT_dec(oc->callback);
if (oc->params.name) Safefree(oc->params.name);
Safefree(oc);
croak("Failed to start gRPC call: %d", err);
}
/* Add to observes list */
oc->next = client->observes;
client->observes = oc;
RETVAL = oc;
}
OUTPUT:
RETVAL
void
ev_etcd_member_list(client, ...)
EV::Etcd client
CODE:
{
SV *opts = NULL;
SV *callback;
if (items == 2) {
callback = ST(1);
} else if (items == 3) {
opts = ST(1);
callback = ST(2);
} else {
croak("Usage: $client->member_list([\\%%opts,] $callback)");
}
VALIDATE_CALLBACK(callback);
pending_call_t *pc;
INIT_PENDING_CALL(pc, CALL_TYPE_MEMBER_LIST, callback, client);
Etcdserverpb__MemberListRequest req = ETCDSERVERPB__MEMBER_LIST_REQUEST__INIT;
if (opts && SvROK(opts) && SvTYPE(SvRV(opts)) == SVt_PVHV) {
HV *hv = (HV *)SvRV(opts);
SV **svp;
if ((svp = hv_fetchs(hv, "linearizable", 0)) && SvTRUE(*svp)) {
req.linearizable = 1;
}
NULL,
deadline,
NULL
);
if (!pc->call) {
grpc_byte_buffer_destroy(send_buffer);
CLEANUP_PENDING_CALL_ON_ERROR(pc);
croak("Failed to create gRPC call for auth_status");
}
/* Set up operations */
grpc_op ops[6] = {0};
grpc_metadata auth_md;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
setup_auth_metadata(client, &ops[0], &auth_md);
ops[1].op = GRPC_OP_SEND_MESSAGE;
ops[1].data.send_message.send_message = send_buffer;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata.recv_initial_metadata = &pc->initial_metadata;
ops[4].op = GRPC_OP_RECV_MESSAGE;
ops[4].data.recv_message.recv_message = &pc->recv_buffer;
ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[5].data.recv_status_on_client.trailing_metadata = &pc->trailing_metadata;
ops[5].data.recv_status_on_client.status = &pc->status;
ops[5].data.recv_status_on_client.status_details = &pc->status_details;
grpc_call_error err = grpc_call_start_batch(pc->call, ops, 6, &pc->base, NULL);
cleanup_auth_metadata(client, &auth_md);
grpc_byte_buffer_destroy(send_buffer);
if (err != GRPC_CALL_OK) {
CLEANUP_PENDING_CALL_ON_ERROR(pc);
croak("Failed to start gRPC call: %d", err);
}
pc->next = client->pending_calls;
client->pending_calls = pc;
}
void
ev_etcd_DESTROY(client)
EV::Etcd client
CODE:
{
/* Fork safety: in a child process, the gRPC thread and completion queue
* are in undefined state. Skip gRPC cleanup, just free Perl-side resources. */
if (client->owner_pid != getpid()) {
warn("EV::Etcd: client destroyed in forked child (pid %d, created in %d)"
" -- skipping gRPC cleanup", (int)getpid(), (int)client->owner_pid);
/* Free SV callbacks and Safefree'd params only; don't touch gRPC objects.
* Honor dual-ownership: if Perl handle is still alive we leave the struct
* for *_DESTROY to free. */
pending_call_t *pc = client->pending_calls;
while (pc) {
pending_call_t *next = pc->next;
SvREFCNT_dec(pc->callback);
Safefree(pc);
pc = next;
}
watch_call_t *wc = client->watches;
while (wc) {
watch_call_t *next = wc->next;
if (ev_is_active(&wc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
SvREFCNT_dec(wc->callback);
wc->callback = NULL;
wc->client_owns = 0;
if (!wc->perl_owns) {
if (wc->params.key) Safefree(wc->params.key);
if (wc->params.range_end) Safefree(wc->params.range_end);
Safefree(wc);
}
wc = next;
}
keepalive_call_t *kc = client->keepalives;
while (kc) {
keepalive_call_t *next = kc->next;
if (ev_is_active(&kc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
SvREFCNT_dec(kc->callback);
kc->callback = NULL;
kc->client_owns = 0;
if (!kc->perl_owns) Safefree(kc);
kc = next;
}
observe_call_t *oc = client->observes;
while (oc) {
observe_call_t *next = oc->next;
if (ev_is_active(&oc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &oc->reconnect_timer);
SvREFCNT_dec(oc->callback);
oc->callback = NULL;
oc->client_owns = 0;
if (!oc->perl_owns) {
if (oc->params.name) Safefree(oc->params.name);
Safefree(oc);
}
oc = next;
}
if (ev_is_active(&client->health_timer))
ev_timer_stop(EV_DEFAULT, &client->health_timer);
if (ev_is_active(&client->cq_async))
ev_async_stop(EV_DEFAULT, &client->cq_async);
goto free_perl_resources;
}
/* Mark client as inactive first to prevent callbacks from accessing freed memory */
client->active = 0;
/* Stop ev_async watcher */
if (ev_is_active(&client->cq_async)) {
ev_async_stop(EV_DEFAULT, &client->cq_async);
}
/* Signal the gRPC thread to stop and wait for it */
client->thread_running = 0;
/* Mark every streaming call inactive and cancel its gRPC call so pending
* batches complete promptly with success=0. */
watch_call_t *wc = client->watches;
while (wc) {
wc->active = 0;
if (ev_is_active(&wc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
if (wc->call) {
grpc_call_cancel(wc->call, NULL);
}
wc = wc->next;
}
keepalive_call_t *kc = client->keepalives;
while (kc) {
kc->active = 0;
if (ev_is_active(&kc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
if (kc->call) {
grpc_call_cancel(kc->call, NULL);
}
kc = kc->next;
}
observe_call_t *oc = client->observes;
while (oc) {
oc->active = 0;
if (ev_is_active(&oc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &oc->reconnect_timer);
if (oc->call) {
grpc_call_cancel(oc->call, NULL);
}
oc = oc->next;
}
/* Cancel pending unary calls */
pending_call_t *pc = client->pending_calls;
while (pc) {
if (pc->call) {
grpc_call_cancel(pc->call, NULL);
}
pc = pc->next;
}
/* Shutdown the completion queue - this will cause the thread to exit */
if (client->cq) {
grpc_completion_queue_shutdown(client->cq);
}
/* Wait for the gRPC thread to finish */
pthread_join(client->cq_thread, NULL);
/* Clean up the event queue (any remaining queued events) */
pthread_mutex_lock(&client->queue_mutex);
queued_event_t *qe = client->event_queue;
while (qe) {
queued_event_t *next = qe->next;
free(qe);
qe = next;
}
client->event_queue = NULL;
client->event_queue_tail = NULL;
pthread_mutex_unlock(&client->queue_mutex);
/* Destroy the mutex */
pthread_mutex_destroy(&client->queue_mutex);
/* Destroy the completion queue */
if (client->cq) {
grpc_completion_queue_destroy(client->cq);
}
/* Cleanup call structures - skip if called during event processing
* (the currently-processing call struct would be a use-after-free).
* Deferred to cq_async_callback when in_callback is set. */
if (!client->in_callback) {
pc = client->pending_calls;
while (pc) {
pending_call_t *next = pc->next;
grpc_metadata_array_destroy(&pc->initial_metadata);
grpc_metadata_array_destroy(&pc->trailing_metadata);
if (pc->recv_buffer) {
grpc_byte_buffer_destroy(pc->recv_buffer);
}
grpc_slice_unref(pc->status_details);
if (pc->call) {
grpc_call_unref(pc->call);
}
SvREFCNT_dec(pc->callback);
Safefree(pc);
pc = next;
}
/* Streaming-call cleanup honors dual-ownership: cleanup_* unlinks and
* frees gRPC state, then frees the struct only if the Perl handle has
* already been released. Otherwise the struct lives until *_DESTROY. */
while (client->watches) cleanup_watch(aTHX_ client->watches);
while (client->keepalives) cleanup_keepalive(aTHX_ client->keepalives);
while (client->observes) cleanup_observe(aTHX_ client->observes);
}
/* Stop health timer */
ev_timer_stop(EV_DEFAULT, &client->health_timer);
if (client->channel) {
grpc_channel_destroy(client->channel);
}
free_perl_resources:
/* Free health callback */
if (client->health_callback) {
SvREFCNT_dec(client->health_callback);
client->health_callback = NULL;
}
/* Free auth token - securely zero before freeing */
if (client->auth_token) {
memset(client->auth_token, 0, client->auth_token_len);
Safefree(client->auth_token);
client->auth_token = NULL;
}
/* Free endpoints */
if (client->endpoints) {
int i;
for (i = 0; i < client->endpoint_count; i++) {
if (client->endpoints[i]) {
Safefree(client->endpoints[i]);
}
}
Safefree(client->endpoints);
client->endpoints = NULL;
}
/* If called during event processing, defer struct free to cq_async_callback */
if (!client->in_callback) {
Safefree(client);
}
}
MODULE = EV::Etcd PACKAGE = EV::Etcd::Watch PREFIX = ev_etcd_watch_
void
ev_etcd_watch_cancel(watch, callback)
EV::Etcd::Watch watch
SV *callback
CODE:
{
VALIDATE_CALLBACK(callback);
watch_call_t *wc = watch;
if (!wc->client_owns) {
CALL_SUCCESS_CALLBACK(callback, newHV());
return;
}
if (ev_is_active(&wc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
if (!wc->active) {
CALL_SUCCESS_CALLBACK(callback, newHV());
return;
}
wc->active = 0;
/* If we have a watch_id, send cancel request */
if (wc->watch_id >= 0) {
Etcdserverpb__WatchCancelRequest cancel_req = ETCDSERVERPB__WATCH_CANCEL_REQUEST__INIT;
cancel_req.watch_id = wc->watch_id;
Etcdserverpb__WatchRequest req = ETCDSERVERPB__WATCH_REQUEST__INIT;
req.request_union_case = ETCDSERVERPB__WATCH_REQUEST__REQUEST_UNION_CANCEL_REQUEST;
req.cancel_request = &cancel_req;
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcdserverpb__watch_request__get_packed_size,
etcdserverpb__watch_request__pack, &req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = send_buffer;
(void)grpc_call_start_batch(wc->call, &op, 1, &cancel_sentinel, NULL);
grpc_byte_buffer_destroy(send_buffer);
}
if (wc->call)
grpc_call_cancel(wc->call, NULL);
CALL_SUCCESS_CALLBACK(callback, newHV());
}
void
ev_etcd_watch_DESTROY(watch)
EV::Etcd::Watch watch
CODE:
{
watch_call_perl_release(aTHX_ watch);
}
MODULE = EV::Etcd PACKAGE = EV::Etcd::Keepalive PREFIX = ev_etcd_keepalive_
void
ev_etcd_keepalive_cancel(keepalive, callback)
EV::Etcd::Keepalive keepalive
SV *callback
CODE:
{
VALIDATE_CALLBACK(callback);
keepalive_call_t *kc = keepalive;
if (!kc->client_owns) {
CALL_SUCCESS_CALLBACK(callback, newHV());
return;
}
if (ev_is_active(&kc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
if (!kc->active) {
CALL_SUCCESS_CALLBACK(callback, newHV());
return;
}
kc->active = 0;
if (kc->call)
grpc_call_cancel(kc->call, NULL);
CALL_SUCCESS_CALLBACK(callback, newHV());
}
void
ev_etcd_keepalive_DESTROY(keepalive)
EV::Etcd::Keepalive keepalive
CODE:
{
keepalive_call_perl_release(aTHX_ keepalive);
}
MODULE = EV::Etcd PACKAGE = EV::Etcd::Observe PREFIX = ev_etcd_observe_
void
ev_etcd_observe_cancel(observe, callback)
EV::Etcd::Observe observe
SV *callback
CODE:
{
VALIDATE_CALLBACK(callback);
observe_call_t *oc = observe;
if (!oc->client_owns) {
CALL_SUCCESS_CALLBACK(callback, newHV());
return;
}
if (ev_is_active(&oc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &oc->reconnect_timer);
if (!oc->active) {
CALL_SUCCESS_CALLBACK(callback, newHV());
return;
}
oc->active = 0;
if (oc->call)
grpc_call_cancel(oc->call, NULL);
CALL_SUCCESS_CALLBACK(callback, newHV());
}
void
ev_etcd_observe_DESTROY(observe)
EV::Etcd::Observe observe
CODE:
{
observe_call_perl_release(aTHX_ observe);
}
MODULE = EV::Etcd PACKAGE = EV::Etcd PREFIX = ev_etcd_
void
END()
CODE:
grpc_shutdown();
( run in 1.546 second using v1.01-cache-2.11-cpan-13bb782fe5a )