EV-Etcd

 view release on metacpan or  search on metacpan

etcd_watch.c  view on Meta::CPAN

        CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "Failed to parse watch response", "watch");
        return;
    }

    if (resp->created) {
        wc->watch_id = resp->watch_id;
        wc->reconnect_attempt = 0;
    }

    if (resp->header && resp->header->revision > wc->last_revision) {
        wc->last_revision = resp->header->revision;
        wc->reconnect_attempt = 0;
    }

    if (resp->canceled) {
        wc->active = 0;
        const char *reason = (resp->cancel_reason && strlen(resp->cancel_reason) > 0)
            ? resp->cancel_reason : "Watch cancelled";
        CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_CANCELLED, reason, "watch");
        etcdserverpb__watch_response__free_unpacked(resp, NULL);
        return;
    }

    HV *result = newHV();
    add_header_to_hv(aTHX_ result, resp->header);

    hv_store(result, "watch_id", 8, newSVi64(resp->watch_id), 0);
    hv_store(result, "created", 7, newSViv(resp->created ? 1 : 0), 0);
    /* canceled / compact_revision are server-cancellation signals — handled
     * via the early-return error path above (resp->canceled) and never
     * appear in the success hash, so we don't store them here. */

    AV *events = newAV();
    if (resp->n_events > 0) {
        av_extend(events, resp->n_events - 1);
    }
    for (size_t i = 0; i < resp->n_events; i++) {
        av_push(events, event_to_hashref(aTHX_ resp->events[i]));
    }
    hv_store(result, "events", 6, newRV_noinc((SV *)events), 0);

    etcdserverpb__watch_response__free_unpacked(resp, NULL);

    CALL_SUCCESS_CALLBACK(wc->callback, result);
}

/* Perform the actual watch reconnection (called from timer callback) */
static void watch_reconnect_cb(struct ev_loop *loop, ev_timer *w, int revents) {
    dTHX;
    (void)loop;
    (void)revents;

    watch_call_t *wc = (watch_call_t *)((char *)w - offsetof(watch_call_t, reconnect_timer));
    ev_etcd_t *client = wc->client;

    if (!client->active) {
        cleanup_watch(aTHX_ wc);
        return;
    }

    /* Cleanup and reinitialize streaming state */
    STREAMING_CALL_CLEANUP(wc);
    STREAMING_CALL_REINIT(wc);

    /* Build watch create request */
    Etcdserverpb__WatchCreateRequest create_req = ETCDSERVERPB__WATCH_CREATE_REQUEST__INIT;
    create_req.key.data = (uint8_t *)wc->params.key;
    create_req.key.len = wc->params.key_len;

    if (wc->params.range_end && wc->params.range_end_len > 0) {
        create_req.range_end.data = (uint8_t *)wc->params.range_end;
        create_req.range_end.len = wc->params.range_end_len;
    }

    if (wc->last_revision > 0) {
        create_req.start_revision = wc->last_revision + 1;
    } else if (wc->params.start_revision > 0) {
        create_req.start_revision = wc->params.start_revision;
    }

    create_req.prev_kv = wc->params.prev_kv;
    create_req.progress_notify = wc->params.progress_notify;
    if (wc->params.has_watch_id)
        create_req.watch_id = wc->params.watch_id;

    Etcdserverpb__WatchRequest req = ETCDSERVERPB__WATCH_REQUEST__INIT;
    req.request_union_case = ETCDSERVERPB__WATCH_REQUEST__REQUEST_UNION_CREATE_REQUEST;
    req.create_request = &create_req;

    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        etcdserverpb__watch_request__get_packed_size,
        etcdserverpb__watch_request__pack, &req);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Create call and setup ops */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
    wc->call = grpc_channel_create_call(
        client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
        client->cq, METHOD_WATCH, NULL, deadline, NULL);

    if (!wc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        wc->active = 0;
        client->in_callback = 1;
        CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "Watch reconnect failed", "watch");
        client->in_callback = 0;
        if (!client->active) {
            finish_client_destroy(aTHX_ client);
            return;
        }
        cleanup_watch(aTHX_ wc);
        return;
    }

    grpc_op ops[4] = {0};
    grpc_metadata auth_md;
    STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, wc);

    init_call_base(&wc->base, CALL_TYPE_WATCH);



( run in 2.536 seconds using v1.01-cache-2.11-cpan-75ffa21a3d4 )