EV-Etcd

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

Revision history for EV-Etcd

0.06  2026-03-17
    - Add opts to lease_keepalive (auto_reconnect) and member_list (linearizable)
    - Preserve watch_id across reconnects
    - Remove GRPC_STATUS_INTERNAL from retryable set
    - Add Data::Path::XS watch examples

0.05  2026-03-14
    - Fix edge cases

0.04  2026-03-02
    - Add missing use EV () before XSLoader::load

0.03  2026-03-02
    - Fix error sources, election_observe arg order, watch early-cancel
    - Clear auth token on auth_disable
    - Deduplicate response handlers, remove dead code

0.02  2026-02-10
    - Initial release
    - KV operations: get, put, delete, range, txn (compare-and-swap)
    - Watch with bidirectional streaming and auto-reconnect
    - Lease: grant, revoke, keepalive, time-to-live, leases
    - Lock and unlock tied to leases
    - Election: campaign, proclaim, leader, resign, observe
    - Cluster: member list/add/remove/update/promote
    - Maintenance: status, compact, defragment, alarm, hash_kv, move_leader
    - Auth: user/role management, authenticate, enable/disable
    - Health monitoring with configurable interval and callback
    - Automatic retries for transient gRPC failures
    - Multiple endpoint support with failover
    - Structured error callbacks ({code, message, source})

Etcd.xs  view on Meta::CPAN

        grpc_metadata_array_destroy(&pc->trailing_metadata);
        if (pc->recv_buffer) grpc_byte_buffer_destroy(pc->recv_buffer);
        grpc_slice_unref(pc->status_details);
        if (pc->call) grpc_call_unref(pc->call);
        SvREFCNT_dec(pc->callback);
        Safefree(pc);
    }
    while (client->watches) {
        cleanup_watch(aTHX_ client->watches);
    }
    while (client->keepalives) {
        cleanup_keepalive(aTHX_ client->keepalives);
    }
    while (client->observes) {
        cleanup_observe(aTHX_ client->observes);
    }

    /* Free client-level resources (mirrors free_perl_resources in DESTROY) */
    if (client->health_callback)
        SvREFCNT_dec(client->health_callback);
    if (client->auth_token) {
        memset(client->auth_token, 0, client->auth_token_len);

Etcd.xs  view on Meta::CPAN

                        cleanup_watch(aTHX_ wc);
                    }
                } else {
                    if (wc->active) {
                        CALL_STATUS_ERROR_CALLBACK(wc->callback, GRPC_STATUS_INTERNAL, "Watch setup failed", "watch");
                        if (!client->active) return;
                    }
                    cleanup_watch(aTHX_ wc);
                }
            } else if (base->type == CALL_TYPE_LEASE_KEEPALIVE_RECV) {
            /* Keepalive receive completion */
            keepalive_call_t *kc = (keepalive_call_t *)base;

            if (success && kc->active) {
                    process_keepalive_response(aTHX_ kc);
                    if (!client->active) return;
                    /* Re-arm receive if still active */
                    if (kc->active) {
                        keepalive_rearm_recv(aTHX_ kc);
                    } else {
                        /* Response handler set active=0 (e.g., lease expired) */
                        cleanup_keepalive(aTHX_ kc);
                    }
                } else if (!success && kc->active) {
                    /* Stream ended or error - try to reconnect */
                    kc->active = 0;
                    /* Try automatic reconnection */
                    if (try_reconnect_keepalive(aTHX_ kc)) {
                        /* Reconnection initiated, don't notify callback yet */
                    } else {
                        /* Reconnection failed or disabled, notify callback and cleanup */
                        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_UNAVAILABLE, "Keepalive stream ended", "keepalive");
                        if (!client->active) return;
                        cleanup_keepalive(aTHX_ kc);
                    }
                } else {
                    /* RECV completed but keepalive already inactive */
                    cleanup_keepalive(aTHX_ kc);
                }
            } else if (base->type == CALL_TYPE_LEASE_KEEPALIVE) {
            /* Initial keepalive setup complete - process first message if any */
            keepalive_call_t *kc = (keepalive_call_t *)base;
            if (success) {
                    /* Process the first message that was received in the initial batch */
                    if (kc->recv_buffer && kc->active) {
                        process_keepalive_response(aTHX_ kc);
                        if (!client->active) return;
                    }
                    /* Re-arm to receive more messages */
                    if (kc->active) {
                        keepalive_rearm_recv(aTHX_ kc);
                    } else {
                        /* First response set active=0 (e.g., lease already expired) */
                        cleanup_keepalive(aTHX_ kc);
                    }
                } else {
                    if (kc->active) {
                        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_INTERNAL, "Keepalive setup failed", "keepalive");
                        if (!client->active) return;
                    }
                    cleanup_keepalive(aTHX_ kc);
                }
            } else if (base->type == CALL_TYPE_ELECTION_OBSERVE_RECV) {
            /* Election observe receive completion */
            observe_call_t *oc = (observe_call_t *)base;

            if (success && oc->active) {
                    process_observe_response(aTHX_ oc);
                    if (!client->active) return;
                    /* Re-arm receive if still active */
                    if (oc->active) {

Etcd.xs  view on Meta::CPAN

        for (int j = 0; j < client->endpoint_count; j++) {
            Safefree(client->endpoints[j]);
        }
        Safefree(client->endpoints);
        Safefree(client);
        croak("Failed to create gRPC completion queue thread");
    }

    client->pending_calls = NULL;
    client->watches = NULL;
    client->keepalives = NULL;
    client->observes = NULL;
    /* Store auth token if provided */
    if (init_auth_token && init_auth_token_len > 0) {
        Newx(client->auth_token, init_auth_token_len + 1, char);
        Copy(init_auth_token, client->auth_token, init_auth_token_len, char);
        client->auth_token[init_auth_token_len] = '\0';
        client->auth_token_len = init_auth_token_len;
    } else {
        client->auth_token = NULL;
        client->auth_token_len = 0;

Etcd.xs  view on Meta::CPAN


    if (err != GRPC_CALL_OK) {
        CLEANUP_PENDING_CALL_ON_ERROR(pc);
        croak("Failed to start gRPC call: %d", err);
    }

    pc->next = client->pending_calls;
    client->pending_calls = pc;
}

EV::Etcd::Keepalive
ev_etcd_lease_keepalive(client, lease_id, ...)
    EV::Etcd client
    IV lease_id
CODE:
{
    /* Parse arguments: lease_keepalive(lease_id, [opts,] callback) */
    SV *opts = NULL;
    SV *callback;

    if (items == 3) {
        callback = ST(2);
    } else if (items == 4) {
        opts = ST(2);
        callback = ST(3);
    } else {
        croak("Usage: $client->lease_keepalive($lease_id, [\\%%opts,] $callback)");
    }

    VALIDATE_CALLBACK(callback);

    /* Create keepalive structure */
    keepalive_call_t *kc;
    Newxz(kc, 1, keepalive_call_t);
    init_call_base(&kc->base, CALL_TYPE_LEASE_KEEPALIVE);
    kc->callback = newSVsv(callback);
    kc->client = client;
    kc->active = 1;
    kc->auto_reconnect = 1;  /* Enable by default */
    kc->lease_id = lease_id;

    /* Process options */
    if (opts && SvROK(opts) && SvTYPE(SvRV(opts)) == SVt_PVHV) {
        HV *hv = (HV *)SvRV(opts);

Etcd.xs  view on Meta::CPAN

    kc->recv_buffer = NULL;
    kc->status_details = grpc_empty_slice();

    /* Build LeaseKeepAliveRequest */
    Etcdserverpb__LeaseKeepAliveRequest req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
    req.id = lease_id;

    /* Serialize request */
    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        etcdserverpb__lease_keep_alive_request__get_packed_size,
        etcdserverpb__lease_keep_alive_request__pack, &req);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Create streaming call */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);

    kc->call = grpc_channel_create_call(
        client->channel,
        NULL,
        GRPC_PROPAGATE_DEFAULTS,

Etcd.xs  view on Meta::CPAN

        NULL
    );

    if (!kc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        grpc_metadata_array_destroy(&kc->initial_metadata);
        grpc_metadata_array_destroy(&kc->trailing_metadata);
        grpc_slice_unref(kc->status_details);
        SvREFCNT_dec(kc->callback);
        Safefree(kc);
        croak("Failed to create gRPC call for lease_keepalive");
    }

    /* Start the call with initial operations */
    grpc_op ops[4] = {0};
    grpc_metadata auth_md;

    ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
    setup_auth_metadata(client, &ops[0], &auth_md);

    ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;

Etcd.xs  view on Meta::CPAN

    if (err != GRPC_CALL_OK) {
        grpc_metadata_array_destroy(&kc->initial_metadata);
        grpc_metadata_array_destroy(&kc->trailing_metadata);
        grpc_slice_unref(kc->status_details);
        grpc_call_unref(kc->call);
        SvREFCNT_dec(kc->callback);
        Safefree(kc);
        croak("Failed to start gRPC call: %d", err);
    }

    kc->next = client->keepalives;
    client->keepalives = kc;

    RETVAL = kc;
}
OUTPUT:
    RETVAL

void
ev_etcd_txn(client, compare_av, success_av, failure_av, callback)
    EV::Etcd client
    SV *compare_av

Etcd.xs  view on Meta::CPAN

        while (wc) {
            watch_call_t *next = wc->next;
            if (ev_is_active(&wc->reconnect_timer))
                ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
            SvREFCNT_dec(wc->callback);
            if (wc->params.key) Safefree(wc->params.key);
            if (wc->params.range_end) Safefree(wc->params.range_end);
            Safefree(wc);
            wc = next;
        }
        keepalive_call_t *kc = client->keepalives;
        while (kc) {
            keepalive_call_t *next = kc->next;
            if (ev_is_active(&kc->reconnect_timer))
                ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
            SvREFCNT_dec(kc->callback);
            Safefree(kc);
            kc = next;
        }
        observe_call_t *oc = client->observes;
        while (oc) {
            observe_call_t *next = oc->next;
            if (ev_is_active(&oc->reconnect_timer))

Etcd.xs  view on Meta::CPAN

    client->active = 0;

    /* Stop ev_async watcher */
    if (ev_is_active(&client->cq_async)) {
        ev_async_stop(EV_DEFAULT, &client->cq_async);
    }

    /* Signal the gRPC thread to stop and wait for it */
    client->thread_running = 0;

    /* Mark all watches and keepalives as inactive and cancel their gRPC calls.
     * This will cause pending operations to complete with success=0. */
    watch_call_t *wc = client->watches;
    while (wc) {
        wc->active = 0;
        if (ev_is_active(&wc->reconnect_timer))
            ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
        if (wc->call) {
            grpc_call_cancel(wc->call, NULL);
        }
        wc = wc->next;
    }

    keepalive_call_t *kc = client->keepalives;
    while (kc) {
        kc->active = 0;
        if (ev_is_active(&kc->reconnect_timer))
            ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
        if (kc->call) {
            grpc_call_cancel(kc->call, NULL);
        }
        kc = kc->next;
    }

Etcd.xs  view on Meta::CPAN

            if (wc->params.key) {
                Safefree(wc->params.key);
            }
            if (wc->params.range_end) {
                Safefree(wc->params.range_end);
            }
            Safefree(wc);
            wc = next;
        }

        kc = client->keepalives;
        while (kc) {
            keepalive_call_t *next = kc->next;
            grpc_metadata_array_destroy(&kc->initial_metadata);
            grpc_metadata_array_destroy(&kc->trailing_metadata);
            if (kc->recv_buffer) {
                grpc_byte_buffer_destroy(kc->recv_buffer);
            }
            grpc_slice_unref(kc->status_details);
            if (kc->call) {
                grpc_call_unref(kc->call);
            }
            SvREFCNT_dec(kc->callback);

Etcd.xs  view on Meta::CPAN

{
    /* The watch_call_t is managed by the client's watches list.
     * DESTROY being called just means Perl lost its reference to the watch object,
     * but the watch is still active on the client side.
     * We intentionally do NOT deactivate the watch here.
     * If the user wants to stop the watch, they should call cancel().
     * The watch will be cleaned up when the client is destroyed. */
    (void)watch;  /* Silence unused parameter warning */
}

MODULE = EV::Etcd  PACKAGE = EV::Etcd::Keepalive  PREFIX = ev_etcd_keepalive_

void
ev_etcd_keepalive_cancel(keepalive, callback)
    EV::Etcd::Keepalive keepalive
    SV *callback
CODE:
{
    VALIDATE_CALLBACK(callback);

    keepalive_call_t *kc = keepalive;

    /* Stop reconnect timer unconditionally — may be pending even when active=0 */
    if (ev_is_active(&kc->reconnect_timer))
        ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);

    if (!kc->active) {
        CALL_SUCCESS_CALLBACK(callback, newHV());
        return;
    }

    kc->active = 0;

    /* Force pending RECV to complete immediately */
    if (kc->call)
        grpc_call_cancel(kc->call, NULL);

    CALL_SUCCESS_CALLBACK(callback, newHV());
}

void
ev_etcd_keepalive_DESTROY(keepalive)
    EV::Etcd::Keepalive keepalive
CODE:
{
    (void)keepalive;
}

MODULE = EV::Etcd  PACKAGE = EV::Etcd::Observe  PREFIX = ev_etcd_observe_

void
ev_etcd_observe_cancel(observe, callback)
    EV::Etcd::Observe observe
    SV *callback
CODE:
{

README.md  view on Meta::CPAN

    }
});

EV::run;
```

## Features

- **KV**: get, put, delete, range, transactions (compare-and-swap)
- **Watch**: bidirectional streaming with auto-reconnect
- **Lease**: grant, revoke, keepalive, time-to-live
- **Lock**: distributed locking tied to leases
- **Election**: leader campaign, observe, proclaim, resign
- **Cluster**: member list/add/remove/update/promote
- **Maintenance**: status, compact, defragment, alarm, hash_kv, move_leader
- **Auth**: user/role management, authenticate, enable/disable
- **Health monitoring** with configurable interval and callback
- **Automatic retries** for transient gRPC failures

## Architecture

eg/lease_test.pl  view on Meta::CPAN

use EV::Etcd;
use Data::Dumper;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

print "=== Lease Test ===\n\n";

my $lease_id;
my $keepalive_count = 0;

# Step 1: Grant a lease with 10 second TTL
print "1. Granting lease with TTL=10s...\n";
$client->lease_grant(10, sub {
    my ($resp, $err) = @_;

    if ($err) {
        print "Lease grant error: $err->{message}\n";
        EV::break;
        return;

etcd_common.h  view on Meta::CPAN

    int active;
    struct ev_etcd_struct *client;
    struct watch_call *next;
    int auto_reconnect;
    int64_t last_revision;
    watch_params_t params;
    int reconnect_attempt;
    ev_timer reconnect_timer;  /* Backoff timer for reconnection */
} watch_call_t;

/* Keepalive structure (for streaming lease keepalive) */
typedef struct keepalive_call {
    call_base_t base;  /* Must be first */
    grpc_call *call;
    SV *callback;
    grpc_metadata_array initial_metadata;
    grpc_metadata_array trailing_metadata;
    grpc_byte_buffer *recv_buffer;
    grpc_slice status_details;
    int64_t lease_id;
    int active;
    struct ev_etcd_struct *client;
    struct keepalive_call *next;
    int auto_reconnect;
    int reconnect_attempt;
    ev_timer reconnect_timer;  /* Backoff timer for reconnection */
} keepalive_call_t;

/* Election observe parameters for reconnection */
typedef struct observe_params {
    char *name;
    size_t name_len;
} observe_params_t;

/* Election observe structure (for streaming election observe) */
typedef struct observe_call {
    call_base_t base;  /* Must be first */

etcd_common.h  view on Meta::CPAN

    /* Hybrid threading: gRPC thread + ev_async for main thread notification */
    pthread_t cq_thread;        /* Thread running gRPC CQ loop */
    pthread_mutex_t queue_mutex; /* Protects event_queue */
    ev_async cq_async;          /* Async watcher to wake main thread */
    queued_event_t *event_queue; /* Queue of completed events */
    queued_event_t *event_queue_tail; /* Tail for O(1) append */
    volatile int thread_running; /* Flag to signal thread shutdown */

    pending_call_t *pending_calls;
    watch_call_t *watches;
    keepalive_call_t *keepalives;
    observe_call_t *observes;
    int active;
    int in_callback;  /* Guard against freeing client during event processing */
    char *auth_token;
    size_t auth_token_len;
    int timeout_seconds;

    /* Multiple endpoints for failover */
    char **endpoints;
    int endpoint_count;

etcd_common.h  view on Meta::CPAN


    /* Health monitoring */
    ev_timer health_timer;
    int is_healthy;
    SV *health_callback;
    pid_t owner_pid;  /* PID of process that created this client (fork safety) */
} ev_etcd_t;

typedef ev_etcd_t *EV__Etcd;
typedef watch_call_t *EV__Etcd__Watch;
typedef keepalive_call_t *EV__Etcd__Keepalive;
typedef observe_call_t *EV__Etcd__Observe;

/* Initialize a call's base structure */
static inline void init_call_base(call_base_t *base, call_type_t type) {
    base->type = type;
}

/* Helper macro to validate callback is a code reference */
#define VALIDATE_CALLBACK(cb) \
    do { \

etcd_common.h  view on Meta::CPAN

        grpc_metadata_array_destroy(&(pc)->trailing_metadata); \
        if ((pc)->recv_buffer) grpc_byte_buffer_destroy((pc)->recv_buffer); \
        grpc_slice_unref((pc)->status_details); \
        if ((pc)->call) grpc_call_unref((pc)->call); \
        SvREFCNT_dec((pc)->callback); \
        Safefree((pc)); \
    } while (0)

/*
 * Helper macros for streaming call reconnection to reduce code triplication
 * across watch, keepalive, and observe reconnect functions.
 */

/*
 * Cleanup old streaming call state before reconnection.
 * Works with any streaming call struct that has these fields.
 *
 * Usage:
 *   STREAMING_CALL_CLEANUP(wc);  // For watch_call_t
 *   STREAMING_CALL_CLEANUP(kc);  // For keepalive_call_t
 *   STREAMING_CALL_CLEANUP(oc);  // For observe_call_t
 */
#define STREAMING_CALL_CLEANUP(call_ptr) \
    do { \
        if ((call_ptr)->call) { \
            grpc_call_unref((call_ptr)->call); \
            (call_ptr)->call = NULL; \
        } \
        grpc_metadata_array_destroy(&(call_ptr)->initial_metadata); \
        grpc_metadata_array_destroy(&(call_ptr)->trailing_metadata); \

etcd_lease.c  view on Meta::CPAN

        hv_store(lease_hv, "id", 2, newSViv(resp->leases[i]->id), 0);
        av_push(leases_av, newRV_noinc((SV *)lease_hv));
    }
    hv_store(result, "leases", 6, newRV_noinc((SV *)leases_av), 0);

    etcdserverpb__lease_leases_response__free_unpacked(resp, NULL);

    CALL_SUCCESS_CALLBACK(pc->callback, result);
}

/* Re-arm keepalive to receive next message */
void keepalive_rearm_recv(pTHX_ keepalive_call_t *kc) {
    if (!kc->active) return;

    if (kc->recv_buffer) {
        grpc_byte_buffer_destroy(kc->recv_buffer);
        kc->recv_buffer = NULL;
    }

    kc->base.type = CALL_TYPE_LEASE_KEEPALIVE_RECV;

    grpc_op op;
    memset(&op, 0, sizeof(op));
    op.op = GRPC_OP_RECV_MESSAGE;
    op.data.recv_message.recv_message = &kc->recv_buffer;

    grpc_call_error err = grpc_call_start_batch(kc->call, &op, 1, &kc->base, NULL);
    if (err != GRPC_CALL_OK) {
        kc->active = 0;
        CALL_SIMPLE_ERROR_CALLBACK(kc->callback, "Keepalive rearm failed");
        cleanup_keepalive(aTHX_ kc);
    }
}

/* Cleanup keepalive and remove from client list */
void cleanup_keepalive(pTHX_ keepalive_call_t *kc) {
    ev_etcd_t *client = kc->client;

    keepalive_call_t **kp = &client->keepalives;
    while (*kp) {
        if (*kp == kc) {
            *kp = kc->next;
            break;
        }
        kp = &(*kp)->next;
    }

    if (ev_is_active(&kc->reconnect_timer))
        ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);

etcd_lease.c  view on Meta::CPAN

    }
    grpc_slice_unref(kc->status_details);
    if (kc->call) {
        grpc_call_unref(kc->call);
    }
    SvREFCNT_dec(kc->callback);
    Safefree(kc);
}

/* Process LeaseKeepAliveResponse */
void process_keepalive_response(pTHX_ keepalive_call_t *kc) {
    if (!kc->recv_buffer) {
        kc->active = 0;
        CALL_SIMPLE_ERROR_CALLBACK(kc->callback, "No keepalive response received");
        return;
    }

    grpc_byte_buffer_reader reader;
    if (!grpc_byte_buffer_reader_init(&reader, kc->recv_buffer)) {
        kc->active = 0;
        CALL_SIMPLE_ERROR_CALLBACK(kc->callback, "Failed to read keepalive response buffer");
        return;
    }

    grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
    grpc_byte_buffer_reader_destroy(&reader);

    Etcdserverpb__LeaseKeepAliveResponse *resp = etcdserverpb__lease_keep_alive_response__unpack(
        NULL, GRPC_SLICE_LENGTH(slice), GRPC_SLICE_START_PTR(slice));
    grpc_slice_unref(slice);

    if (!resp) {
        kc->active = 0;
        CALL_SIMPLE_ERROR_CALLBACK(kc->callback, "Failed to parse keepalive response");
        return;
    }

    kc->reconnect_attempt = 0;

    if (resp->ttl == 0) {
        kc->active = 0;
        CALL_STATUS_ERROR_CALLBACK(kc->callback, GRPC_STATUS_NOT_FOUND, "Lease expired", "keepalive");
        etcdserverpb__lease_keep_alive_response__free_unpacked(resp, NULL);
        return;
    }

    HV *result = newHV();
    add_header_to_hv(aTHX_ result, resp->header);
    hv_store(result, "id", 2, newSViv(resp->id), 0);
    hv_store(result, "ttl", 3, newSViv(resp->ttl), 0);
    etcdserverpb__lease_keep_alive_response__free_unpacked(resp, NULL);

    CALL_SUCCESS_CALLBACK(kc->callback, result);
}

/* Perform keepalive reconnection (called from timer callback) */
static void keepalive_reconnect_cb(struct ev_loop *loop, ev_timer *w, int revents) {
    dTHX;
    (void)loop;
    (void)revents;

    keepalive_call_t *kc = (keepalive_call_t *)((char *)w - offsetof(keepalive_call_t, reconnect_timer));
    ev_etcd_t *client = kc->client;

    if (!client->active) {
        cleanup_keepalive(aTHX_ kc);
        return;
    }

    /* Cleanup and reinitialize streaming state */
    STREAMING_CALL_CLEANUP(kc);
    STREAMING_CALL_REINIT(kc);

    /* Build keepalive request */
    Etcdserverpb__LeaseKeepAliveRequest keep_req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
    keep_req.id = kc->lease_id;

    grpc_slice req_slice;
    SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
        etcdserverpb__lease_keep_alive_request__get_packed_size,
        etcdserverpb__lease_keep_alive_request__pack, &keep_req);
    grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
    grpc_slice_unref(req_slice);

    /* Create call and setup ops */
    gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
    kc->call = grpc_channel_create_call(
        client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
        client->cq, METHOD_LEASE_KEEPALIVE, NULL, deadline, NULL);

    if (!kc->call) {
        grpc_byte_buffer_destroy(send_buffer);
        kc->active = 0;
        client->in_callback = 1;
        CALL_SIMPLE_ERROR_CALLBACK(kc->callback, "Keepalive reconnect failed");
        client->in_callback = 0;
        if (!client->active) {
            finish_client_destroy(aTHX_ client);
            return;
        }
        cleanup_keepalive(aTHX_ kc);
        return;
    }

    grpc_op ops[4] = {0};
    grpc_metadata auth_md;
    STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, kc);

    init_call_base(&kc->base, CALL_TYPE_LEASE_KEEPALIVE);
    grpc_call_error err = grpc_call_start_batch(kc->call, ops, 4, &kc->base, NULL);
    cleanup_auth_metadata(client, &auth_md);
    grpc_byte_buffer_destroy(send_buffer);

    if (err != GRPC_CALL_OK) {
        STREAMING_CALL_BATCH_ERROR(kc);
        client->in_callback = 1;
        CALL_SIMPLE_ERROR_CALLBACK(kc->callback, "Keepalive reconnect batch failed");
        client->in_callback = 0;
        if (!client->active) {
            finish_client_destroy(aTHX_ client);
            return;
        }
        cleanup_keepalive(aTHX_ kc);
    }
}

int try_reconnect_keepalive(pTHX_ keepalive_call_t *kc) {
    ev_etcd_t *client = kc->client;

    if (!kc->auto_reconnect || !client->active || kc->lease_id <= 0) {
        return 0;
    }

    if (kc->reconnect_attempt >= client->max_retries) {
        return 0;
    }

    kc->reconnect_attempt++;

    ev_tstamp delay = RECONNECT_BACKOFF_SECONDS(kc->reconnect_attempt);
    ev_timer_init(&kc->reconnect_timer, keepalive_reconnect_cb, delay, 0.0);
    ev_timer_start(EV_DEFAULT, &kc->reconnect_timer);

    return 1;
}

etcd_lease.h  view on Meta::CPAN

#define ETCD_LEASE_H

#include "etcd_common.h"

/* Lease response handlers */
void process_lease_grant_response(pTHX_ pending_call_t *pc);
void process_lease_revoke_response(pTHX_ pending_call_t *pc);
void process_lease_time_to_live_response(pTHX_ pending_call_t *pc);
void process_lease_leases_response(pTHX_ pending_call_t *pc);

/* Keepalive handlers */
void process_keepalive_response(pTHX_ keepalive_call_t *kc);
void keepalive_rearm_recv(pTHX_ keepalive_call_t *kc);
void cleanup_keepalive(pTHX_ keepalive_call_t *kc);
int try_reconnect_keepalive(pTHX_ keepalive_call_t *kc);

#endif /* ETCD_LEASE_H */

lib/EV/Etcd.pm  view on Meta::CPAN


ArrayRef of etcd endpoints (host:port).

=item timeout

RPC timeout in seconds. Default is 30 seconds. Minimum value is 1 second.

=item max_retries

Maximum number of reconnection attempts for streaming operations (watch,
lease_keepalive, election_observe) after a connection failure. Default is 3.
Set to 0 to disable automatic reconnection.

=item health_interval

Interval in seconds for health monitoring. Default is 0 (disabled).
When enabled, the client periodically checks the gRPC channel connectivity
state and calls the on_health_change callback when the connection state changes.

=item on_health_change

lib/EV/Etcd.pm  view on Meta::CPAN

    {
        code      => 14,              # gRPC status code (integer)
        status    => "UNAVAILABLE",   # gRPC status name (string)
        message   => "Connection refused",  # Error message
        source    => "get",           # Which operation failed
        retryable => 1,               # Whether the error is retryable
    }

The C<retryable> field indicates whether the error is transient (status codes:
UNAVAILABLE, RESOURCE_EXHAUSTED, ABORTED, DEADLINE_EXCEEDED).
Streaming operations (watch, keepalive, observe) automatically reconnect
on transient failures according to the C<max_retries> configuration.
Unary RPCs (get, put, delete, etc.) do not retry automatically; use the
C<retryable> field to implement application-level retry logic.

=head2 put

    $client->put($key, $value, $callback);
    $client->put($key, $value, \%opts, $callback);

Put a key-value pair into etcd.

lib/EV/Etcd.pm  view on Meta::CPAN

The actual TTL granted by the server.

=back

=head2 lease_revoke

    $client->lease_revoke($lease_id, $callback);

Revoke a lease. All keys attached to the lease will be deleted.

=head2 lease_keepalive

    my $keepalive = $client->lease_keepalive($lease_id, $callback);
    my $keepalive = $client->lease_keepalive($lease_id, \%opts, $callback);

Keep a lease alive. Creates a bidirectional streaming connection that keeps
the lease refreshed. Returns an C<EV::Etcd::Keepalive> object that can be
used to cancel the keepalive stream:

    $keepalive->cancel(sub { my ($resp, $err) = @_; });

Options:

    auto_reconnect => 1   # auto-reconnect on failure (default: 1)

=head2 EV::Etcd::Keepalive Methods

=head3 cancel

    $keepalive->cancel($callback);

Cancel the keepalive stream. The callback receives C<($response, $error)>
when cancellation is complete. The response is an empty hash reference on
success.

=head2 lease_time_to_live

    $client->lease_time_to_live($lease_id, $callback);
    $client->lease_time_to_live($lease_id, \%opts, $callback);

Get the remaining TTL of a lease.

lib/EV/Etcd.pm  view on Meta::CPAN


The name (identifier) of the lock to acquire. This is a byte string that
identifies the resource being locked. Multiple clients attempting to lock
the same name will block until the lock is available.

=item lease_id

The ID of a lease to attach to the lock. The lock will be held for the
duration of the lease. If the lease expires or is revoked, the lock is
automatically released. You must first create a lease with C<lease_grant>
and optionally keep it alive with C<lease_keepalive>.

=item callback

Called with C<($response, $error)> when the lock is acquired (or fails).

=back

The response contains:

=over 4

lib/EV/Etcd.pm  view on Meta::CPAN

Elections use leases to ensure that leadership is automatically released
when a leader fails.

=head2 election_campaign

    $client->election_campaign($name, $lease_id, $value, $callback);

Campaign for leadership of an election.

This call blocks until the caller is elected as leader. Once elected, the
caller should periodically keep the lease alive to maintain leadership.

Arguments:

=over 4

=item name

The name of the election to campaign in.

=item lease_id

rpc.pb-c.c  view on Meta::CPAN

}
void   etcdserverpb__lease_revoke_response__free_unpacked
                     (Etcdserverpb__LeaseRevokeResponse *message,
                      ProtobufCAllocator *allocator)
{
  if(!message)
    return;
  assert(message->base.descriptor == &etcdserverpb__lease_revoke_response__descriptor);
  protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
void   etcdserverpb__lease_keep_alive_request__init
                     (Etcdserverpb__LeaseKeepAliveRequest         *message)
{
  static const Etcdserverpb__LeaseKeepAliveRequest init_value = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
  *message = init_value;
}
size_t etcdserverpb__lease_keep_alive_request__get_packed_size
                     (const Etcdserverpb__LeaseKeepAliveRequest *message)
{
  assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_request__descriptor);
  return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
}
size_t etcdserverpb__lease_keep_alive_request__pack
                     (const Etcdserverpb__LeaseKeepAliveRequest *message,
                      uint8_t       *out)
{
  assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_request__descriptor);
  return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
}
size_t etcdserverpb__lease_keep_alive_request__pack_to_buffer
                     (const Etcdserverpb__LeaseKeepAliveRequest *message,
                      ProtobufCBuffer *buffer)
{
  assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_request__descriptor);
  return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
}
Etcdserverpb__LeaseKeepAliveRequest *
       etcdserverpb__lease_keep_alive_request__unpack
                     (ProtobufCAllocator  *allocator,
                      size_t               len,
                      const uint8_t       *data)
{
  return (Etcdserverpb__LeaseKeepAliveRequest *)
     protobuf_c_message_unpack (&etcdserverpb__lease_keep_alive_request__descriptor,
                                allocator, len, data);
}
void   etcdserverpb__lease_keep_alive_request__free_unpacked
                     (Etcdserverpb__LeaseKeepAliveRequest *message,
                      ProtobufCAllocator *allocator)
{
  if(!message)
    return;
  assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_request__descriptor);
  protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
void   etcdserverpb__lease_keep_alive_response__init
                     (Etcdserverpb__LeaseKeepAliveResponse         *message)
{
  static const Etcdserverpb__LeaseKeepAliveResponse init_value = ETCDSERVERPB__LEASE_KEEP_ALIVE_RESPONSE__INIT;
  *message = init_value;
}
size_t etcdserverpb__lease_keep_alive_response__get_packed_size
                     (const Etcdserverpb__LeaseKeepAliveResponse *message)
{
  assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_response__descriptor);
  return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
}
size_t etcdserverpb__lease_keep_alive_response__pack
                     (const Etcdserverpb__LeaseKeepAliveResponse *message,
                      uint8_t       *out)
{
  assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_response__descriptor);
  return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
}
size_t etcdserverpb__lease_keep_alive_response__pack_to_buffer
                     (const Etcdserverpb__LeaseKeepAliveResponse *message,
                      ProtobufCBuffer *buffer)
{
  assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_response__descriptor);
  return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
}
Etcdserverpb__LeaseKeepAliveResponse *
       etcdserverpb__lease_keep_alive_response__unpack
                     (ProtobufCAllocator  *allocator,
                      size_t               len,
                      const uint8_t       *data)
{
  return (Etcdserverpb__LeaseKeepAliveResponse *)
     protobuf_c_message_unpack (&etcdserverpb__lease_keep_alive_response__descriptor,
                                allocator, len, data);
}
void   etcdserverpb__lease_keep_alive_response__free_unpacked
                     (Etcdserverpb__LeaseKeepAliveResponse *message,
                      ProtobufCAllocator *allocator)
{
  if(!message)
    return;
  assert(message->base.descriptor == &etcdserverpb__lease_keep_alive_response__descriptor);
  protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
void   etcdserverpb__lease_time_to_live_request__init
                     (Etcdserverpb__LeaseTimeToLiveRequest         *message)
{
  static const Etcdserverpb__LeaseTimeToLiveRequest init_value = ETCDSERVERPB__LEASE_TIME_TO_LIVE_REQUEST__INIT;
  *message = init_value;
}
size_t etcdserverpb__lease_time_to_live_request__get_packed_size
                     (const Etcdserverpb__LeaseTimeToLiveRequest *message)

rpc.pb-c.c  view on Meta::CPAN

  "Etcdserverpb__LeaseRevokeResponse",
  "etcdserverpb",
  sizeof(Etcdserverpb__LeaseRevokeResponse),
  1,
  etcdserverpb__lease_revoke_response__field_descriptors,
  etcdserverpb__lease_revoke_response__field_indices_by_name,
  1,  etcdserverpb__lease_revoke_response__number_ranges,
  (ProtobufCMessageInit) etcdserverpb__lease_revoke_response__init,
  NULL,NULL,NULL    /* reserved[123] */
};
static const ProtobufCFieldDescriptor etcdserverpb__lease_keep_alive_request__field_descriptors[1] =
{
  {
    "ID",
    1,
    PROTOBUF_C_LABEL_NONE,
    PROTOBUF_C_TYPE_INT64,
    0,   /* quantifier_offset */
    offsetof(Etcdserverpb__LeaseKeepAliveRequest, id),
    NULL,
    NULL,
    0,             /* flags */
    0,NULL,NULL    /* reserved1,reserved2, etc */
  },
};
static const unsigned etcdserverpb__lease_keep_alive_request__field_indices_by_name[] = {
  0,   /* field[0] = ID */
};
static const ProtobufCIntRange etcdserverpb__lease_keep_alive_request__number_ranges[1 + 1] =
{
  { 1, 0 },
  { 0, 1 }
};
const ProtobufCMessageDescriptor etcdserverpb__lease_keep_alive_request__descriptor =
{
  PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
  "etcdserverpb.LeaseKeepAliveRequest",
  "LeaseKeepAliveRequest",
  "Etcdserverpb__LeaseKeepAliveRequest",
  "etcdserverpb",
  sizeof(Etcdserverpb__LeaseKeepAliveRequest),
  1,
  etcdserverpb__lease_keep_alive_request__field_descriptors,
  etcdserverpb__lease_keep_alive_request__field_indices_by_name,
  1,  etcdserverpb__lease_keep_alive_request__number_ranges,
  (ProtobufCMessageInit) etcdserverpb__lease_keep_alive_request__init,
  NULL,NULL,NULL    /* reserved[123] */
};
static const ProtobufCFieldDescriptor etcdserverpb__lease_keep_alive_response__field_descriptors[3] =
{
  {
    "header",
    1,
    PROTOBUF_C_LABEL_NONE,
    PROTOBUF_C_TYPE_MESSAGE,
    0,   /* quantifier_offset */
    offsetof(Etcdserverpb__LeaseKeepAliveResponse, header),
    &etcdserverpb__response_header__descriptor,
    NULL,

rpc.pb-c.c  view on Meta::CPAN

    PROTOBUF_C_LABEL_NONE,
    PROTOBUF_C_TYPE_INT64,
    0,   /* quantifier_offset */
    offsetof(Etcdserverpb__LeaseKeepAliveResponse, ttl),
    NULL,
    NULL,
    0,             /* flags */
    0,NULL,NULL    /* reserved1,reserved2, etc */
  },
};
static const unsigned etcdserverpb__lease_keep_alive_response__field_indices_by_name[] = {
  1,   /* field[1] = ID */
  2,   /* field[2] = TTL */
  0,   /* field[0] = header */
};
static const ProtobufCIntRange etcdserverpb__lease_keep_alive_response__number_ranges[1 + 1] =
{
  { 1, 0 },
  { 0, 3 }
};
const ProtobufCMessageDescriptor etcdserverpb__lease_keep_alive_response__descriptor =
{
  PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
  "etcdserverpb.LeaseKeepAliveResponse",
  "LeaseKeepAliveResponse",
  "Etcdserverpb__LeaseKeepAliveResponse",
  "etcdserverpb",
  sizeof(Etcdserverpb__LeaseKeepAliveResponse),
  3,
  etcdserverpb__lease_keep_alive_response__field_descriptors,
  etcdserverpb__lease_keep_alive_response__field_indices_by_name,
  1,  etcdserverpb__lease_keep_alive_response__number_ranges,
  (ProtobufCMessageInit) etcdserverpb__lease_keep_alive_response__init,
  NULL,NULL,NULL    /* reserved[123] */
};
static const ProtobufCFieldDescriptor etcdserverpb__lease_time_to_live_request__field_descriptors[2] =
{
  {
    "ID",
    1,
    PROTOBUF_C_LABEL_NONE,
    PROTOBUF_C_TYPE_INT64,
    0,   /* quantifier_offset */

rpc.pb-c.h  view on Meta::CPAN

 { PROTOBUF_C_MESSAGE_INIT (&etcdserverpb__lease_revoke_response__descriptor) \
    , NULL }


struct  Etcdserverpb__LeaseKeepAliveRequest
{
  ProtobufCMessage base;
  int64_t id;
};
#define ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT \
 { PROTOBUF_C_MESSAGE_INIT (&etcdserverpb__lease_keep_alive_request__descriptor) \
    , 0 }


struct  Etcdserverpb__LeaseKeepAliveResponse
{
  ProtobufCMessage base;
  Etcdserverpb__ResponseHeader *header;
  int64_t id;
  int64_t ttl;
};
#define ETCDSERVERPB__LEASE_KEEP_ALIVE_RESPONSE__INIT \
 { PROTOBUF_C_MESSAGE_INIT (&etcdserverpb__lease_keep_alive_response__descriptor) \
    , NULL, 0, 0 }


struct  Etcdserverpb__LeaseTimeToLiveRequest
{
  ProtobufCMessage base;
  int64_t id;
  protobuf_c_boolean keys;
};
#define ETCDSERVERPB__LEASE_TIME_TO_LIVE_REQUEST__INIT \

rpc.pb-c.h  view on Meta::CPAN

                      ProtobufCBuffer     *buffer);
Etcdserverpb__LeaseRevokeResponse *
       etcdserverpb__lease_revoke_response__unpack
                     (ProtobufCAllocator  *allocator,
                      size_t               len,
                      const uint8_t       *data);
void   etcdserverpb__lease_revoke_response__free_unpacked
                     (Etcdserverpb__LeaseRevokeResponse *message,
                      ProtobufCAllocator *allocator);
/* Etcdserverpb__LeaseKeepAliveRequest methods */
void   etcdserverpb__lease_keep_alive_request__init
                     (Etcdserverpb__LeaseKeepAliveRequest         *message);
size_t etcdserverpb__lease_keep_alive_request__get_packed_size
                     (const Etcdserverpb__LeaseKeepAliveRequest   *message);
size_t etcdserverpb__lease_keep_alive_request__pack
                     (const Etcdserverpb__LeaseKeepAliveRequest   *message,
                      uint8_t             *out);
size_t etcdserverpb__lease_keep_alive_request__pack_to_buffer
                     (const Etcdserverpb__LeaseKeepAliveRequest   *message,
                      ProtobufCBuffer     *buffer);
Etcdserverpb__LeaseKeepAliveRequest *
       etcdserverpb__lease_keep_alive_request__unpack
                     (ProtobufCAllocator  *allocator,
                      size_t               len,
                      const uint8_t       *data);
void   etcdserverpb__lease_keep_alive_request__free_unpacked
                     (Etcdserverpb__LeaseKeepAliveRequest *message,
                      ProtobufCAllocator *allocator);
/* Etcdserverpb__LeaseKeepAliveResponse methods */
void   etcdserverpb__lease_keep_alive_response__init
                     (Etcdserverpb__LeaseKeepAliveResponse         *message);
size_t etcdserverpb__lease_keep_alive_response__get_packed_size
                     (const Etcdserverpb__LeaseKeepAliveResponse   *message);
size_t etcdserverpb__lease_keep_alive_response__pack
                     (const Etcdserverpb__LeaseKeepAliveResponse   *message,
                      uint8_t             *out);
size_t etcdserverpb__lease_keep_alive_response__pack_to_buffer
                     (const Etcdserverpb__LeaseKeepAliveResponse   *message,
                      ProtobufCBuffer     *buffer);
Etcdserverpb__LeaseKeepAliveResponse *
       etcdserverpb__lease_keep_alive_response__unpack
                     (ProtobufCAllocator  *allocator,
                      size_t               len,
                      const uint8_t       *data);
void   etcdserverpb__lease_keep_alive_response__free_unpacked
                     (Etcdserverpb__LeaseKeepAliveResponse *message,
                      ProtobufCAllocator *allocator);
/* Etcdserverpb__LeaseTimeToLiveRequest methods */
void   etcdserverpb__lease_time_to_live_request__init
                     (Etcdserverpb__LeaseTimeToLiveRequest         *message);
size_t etcdserverpb__lease_time_to_live_request__get_packed_size
                     (const Etcdserverpb__LeaseTimeToLiveRequest   *message);
size_t etcdserverpb__lease_time_to_live_request__pack
                     (const Etcdserverpb__LeaseTimeToLiveRequest   *message,
                      uint8_t             *out);

rpc.pb-c.h  view on Meta::CPAN

extern const ProtobufCMessageDescriptor etcdserverpb__delete_range_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__watch_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__watch_create_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__watch_cancel_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__watch_progress_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__watch_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_grant_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_grant_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_revoke_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_revoke_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_keep_alive_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_keep_alive_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_time_to_live_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_time_to_live_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_leases_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_status__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__lease_leases_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__compaction_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__compaction_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__status_request__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__status_response__descriptor;
extern const ProtobufCMessageDescriptor etcdserverpb__alarm_member__descriptor;

t/lease.t  view on Meta::CPAN

                $found = 1;
                last;
            }
        }
        diag("Found " . scalar(@{$resp->{leases} || []}) . " lease(s), our lease " . ($found ? "found" : "not found"));
        EV::break;
    });
    my $t5 = EV::timer(5, 0, sub { fail('timeout'); EV::break });
    EV::run;

    # Test 14-15: lease_keepalive (send one keepalive)
    my $keepalive_received = 0;
    $client->lease_keepalive($lease_id, sub {
        my ($resp, $err) = @_;
        if (!$keepalive_received) {
            $keepalive_received = 1;
            ok(!$err, 'lease_keepalive succeeded');
            ok($resp->{ttl} > 0, 'keepalive response has positive ttl');
            diag("Keepalive response: id=$resp->{id}, ttl=$resp->{ttl}");
            EV::break;
        }
    });
    my $t6 = EV::timer(5, 0, sub {
        fail('keepalive timeout') unless $keepalive_received;
        EV::break;
    });
    EV::run;

    # Test 16-17: lease_revoke
    $client->lease_revoke($lease_id, sub {
        my ($resp, $err) = @_;
        ok(!$err, 'lease_revoke succeeded');
        ok($resp->{header}, 'revoke response has header');
        diag("Lease revoked");

t/streaming.t  view on Meta::CPAN

plan skip_all => 'etcd not available on 127.0.0.1:2379' unless $etcd_available;

plan tests => 11;

my $client = EV::Etcd->new(
    endpoints => ['127.0.0.1:2379'],
);

my $test_prefix = "/test-streaming-$$-" . time();

# === lease_keepalive streaming tests ===

# Test 1-4: lease_keepalive receives response
my $lease_id;
my $keepalive_count = 0;

$client->lease_grant(10, sub {
    my ($resp, $err) = @_;
    ok(!$err, 'lease_grant succeeded');
    $lease_id = $resp->{id};
    diag("Granted lease: id=$lease_id, ttl=$resp->{ttl}");
    EV::break;
});
my $t1 = EV::timer(5, 0, sub { fail('lease_grant timeout'); EV::break });
EV::run;
undef $t1;  # Cancel timer

SKIP: {
    skip "no lease id", 4 unless $lease_id;

    my $keepalive_handle = $client->lease_keepalive($lease_id, sub {
        my ($resp, $err) = @_;
        if ($err) {
            diag("Keepalive error: " . (ref($err) ? $err->{message} : $err));
            return;
        }
        $keepalive_count++;
        diag("Keepalive response #$keepalive_count: ttl=$resp->{ttl}");
        if ($keepalive_count >= 1) {
            EV::break;
        }
    });

    ok(defined $keepalive_handle, 'lease_keepalive returns handle');
    isa_ok($keepalive_handle, 'EV::Etcd::Keepalive', 'keepalive handle');

    # Wait for at least one keepalive response
    my $keepalive_timer = EV::timer 5, 0, sub {
        diag("Keepalive timer expired, received $keepalive_count responses");
        EV::break;
    };
    EV::run;

    ok($keepalive_count >= 1, "received at least 1 keepalive response (got $keepalive_count)");

    # Cleanup lease (this will end the keepalive)
    $client->lease_revoke($lease_id, sub {
        my ($resp, $err) = @_;
        diag($err ? "Revoke failed" : "Lease revoked");
    });
    pass('cleanup initiated');
}

# === Watch streaming tests ===

# Test 5-9: watch receives multiple events

typemap  view on Meta::CPAN

TYPEMAP
int64_t	T_IV
EV::Etcd	T_PTROBJ
EV::Etcd::Watch	T_PTROBJ
EV::Etcd::Keepalive	T_PTROBJ
EV::Etcd::Observe	T_PTROBJ

INPUT
T_PTROBJ
    if (SvROK($arg) && sv_derived_from($arg, \"${ntype}\")) {
        IV tmp = SvIV((SV*)SvRV($arg));
        $var = INT2PTR($type, tmp);
    } else {
        croak(\"$var is not of type ${ntype}\");
    }



( run in 1.137 second using v1.01-cache-2.11-cpan-39bf76dae61 )