EV-Etcd
view release on metacpan or search on metacpan
etcd_common.h view on Meta::CPAN
CALL_TYPE_MEMBER_ADD,
CALL_TYPE_MEMBER_REMOVE,
CALL_TYPE_MEMBER_UPDATE,
CALL_TYPE_MEMBER_LIST,
CALL_TYPE_MEMBER_PROMOTE,
CALL_TYPE_ALARM,
CALL_TYPE_DEFRAGMENT,
CALL_TYPE_HASH_KV,
CALL_TYPE_MOVE_LEADER,
CALL_TYPE_AUTH_STATUS
} call_type_t;
/* Forward declaration */
struct ev_etcd_struct;
/*
* Queued event structure for passing gRPC completions from
* the gRPC thread to the main EV thread.
*/
typedef struct queued_event {
void *tag; /* The tag from grpc_event */
int success; /* The success flag from grpc_event */
struct queued_event *next;
} queued_event_t;
/*
* Base structure for all call types - must be first in each call struct.
* Used as the tag for gRPC operations.
*/
typedef struct call_base {
call_type_t type;
} call_base_t;
/* Pending call structure (for unary RPCs) */
typedef struct pending_call {
call_base_t base; /* Must be first */
grpc_call *call;
SV *callback;
grpc_metadata_array initial_metadata;
grpc_metadata_array trailing_metadata;
grpc_byte_buffer *recv_buffer;
grpc_status_code status;
grpc_slice status_details;
struct ev_etcd_struct *client;
struct pending_call *next;
} pending_call_t;
/* Watch recovery parameters */
typedef struct watch_params {
char *key;
size_t key_len;
char *range_end;
size_t range_end_len;
int64_t start_revision;
int prev_kv;
int progress_notify;
int64_t watch_id;
int has_watch_id;
} watch_params_t;
/* Watch structure (for streaming watch) */
typedef struct watch_call {
call_base_t base; /* Must be first */
grpc_call *call;
SV *callback;
grpc_metadata_array initial_metadata;
grpc_metadata_array trailing_metadata;
grpc_byte_buffer *recv_buffer;
grpc_slice status_details;
int64_t watch_id;
int active;
struct ev_etcd_struct *client;
struct watch_call *next;
int auto_reconnect;
int64_t last_revision;
watch_params_t params;
int reconnect_attempt;
ev_timer reconnect_timer; /* Backoff timer for reconnection */
} watch_call_t;
/* Keepalive structure (for streaming lease keepalive) */
typedef struct keepalive_call {
call_base_t base; /* Must be first */
grpc_call *call;
SV *callback;
grpc_metadata_array initial_metadata;
grpc_metadata_array trailing_metadata;
grpc_byte_buffer *recv_buffer;
grpc_slice status_details;
int64_t lease_id;
int active;
struct ev_etcd_struct *client;
struct keepalive_call *next;
int auto_reconnect;
int reconnect_attempt;
ev_timer reconnect_timer; /* Backoff timer for reconnection */
} keepalive_call_t;
/* Election observe parameters for reconnection */
typedef struct observe_params {
char *name;
size_t name_len;
} observe_params_t;
/* Election observe structure (for streaming election observe) */
typedef struct observe_call {
call_base_t base; /* Must be first */
grpc_call *call;
SV *callback;
grpc_metadata_array initial_metadata;
grpc_metadata_array trailing_metadata;
grpc_byte_buffer *recv_buffer;
grpc_slice status_details;
int active;
struct ev_etcd_struct *client;
struct observe_call *next;
int auto_reconnect;
int reconnect_attempt;
ev_timer reconnect_timer; /* Backoff timer for reconnection */
observe_params_t params;
} observe_call_t;
/* Client structure */
typedef struct ev_etcd_struct {
grpc_channel *channel;
grpc_completion_queue *cq;
/* Hybrid threading: gRPC thread + ev_async for main thread notification */
pthread_t cq_thread; /* Thread running gRPC CQ loop */
pthread_mutex_t queue_mutex; /* Protects event_queue */
ev_async cq_async; /* Async watcher to wake main thread */
queued_event_t *event_queue; /* Queue of completed events */
queued_event_t *event_queue_tail; /* Tail for O(1) append */
volatile int thread_running; /* Flag to signal thread shutdown */
pending_call_t *pending_calls;
watch_call_t *watches;
keepalive_call_t *keepalives;
observe_call_t *observes;
int active;
int in_callback; /* Guard against freeing client during event processing */
char *auth_token;
size_t auth_token_len;
int timeout_seconds;
/* Multiple endpoints for failover */
char **endpoints;
int endpoint_count;
int current_endpoint;
/* Retry configuration */
int max_retries;
/* Health monitoring */
ev_timer health_timer;
int is_healthy;
SV *health_callback;
pid_t owner_pid; /* PID of process that created this client (fork safety) */
} ev_etcd_t;
typedef ev_etcd_t *EV__Etcd;
typedef watch_call_t *EV__Etcd__Watch;
typedef keepalive_call_t *EV__Etcd__Keepalive;
typedef observe_call_t *EV__Etcd__Observe;
etcd_common.h view on Meta::CPAN
* Helper macro for success callback with result hashref.
*
* Usage:
* CALL_SUCCESS_CALLBACK(callback, result_hv);
*/
#define CALL_SUCCESS_CALLBACK(callback, result_hv) \
do { \
dSP; \
ENTER; SAVETMPS; PUSHMARK(SP); EXTEND(SP, 2); \
PUSHs(sv_2mortal(newRV_noinc((SV *)result_hv))); \
PUSHs(&PL_sv_undef); \
PUTBACK; CALL_SV_SAFE(callback, G_DISCARD); FREETMPS; LEAVE; \
} while (0)
/*
* Helper macros for unary RPC pending call initialization and cleanup.
* Reduces boilerplate across all unary RPC implementations.
*/
/*
* Initialize a pending_call_t structure for a unary RPC.
*
* Usage:
* pending_call_t *pc;
* INIT_PENDING_CALL(pc, CALL_TYPE_RANGE, callback, client);
*/
#define INIT_PENDING_CALL(pc, call_type, callback_sv, client_ref) \
do { \
Newxz((pc), 1, pending_call_t); \
init_call_base(&(pc)->base, (call_type)); \
(pc)->callback = newSVsv((callback_sv)); \
(pc)->client = (client_ref); \
grpc_metadata_array_init(&(pc)->initial_metadata); \
grpc_metadata_array_init(&(pc)->trailing_metadata); \
(pc)->recv_buffer = NULL; \
(pc)->status_details = grpc_empty_slice(); \
} while (0)
/*
* Cleanup a pending_call_t on error before it's added to the pending list.
* Use this when grpc_call_start_batch fails.
*
* Usage:
* if (err != GRPC_CALL_OK) {
* CLEANUP_PENDING_CALL_ON_ERROR(pc);
* croak("Failed to start gRPC call: %d", err);
* }
*/
#define CLEANUP_PENDING_CALL_ON_ERROR(pc) \
do { \
grpc_metadata_array_destroy(&(pc)->initial_metadata); \
grpc_metadata_array_destroy(&(pc)->trailing_metadata); \
if ((pc)->recv_buffer) grpc_byte_buffer_destroy((pc)->recv_buffer); \
grpc_slice_unref((pc)->status_details); \
if ((pc)->call) grpc_call_unref((pc)->call); \
SvREFCNT_dec((pc)->callback); \
Safefree((pc)); \
} while (0)
/*
* Helper macros for streaming call reconnection to reduce code triplication
* across watch, keepalive, and observe reconnect functions.
*/
/*
* Cleanup old streaming call state before reconnection.
* Works with any streaming call struct that has these fields.
*
* Usage:
* STREAMING_CALL_CLEANUP(wc); // For watch_call_t
* STREAMING_CALL_CLEANUP(kc); // For keepalive_call_t
* STREAMING_CALL_CLEANUP(oc); // For observe_call_t
*/
#define STREAMING_CALL_CLEANUP(call_ptr) \
do { \
if ((call_ptr)->call) { \
grpc_call_unref((call_ptr)->call); \
(call_ptr)->call = NULL; \
} \
grpc_metadata_array_destroy(&(call_ptr)->initial_metadata); \
grpc_metadata_array_destroy(&(call_ptr)->trailing_metadata); \
if ((call_ptr)->recv_buffer) { \
grpc_byte_buffer_destroy((call_ptr)->recv_buffer); \
(call_ptr)->recv_buffer = NULL; \
} \
grpc_slice_unref((call_ptr)->status_details); \
} while (0)
/*
* Reinitialize streaming call state for reconnection.
*
* Usage:
* STREAMING_CALL_REINIT(wc);
*/
#define STREAMING_CALL_REINIT(call_ptr) \
do { \
grpc_metadata_array_init(&(call_ptr)->initial_metadata); \
grpc_metadata_array_init(&(call_ptr)->trailing_metadata); \
(call_ptr)->status_details = grpc_empty_slice(); \
(call_ptr)->active = 1; \
} while (0)
/*
* Setup standard 4-op batch for streaming call reconnection.
* Requires: ops[4], auth_md, send_buffer, call_ptr all in scope.
*
* Usage:
* STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, wc);
*/
#define STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buf, call_ptr) \
do { \
(ops)[0].op = GRPC_OP_SEND_INITIAL_METADATA; \
setup_auth_metadata(client, &(ops)[0], &(auth_md)); \
(ops)[1].op = GRPC_OP_RECV_INITIAL_METADATA; \
(ops)[1].data.recv_initial_metadata.recv_initial_metadata = &(call_ptr)->initial_metadata; \
(ops)[2].op = GRPC_OP_SEND_MESSAGE; \
(ops)[2].data.send_message.send_message = (send_buf); \
(ops)[3].op = GRPC_OP_RECV_MESSAGE; \
(ops)[3].data.recv_message.recv_message = &(call_ptr)->recv_buffer; \
} while (0)
/*
* Handle error after failed batch start for streaming reconnect.
*
* Usage:
* STREAMING_CALL_BATCH_ERROR(wc);
*/
#define STREAMING_CALL_BATCH_ERROR(call_ptr) \
do { \
(call_ptr)->active = 0; \
if ((call_ptr)->call) { \
grpc_call_unref((call_ptr)->call); \
(call_ptr)->call = NULL; \
} \
} while (0)
/* Finish deferred client destruction after in_callback guard.
* Called from cq_async_callback and timer callbacks when DESTROY was
* invoked during a Perl callback (in_callback=1 prevented immediate free). */
void finish_client_destroy(pTHX_ ev_etcd_t *client);
#endif /* ETCD_COMMON_H */
( run in 1.621 second using v1.01-cache-2.11-cpan-39bf76dae61 )