EV-Etcd

 view release on metacpan or  search on metacpan

Etcd.xs  view on Meta::CPAN

        grpc_event event = grpc_completion_queue_next(client->cq, deadline, NULL);

        if (event.type == GRPC_QUEUE_SHUTDOWN) {
            break;
        }

        if (event.type != GRPC_OP_COMPLETE) {
            continue;
        }

        /* Queue the event for the main thread */
        queued_event_t *qe = (queued_event_t *)malloc(sizeof(queued_event_t));
        if (!qe) {
            /* OOM is extremely rare but would cause callbacks to never fire.
             * Log to stderr since we can't call Perl from this thread. */
            fprintf(stderr, "EV::Etcd: CRITICAL - malloc failed in gRPC thread, event dropped\n");
            continue;
        }

        qe->tag = event.tag;
        qe->success = event.success;
        qe->next = NULL;

        pthread_mutex_lock(&client->queue_mutex);
        if (client->event_queue_tail) {
            client->event_queue_tail->next = qe;
        } else {
            client->event_queue = qe;
        }
        client->event_queue_tail = qe;
        pthread_mutex_unlock(&client->queue_mutex);

        /* Signal main thread */
        ev_async_send(EV_DEFAULT, &client->cq_async);
    }

    return NULL;
}

/*
 * Finish deferred client destruction.
 * Called after in_callback is cleared and client->active is false,
 * meaning DESTROY ran during a callback but deferred the final cleanup.
 */
void finish_client_destroy(pTHX_ ev_etcd_t *client) {
    /* Free any remaining call structures */
    while (client->pending_calls) {
        pending_call_t *pc = client->pending_calls;
        client->pending_calls = pc->next;
        grpc_metadata_array_destroy(&pc->initial_metadata);
        grpc_metadata_array_destroy(&pc->trailing_metadata);
        if (pc->recv_buffer) grpc_byte_buffer_destroy(pc->recv_buffer);
        grpc_slice_unref(pc->status_details);
        if (pc->call) grpc_call_unref(pc->call);
        SvREFCNT_dec(pc->callback);
        Safefree(pc);
    }
    while (client->watches) {
        cleanup_watch(aTHX_ client->watches);
    }
    while (client->keepalives) {
        cleanup_keepalive(aTHX_ client->keepalives);
    }
    while (client->observes) {
        cleanup_observe(aTHX_ client->observes);
    }

    /* Free client-level resources (mirrors free_perl_resources in DESTROY) */
    if (client->health_callback)
        SvREFCNT_dec(client->health_callback);
    if (client->auth_token) {
        memset(client->auth_token, 0, client->auth_token_len);
        Safefree(client->auth_token);
    }
    if (client->endpoints) {
        for (int i = 0; i < client->endpoint_count; i++)
            if (client->endpoints[i]) Safefree(client->endpoints[i]);
        Safefree(client->endpoints);
    }

    Safefree(client);
}

/*
 * ev_async callback - runs in main thread when signaled by gRPC thread.
 * Drains the event queue and processes each event.
 */
static void cq_async_callback(struct ev_loop *loop, ev_async *w, int revents) {
    dTHX;
    (void)loop;
    (void)revents;

    ev_etcd_t *client = (ev_etcd_t *)((char *)w - offsetof(ev_etcd_t, cq_async));

    /* Don't process if client is being destroyed */
    if (!client->active) {
        return;
    }

    /* Drain the queue under lock, then process without lock */
    pthread_mutex_lock(&client->queue_mutex);
    queued_event_t *queue = client->event_queue;
    client->event_queue = NULL;
    client->event_queue_tail = NULL;
    pthread_mutex_unlock(&client->queue_mutex);

    /* Guard against client being freed during event processing */
    client->in_callback = 1;

    /* Process all queued events */
    while (queue) {
        queued_event_t *qe = queue;
        queue = qe->next;

        /* Defensive NULL guard — every code path uses a real tag (see cancel_sentinel) */
        if (qe->tag) {
            process_grpc_event(aTHX_ client, qe->tag, qe->success);
        }

        free(qe);

        /* Check if client was destroyed during callback processing */

Etcd.xs  view on Meta::CPAN

 */
static void process_grpc_event(pTHX_ ev_etcd_t *client, void *tag, int success) {
    call_base_t *base = (call_base_t *)tag;

    /* Fire-and-forget cancel batch — nothing to do */
    if (base->type == CALL_TYPE_NONE) return;

    if (base->type == CALL_TYPE_WATCH_RECV) {
            /* Watch receive completion */
            watch_call_t *wc = (watch_call_t *)base;

            if (success && wc->active) {
                    process_watch_response(aTHX_ wc);
                    if (!client->active) return; /* DESTROY called in callback */
                    /* Re-arm receive if still active */
                    if (wc->active) {
                        watch_rearm_recv(aTHX_ wc);
                    } else {
                        /* Response handler set active=0 (e.g., server cancel) */
                        cleanup_watch(aTHX_ wc);
                    }
                } else if (!success && wc->active) {
                    /* Stream ended or error - try to reconnect */
                    wc->active = 0;
                    /* Try automatic reconnection */
                    if (try_reconnect_watch(aTHX_ wc)) {
                        /* Reconnection initiated, don't notify callback yet */
                    } else {
                        /* Reconnection failed or disabled, notify callback and cleanup */
                        CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_UNAVAILABLE, "Watch stream ended", "watch");
                        if (!client->active) return; /* DESTROY called in callback */
                        cleanup_watch(aTHX_ wc);
                    }
                } else {
                    /* RECV completed but watch already inactive (user cancel) */
                    cleanup_watch(aTHX_ wc);
                }
            } else if (base->type == CALL_TYPE_WATCH) {
            /* Initial watch setup complete - process first message if any */
            watch_call_t *wc = (watch_call_t *)base;
            if (success) {
                    /* Process the first message that was received in the initial batch */
                    if (wc->recv_buffer && wc->active) {
                        process_watch_response(aTHX_ wc);
                        if (!client->active) return;
                    }
                    /* Re-arm to receive more messages */
                    if (wc->active) {
                        watch_rearm_recv(aTHX_ wc);
                    } else {
                        cleanup_watch(aTHX_ wc);
                    }
                } else {
                    if (wc->active) {
                        CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "Watch setup failed", "watch");
                        if (!client->active) return;
                    }
                    cleanup_watch(aTHX_ wc);
                }
            } else if (base->type == CALL_TYPE_LEASE_KEEPALIVE_RECV) {
            /* Keepalive receive completion */
            keepalive_call_t *kc = (keepalive_call_t *)base;

            if (success && kc->active) {
                    process_keepalive_response(aTHX_ kc);
                    if (!client->active) return;
                    /* Re-arm receive if still active */
                    if (kc->active) {
                        keepalive_rearm_recv(aTHX_ kc);
                    } else {
                        /* Response handler set active=0 (e.g., lease expired) */
                        cleanup_keepalive(aTHX_ kc);
                    }
                } else if (!success && kc->active) {
                    /* Stream ended or error - try to reconnect */
                    kc->active = 0;
                    /* Try automatic reconnection */
                    if (try_reconnect_keepalive(aTHX_ kc)) {
                        /* Reconnection initiated, don't notify callback yet */
                    } else {
                        /* Reconnection failed or disabled, notify callback and cleanup */
                        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_UNAVAILABLE, "Keepalive stream ended", "keepalive");
                        if (!client->active) return;
                        cleanup_keepalive(aTHX_ kc);
                    }
                } else {
                    /* RECV completed but keepalive already inactive */
                    cleanup_keepalive(aTHX_ kc);
                }
            } else if (base->type == CALL_TYPE_LEASE_KEEPALIVE) {
            /* Initial keepalive setup complete - process first message if any */
            keepalive_call_t *kc = (keepalive_call_t *)base;
            if (success) {
                    /* Process the first message that was received in the initial batch */
                    if (kc->recv_buffer && kc->active) {
                        process_keepalive_response(aTHX_ kc);
                        if (!client->active) return;
                    }
                    /* Re-arm to receive more messages */
                    if (kc->active) {
                        keepalive_rearm_recv(aTHX_ kc);
                    } else {
                        /* First response set active=0 (e.g., lease already expired) */
                        cleanup_keepalive(aTHX_ kc);
                    }
                } else {
                    if (kc->active) {
                        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive setup failed", "keepalive");
                        if (!client->active) return;
                    }
                    cleanup_keepalive(aTHX_ kc);
                }
            } else if (base->type == CALL_TYPE_ELECTION_OBSERVE_RECV) {
            /* Election observe receive completion */
            observe_call_t *oc = (observe_call_t *)base;

            if (success && oc->active) {
                    process_observe_response(aTHX_ oc);
                    if (!client->active) return;
                    /* Re-arm receive if still active */
                    if (oc->active) {
                        observe_rearm_recv(aTHX_ oc);
                    } else {
                        /* Response handler set active=0 */
                        cleanup_observe(aTHX_ oc);
                    }
                } else if (!success && oc->active) {
                    /* Stream ended or error - try to reconnect */
                    oc->active = 0;
                    /* Try automatic reconnection */
                    if (try_reconnect_observe(aTHX_ oc)) {
                        /* Reconnection initiated, don't notify callback yet */
                    } else {
                        /* Reconnection failed or disabled, notify callback and cleanup */
                        CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_UNAVAILABLE, "Observe stream ended", "observe");
                        if (!client->active) return;
                        cleanup_observe(aTHX_ oc);
                    }
                } else {
                    /* RECV completed but observe already inactive */
                    cleanup_observe(aTHX_ oc);
                }
            } else if (base->type == CALL_TYPE_ELECTION_OBSERVE) {
            /* Initial observe setup complete - process first message if any */
            observe_call_t *oc = (observe_call_t *)base;
            if (success) {
                    /* Process the first message that was received in the initial batch */
                    if (oc->recv_buffer && oc->active) {
                        process_observe_response(aTHX_ oc);
                        if (!client->active) return;
                    }
                    /* Re-arm to receive more messages */
                    if (oc->active) {
                        observe_rearm_recv(aTHX_ oc);
                    } else {
                        /* First response set active=0 */
                        cleanup_observe(aTHX_ oc);
                    }
                } else {
                    if (oc->active) {
                        CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_INTERNAL, "Observe setup failed", "observe");
                        if (!client->active) return;
                    }
                    cleanup_observe(aTHX_ oc);
                }
            } else {
            /* Unary RPC completion */
            pending_call_t *pc = (pending_call_t *)base;

            /* Remove from pending list BEFORE calling handler to prevent
             * use-after-free if the callback triggers DESTROY */

Etcd.xs  view on Meta::CPAN

    /* Create call */
    gpr_timespec deadline = gpr_time_add(
        gpr_now(GPR_CLOCK_REALTIME),
        gpr_time_from_seconds(client->timeout_seconds, GPR_TIMESPAN)
    );

    pc->call = grpc_channel_create_call(
        client->channel,
        NULL,
        GRPC_PROPAGATE_DEFAULTS,
        client->cq,
        METHOD_MAINTENANCE_STATUS,
        NULL,
        deadline,
        NULL
    );

    if (!pc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        CLEANUP_PENDING_CALL_ON_ERROR(pc);
        croak("Failed to create gRPC call for status");
    }

    /* Set up operations */
    grpc_op ops[6] = {0};
    grpc_metadata auth_md;

    ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
    setup_auth_metadata(client, &ops[0], &auth_md);

    ops[1].op = GRPC_OP_SEND_MESSAGE;
    ops[1].data.send_message.send_message = send_buffer;

    ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;

    ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
    ops[3].data.recv_initial_metadata.recv_initial_metadata = &pc->initial_metadata;

    ops[4].op = GRPC_OP_RECV_MESSAGE;
    ops[4].data.recv_message.recv_message = &pc->recv_buffer;

    ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
    ops[5].data.recv_status_on_client.trailing_metadata = &pc->trailing_metadata;
    ops[5].data.recv_status_on_client.status = &pc->status;
    ops[5].data.recv_status_on_client.status_details = &pc->status_details;

    grpc_call_error err = grpc_call_start_batch(pc->call, ops, 6, &pc->base, NULL);

    cleanup_auth_metadata(client, &auth_md);
    grpc_byte_buffer_destroy(send_buffer);

    if (err != GRPC_CALL_OK) {
        CLEANUP_PENDING_CALL_ON_ERROR(pc);
        croak("Failed to start gRPC call: %d", err);
    }

    pc->next = client->pending_calls;
    client->pending_calls = pc;
}

EV::Etcd::Keepalive
ev_etcd_lease_keepalive(client, lease_id, ...)
    EV::Etcd client
    int64_t lease_id
CODE:
{
    /* Parse arguments: lease_keepalive(lease_id, [opts,] callback) */
    SV *opts = NULL;
    SV *callback;

    if (items == 3) {
        callback = ST(2);
    } else if (items == 4) {
        opts = ST(2);
        callback = ST(3);
    } else {
        croak("Usage: $client->lease_keepalive($lease_id, [\\%%opts,] $callback)");
    }

    VALIDATE_CALLBACK(callback);

    /* Create keepalive structure (Newxz zeroes everything; only set non-zero fields) */
    keepalive_call_t *kc;
    Newxz(kc, 1, keepalive_call_t);
    init_call_base(&kc->base, CALL_TYPE_LEASE_KEEPALIVE);
    kc->callback = newSVsv(callback);
    kc->client = client;
    kc->active = 1;
    kc->auto_reconnect = 1;  /* Enable by default */
    kc->lease_id = lease_id;
    kc->client_owns = 1;
    kc->perl_owns = 1;

    if (opts && SvROK(opts) && SvTYPE(SvRV(opts)) == SVt_PVHV) {
        HV *hv = (HV *)SvRV(opts);
        SV **svp;

        if ((svp = hv_fetchs(hv, "auto_reconnect", 0)) && !SvTRUE(*svp)) {
            kc->auto_reconnect = 0;
        }
    }
    grpc_metadata_array_init(&kc->initial_metadata);
    grpc_metadata_array_init(&kc->trailing_metadata);
    kc->status_details = grpc_empty_slice();

    /* Build LeaseKeepAliveRequest */
    Etcdserverpb__LeaseKeepAliveRequest req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
    req.id = lease_id;

    /* Serialize request */
    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        etcdserverpb__lease_keep_alive_request__get_packed_size,
        etcdserverpb__lease_keep_alive_request__pack, &req);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Create streaming call */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);

    kc->call = grpc_channel_create_call(
        client->channel,
        NULL,
        GRPC_PROPAGATE_DEFAULTS,
        client->cq,
        METHOD_LEASE_KEEPALIVE,
        NULL,
        deadline,
        NULL
    );

    if (!kc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        grpc_metadata_array_destroy(&kc->initial_metadata);
        grpc_metadata_array_destroy(&kc->trailing_metadata);
        grpc_slice_unref(kc->status_details);
        SvREFCNT_dec(kc->callback);
        Safefree(kc);
        croak("Failed to create gRPC call for lease_keepalive");
    }

    /* Start the call with initial operations */
    grpc_op ops[4] = {0};
    grpc_metadata auth_md;

    ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
    setup_auth_metadata(client, &ops[0], &auth_md);

    ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
    ops[1].data.recv_initial_metadata.recv_initial_metadata = &kc->initial_metadata;

    ops[2].op = GRPC_OP_SEND_MESSAGE;
    ops[2].data.send_message.send_message = send_buffer;

    /* Receive the first response */
    ops[3].op = GRPC_OP_RECV_MESSAGE;
    ops[3].data.recv_message.recv_message = &kc->recv_buffer;

    grpc_call_error err = grpc_call_start_batch(kc->call, ops, 4, &kc->base, NULL);

    cleanup_auth_metadata(client, &auth_md);
    grpc_byte_buffer_destroy(send_buffer);

    if (err != GRPC_CALL_OK) {
        grpc_metadata_array_destroy(&kc->initial_metadata);
        grpc_metadata_array_destroy(&kc->trailing_metadata);
        grpc_slice_unref(kc->status_details);
        grpc_call_unref(kc->call);
        SvREFCNT_dec(kc->callback);
        Safefree(kc);
        croak("Failed to start gRPC call: %d", err);
    }

    kc->next = client->keepalives;
    client->keepalives = kc;

    RETVAL = kc;
}
OUTPUT:
    RETVAL

void
ev_etcd_txn(client, compare_av, success_av, failure_av, callback)
    EV::Etcd client
    SV *compare_av
    SV *success_av
    SV *failure_av
    SV *callback
CODE:
{
    VALIDATE_CALLBACK(callback);

    /* Pre-validate compare key/value sizes before allocating pending call */
    if (SvROK(compare_av) && SvTYPE(SvRV(compare_av)) == SVt_PVAV) {
        AV *av = (AV *)SvRV(compare_av);
        size_t n = av_len(av) + 1;
        for (size_t i = 0; i < n; i++) {
            SV **elem = av_fetch(av, i, 0);
            if (elem && SvROK(*elem) && SvTYPE(SvRV(*elem)) == SVt_PVHV) {
                HV *hv = (HV *)SvRV(*elem);
                STRLEN _l;
                SV **key_sv = hv_fetch(hv, "key", 3, 0);
                if (key_sv && SvOK(*key_sv)) {
                    (void)SvPV(*key_sv, _l);
                    VALIDATE_KEY_SIZE(_l);
                }
                SV **value_sv = hv_fetch(hv, "value", 5, 0);
                if (value_sv && SvOK(*value_sv)) {
                    (void)SvPV(*value_sv, _l);
                    VALIDATE_VALUE_SIZE(_l);
                }
            }
        }
    }

    /* Pre-validate success/failure op sizes too, before any allocation, so a
     * croak from an oversized key/value can't leak the compare array below. */
    validate_request_ops(aTHX_ success_av);
    validate_request_ops(aTHX_ failure_av);

    /* Create pending call structure */
    pending_call_t *pc;
    INIT_PENDING_CALL(pc, CALL_TYPE_TXN, callback, client);

    /* Build TxnRequest */
    Etcdserverpb__TxnRequest req = ETCDSERVERPB__TXN_REQUEST__INIT;

    /* Parse compare array */
    size_t n_compare = 0;
    Etcdserverpb__Compare **compares = NULL;

    if (SvROK(compare_av) && SvTYPE(SvRV(compare_av)) == SVt_PVAV) {
        AV *av = (AV *)SvRV(compare_av);
        n_compare = av_len(av) + 1;
        if (n_compare > 0) {

Etcd.xs  view on Meta::CPAN

    oc->client = client;
    oc->active = 1;
    oc->auto_reconnect = auto_reconnect;
    oc->client_owns = 1;
    oc->perl_owns = 1;
    grpc_metadata_array_init(&oc->initial_metadata);
    grpc_metadata_array_init(&oc->trailing_metadata);
    oc->status_details = grpc_empty_slice();

    /* Save params for reconnection */
    Newx(oc->params.name, name_len + 1, char);
    Copy(name_str, oc->params.name, name_len, char);
    oc->params.name[name_len] = '\0';
    oc->params.name_len = name_len;

    V3electionpb__LeaderRequest req = V3ELECTIONPB__LEADER_REQUEST__INIT;
    req.name.data = (uint8_t *)name_str;
    req.name.len = name_len;

    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        v3electionpb__leader_request__get_packed_size,
        v3electionpb__leader_request__pack, &req);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Use infinite deadline for streaming call */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);

    oc->call = grpc_channel_create_call(
        client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
        client->cq, METHOD_ELECTION_OBSERVE, NULL, deadline, NULL
    );

    if (!oc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        grpc_metadata_array_destroy(&oc->initial_metadata);
        grpc_metadata_array_destroy(&oc->trailing_metadata);
        grpc_slice_unref(oc->status_details);
        SvREFCNT_dec(oc->callback);
        if (oc->params.name) Safefree(oc->params.name);
        Safefree(oc);
        croak("Failed to create gRPC call for election_observe");
    }

    grpc_op ops[5] = {0};
    grpc_metadata auth_md;

    ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
    setup_auth_metadata(client, &ops[0], &auth_md);

    ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
    ops[1].data.recv_initial_metadata.recv_initial_metadata = &oc->initial_metadata;

    ops[2].op = GRPC_OP_SEND_MESSAGE;
    ops[2].data.send_message.send_message = send_buffer;

    /* Observe is server-streaming: the client sends one LeaderRequest and the
     * server streams LeaderResponses. Half-close the client side so etcd's
     * handler receives the completed request and begins streaming; without it
     * the stream is established but never delivers an event. (Watch/keepalive
     * are bidi and deliberately stay open, so they must NOT half-close.) */
    ops[3].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;

    ops[4].op = GRPC_OP_RECV_MESSAGE;
    ops[4].data.recv_message.recv_message = &oc->recv_buffer;

    grpc_call_error err = grpc_call_start_batch(oc->call, ops, 5, &oc->base, NULL);
    cleanup_auth_metadata(client, &auth_md);
    grpc_byte_buffer_destroy(send_buffer);

    if (err != GRPC_CALL_OK) {
        grpc_metadata_array_destroy(&oc->initial_metadata);
        grpc_metadata_array_destroy(&oc->trailing_metadata);
        grpc_slice_unref(oc->status_details);
        grpc_call_unref(oc->call);
        SvREFCNT_dec(oc->callback);
        if (oc->params.name) Safefree(oc->params.name);
        Safefree(oc);
        croak("Failed to start gRPC call: %d", err);
    }

    /* Add to observes list */
    oc->next = client->observes;
    client->observes = oc;

    RETVAL = oc;
}
OUTPUT:
    RETVAL

void
ev_etcd_member_list(client, ...)
    EV::Etcd client
CODE:
{
    SV *opts = NULL;
    SV *callback;

    if (items == 2) {
        callback = ST(1);
    } else if (items == 3) {
        opts = ST(1);
        callback = ST(2);
    } else {
        croak("Usage: $client->member_list([\\%%opts,] $callback)");
    }

    VALIDATE_CALLBACK(callback);

    pending_call_t *pc;
    INIT_PENDING_CALL(pc, CALL_TYPE_MEMBER_LIST, callback, client);

    Etcdserverpb__MemberListRequest req = ETCDSERVERPB__MEMBER_LIST_REQUEST__INIT;

    if (opts && SvROK(opts) && SvTYPE(SvRV(opts)) == SVt_PVHV) {
        HV *hv = (HV *)SvRV(opts);
        SV **svp;
        if ((svp = hv_fetchs(hv, "linearizable", 0)) && SvTRUE(*svp)) {
            req.linearizable = 1;
        }

Etcd.xs  view on Meta::CPAN

        NULL,
        deadline,
        NULL
    );

    if (!pc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        CLEANUP_PENDING_CALL_ON_ERROR(pc);
        croak("Failed to create gRPC call for auth_status");
    }

    /* Set up operations */
    grpc_op ops[6] = {0};
    grpc_metadata auth_md;

    ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
    setup_auth_metadata(client, &ops[0], &auth_md);

    ops[1].op = GRPC_OP_SEND_MESSAGE;
    ops[1].data.send_message.send_message = send_buffer;

    ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;

    ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
    ops[3].data.recv_initial_metadata.recv_initial_metadata = &pc->initial_metadata;

    ops[4].op = GRPC_OP_RECV_MESSAGE;
    ops[4].data.recv_message.recv_message = &pc->recv_buffer;

    ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
    ops[5].data.recv_status_on_client.trailing_metadata = &pc->trailing_metadata;
    ops[5].data.recv_status_on_client.status = &pc->status;
    ops[5].data.recv_status_on_client.status_details = &pc->status_details;

    grpc_call_error err = grpc_call_start_batch(pc->call, ops, 6, &pc->base, NULL);

    cleanup_auth_metadata(client, &auth_md);
    grpc_byte_buffer_destroy(send_buffer);

    if (err != GRPC_CALL_OK) {
        CLEANUP_PENDING_CALL_ON_ERROR(pc);
        croak("Failed to start gRPC call: %d", err);
    }

    pc->next = client->pending_calls;
    client->pending_calls = pc;
}

void
ev_etcd_DESTROY(client)
    EV::Etcd client
CODE:
{
    /* Fork safety: in a child process, the gRPC thread and completion queue
     * are in undefined state. Skip gRPC cleanup, just free Perl-side resources. */
    if (client->owner_pid != getpid()) {
        warn("EV::Etcd: client destroyed in forked child (pid %d, created in %d)"
             " -- skipping gRPC cleanup", (int)getpid(), (int)client->owner_pid);

        /* Free SV callbacks and Safefree'd params only; don't touch gRPC objects.
         * Honor dual-ownership: if Perl handle is still alive we leave the struct
         * for *_DESTROY to free. */
        pending_call_t *pc = client->pending_calls;
        while (pc) {
            pending_call_t *next = pc->next;
            SvREFCNT_dec(pc->callback);
            Safefree(pc);
            pc = next;
        }
        watch_call_t *wc = client->watches;
        while (wc) {
            watch_call_t *next = wc->next;
            if (ev_is_active(&wc->reconnect_timer))
                ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
            SvREFCNT_dec(wc->callback);
            wc->callback = NULL;
            wc->client_owns = 0;
            if (!wc->perl_owns) {
                if (wc->params.key) Safefree(wc->params.key);
                if (wc->params.range_end) Safefree(wc->params.range_end);
                Safefree(wc);
            }
            wc = next;
        }
        keepalive_call_t *kc = client->keepalives;
        while (kc) {
            keepalive_call_t *next = kc->next;
            if (ev_is_active(&kc->reconnect_timer))
                ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
            SvREFCNT_dec(kc->callback);
            kc->callback = NULL;
            kc->client_owns = 0;
            if (!kc->perl_owns) Safefree(kc);
            kc = next;
        }
        observe_call_t *oc = client->observes;
        while (oc) {
            observe_call_t *next = oc->next;
            if (ev_is_active(&oc->reconnect_timer))
                ev_timer_stop(EV_DEFAULT, &oc->reconnect_timer);
            SvREFCNT_dec(oc->callback);
            oc->callback = NULL;
            oc->client_owns = 0;
            if (!oc->perl_owns) {
                if (oc->params.name) Safefree(oc->params.name);
                Safefree(oc);
            }
            oc = next;
        }
        if (ev_is_active(&client->health_timer))
            ev_timer_stop(EV_DEFAULT, &client->health_timer);
        if (ev_is_active(&client->cq_async))
            ev_async_stop(EV_DEFAULT, &client->cq_async);
        goto free_perl_resources;
    }

    /* Mark client as inactive first to prevent callbacks from accessing freed memory */
    client->active = 0;

    /* Stop ev_async watcher */
    if (ev_is_active(&client->cq_async)) {
        ev_async_stop(EV_DEFAULT, &client->cq_async);
    }

    /* Signal the gRPC thread to stop and wait for it */
    client->thread_running = 0;

    /* Mark every streaming call inactive and cancel its gRPC call so pending
     * batches complete promptly with success=0. */
    watch_call_t *wc = client->watches;
    while (wc) {
        wc->active = 0;
        if (ev_is_active(&wc->reconnect_timer))
            ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
        if (wc->call) {
            grpc_call_cancel(wc->call, NULL);
        }
        wc = wc->next;
    }

    keepalive_call_t *kc = client->keepalives;
    while (kc) {
        kc->active = 0;
        if (ev_is_active(&kc->reconnect_timer))
            ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
        if (kc->call) {
            grpc_call_cancel(kc->call, NULL);
        }
        kc = kc->next;
    }

    observe_call_t *oc = client->observes;
    while (oc) {
        oc->active = 0;
        if (ev_is_active(&oc->reconnect_timer))
            ev_timer_stop(EV_DEFAULT, &oc->reconnect_timer);
        if (oc->call) {
            grpc_call_cancel(oc->call, NULL);
        }
        oc = oc->next;
    }

    /* Cancel pending unary calls */
    pending_call_t *pc = client->pending_calls;
    while (pc) {
        if (pc->call) {
            grpc_call_cancel(pc->call, NULL);
        }
        pc = pc->next;
    }

    /* Shutdown the completion queue - this will cause the thread to exit */
    if (client->cq) {
        grpc_completion_queue_shutdown(client->cq);
    }

    /* Wait for the gRPC thread to finish */
    pthread_join(client->cq_thread, NULL);

    /* Clean up the event queue (any remaining queued events) */
    pthread_mutex_lock(&client->queue_mutex);
    queued_event_t *qe = client->event_queue;
    while (qe) {
        queued_event_t *next = qe->next;
        free(qe);
        qe = next;
    }
    client->event_queue = NULL;
    client->event_queue_tail = NULL;
    pthread_mutex_unlock(&client->queue_mutex);

    /* Destroy the mutex */
    pthread_mutex_destroy(&client->queue_mutex);

    /* Destroy the completion queue */
    if (client->cq) {
        grpc_completion_queue_destroy(client->cq);
    }

    /* Cleanup call structures - skip if called during event processing
     * (the currently-processing call struct would be a use-after-free).
     * Deferred to cq_async_callback when in_callback is set. */
    if (!client->in_callback) {
        pc = client->pending_calls;
        while (pc) {
            pending_call_t *next = pc->next;
            grpc_metadata_array_destroy(&pc->initial_metadata);
            grpc_metadata_array_destroy(&pc->trailing_metadata);
            if (pc->recv_buffer) {
                grpc_byte_buffer_destroy(pc->recv_buffer);
            }
            grpc_slice_unref(pc->status_details);
            if (pc->call) {
                grpc_call_unref(pc->call);
            }
            SvREFCNT_dec(pc->callback);
            Safefree(pc);
            pc = next;
        }

        /* Streaming-call cleanup honors dual-ownership: cleanup_* unlinks and
         * frees gRPC state, then frees the struct only if the Perl handle has
         * already been released. Otherwise the struct lives until *_DESTROY. */
        while (client->watches) cleanup_watch(aTHX_ client->watches);
        while (client->keepalives) cleanup_keepalive(aTHX_ client->keepalives);
        while (client->observes) cleanup_observe(aTHX_ client->observes);
    }

    /* Stop health timer */
    ev_timer_stop(EV_DEFAULT, &client->health_timer);

    if (client->channel) {
        grpc_channel_destroy(client->channel);
    }

    free_perl_resources:
    /* Free health callback */
    if (client->health_callback) {
        SvREFCNT_dec(client->health_callback);
        client->health_callback = NULL;
    }

    /* Free auth token - securely zero before freeing */
    if (client->auth_token) {
        memset(client->auth_token, 0, client->auth_token_len);
        Safefree(client->auth_token);
        client->auth_token = NULL;
    }

    /* Free endpoints */
    if (client->endpoints) {
        int i;
        for (i = 0; i < client->endpoint_count; i++) {
            if (client->endpoints[i]) {
                Safefree(client->endpoints[i]);
            }
        }
        Safefree(client->endpoints);
        client->endpoints = NULL;
    }

    /* If called during event processing, defer struct free to cq_async_callback */
    if (!client->in_callback) {
        Safefree(client);
    }
}

MODULE = EV::Etcd  PACKAGE = EV::Etcd::Watch  PREFIX = ev_etcd_watch_

void
ev_etcd_watch_cancel(watch, callback)
    EV::Etcd::Watch watch
    SV *callback
CODE:
{
    VALIDATE_CALLBACK(callback);

    watch_call_t *wc = watch;

    if (!wc->client_owns) {
        CALL_SUCCESS_CALLBACK(callback, newHV());
        return;
    }

    if (ev_is_active(&wc->reconnect_timer))
        ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);

    if (!wc->active) {
        CALL_SUCCESS_CALLBACK(callback, newHV());
        return;
    }

    wc->active = 0;

    /* If we have a watch_id, send cancel request */
    if (wc->watch_id >= 0) {
        Etcdserverpb__WatchCancelRequest cancel_req = ETCDSERVERPB__WATCH_CANCEL_REQUEST__INIT;
        cancel_req.watch_id = wc->watch_id;

        Etcdserverpb__WatchRequest req = ETCDSERVERPB__WATCH_REQUEST__INIT;
        req.request_union_case = ETCDSERVERPB__WATCH_REQUEST__REQUEST_UNION_CANCEL_REQUEST;
        req.cancel_request = &cancel_req;

        grpc_slice req_slice;
        SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
            etcdserverpb__watch_request__get_packed_size,
            etcdserverpb__watch_request__pack, &req);
        grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
        grpc_slice_unref(req_slice);

        grpc_op op;
        memset(&op, 0, sizeof(op));
        op.op = GRPC_OP_SEND_MESSAGE;
        op.data.send_message.send_message = send_buffer;

        (void)grpc_call_start_batch(wc->call, &op, 1, &cancel_sentinel, NULL);
        grpc_byte_buffer_destroy(send_buffer);
    }

    if (wc->call)
        grpc_call_cancel(wc->call, NULL);

    CALL_SUCCESS_CALLBACK(callback, newHV());
}

void
ev_etcd_watch_DESTROY(watch)
    EV::Etcd::Watch watch
CODE:
{
    watch_call_perl_release(aTHX_ watch);
}

MODULE = EV::Etcd  PACKAGE = EV::Etcd::Keepalive  PREFIX = ev_etcd_keepalive_

void
ev_etcd_keepalive_cancel(keepalive, callback)
    EV::Etcd::Keepalive keepalive
    SV *callback
CODE:
{
    VALIDATE_CALLBACK(callback);

    keepalive_call_t *kc = keepalive;

    if (!kc->client_owns) {
        CALL_SUCCESS_CALLBACK(callback, newHV());
        return;
    }

    if (ev_is_active(&kc->reconnect_timer))
        ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);

    if (!kc->active) {
        CALL_SUCCESS_CALLBACK(callback, newHV());
        return;
    }

    kc->active = 0;

    if (kc->call)
        grpc_call_cancel(kc->call, NULL);

    CALL_SUCCESS_CALLBACK(callback, newHV());
}

void
ev_etcd_keepalive_DESTROY(keepalive)
    EV::Etcd::Keepalive keepalive
CODE:
{
    keepalive_call_perl_release(aTHX_ keepalive);
}

MODULE = EV::Etcd  PACKAGE = EV::Etcd::Observe  PREFIX = ev_etcd_observe_

void
ev_etcd_observe_cancel(observe, callback)
    EV::Etcd::Observe observe
    SV *callback
CODE:
{
    VALIDATE_CALLBACK(callback);

    observe_call_t *oc = observe;

    if (!oc->client_owns) {
        CALL_SUCCESS_CALLBACK(callback, newHV());
        return;
    }

    if (ev_is_active(&oc->reconnect_timer))
        ev_timer_stop(EV_DEFAULT, &oc->reconnect_timer);

    if (!oc->active) {
        CALL_SUCCESS_CALLBACK(callback, newHV());
        return;
    }

    oc->active = 0;

    if (oc->call)
        grpc_call_cancel(oc->call, NULL);

    CALL_SUCCESS_CALLBACK(callback, newHV());
}

void
ev_etcd_observe_DESTROY(observe)
    EV::Etcd::Observe observe
CODE:
{
    observe_call_perl_release(aTHX_ observe);
}

MODULE = EV::Etcd  PACKAGE = EV::Etcd  PREFIX = ev_etcd_

void
END()
CODE:
    grpc_shutdown();



( run in 1.546 second using v1.01-cache-2.11-cpan-13bb782fe5a )