EV-Etcd

 view release on metacpan or  search on metacpan

etcd_election.c  view on Meta::CPAN

    if (!oc->client_owns) observe_call_free(aTHX_ oc);
}

/* Process LeaderResponse for observe stream */
void process_observe_response(pTHX_ observe_call_t *oc) {
    if (!oc->recv_buffer) {
        oc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_INTERNAL, "No observe response received", "observe");
        return;
    }

    grpc_byte_buffer_reader reader;
    if (!grpc_byte_buffer_reader_init(&reader, oc->recv_buffer)) {
        oc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_INTERNAL, "Failed to read observe response buffer", "observe");
        return;
    }

    grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
    grpc_byte_buffer_reader_destroy(&reader);

    V3electionpb__LeaderResponse *resp = v3electionpb__leader_response__unpack(
        NULL, GRPC_SLICE_LENGTH(slice), GRPC_SLICE_START_PTR(slice));
    grpc_slice_unref(slice);

    if (!resp) {
        oc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_INTERNAL, "Failed to parse observe response", "observe");
        return;
    }

    /* Reset reconnect attempt on successful response */
    oc->reconnect_attempt = 0;

    HV *result = newHV();
    add_header_to_hv(aTHX_ result, resp->header);

    if (resp->kv) {
        hv_store(result, "kv", 2, kv_to_hashref(aTHX_ resp->kv), 0);
    }

    v3electionpb__leader_response__free_unpacked(resp, NULL);

    CALL_SUCCESS_CALLBACK(oc->callback, result);
}

/* Perform observe reconnection (called from timer callback) */
static void observe_reconnect_cb(struct ev_loop *loop, ev_timer *w, int revents) {
    dTHX;
    (void)loop;
    (void)revents;

    observe_call_t *oc = (observe_call_t *)((char *)w - offsetof(observe_call_t, reconnect_timer));
    ev_etcd_t *client = oc->client;

    if (!client->active) {
        cleanup_observe(aTHX_ oc);
        return;
    }

    /* Cleanup and reinitialize streaming state */
    STREAMING_CALL_CLEANUP(oc);
    STREAMING_CALL_REINIT(oc);

    /* Create LeaderRequest for observe */
    V3electionpb__LeaderRequest req = V3ELECTIONPB__LEADER_REQUEST__INIT;
    req.name.data = (uint8_t *)oc->params.name;
    req.name.len = oc->params.name_len;

    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        v3electionpb__leader_request__get_packed_size,
        v3electionpb__leader_request__pack, &req);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Create call and setup ops */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
    oc->call = grpc_channel_create_call(
        client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
        client->cq, METHOD_ELECTION_OBSERVE, NULL, deadline, NULL);

    if (!oc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        oc->active = 0;
        client->in_callback = 1;
        CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_INTERNAL, "Observe reconnect failed", "observe");
        client->in_callback = 0;
        if (!client->active) {
            finish_client_destroy(aTHX_ client);
            return;
        }
        cleanup_observe(aTHX_ oc);
        return;
    }

    grpc_op ops[4] = {0};
    grpc_metadata auth_md;
    STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, oc);

    init_call_base(&oc->base, CALL_TYPE_ELECTION_OBSERVE);
    grpc_call_error err = grpc_call_start_batch(oc->call, ops, 4, &oc->base, NULL);
    cleanup_auth_metadata(client, &auth_md);
    grpc_byte_buffer_destroy(send_buffer);

    if (err != GRPC_CALL_OK) {
        STREAMING_CALL_BATCH_ERROR(oc);
        client->in_callback = 1;
        CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_INTERNAL, "Observe reconnect batch failed", "observe");
        client->in_callback = 0;
        if (!client->active) {
            finish_client_destroy(aTHX_ client);
            return;
        }
        cleanup_observe(aTHX_ oc);
    }
}

int try_reconnect_observe(pTHX_ observe_call_t *oc) {
    ev_etcd_t *client = oc->client;



( run in 1.178 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )