EV-Etcd
view release on metacpan or search on metacpan
wc->params.range_end_len = range_end_len;
}
/* prefix - convenience option to watch all keys with given prefix */
if ((svp = hv_fetchs(hv, "prefix", 0)) && SvTRUE(*svp)) {
/* Don't override if range_end was explicitly provided */
if (!range_end_copy && key_len > 0) {
size_t range_len;
range_end_copy = compute_prefix_range_end(key_str, key_len, &range_len);
if (range_end_copy) {
create_req.range_end.data = (uint8_t *)range_end_copy;
create_req.range_end.len = range_len;
/* Store for recovery */
Newx(wc->params.range_end, range_len + 1, char);
Copy(range_end_copy, wc->params.range_end, range_len, char);
wc->params.range_end[range_len] = '\0';
wc->params.range_end_len = range_len;
}
}
}
/* start_revision - watch from specific revision */
if ((svp = hv_fetchs(hv, "start_revision", 0)) && SvOK(*svp)) {
create_req.start_revision = SvI64(*svp);
wc->params.start_revision = create_req.start_revision;
}
/* progress_notify - receive periodic progress notifications */
if ((svp = hv_fetchs(hv, "progress_notify", 0)) && SvTRUE(*svp)) {
create_req.progress_notify = 1;
wc->params.progress_notify = 1;
}
/* prev_kv - include previous key-value in events */
if ((svp = hv_fetchs(hv, "prev_kv", 0)) && SvTRUE(*svp)) {
create_req.prev_kv = 1;
wc->params.prev_kv = 1;
}
/* watch_id - optional explicit watch ID */
if ((svp = hv_fetchs(hv, "watch_id", 0)) && SvOK(*svp)) {
create_req.watch_id = SvI64(*svp);
wc->params.watch_id = create_req.watch_id;
wc->params.has_watch_id = 1;
}
}
Etcdserverpb__WatchRequest req = ETCDSERVERPB__WATCH_REQUEST__INIT;
req.request_union_case = ETCDSERVERPB__WATCH_REQUEST__REQUEST_UNION_CREATE_REQUEST;
req.create_request = &create_req;
/* Serialize request */
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcdserverpb__watch_request__get_packed_size,
etcdserverpb__watch_request__pack, &req);
if (range_end_copy) Safefree(range_end_copy);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Create streaming call */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); /* No timeout for watch */
wc->call = grpc_channel_create_call(
client->channel,
NULL, /* parent call */
GRPC_PROPAGATE_DEFAULTS,
client->cq,
METHOD_WATCH,
NULL, /* host */
deadline,
NULL /* reserved */
);
if (!wc->call) {
grpc_byte_buffer_destroy(send_buffer);
grpc_metadata_array_destroy(&wc->initial_metadata);
grpc_metadata_array_destroy(&wc->trailing_metadata);
grpc_slice_unref(wc->status_details);
SvREFCNT_dec(wc->callback);
if (wc->params.key) Safefree(wc->params.key);
if (wc->params.range_end) Safefree(wc->params.range_end);
Safefree(wc);
croak("Failed to create gRPC call for watch");
}
/* Start the call with initial operations */
grpc_op ops[4] = {0};
grpc_metadata auth_md;
/* Send initial metadata (with auth token if available) */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
setup_auth_metadata(client, &ops[0], &auth_md);
/* Receive initial metadata */
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata.recv_initial_metadata = &wc->initial_metadata;
/* Send the watch create request */
ops[2].op = GRPC_OP_SEND_MESSAGE;
ops[2].data.send_message.send_message = send_buffer;
/* Receive the first response (WatchResponse with created=true) */
ops[3].op = GRPC_OP_RECV_MESSAGE;
ops[3].data.recv_message.recv_message = &wc->recv_buffer;
/* Start batch - status receive is handled separately when stream ends */
grpc_call_error err = grpc_call_start_batch(wc->call, ops, 4, &wc->base, NULL);
cleanup_auth_metadata(client, &auth_md);
grpc_byte_buffer_destroy(send_buffer);
if (err != GRPC_CALL_OK) {
grpc_metadata_array_destroy(&wc->initial_metadata);
grpc_metadata_array_destroy(&wc->trailing_metadata);
grpc_slice_unref(wc->status_details);
grpc_call_unref(wc->call);
SvREFCNT_dec(wc->callback);
/* Free watch params allocated for recovery */
if (wc->params.key) {
Safefree(wc->params.key);
client->pending_calls = pc;
}
EV::Etcd::Keepalive
ev_etcd_lease_keepalive(client, lease_id, ...)
EV::Etcd client
int64_t lease_id
CODE:
{
/* Parse arguments: lease_keepalive(lease_id, [opts,] callback) */
SV *opts = NULL;
SV *callback;
if (items == 3) {
callback = ST(2);
} else if (items == 4) {
opts = ST(2);
callback = ST(3);
} else {
croak("Usage: $client->lease_keepalive($lease_id, [\\%%opts,] $callback)");
}
VALIDATE_CALLBACK(callback);
/* Create keepalive structure (Newxz zeroes everything; only set non-zero fields) */
keepalive_call_t *kc;
Newxz(kc, 1, keepalive_call_t);
init_call_base(&kc->base, CALL_TYPE_LEASE_KEEPALIVE);
kc->callback = newSVsv(callback);
kc->client = client;
kc->active = 1;
kc->auto_reconnect = 1; /* Enable by default */
kc->lease_id = lease_id;
kc->client_owns = 1;
kc->perl_owns = 1;
if (opts && SvROK(opts) && SvTYPE(SvRV(opts)) == SVt_PVHV) {
HV *hv = (HV *)SvRV(opts);
SV **svp;
if ((svp = hv_fetchs(hv, "auto_reconnect", 0)) && !SvTRUE(*svp)) {
kc->auto_reconnect = 0;
}
}
grpc_metadata_array_init(&kc->initial_metadata);
grpc_metadata_array_init(&kc->trailing_metadata);
kc->status_details = grpc_empty_slice();
/* Build LeaseKeepAliveRequest */
Etcdserverpb__LeaseKeepAliveRequest req = ETCDSERVERPB__LEASE_KEEP_ALIVE_REQUEST__INIT;
req.id = lease_id;
/* Serialize request */
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
etcdserverpb__lease_keep_alive_request__get_packed_size,
etcdserverpb__lease_keep_alive_request__pack, &req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Create streaming call */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
kc->call = grpc_channel_create_call(
client->channel,
NULL,
GRPC_PROPAGATE_DEFAULTS,
client->cq,
METHOD_LEASE_KEEPALIVE,
NULL,
deadline,
NULL
);
if (!kc->call) {
grpc_byte_buffer_destroy(send_buffer);
grpc_metadata_array_destroy(&kc->initial_metadata);
grpc_metadata_array_destroy(&kc->trailing_metadata);
grpc_slice_unref(kc->status_details);
SvREFCNT_dec(kc->callback);
Safefree(kc);
croak("Failed to create gRPC call for lease_keepalive");
}
/* Start the call with initial operations */
grpc_op ops[4] = {0};
grpc_metadata auth_md;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
setup_auth_metadata(client, &ops[0], &auth_md);
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata.recv_initial_metadata = &kc->initial_metadata;
ops[2].op = GRPC_OP_SEND_MESSAGE;
ops[2].data.send_message.send_message = send_buffer;
/* Receive the first response */
ops[3].op = GRPC_OP_RECV_MESSAGE;
ops[3].data.recv_message.recv_message = &kc->recv_buffer;
grpc_call_error err = grpc_call_start_batch(kc->call, ops, 4, &kc->base, NULL);
cleanup_auth_metadata(client, &auth_md);
grpc_byte_buffer_destroy(send_buffer);
if (err != GRPC_CALL_OK) {
grpc_metadata_array_destroy(&kc->initial_metadata);
grpc_metadata_array_destroy(&kc->trailing_metadata);
grpc_slice_unref(kc->status_details);
grpc_call_unref(kc->call);
SvREFCNT_dec(kc->callback);
Safefree(kc);
croak("Failed to start gRPC call: %d", err);
}
kc->next = client->keepalives;
client->keepalives = kc;
RETVAL = kc;
}
{
/* Parse arguments: election_observe(name, [opts,] callback) */
SV *opts = NULL;
SV *callback;
if (items == 3) {
callback = ST(2);
} else if (items == 4) {
opts = ST(2);
callback = ST(3);
} else {
croak("Usage: $client->election_observe($name, [\\%%opts,] $callback)");
}
VALIDATE_CALLBACK(callback);
STRLEN name_len;
const char *name_str = SvPV(name, name_len);
VALIDATE_KEY_SIZE(name_len);
int auto_reconnect = 1;
if (opts && SvROK(opts) && SvTYPE(SvRV(opts)) == SVt_PVHV) {
HV *hv = (HV *)SvRV(opts);
SV **sv_ar = hv_fetch(hv, "auto_reconnect", 14, 0);
if (sv_ar && *sv_ar) {
auto_reconnect = SvTRUE(*sv_ar);
}
}
/* Newxz zeroes everything; only set non-zero fields */
observe_call_t *oc;
Newxz(oc, 1, observe_call_t);
init_call_base(&oc->base, CALL_TYPE_ELECTION_OBSERVE);
oc->callback = newSVsv(callback);
oc->client = client;
oc->active = 1;
oc->auto_reconnect = auto_reconnect;
oc->client_owns = 1;
oc->perl_owns = 1;
grpc_metadata_array_init(&oc->initial_metadata);
grpc_metadata_array_init(&oc->trailing_metadata);
oc->status_details = grpc_empty_slice();
/* Save params for reconnection */
Newx(oc->params.name, name_len + 1, char);
Copy(name_str, oc->params.name, name_len, char);
oc->params.name[name_len] = '\0';
oc->params.name_len = name_len;
V3electionpb__LeaderRequest req = V3ELECTIONPB__LEADER_REQUEST__INIT;
req.name.data = (uint8_t *)name_str;
req.name.len = name_len;
grpc_slice req_slice;
SERIALIZE_PROTOBUF_TO_SLICE(req_slice,
v3electionpb__leader_request__get_packed_size,
v3electionpb__leader_request__pack, &req);
grpc_byte_buffer *send_buffer = grpc_raw_byte_buffer_create(&req_slice, 1);
grpc_slice_unref(req_slice);
/* Use infinite deadline for streaming call */
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
oc->call = grpc_channel_create_call(
client->channel, NULL, GRPC_PROPAGATE_DEFAULTS,
client->cq, METHOD_ELECTION_OBSERVE, NULL, deadline, NULL
);
if (!oc->call) {
grpc_byte_buffer_destroy(send_buffer);
grpc_metadata_array_destroy(&oc->initial_metadata);
grpc_metadata_array_destroy(&oc->trailing_metadata);
grpc_slice_unref(oc->status_details);
SvREFCNT_dec(oc->callback);
if (oc->params.name) Safefree(oc->params.name);
Safefree(oc);
croak("Failed to create gRPC call for election_observe");
}
grpc_op ops[4] = {0};
grpc_metadata auth_md;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
setup_auth_metadata(client, &ops[0], &auth_md);
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata.recv_initial_metadata = &oc->initial_metadata;
ops[2].op = GRPC_OP_SEND_MESSAGE;
ops[2].data.send_message.send_message = send_buffer;
ops[3].op = GRPC_OP_RECV_MESSAGE;
ops[3].data.recv_message.recv_message = &oc->recv_buffer;
grpc_call_error err = grpc_call_start_batch(oc->call, ops, 4, &oc->base, NULL);
cleanup_auth_metadata(client, &auth_md);
grpc_byte_buffer_destroy(send_buffer);
if (err != GRPC_CALL_OK) {
grpc_metadata_array_destroy(&oc->initial_metadata);
grpc_metadata_array_destroy(&oc->trailing_metadata);
grpc_slice_unref(oc->status_details);
grpc_call_unref(oc->call);
SvREFCNT_dec(oc->callback);
if (oc->params.name) Safefree(oc->params.name);
Safefree(oc);
croak("Failed to start gRPC call: %d", err);
}
/* Add to observes list */
oc->next = client->observes;
client->observes = oc;
RETVAL = oc;
}
OUTPUT:
RETVAL
void
ev_etcd_member_list(client, ...)
EV::Etcd client
pc = next;
}
watch_call_t *wc = client->watches;
while (wc) {
watch_call_t *next = wc->next;
if (ev_is_active(&wc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
SvREFCNT_dec(wc->callback);
wc->callback = NULL;
wc->client_owns = 0;
if (!wc->perl_owns) {
if (wc->params.key) Safefree(wc->params.key);
if (wc->params.range_end) Safefree(wc->params.range_end);
Safefree(wc);
}
wc = next;
}
keepalive_call_t *kc = client->keepalives;
while (kc) {
keepalive_call_t *next = kc->next;
if (ev_is_active(&kc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
SvREFCNT_dec(kc->callback);
kc->callback = NULL;
kc->client_owns = 0;
if (!kc->perl_owns) Safefree(kc);
kc = next;
}
observe_call_t *oc = client->observes;
while (oc) {
observe_call_t *next = oc->next;
if (ev_is_active(&oc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &oc->reconnect_timer);
SvREFCNT_dec(oc->callback);
oc->callback = NULL;
oc->client_owns = 0;
if (!oc->perl_owns) {
if (oc->params.name) Safefree(oc->params.name);
Safefree(oc);
}
oc = next;
}
if (ev_is_active(&client->health_timer))
ev_timer_stop(EV_DEFAULT, &client->health_timer);
if (ev_is_active(&client->cq_async))
ev_async_stop(EV_DEFAULT, &client->cq_async);
goto free_perl_resources;
}
/* Mark client as inactive first to prevent callbacks from accessing freed memory */
client->active = 0;
/* Stop ev_async watcher */
if (ev_is_active(&client->cq_async)) {
ev_async_stop(EV_DEFAULT, &client->cq_async);
}
/* Signal the gRPC thread to stop and wait for it */
client->thread_running = 0;
/* Mark every streaming call inactive and cancel its gRPC call so pending
* batches complete promptly with success=0. */
watch_call_t *wc = client->watches;
while (wc) {
wc->active = 0;
if (ev_is_active(&wc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &wc->reconnect_timer);
if (wc->call) {
grpc_call_cancel(wc->call, NULL);
}
wc = wc->next;
}
keepalive_call_t *kc = client->keepalives;
while (kc) {
kc->active = 0;
if (ev_is_active(&kc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &kc->reconnect_timer);
if (kc->call) {
grpc_call_cancel(kc->call, NULL);
}
kc = kc->next;
}
observe_call_t *oc = client->observes;
while (oc) {
oc->active = 0;
if (ev_is_active(&oc->reconnect_timer))
ev_timer_stop(EV_DEFAULT, &oc->reconnect_timer);
if (oc->call) {
grpc_call_cancel(oc->call, NULL);
}
oc = oc->next;
}
/* Cancel pending unary calls */
pending_call_t *pc = client->pending_calls;
while (pc) {
if (pc->call) {
grpc_call_cancel(pc->call, NULL);
}
pc = pc->next;
}
/* Shutdown the completion queue - this will cause the thread to exit */
if (client->cq) {
grpc_completion_queue_shutdown(client->cq);
}
/* Wait for the gRPC thread to finish */
pthread_join(client->cq_thread, NULL);
/* Clean up the event queue (any remaining queued events) */
pthread_mutex_lock(&client->queue_mutex);
queued_event_t *qe = client->event_queue;
while (qe) {
queued_event_t *next = qe->next;
free(qe);
qe = next;
}
client->event_queue = NULL;
( run in 0.850 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )