EV-Etcd

 view release on metacpan or  search on metacpan

etcd_common.h  view on Meta::CPAN

    CALL_TYPE_MEMBER_ADD,
    CALL_TYPE_MEMBER_REMOVE,
    CALL_TYPE_MEMBER_UPDATE,
    CALL_TYPE_MEMBER_LIST,
    CALL_TYPE_MEMBER_PROMOTE,
    CALL_TYPE_ALARM,
    CALL_TYPE_DEFRAGMENT,
    CALL_TYPE_HASH_KV,
    CALL_TYPE_MOVE_LEADER,
    CALL_TYPE_AUTH_STATUS
} call_type_t;

/* Forward declaration */
struct ev_etcd_struct;

/*
 * Queued event structure for passing gRPC completions from
 * the gRPC thread to the main EV thread.
 */
typedef struct queued_event {
    void *tag;              /* The tag from grpc_event */
    int success;            /* The success flag from grpc_event */
    struct queued_event *next;
} queued_event_t;

/*
 * Base structure for all call types - must be first in each call struct.
 * Used as the tag for gRPC operations.
 */
typedef struct call_base {
    call_type_t type;
} call_base_t;

/* Pending call structure (for unary RPCs) */
typedef struct pending_call {
    call_base_t base;  /* Must be first */
    grpc_call *call;
    SV *callback;
    grpc_metadata_array initial_metadata;
    grpc_metadata_array trailing_metadata;
    grpc_byte_buffer *recv_buffer;
    grpc_status_code status;
    grpc_slice status_details;
    struct ev_etcd_struct *client;
    struct pending_call *next;
} pending_call_t;

/* Watch recovery parameters */
typedef struct watch_params {
    char *key;
    size_t key_len;
    char *range_end;
    size_t range_end_len;
    int64_t start_revision;
    int prev_kv;
    int progress_notify;
    int64_t watch_id;
    int has_watch_id;
} watch_params_t;

/* Watch structure (for streaming watch) */
typedef struct watch_call {
    call_base_t base;  /* Must be first */
    grpc_call *call;
    SV *callback;
    grpc_metadata_array initial_metadata;
    grpc_metadata_array trailing_metadata;
    grpc_byte_buffer *recv_buffer;
    grpc_slice status_details;
    int64_t watch_id;
    int active;
    struct ev_etcd_struct *client;
    struct watch_call *next;
    int auto_reconnect;
    int64_t last_revision;
    watch_params_t params;
    int reconnect_attempt;
    ev_timer reconnect_timer;  /* Backoff timer for reconnection */
} watch_call_t;

/* Keepalive structure (for streaming lease keepalive) */
typedef struct keepalive_call {
    call_base_t base;  /* Must be first */
    grpc_call *call;
    SV *callback;
    grpc_metadata_array initial_metadata;
    grpc_metadata_array trailing_metadata;
    grpc_byte_buffer *recv_buffer;
    grpc_slice status_details;
    int64_t lease_id;
    int active;
    struct ev_etcd_struct *client;
    struct keepalive_call *next;
    int auto_reconnect;
    int reconnect_attempt;
    ev_timer reconnect_timer;  /* Backoff timer for reconnection */
} keepalive_call_t;

/* Election observe parameters for reconnection */
typedef struct observe_params {
    char *name;
    size_t name_len;
} observe_params_t;

/* Election observe structure (for streaming election observe) */
typedef struct observe_call {
    call_base_t base;  /* Must be first */
    grpc_call *call;
    SV *callback;
    grpc_metadata_array initial_metadata;
    grpc_metadata_array trailing_metadata;
    grpc_byte_buffer *recv_buffer;
    grpc_slice status_details;
    int active;
    struct ev_etcd_struct *client;
    struct observe_call *next;
    int auto_reconnect;
    int reconnect_attempt;
    ev_timer reconnect_timer;  /* Backoff timer for reconnection */
    observe_params_t params;
} observe_call_t;

/* Client structure */
typedef struct ev_etcd_struct {
    grpc_channel *channel;
    grpc_completion_queue *cq;

    /* Hybrid threading: gRPC thread + ev_async for main thread notification */
    pthread_t cq_thread;        /* Thread running gRPC CQ loop */
    pthread_mutex_t queue_mutex; /* Protects event_queue */
    ev_async cq_async;          /* Async watcher to wake main thread */
    queued_event_t *event_queue; /* Queue of completed events */
    queued_event_t *event_queue_tail; /* Tail for O(1) append */
    volatile int thread_running; /* Flag to signal thread shutdown */

    pending_call_t *pending_calls;
    watch_call_t *watches;
    keepalive_call_t *keepalives;
    observe_call_t *observes;
    int active;
    int in_callback;  /* Guard against freeing client during event processing */
    char *auth_token;
    size_t auth_token_len;
    int timeout_seconds;

    /* Multiple endpoints for failover */
    char **endpoints;
    int endpoint_count;
    int current_endpoint;

    /* Retry configuration */
    int max_retries;

    /* Health monitoring */
    ev_timer health_timer;
    int is_healthy;
    SV *health_callback;
    pid_t owner_pid;  /* PID of process that created this client (fork safety) */
} ev_etcd_t;

typedef ev_etcd_t *EV__Etcd;
typedef watch_call_t *EV__Etcd__Watch;
typedef keepalive_call_t *EV__Etcd__Keepalive;
typedef observe_call_t *EV__Etcd__Observe;

etcd_common.h  view on Meta::CPAN

 * Helper macro for success callback with result hashref.
 *
 * Usage:
 *   CALL_SUCCESS_CALLBACK(callback, result_hv);
 */
#define CALL_SUCCESS_CALLBACK(callback, result_hv) \
    do { \
        dSP; \
        ENTER; SAVETMPS; PUSHMARK(SP); EXTEND(SP, 2); \
        PUSHs(sv_2mortal(newRV_noinc((SV *)result_hv))); \
        PUSHs(&PL_sv_undef); \
        PUTBACK; CALL_SV_SAFE(callback, G_DISCARD); FREETMPS; LEAVE; \
    } while (0)

/*
 * Helper macros for unary RPC pending call initialization and cleanup.
 * Reduces boilerplate across all unary RPC implementations.
 */

/*
 * Initialize a pending_call_t structure for a unary RPC.
 *
 * Usage:
 *   pending_call_t *pc;
 *   INIT_PENDING_CALL(pc, CALL_TYPE_RANGE, callback, client);
 */
#define INIT_PENDING_CALL(pc, call_type, callback_sv, client_ref) \
    do { \
        Newxz((pc), 1, pending_call_t); \
        init_call_base(&(pc)->base, (call_type)); \
        (pc)->callback = newSVsv((callback_sv)); \
        (pc)->client = (client_ref); \
        grpc_metadata_array_init(&(pc)->initial_metadata); \
        grpc_metadata_array_init(&(pc)->trailing_metadata); \
        (pc)->recv_buffer = NULL; \
        (pc)->status_details = grpc_empty_slice(); \
    } while (0)

/*
 * Cleanup a pending_call_t on error before it's added to the pending list.
 * Use this when grpc_call_start_batch fails.
 *
 * Usage:
 *   if (err != GRPC_CALL_OK) {
 *       CLEANUP_PENDING_CALL_ON_ERROR(pc);
 *       croak("Failed to start gRPC call: %d", err);
 *   }
 */
#define CLEANUP_PENDING_CALL_ON_ERROR(pc) \
    do { \
        grpc_metadata_array_destroy(&(pc)->initial_metadata); \
        grpc_metadata_array_destroy(&(pc)->trailing_metadata); \
        if ((pc)->recv_buffer) grpc_byte_buffer_destroy((pc)->recv_buffer); \
        grpc_slice_unref((pc)->status_details); \
        if ((pc)->call) grpc_call_unref((pc)->call); \
        SvREFCNT_dec((pc)->callback); \
        Safefree((pc)); \
    } while (0)

/*
 * Helper macros for streaming call reconnection to reduce code triplication
 * across watch, keepalive, and observe reconnect functions.
 */

/*
 * Cleanup old streaming call state before reconnection.
 * Works with any streaming call struct that has these fields.
 *
 * Usage:
 *   STREAMING_CALL_CLEANUP(wc);  // For watch_call_t
 *   STREAMING_CALL_CLEANUP(kc);  // For keepalive_call_t
 *   STREAMING_CALL_CLEANUP(oc);  // For observe_call_t
 */
#define STREAMING_CALL_CLEANUP(call_ptr) \
    do { \
        if ((call_ptr)->call) { \
            grpc_call_unref((call_ptr)->call); \
            (call_ptr)->call = NULL; \
        } \
        grpc_metadata_array_destroy(&(call_ptr)->initial_metadata); \
        grpc_metadata_array_destroy(&(call_ptr)->trailing_metadata); \
        if ((call_ptr)->recv_buffer) { \
            grpc_byte_buffer_destroy((call_ptr)->recv_buffer); \
            (call_ptr)->recv_buffer = NULL; \
        } \
        grpc_slice_unref((call_ptr)->status_details); \
    } while (0)

/*
 * Reinitialize streaming call state for reconnection.
 *
 * Usage:
 *   STREAMING_CALL_REINIT(wc);
 */
#define STREAMING_CALL_REINIT(call_ptr) \
    do { \
        grpc_metadata_array_init(&(call_ptr)->initial_metadata); \
        grpc_metadata_array_init(&(call_ptr)->trailing_metadata); \
        (call_ptr)->status_details = grpc_empty_slice(); \
        (call_ptr)->active = 1; \
    } while (0)

/*
 * Setup standard 4-op batch for streaming call reconnection.
 * Requires: ops[4], auth_md, send_buffer, call_ptr all in scope.
 *
 * Usage:
 *   STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buffer, wc);
 */
#define STREAMING_CALL_SETUP_OPS(client, ops, auth_md, send_buf, call_ptr) \
    do { \
        (ops)[0].op = GRPC_OP_SEND_INITIAL_METADATA; \
        setup_auth_metadata(client, &(ops)[0], &(auth_md)); \
        (ops)[1].op = GRPC_OP_RECV_INITIAL_METADATA; \
        (ops)[1].data.recv_initial_metadata.recv_initial_metadata = &(call_ptr)->initial_metadata; \
        (ops)[2].op = GRPC_OP_SEND_MESSAGE; \
        (ops)[2].data.send_message.send_message = (send_buf); \
        (ops)[3].op = GRPC_OP_RECV_MESSAGE; \
        (ops)[3].data.recv_message.recv_message = &(call_ptr)->recv_buffer; \
    } while (0)

/*
 * Handle error after failed batch start for streaming reconnect.
 *
 * Usage:
 *   STREAMING_CALL_BATCH_ERROR(wc);
 */
#define STREAMING_CALL_BATCH_ERROR(call_ptr) \
    do { \
        (call_ptr)->active = 0; \
        if ((call_ptr)->call) { \
            grpc_call_unref((call_ptr)->call); \
            (call_ptr)->call = NULL; \
        } \
    } while (0)

/* Finish deferred client destruction after in_callback guard.
 * Called from cq_async_callback and timer callbacks when DESTROY was
 * invoked during a Perl callback (in_callback=1 prevented immediate free). */
void finish_client_destroy(pTHX_ ev_etcd_t *client);

#endif /* ETCD_COMMON_H */



( run in 1.621 second using v1.01-cache-2.11-cpan-39bf76dae61 )