EV-Etcd
view release on metacpan or search on metacpan
etcd_watch.c view on Meta::CPAN
CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "Failed to parse watch response", "watch");
return;
}
if (resp->created) {
wc->watch_id = resp->watch_id;
wc->reconnect_attempt = 0;
}
if (resp->header && resp->header->revision > wc->last_revision) {
wc->last_revision = resp->header->revision;
wc->reconnect_attempt = 0;
}
if (resp->canceled) {
wc->active = 0;
const char *reason = (resp->cancel_reason && strlen(resp->cancel_reason) > 0)
? resp->cancel_reason : "Watch cancelled";
CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_CANCELLED, reason, "watch");
etcdserverpb__watch_response__free_unpacked(resp, NULL);
return;
}
HV *result = newHV();
add_header_to_hv(aTHX_ result, resp->header);
hv_store(result, "watch_id", 8, newSVi64(resp->watch_id), 0);
hv_store(result, "created", 7, newSViv(resp->created ? 1 : 0), 0);
/* canceled / compact_revision are server-cancellation signals â handled
* via the early-return error path above (resp->canceled) and never
* appear in the success hash, so we don't store them here. */
AV *events = newAV();
if (resp->n_events > 0) {
av_extend(events, resp->n_events - 1);
}
for (size_t i = 0; i < resp->n_events; i++) {
av_push(events, event_to_hashref(aTHX_ resp->events[i]));
}
hv_store(result, "events", 6, newRV_noinc((SV *)events), 0);
etcdserverpb__watch_response__free_unpacked(resp, NULL);
CALL_SUCCESS_CALLBACK(wc->callback, result);
}
/* Perform the actual watch reconnection (called from timer callback) */
static void watch_reconnect_cb(struct ev_loop *loop, ev_timer *w, int revents) {
dTHX;
(void)loop;
(void)revents;
watch_call_t *wc = (watch_call_t *)((char *)w - offsetof(watch_call_t, reconnect_timer));
ev_etcd_t *client = wc->client;
if (!client->active) {
cleanup_watch(aTHX_ wc);
return;
}
/* Cleanup and reinitialize streaming state */
STREAMING_CALL_CLEANUP(wc);
STREAMING_CALL_REINIT(wc);
/* Build watch create request */
Etcdserverpb__WatchCreateRequest create_req = ETCDSERVERPB__WATCH_CREATE_REQUEST__INIT;
create_req.key.data = (uint8_t *)wc->params.key;
create_req.key.len = wc->params.key_len;
if (wc->params.range_end && wc->params.range_end_len > 0) {
create_req.range_end.data = (uint8_t *)wc->params.range_end;
create_req.range_end.len = wc->params.range_end_len;
}
if (wc->last_revision > 0) {
create_req.start_revision = wc->last_revision + 1;
} else if (wc->params.start_revision > 0) {
create_req.start_revision = wc->params.start_revision;
}
create_req.prev_kv = wc->params.prev_kv;
create_req.progress_notify = wc->params.progress_notify;
if (wc->params.has_watch_id)
create_req.watch_id = wc->params.watch_id;
Etcdserverpb__WatchRequest req = ETCDSERVERPB__WATCH_REQUEST__INIT;
req.request_union_case = ETCDSERVERPB__WATCH_REQUEST__REQUEST_UNION_CREATE_REQUEST;
req.create_request = &create_req;
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcdserverpb__watch_request__get_packed_size,
etcdserverpb__watch_request__pack, &req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Create call and setup ops */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
wc->call = grpc_channel_create_call(
client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
client->cq, METHOD_WATCH, NULL, deadline, NULL);
if (!wc->call) {
grpc_byte_buffer_destroy(send_buffer);
wc->active = 0;
client->in_callback = 1;
CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "Watch reconnect failed", "watch");
client->in_callback = 0;
if (!client->active) {
finish_client_destroy(aTHX_ client);
return;
}
cleanup_watch(aTHX_ wc);
return;
}
grpc_op ops[4] = {0};
grpc_metadata auth_md;
STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, wc);
init_call_base(&wc->base, CALL_TYPE_WATCH);
( run in 2.536 seconds using v1.01-cache-2.11-cpan-75ffa21a3d4 )