EV-ClickHouse

 view release on metacpan or  search on metacpan

xs/queues.c  view on Meta::CPAN

/* --- freelist for cb_queue entries + send_queue entries ---
 *
 * Both use a singly-linked-list-in-the-record-itself stash: the first
 * sizeof(void*) bytes of a released entry are repurposed as the "next
 * free" pointer. Reuse the same struct without rebuilding it from
 * scratch.
 *
 * Also lives here: the singleton sentinels (keepalive_noop_cb,
 * iter_timeout_cb) and the small helpers that munge per-query
 * settings into per-send fields.
 *
 * This file is #include'd from ClickHouse.xs as part of the single
 * translation unit; symbols stay file-local-to-the-TU.
 */

static ev_ch_cb_t *cbt_freelist = NULL;

static ev_ch_cb_t* alloc_cbt(void) {
    ev_ch_cb_t *cbt;
    if (cbt_freelist) {
        cbt = cbt_freelist;
        cbt_freelist = *(ev_ch_cb_t **)cbt;
    } else {
        Newx(cbt, 1, ev_ch_cb_t);
    }
    /* Reset all fields — freelist may have stale values. */
    cbt->cb = NULL;
    cbt->raw = 0;
    cbt->on_data = NULL;
    cbt->on_complete = NULL;
    cbt->query_timeout = 0;
    return cbt;
}

static void release_cbt(ev_ch_cb_t *cbt) {
    *(ev_ch_cb_t **)cbt = cbt_freelist;
    cbt_freelist = cbt;
}

static ev_ch_send_t *send_freelist = NULL;

/* Iterator timeout watcher cb: just break the loop the iterator drove. */
static void iter_timeout_cb(EV_P_ ev_timer *w, int revents) {
    (void)w; (void)revents;
    ev_break(EV_A, EVBREAK_ONE);
}

/* No-op CV reference used as the callback for HTTP keepalive pings;
 * initialised once at BOOT and shared by all connections. */
static SV *keepalive_noop_cb = NULL;

static ev_ch_send_t* alloc_send(void) {
    ev_ch_send_t *s;
    if (send_freelist) {
        s = send_freelist;
        send_freelist = *(ev_ch_send_t **)s;
    } else {
        Newx(s, 1, ev_ch_send_t);
    }
    s->data = NULL;
    s->data_len = 0;
    s->cb = NULL;
    s->insert_data = NULL;
    s->insert_data_len = 0;
    s->insert_av = NULL;
    s->raw = 0;
    s->on_data = NULL;
    s->on_complete = NULL;
    s->query_timeout = 0;
    s->query_id = NULL;
    return s;
}

static void release_send(ev_ch_send_t *s) {
    CLEAR_STR(s->query_id);
    *(ev_ch_send_t **)s = send_freelist;
    send_freelist = s;
}

/* Copy settings->{query_id} into s->query_id and apply query_timeout. */
static void send_apply_settings(ev_ch_send_t *s, HV *settings) {
    SV **svp = hv_fetch(settings, "query_id", 8, 0);
    if (svp && SvOK(*svp)) {
        STRLEN qlen;
        const char *qstr = SvPV(*svp, qlen);
        Newx(s->query_id, qlen + 1, char);
        Copy(qstr, s->query_id, qlen, char);
        s->query_id[qlen] = '\0';
    }
    svp = hv_fetch(settings, "query_timeout", 13, 0);
    if (svp && SvOK(*svp)) s->query_timeout = SvNV(*svp);
}

/* If settings has params => { x => 1 }, return a new HV* copy with the
 * param keys flattened to param_x => '1'. Caller owns the returned HV
 * (SvREFCNT_dec it). Returns NULL if no params key — caller continues
 * to use the original settings hashref. */
static HV* expand_params(pTHX_ HV *settings) {
    SV **svp = hv_fetch(settings, "params", 6, 0);
    if (!svp || !SvROK(*svp) || SvTYPE(SvRV(*svp)) != SVt_PVHV)
        return NULL;
    HV *phv = (HV *)SvRV(*svp);
    HV *copy = newHVhv(settings);
    HE *pe;
    hv_iterinit(phv);
    while ((pe = hv_iternext(phv))) {
        I32 pklen;
        char *pkey = hv_iterkey(pe, &pklen);
        SV *pval = hv_iterval(phv, pe);
        char *prefixed;



( run in 0.795 second using v1.01-cache-2.11-cpan-13bb782fe5a )