EV-Etcd

 view release on metacpan or  search on metacpan

etcd_common.h  view on Meta::CPAN

    int64_t lease_id;
    int active;
    struct ev_etcd_struct *client;
    struct keepalive_call *next;
    int auto_reconnect;
    int reconnect_attempt;
    ev_timer reconnect_timer;  /* Backoff timer for reconnection */
    int client_owns;           /* See watch_call_t — same dual-ownership */
    int perl_owns;
} keepalive_call_t;

/* Election observe parameters for reconnection */
typedef struct observe_params {
    char *name;
    size_t name_len;
} observe_params_t;

/* Election observe structure (for streaming election observe) */
typedef struct observe_call {
    call_base_t base;  /* Must be first */
    grpc_call *call;
    SV *callback;
    grpc_metadata_array initial_metadata;
    grpc_metadata_array trailing_metadata;
    grpc_byte_buffer *recv_buffer;
    grpc_slice status_details;
    int active;
    struct ev_etcd_struct *client;
    struct observe_call *next;
    int auto_reconnect;
    int reconnect_attempt;
    ev_timer reconnect_timer;  /* Backoff timer for reconnection */
    observe_params_t params;
    int client_owns;           /* See watch_call_t — same dual-ownership */
    int perl_owns;
} observe_call_t;

/* Client structure */
typedef struct ev_etcd_struct {
    grpc_channel *channel;
    grpc_completion_queue *cq;

    /* Hybrid threading: gRPC thread + ev_async for main thread notification */
    pthread_t cq_thread;        /* Thread running gRPC CQ loop */
    pthread_mutex_t queue_mutex; /* Protects event_queue */
    ev_async cq_async;          /* Async watcher to wake main thread */
    queued_event_t *event_queue; /* Queue of completed events */
    queued_event_t *event_queue_tail; /* Tail for O(1) append */
    volatile int thread_running; /* Flag to signal thread shutdown */

    pending_call_t *pending_calls;
    watch_call_t *watches;
    keepalive_call_t *keepalives;
    observe_call_t *observes;
    int active;
    int in_callback;  /* Guard against freeing client during event processing */
    char *auth_token;
    size_t auth_token_len;
    int timeout_seconds;

    /* Multiple endpoints for failover */
    char **endpoints;
    int endpoint_count;
    int current_endpoint;

    /* Retry configuration */
    int max_retries;

    /* Health monitoring */
    ev_timer health_timer;
    int is_healthy;
    SV *health_callback;
    pid_t owner_pid;  /* PID of process that created this client (fork safety) */
} ev_etcd_t;

typedef ev_etcd_t *EV__Etcd;
typedef watch_call_t *EV__Etcd__Watch;
typedef keepalive_call_t *EV__Etcd__Keepalive;
typedef observe_call_t *EV__Etcd__Observe;

/* Initialize a call's base structure */
static inline void init_call_base(call_base_t *base, call_type_t type) {
    base->type = type;
}

/* Helper macro to validate callback is a code reference */
#define VALIDATE_CALLBACK(cb) \
    do { \
        if (!SvROK(cb) || SvTYPE(SvRV(cb)) != SVt_PVCV) { \
            croak("callback must be a code reference"); \
        } \
    } while (0)

/* Store / fetch a 64-bit integer into a Perl HV slot without truncation on
 * 32-bit Perl (where IV/UV are 32-bit). On 32-bit Perl, NV (double) preserves
 * integers up to 2^53, which is enough for any etcd revision/lease/member id
 * the protocol can produce. */
#if IVSIZE >= 8
#  define newSVi64(v) newSViv((IV)(v))
#  define newSVu64(v) newSVuv((UV)(v))
#  define SvI64(sv)   ((int64_t)SvIV(sv))
#  define SvU64(sv)   ((uint64_t)SvUV(sv))
#else
#  define newSVi64(v) newSVnv((NV)(v))
#  define newSVu64(v) newSVnv((NV)(v))
#  define SvI64(sv)   ((int64_t)SvNV(sv))
#  define SvU64(sv)   ((uint64_t)SvNV(sv))
#endif

/* Common utility functions */
const char* grpc_status_name(grpc_status_code code);
int is_retryable_status(grpc_status_code code);
SV* create_error_hv(pTHX_ grpc_status_code code, const char *message, size_t message_len, const char *source);

/* Helper functions */
SV* kv_to_hashref(pTHX_ Mvccpb__KeyValue *kv);
SV* event_to_hashref(pTHX_ Mvccpb__Event *event);
void add_header_to_hv(pTHX_ HV *result, Etcdserverpb__ResponseHeader *header);

/* Auth metadata helpers */
void setup_auth_metadata(ev_etcd_t *client, grpc_op *op, grpc_metadata *auth_md);
void cleanup_auth_metadata(ev_etcd_t *client, grpc_metadata *auth_md);

/*



( run in 1.251 second using v1.01-cache-2.11-cpan-524268b4103 )