EV-Etcd
view release on metacpan or search on metacpan
etcd_election.c view on Meta::CPAN
if (!oc->client_owns) observe_call_free(aTHX_ oc);
}
/* Process LeaderResponse for observe stream */
void process_observe_response(pTHX_ observe_call_t *oc) {
if (!oc->recv_buffer) {
oc->active = 0;
CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_INTERNAL, "No observe response received", "observe");
return;
}
grpc_byte_buffer_reader reader;
if (!grpc_byte_buffer_reader_init(&reader, oc->recv_buffer)) {
oc->active = 0;
CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_INTERNAL, "Failed to read observe response buffer", "observe");
return;
}
grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
grpc_byte_buffer_reader_destroy(&reader);
V3electionpb__LeaderResponse *resp = v3electionpb__leader_response__unpack(
NULL, GRPC_SLICE_LENGTH(slice), GRPC_SLICE_START_PTR(slice));
grpc_slice_unref(slice);
if (!resp) {
oc->active = 0;
CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_INTERNAL, "Failed to parse observe response", "observe");
return;
}
/* Reset reconnect attempt on successful response */
oc->reconnect_attempt = 0;
HV *result = newHV();
add_header_to_hv(aTHX_ result, resp->header);
if (resp->kv) {
hv_store(result, "kv", 2, kv_to_hashref(aTHX_ resp->kv), 0);
}
v3electionpb__leader_response__free_unpacked(resp, NULL);
CALL_SUCCESS_CALLBACK(oc->callback, result);
}
/* Perform observe reconnection (called from timer callback) */
static void observe_reconnect_cb(struct ev_loop *loop, ev_timer *w, int revents) {
dTHX;
(void)loop;
(void)revents;
observe_call_t *oc = (observe_call_t *)((char *)w - offsetof(observe_call_t, reconnect_timer));
ev_etcd_t *client = oc->client;
if (!client->active) {
cleanup_observe(aTHX_ oc);
return;
}
/* Cleanup and reinitialize streaming state */
STREAMING_CALL_CLEANUP(oc);
STREAMING_CALL_REINIT(oc);
/* Create LeaderRequest for observe */
V3electionpb__LeaderRequest req = V3ELECTIONPB__LEADER_REQUEST__INIT;
req.name.data = (uint8_t *)oc->params.name;
req.name.len = oc->params.name_len;
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
v3electionpb__leader_request__get_packed_size,
v3electionpb__leader_request__pack, &req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Create call and setup ops */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
oc->call = grpc_channel_create_call(
client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
client->cq, METHOD_ELECTION_OBSERVE, NULL, deadline, NULL);
if (!oc->call) {
grpc_byte_buffer_destroy(send_buffer);
oc->active = 0;
client->in_callback = 1;
CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_INTERNAL, "Observe reconnect failed", "observe");
client->in_callback = 0;
if (!client->active) {
finish_client_destroy(aTHX_ client);
return;
}
cleanup_observe(aTHX_ oc);
return;
}
grpc_op ops[4] = {0};
grpc_metadata auth_md;
STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, oc);
init_call_base(&oc->base, CALL_TYPE_ELECTION_OBSERVE);
grpc_call_error err = grpc_call_start_batch(oc->call, ops, 4, &oc->base, NULL);
cleanup_auth_metadata(client, &auth_md);
grpc_byte_buffer_destroy(send_buffer);
if (err != GRPC_CALL_OK) {
STREAMING_CALL_BATCH_ERROR(oc);
client->in_callback = 1;
CALL_STATUS_ERROR_CALLBACK(oc->callback, GRPC_STATUS_INTERNAL, "Observe reconnect batch failed", "observe");
client->in_callback = 0;
if (!client->active) {
finish_client_destroy(aTHX_ client);
return;
}
cleanup_observe(aTHX_ oc);
}
}
int try_reconnect_observe(pTHX_ observe_call_t *oc) {
ev_etcd_t *client = oc->client;
( run in 1.178 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )