EV-Etcd

 view release on metacpan or  search on metacpan

etcd_lease.c  view on Meta::CPAN


    CALL_SUCCESS_CALLBACK(pc->callback, result);
}

/* Process LeaseTimeToLiveResponse */
void process_lease_time_to_live_response(pTHX_ pending_call_t *pc) {
    BEGIN_RESPONSE_HANDLER(pc, "lease_ttl");

    Etcdserverpb__LeaseTimeToLiveResponse *resp;
    UNPACK_RESPONSE(pc, resp, etcdserverpb__lease_time_to_live_response__unpack);

    HV *result = newHV();
    add_header_to_hv(aTHX_ result, resp->header);
    hv_store(result, "id", 2, newSVi64(resp->id), 0);
    hv_store(result, "ttl", 3, newSVi64(resp->ttl), 0);
    hv_store(result, "granted_ttl", 11, newSVi64(resp->grantedttl), 0);

    if (resp->n_keys > 0) {
        AV *keys_av = newAV();
        av_extend(keys_av, resp->n_keys - 1);
        for (size_t i = 0; i < resp->n_keys; i++) {
            /* Handle NULL data pointer for empty bytes field */
            av_push(keys_av, resp->keys[i].data
                ? newSVpvn((char *)resp->keys[i].data, resp->keys[i].len)
                : newSVpvn("", 0));
        }
        hv_store(result, "keys", 4, newRV_noinc((SV *)keys_av), 0);
    }

    etcdserverpb__lease_time_to_live_response__free_unpacked(resp, NULL);

    CALL_SUCCESS_CALLBACK(pc->callback, result);
}

/* Process LeaseLeasesResponse */
void process_lease_leases_response(pTHX_ pending_call_t *pc) {
    BEGIN_RESPONSE_HANDLER(pc, "lease_leases");

    Etcdserverpb__LeaseLeasesResponse *resp;
    UNPACK_RESPONSE(pc, resp, etcdserverpb__lease_leases_response__unpack);

    HV *result = newHV();
    add_header_to_hv(aTHX_ result, resp->header);

    AV *leases_av = newAV();
    if (resp->n_leases > 0) {
        av_extend(leases_av, resp->n_leases - 1);
    }
    for (size_t i = 0; i < resp->n_leases; i++) {
        HV *lease_hv = newHV();
        hv_store(lease_hv, "id", 2, newSVi64(resp->leases[i]->id), 0);
        av_push(leases_av, newRV_noinc((SV *)lease_hv));
    }
    hv_store(result, "leases", 6, newRV_noinc((SV *)leases_av), 0);

    etcdserverpb__lease_leases_response__free_unpacked(resp, NULL);

    CALL_SUCCESS_CALLBACK(pc->callback, result);
}

/* Re-arm keepalive to receive next message */
void keepalive_rearm_recv(pTHX_ keepalive_call_t *kc) {
    if (!kc->active) return;

    if (kc->recv_buffer) {
        grpc_byte_buffer_destroy(kc->recv_buffer);
        kc->recv_buffer = NULL;
    }

    kc->base.type = CALL_TYPE_LEASE_KEEPALIVE_RECV;

    grpc_op op;
    memset(&op, 0, sizeof(op));
    op.op = GRPC_OP_RECV_MESSAGE;
    op.data.recv_message.recv_message = &kc->recv_buffer;

    grpc_call_error err = grpc_call_start_batch(kc->call, &op, 1, &kc->base, NULL);
    if (err != GRPC_CALL_OK) {
        kc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive rearm failed", "keepalive");
        cleanup_keepalive(aTHX_ kc);
    }
}

static void keepalive_call_free(pTHX_ keepalive_call_t *kc) {
    Safefree(kc);
}

/* See cleanup_watch — same dual-ownership pattern */
void cleanup_keepalive(pTHX_ keepalive_call_t *kc) {
    if (!kc->client_owns) return;

    ev_etcd_t *client = kc->client;
    keepalive_call_t **kp = &client->keepalives;
    while (*kp) {
        if (*kp == kc) { *kp = kc->next; break; }
        kp = &(*kp)->next;
    }

    if (ev_is_active(&kc->reconnect_timer))
        ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
    grpc_metadata_array_destroy(&kc->initial_metadata);
    grpc_metadata_array_destroy(&kc->trailing_metadata);
    if (kc->recv_buffer) {
        grpc_byte_buffer_destroy(kc->recv_buffer);
        kc->recv_buffer = NULL;
    }
    grpc_slice_unref(kc->status_details);
    if (kc->call) {
        grpc_call_unref(kc->call);
        kc->call = NULL;
    }
    SvREFCNT_dec(kc->callback);
    kc->callback = NULL;
    kc->active = 0;
    kc->client_owns = 0;

    if (!kc->perl_owns) keepalive_call_free(aTHX_ kc);
}

void keepalive_call_perl_release(pTHX_ keepalive_call_t *kc) {
    kc->perl_owns = 0;
    if (!kc->client_owns) keepalive_call_free(aTHX_ kc);
}

/* Process LeaseKeepAliveResponse */
void process_keepalive_response(pTHX_ keepalive_call_t *kc) {
    if (!kc->recv_buffer) {
        kc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "No keepalive response received", "keepalive");
        return;
    }

    grpc_byte_buffer_reader reader;
    if (!grpc_byte_buffer_reader_init(&reader, kc->recv_buffer)) {
        kc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Failed to read keepalive response buffer", "keepalive");
        return;
    }

    grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
    grpc_byte_buffer_reader_destroy(&reader);

    Etcdserverpb__LeaseKeepAliveResponse *resp = etcdserverpb__lease_keep_alive_response__unpack(
        NULL, GRPC_SLICE_LENGTH(slice), GRPC_SLICE_START_PTR(slice));
    grpc_slice_unref(slice);

    if (!resp) {
        kc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Failed to parse keepalive response", "keepalive");
        return;
    }

    kc->reconnect_attempt = 0;

    if (resp->ttl == 0) {
        kc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_NOT_FOUND, "Lease expired", "keepalive");
        etcdserverpb__lease_keep_alive_response__free_unpacked(resp, NULL);
        return;
    }

    HV *result = newHV();
    add_header_to_hv(aTHX_ result, resp->header);
    hv_store(result, "id", 2, newSVi64(resp->id), 0);
    hv_store(result, "ttl", 3, newSVi64(resp->ttl), 0);
    etcdserverpb__lease_keep_alive_response__free_unpacked(resp, NULL);

    CALL_SUCCESS_CALLBACK(kc->callback, result);
}

/* Perform keepalive reconnection (called from timer callback) */
static void keepalive_reconnect_cb(struct ev_loop *loop, ev_timer *w, int revents) {
    dTHX;
    (void)loop;
    (void)revents;

    keepalive_call_t *kc = (keepalive_call_t *)((char *)w - offsetof(keepalive_call_t, reconnect_timer));
    ev_etcd_t *client = kc->client;

    if (!client->active) {
        cleanup_keepalive(aTHX_ kc);
        return;
    }

    /* Cleanup and reinitialize streaming state */
    STREAMING_CALL_CLEANUP(kc);
    STREAMING_CALL_REINIT(kc);

    /* Build keepalive request */
    Etcdserverpb__LeaseKeepAliveRequest keep_req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
    keep_req.id = kc->lease_id;

    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        etcdserverpb__lease_keep_alive_request__get_packed_size,
        etcdserverpb__lease_keep_alive_request__pack, &keep_req);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Create call and setup ops */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
    kc->call = grpc_channel_create_call(
        client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
        client->cq, METHOD_LEASE_KEEPALIVE, NULL, deadline, NULL);

    if (!kc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        kc->active = 0;
        client->in_callback = 1;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive reconnect failed", "keepalive");
        client->in_callback = 0;
        if (!client->active) {
            finish_client_destroy(aTHX_ client);
            return;
        }
        cleanup_keepalive(aTHX_ kc);
        return;
    }

    grpc_op ops[4] = {0};
    grpc_metadata auth_md;
    STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, kc);

    init_call_base(&kc->base, CALL_TYPE_LEASE_KEEPALIVE);
    grpc_call_error err = grpc_call_start_batch(kc->call, ops, 4, &kc->base, NULL);
    cleanup_auth_metadata(client, &auth_md);
    grpc_byte_buffer_destroy(send_buffer);

    if (err != GRPC_CALL_OK) {
        STREAMING_CALL_BATCH_ERROR(kc);
        client->in_callback = 1;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive reconnect batch failed", "keepalive");
        client->in_callback = 0;
        if (!client->active) {
            finish_client_destroy(aTHX_ client);
            return;
        }
        cleanup_keepalive(aTHX_ kc);
    }
}

int try_reconnect_keepalive(pTHX_ keepalive_call_t *kc) {
    ev_etcd_t *client = kc->client;

    if (!kc->auto_reconnect || !client->active || kc->lease_id <= 0) {
        return 0;
    }

    if (kc->reconnect_attempt >= client->max_retries) {
        return 0;
    }

    kc->reconnect_attempt++;

    ev_tstamp delay = RECONNECT_BACKOFF_SECONDS(kc->reconnect_attempt);
    ev_timer_init(&kc->reconnect_timer, keepalive_reconnect_cb, delay, 0.0);
    ev_timer_start(EV_DEFAULT, &kc->reconnect_timer);

    return 1;
}



( run in 1.541 second using v1.01-cache-2.11-cpan-56fb94df46f )