EV-Etcd

 view release on metacpan or  search on metacpan

etcd_lease.c  view on Meta::CPAN

/* Process LeaseKeepAliveResponse */
void process_keepalive_response(pTHX_ keepalive_call_t *kc) {
    if (!kc->recv_buffer) {
        kc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "No keepalive response received", "keepalive");
        return;
    }

    grpc_byte_buffer_reader reader;
    if (!grpc_byte_buffer_reader_init(&reader, kc->recv_buffer)) {
        kc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Failed to read keepalive response buffer", "keepalive");
        return;
    }

    grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
    grpc_byte_buffer_reader_destroy(&reader);

    Etcdserverpb__LeaseKeepAliveResponse *resp = etcdserverpb__lease_keep_alive_response__unpack(
        NULL, GRPC_SLICE_LENGTH(slice), GRPC_SLICE_START_PTR(slice));
    grpc_slice_unref(slice);

    if (!resp) {
        kc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Failed to parse keepalive response", "keepalive");
        return;
    }

    kc->reconnect_attempt = 0;

    if (resp->ttl == 0) {
        kc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_NOT_FOUND, "Lease expired", "keepalive");
        etcdserverpb__lease_keep_alive_response__free_unpacked(resp, NULL);
        return;
    }

    HV *result = newHV();
    add_header_to_hv(aTHX_ result, resp->header);
    hv_store(result, "id", 2, newSVi64(resp->id), 0);
    hv_store(result, "ttl", 3, newSVi64(resp->ttl), 0);
    etcdserverpb__lease_keep_alive_response__free_unpacked(resp, NULL);

    CALL_SUCCESS_CALLBACK(kc->callback, result);
}

/* Perform keepalive reconnection (called from timer callback) */
static void keepalive_reconnect_cb(struct ev_loop *loop, ev_timer *w, int revents) {
    dTHX;
    (void)loop;
    (void)revents;

    keepalive_call_t *kc = (keepalive_call_t *)((char *)w - offsetof(keepalive_call_t, reconnect_timer));
    ev_etcd_t *client = kc->client;

    if (!client->active) {
        cleanup_keepalive(aTHX_ kc);
        return;
    }

    /* Cleanup and reinitialize streaming state */
    STREAMING_CALL_CLEANUP(kc);
    STREAMING_CALL_REINIT(kc);

    /* Build keepalive request */
    Etcdserverpb__LeaseKeepAliveRequest keep_req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
    keep_req.id = kc->lease_id;

    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        etcdserverpb__lease_keep_alive_request__get_packed_size,
        etcdserverpb__lease_keep_alive_request__pack, &keep_req);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Create call and setup ops */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
    kc->call = grpc_channel_create_call(
        client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
        client->cq, METHOD_LEASE_KEEPALIVE, NULL, deadline, NULL);

    if (!kc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        kc->active = 0;
        client->in_callback = 1;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive reconnect failed", "keepalive");
        client->in_callback = 0;
        if (!client->active) {
            finish_client_destroy(aTHX_ client);
            return;
        }
        cleanup_keepalive(aTHX_ kc);
        return;
    }

    grpc_op ops[4] = {0};
    grpc_metadata auth_md;
    STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, kc);

    init_call_base(&kc->base, CALL_TYPE_LEASE_KEEPALIVE);
    grpc_call_error err = grpc_call_start_batch(kc->call, ops, 4, &kc->base, NULL);
    cleanup_auth_metadata(client, &auth_md);
    grpc_byte_buffer_destroy(send_buffer);

    if (err != GRPC_CALL_OK) {
        STREAMING_CALL_BATCH_ERROR(kc);
        client->in_callback = 1;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive reconnect batch failed", "keepalive");
        client->in_callback = 0;
        if (!client->active) {
            finish_client_destroy(aTHX_ client);
            return;
        }
        cleanup_keepalive(aTHX_ kc);
    }
}

int try_reconnect_keepalive(pTHX_ keepalive_call_t *kc) {
    ev_etcd_t *client = kc->client;

    if (!kc->auto_reconnect || !client->active || kc->lease_id <= 0) {



( run in 0.487 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )