EV-Etcd
view release on metacpan or search on metacpan
etcd_lease.c view on Meta::CPAN
/* Process LeaseKeepAliveResponse */
void process_keepalive_response(pTHX_ keepalive_call_t *kc) {
if (!kc->recv_buffer) {
kc->active = 0;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "No keepalive response received", "keepalive");
return;
}
grpc_byte_buffer_reader reader;
if (!grpc_byte_buffer_reader_init(&reader, kc->recv_buffer)) {
kc->active = 0;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Failed to read keepalive response buffer", "keepalive");
return;
}
grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
grpc_byte_buffer_reader_destroy(&reader);
Etcdserverpb__LeaseKeepAliveResponse *resp = etcdserverpb__lease_keep_alive_response__unpack(
NULL, GRPC_SLICE_LENGTH(slice), GRPC_SLICE_START_PTR(slice));
grpc_slice_unref(slice);
if (!resp) {
kc->active = 0;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Failed to parse keepalive response", "keepalive");
return;
}
kc->reconnect_attempt = 0;
if (resp->ttl == 0) {
kc->active = 0;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_NOT_FOUND, "Lease expired", "keepalive");
etcdserverpb__lease_keep_alive_response__free_unpacked(resp, NULL);
return;
}
HV *result = newHV();
add_header_to_hv(aTHX_ result, resp->header);
hv_store(result, "id", 2, newSVi64(resp->id), 0);
hv_store(result, "ttl", 3, newSVi64(resp->ttl), 0);
etcdserverpb__lease_keep_alive_response__free_unpacked(resp, NULL);
CALL_SUCCESS_CALLBACK(kc->callback, result);
}
/* Perform keepalive reconnection (called from timer callback) */
static void keepalive_reconnect_cb(struct ev_loop *loop, ev_timer *w, int revents) {
dTHX;
(void)loop;
(void)revents;
keepalive_call_t *kc = (keepalive_call_t *)((char *)w - offsetof(keepalive_call_t, reconnect_timer));
ev_etcd_t *client = kc->client;
if (!client->active) {
cleanup_keepalive(aTHX_ kc);
return;
}
/* Cleanup and reinitialize streaming state */
STREAMING_CALL_CLEANUP(kc);
STREAMING_CALL_REINIT(kc);
/* Build keepalive request */
Etcdserverpb__LeaseKeepAliveRequest keep_req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
keep_req.id = kc->lease_id;
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcdserverpb__lease_keep_alive_request__get_packed_size,
etcdserverpb__lease_keep_alive_request__pack, &keep_req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Create call and setup ops */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
kc->call = grpc_channel_create_call(
client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
client->cq, METHOD_LEASE_KEEPALIVE, NULL, deadline, NULL);
if (!kc->call) {
grpc_byte_buffer_destroy(send_buffer);
kc->active = 0;
client->in_callback = 1;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive reconnect failed", "keepalive");
client->in_callback = 0;
if (!client->active) {
finish_client_destroy(aTHX_ client);
return;
}
cleanup_keepalive(aTHX_ kc);
return;
}
grpc_op ops[4] = {0};
grpc_metadata auth_md;
STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, kc);
init_call_base(&kc->base, CALL_TYPE_LEASE_KEEPALIVE);
grpc_call_error err = grpc_call_start_batch(kc->call, ops, 4, &kc->base, NULL);
cleanup_auth_metadata(client, &auth_md);
grpc_byte_buffer_destroy(send_buffer);
if (err != GRPC_CALL_OK) {
STREAMING_CALL_BATCH_ERROR(kc);
client->in_callback = 1;
CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive reconnect batch failed", "keepalive");
client->in_callback = 0;
if (!client->active) {
finish_client_destroy(aTHX_ client);
return;
}
cleanup_keepalive(aTHX_ kc);
}
}
int try_reconnect_keepalive(pTHX_ keepalive_call_t *kc) {
ev_etcd_t *client = kc->client;
if (!kc->auto_reconnect || !client->active || kc->lease_id <= 0) {
( run in 0.487 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )