EV-Etcd

 view release on metacpan or  search on metacpan

Etcd.xs  view on Meta::CPAN

            wc->params.range_end_len = range_end_len;
        }

        /* prefix - convenience option to watch all keys with given prefix */
        if ((svp = hv_fetchs(hv, "prefix", 0)) && SvTRUE(*svp)) {
            /* Don't override if range_end was explicitly provided */
            if (!range_end_copy && key_len > 0) {
                size_t range_len;
                range_end_copy = compute_prefix_range_end(key_str, key_len, &range_len);
                if (range_end_copy) {
                    create_req.range_end.data = (uint8_t *)range_end_copy;
                    create_req.range_end.len = range_len;
                    /* Store for recovery */
                    Newx(wc->params.range_end, range_len + 1, char);
                    Copy(range_end_copy, wc->params.range_end, range_len, char);
                    wc->params.range_end[range_len] = '\0';
                    wc->params.range_end_len = range_len;
                }
            }
        }

        /* start_revision - watch from specific revision */
        if ((svp = hv_fetchs(hv, "start_revision", 0)) && SvOK(*svp)) {
            create_req.start_revision = SvI64(*svp);
            wc->params.start_revision = create_req.start_revision;
        }

        /* progress_notify - receive periodic progress notifications */
        if ((svp = hv_fetchs(hv, "progress_notify", 0)) && SvTRUE(*svp)) {
            create_req.progress_notify = 1;
            wc->params.progress_notify = 1;
        }

        /* prev_kv - include previous key-value in events */
        if ((svp = hv_fetchs(hv, "prev_kv", 0)) && SvTRUE(*svp)) {
            create_req.prev_kv = 1;
            wc->params.prev_kv = 1;
        }

        /* watch_id - optional explicit watch ID */
        if ((svp = hv_fetchs(hv, "watch_id", 0)) && SvOK(*svp)) {
            create_req.watch_id = SvI64(*svp);
            wc->params.watch_id = create_req.watch_id;
            wc->params.has_watch_id = 1;
        }
    }

    Etcdserverpb__WatchRequest req = ETCDSERVERPB__WATCH_REQUEST__INIT;
    req.request_union_case = ETCDSERVERPB__WATCH_REQUEST__REQUEST_UNION_CREATE_REQUEST;
    req.create_request = &create_req;

    /* Serialize request */
    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        etcdserverpb__watch_request__get_packed_size,
        etcdserverpb__watch_request__pack, &req);
    if (range_end_copy) Safefree(range_end_copy);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Create streaming call */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);  /* No timeout for watch */

    wc->call = grpc_channel_create_call(
        client->channel,
        NULL,  /* parent call */
        GRPC_PROPAGATE_DEFAULTS,
        client->cq,
        METHOD_WATCH,
        NULL,  /* host */
        deadline,
        NULL   /* reserved */
    );

    if (!wc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        grpc_metadata_array_destroy(&wc->initial_metadata);
        grpc_metadata_array_destroy(&wc->trailing_metadata);
        grpc_slice_unref(wc->status_details);
        SvREFCNT_dec(wc->callback);
        if (wc->params.key) Safefree(wc->params.key);
        if (wc->params.range_end) Safefree(wc->params.range_end);
        Safefree(wc);
        croak("Failed to create gRPC call for watch");
    }

    /* Start the call with initial operations */
    grpc_op ops[4] = {0};
    grpc_metadata auth_md;

    /* Send initial metadata (with auth token if available) */
    ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
    setup_auth_metadata(client, &ops[0], &auth_md);

    /* Receive initial metadata */
    ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
    ops[1].data.recv_initial_metadata.recv_initial_metadata = &wc->initial_metadata;

    /* Send the watch create request */
    ops[2].op = GRPC_OP_SEND_MESSAGE;
    ops[2].data.send_message.send_message = send_buffer;

    /* Receive the first response (WatchResponse with created=true) */
    ops[3].op = GRPC_OP_RECV_MESSAGE;
    ops[3].data.recv_message.recv_message = &wc->recv_buffer;

    /* Start batch - status receive is handled separately when stream ends */
    grpc_call_error err = grpc_call_start_batch(wc->call, ops, 4, &wc->base, NULL);

    cleanup_auth_metadata(client, &auth_md);
    grpc_byte_buffer_destroy(send_buffer);

    if (err != GRPC_CALL_OK) {
        grpc_metadata_array_destroy(&wc->initial_metadata);
        grpc_metadata_array_destroy(&wc->trailing_metadata);
        grpc_slice_unref(wc->status_details);
        grpc_call_unref(wc->call);
        SvREFCNT_dec(wc->callback);
        /* Free watch params allocated for recovery */
        if (wc->params.key) {
            Safefree(wc->params.key);

Etcd.xs  view on Meta::CPAN

    client->pending_calls = pc;
}

EV::Etcd::Keepalive
ev_etcd_lease_keepalive(client, lease_id, ...)
    EV::Etcd client
    int64_t lease_id
CODE:
{
    /* Parse arguments: lease_keepalive(lease_id, [opts,] callback) */
    SV *opts = NULL;
    SV *callback;

    if (items == 3) {
        callback = ST(2);
    } else if (items == 4) {
        opts = ST(2);
        callback = ST(3);
    } else {
        croak("Usage: $client->lease_keepalive($lease_id, [\\%%opts,] $callback)");
    }

    VALIDATE_CALLBACK(callback);

    /* Create keepalive structure (Newxz zeroes everything; only set non-zero fields) */
    keepalive_call_t *kc;
    Newxz(kc, 1, keepalive_call_t);
    init_call_base(&kc->base, CALL_TYPE_LEASE_KEEPALIVE);
    kc->callback = newSVsv(callback);
    kc->client = client;
    kc->active = 1;
    kc->auto_reconnect = 1;  /* Enable by default */
    kc->lease_id = lease_id;
    kc->client_owns = 1;
    kc->perl_owns = 1;

    if (opts && SvROK(opts) && SvTYPE(SvRV(opts)) == SVt_PVHV) {
        HV *hv = (HV *)SvRV(opts);
        SV **svp;

        if ((svp = hv_fetchs(hv, "auto_reconnect", 0)) && !SvTRUE(*svp)) {
            kc->auto_reconnect = 0;
        }
    }
    grpc_metadata_array_init(&kc->initial_metadata);
    grpc_metadata_array_init(&kc->trailing_metadata);
    kc->status_details = grpc_empty_slice();

    /* Build LeaseKeepAliveRequest */
    Etcdserverpb__LeaseKeepAliveRequest req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
    req.id = lease_id;

    /* Serialize request */
    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        etcdserverpb__lease_keep_alive_request__get_packed_size,
        etcdserverpb__lease_keep_alive_request__pack, &req);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Create streaming call */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);

    kc->call = grpc_channel_create_call(
        client->channel,
        NULL,
        GRPC_PROPAGATE_DEFAULTS,
        client->cq,
        METHOD_LEASE_KEEPALIVE,
        NULL,
        deadline,
        NULL
    );

    if (!kc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        grpc_metadata_array_destroy(&kc->initial_metadata);
        grpc_metadata_array_destroy(&kc->trailing_metadata);
        grpc_slice_unref(kc->status_details);
        SvREFCNT_dec(kc->callback);
        Safefree(kc);
        croak("Failed to create gRPC call for lease_keepalive");
    }

    /* Start the call with initial operations */
    grpc_op ops[4] = {0};
    grpc_metadata auth_md;

    ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
    setup_auth_metadata(client, &ops[0], &auth_md);

    ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
    ops[1].data.recv_initial_metadata.recv_initial_metadata = &kc->initial_metadata;

    ops[2].op = GRPC_OP_SEND_MESSAGE;
    ops[2].data.send_message.send_message = send_buffer;

    /* Receive the first response */
    ops[3].op = GRPC_OP_RECV_MESSAGE;
    ops[3].data.recv_message.recv_message = &kc->recv_buffer;

    grpc_call_error err = grpc_call_start_batch(kc->call, ops, 4, &kc->base, NULL);

    cleanup_auth_metadata(client, &auth_md);
    grpc_byte_buffer_destroy(send_buffer);

    if (err != GRPC_CALL_OK) {
        grpc_metadata_array_destroy(&kc->initial_metadata);
        grpc_metadata_array_destroy(&kc->trailing_metadata);
        grpc_slice_unref(kc->status_details);
        grpc_call_unref(kc->call);
        SvREFCNT_dec(kc->callback);
        Safefree(kc);
        croak("Failed to start gRPC call: %d", err);
    }

    kc->next = client->keepalives;
    client->keepalives = kc;

    RETVAL = kc;
}

Etcd.xs  view on Meta::CPAN

{
    /* Parse arguments: election_observe(name, [opts,] callback) */
    SV *opts = NULL;
    SV *callback;

    if (items == 3) {
        callback = ST(2);
    } else if (items == 4) {
        opts = ST(2);
        callback = ST(3);
    } else {
        croak("Usage: $client->election_observe($name, [\\%%opts,] $callback)");
    }

    VALIDATE_CALLBACK(callback);

    STRLEN name_len;
    const char *name_str = SvPV(name, name_len);
    VALIDATE_KEY_SIZE(name_len);

    int auto_reconnect = 1;
    if (opts && SvROK(opts) && SvTYPE(SvRV(opts)) == SVt_PVHV) {
        HV *hv = (HV *)SvRV(opts);
        SV **sv_ar = hv_fetch(hv, "auto_reconnect", 14, 0);
        if (sv_ar && *sv_ar) {
            auto_reconnect = SvTRUE(*sv_ar);
        }
    }

    /* Newxz zeroes everything; only set non-zero fields */
    observe_call_t *oc;
    Newxz(oc, 1, observe_call_t);
    init_call_base(&oc->base, CALL_TYPE_ELECTION_OBSERVE);
    oc->callback = newSVsv(callback);
    oc->client = client;
    oc->active = 1;
    oc->auto_reconnect = auto_reconnect;
    oc->client_owns = 1;
    oc->perl_owns = 1;
    grpc_metadata_array_init(&oc->initial_metadata);
    grpc_metadata_array_init(&oc->trailing_metadata);
    oc->status_details = grpc_empty_slice();

    /* Save params for reconnection */
    Newx(oc->params.name, name_len + 1, char);
    Copy(name_str, oc->params.name, name_len, char);
    oc->params.name[name_len] = '\0';
    oc->params.name_len = name_len;

    V3electionpb__LeaderRequest req = V3ELECTIONPB__LEADER_REQUEST__INIT;
    req.name.data = (uint8_t *)name_str;
    req.name.len = name_len;

    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        v3electionpb__leader_request__get_packed_size,
        v3electionpb__leader_request__pack, &req);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Use infinite deadline for streaming call */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);

    oc->call = grpc_channel_create_call(
        client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
        client->cq, METHOD_ELECTION_OBSERVE, NULL, deadline, NULL
    );

    if (!oc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        grpc_metadata_array_destroy(&oc->initial_metadata);
        grpc_metadata_array_destroy(&oc->trailing_metadata);
        grpc_slice_unref(oc->status_details);
        SvREFCNT_dec(oc->callback);
        if (oc->params.name) Safefree(oc->params.name);
        Safefree(oc);
        croak("Failed to create gRPC call for election_observe");
    }

    grpc_op ops[4] = {0};
    grpc_metadata auth_md;

    ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
    setup_auth_metadata(client, &ops[0], &auth_md);

    ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
    ops[1].data.recv_initial_metadata.recv_initial_metadata = &oc->initial_metadata;

    ops[2].op = GRPC_OP_SEND_MESSAGE;
    ops[2].data.send_message.send_message = send_buffer;

    ops[3].op = GRPC_OP_RECV_MESSAGE;
    ops[3].data.recv_message.recv_message = &oc->recv_buffer;

    grpc_call_error err = grpc_call_start_batch(oc->call, ops, 4, &oc->base, NULL);
    cleanup_auth_metadata(client, &auth_md);
    grpc_byte_buffer_destroy(send_buffer);

    if (err != GRPC_CALL_OK) {
        grpc_metadata_array_destroy(&oc->initial_metadata);
        grpc_metadata_array_destroy(&oc->trailing_metadata);
        grpc_slice_unref(oc->status_details);
        grpc_call_unref(oc->call);
        SvREFCNT_dec(oc->callback);
        if (oc->params.name) Safefree(oc->params.name);
        Safefree(oc);
        croak("Failed to start gRPC call: %d", err);
    }

    /* Add to observes list */
    oc->next = client->observes;
    client->observes = oc;

    RETVAL = oc;
}
OUTPUT:
    RETVAL

void
ev_etcd_member_list(client, ...)
    EV::Etcd client

Etcd.xs  view on Meta::CPAN

            pc = next;
        }
        watch_call_t *wc = client->watches;
        while (wc) {
            watch_call_t *next = wc->next;
            if (ev_is_active(&wc->reconnect_timer))
                ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
            SvREFCNT_dec(wc->callback);
            wc->callback = NULL;
            wc->client_owns = 0;
            if (!wc->perl_owns) {
                if (wc->params.key) Safefree(wc->params.key);
                if (wc->params.range_end) Safefree(wc->params.range_end);
                Safefree(wc);
            }
            wc = next;
        }
        keepalive_call_t *kc = client->keepalives;
        while (kc) {
            keepalive_call_t *next = kc->next;
            if (ev_is_active(&kc->reconnect_timer))
                ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
            SvREFCNT_dec(kc->callback);
            kc->callback = NULL;
            kc->client_owns = 0;
            if (!kc->perl_owns) Safefree(kc);
            kc = next;
        }
        observe_call_t *oc = client->observes;
        while (oc) {
            observe_call_t *next = oc->next;
            if (ev_is_active(&oc->reconnect_timer))
                ev_timer_stop(EV_DEFAULT, &oc->reconnect_timer);
            SvREFCNT_dec(oc->callback);
            oc->callback = NULL;
            oc->client_owns = 0;
            if (!oc->perl_owns) {
                if (oc->params.name) Safefree(oc->params.name);
                Safefree(oc);
            }
            oc = next;
        }
        if (ev_is_active(&client->health_timer))
            ev_timer_stop(EV_DEFAULT, &client->health_timer);
        if (ev_is_active(&client->cq_async))
            ev_async_stop(EV_DEFAULT, &client->cq_async);
        goto free_perl_resources;
    }

    /* Mark client as inactive first to prevent callbacks from accessing freed memory */
    client->active = 0;

    /* Stop ev_async watcher */
    if (ev_is_active(&client->cq_async)) {
        ev_async_stop(EV_DEFAULT, &client->cq_async);
    }

    /* Signal the gRPC thread to stop and wait for it */
    client->thread_running = 0;

    /* Mark every streaming call inactive and cancel its gRPC call so pending
     * batches complete promptly with success=0. */
    watch_call_t *wc = client->watches;
    while (wc) {
        wc->active = 0;
        if (ev_is_active(&wc->reconnect_timer))
            ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
        if (wc->call) {
            grpc_call_cancel(wc->call, NULL);
        }
        wc = wc->next;
    }

    keepalive_call_t *kc = client->keepalives;
    while (kc) {
        kc->active = 0;
        if (ev_is_active(&kc->reconnect_timer))
            ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
        if (kc->call) {
            grpc_call_cancel(kc->call, NULL);
        }
        kc = kc->next;
    }

    observe_call_t *oc = client->observes;
    while (oc) {
        oc->active = 0;
        if (ev_is_active(&oc->reconnect_timer))
            ev_timer_stop(EV_DEFAULT, &oc->reconnect_timer);
        if (oc->call) {
            grpc_call_cancel(oc->call, NULL);
        }
        oc = oc->next;
    }

    /* Cancel pending unary calls */
    pending_call_t *pc = client->pending_calls;
    while (pc) {
        if (pc->call) {
            grpc_call_cancel(pc->call, NULL);
        }
        pc = pc->next;
    }

    /* Shutdown the completion queue - this will cause the thread to exit */
    if (client->cq) {
        grpc_completion_queue_shutdown(client->cq);
    }

    /* Wait for the gRPC thread to finish */
    pthread_join(client->cq_thread, NULL);

    /* Clean up the event queue (any remaining queued events) */
    pthread_mutex_lock(&client->queue_mutex);
    queued_event_t *qe = client->event_queue;
    while (qe) {
        queued_event_t *next = qe->next;
        free(qe);
        qe = next;
    }
    client->event_queue = NULL;



( run in 0.850 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )