EV-ClickHouse

 view release on metacpan or  search on metacpan

ClickHouse.xs  view on Meta::CPAN

    int native_state;           /* NATIVE_IDLE, NATIVE_WAIT_HELLO, NATIVE_WAIT_RESULT, ... */
    AV *native_rows;            /* accumulate rows across Data blocks */
    char *insert_data;          /* pending TabSeparated data for two-phase INSERT */
    size_t insert_data_len;
    SV *insert_av;              /* pending AV* of AV*s for arrayref INSERT */
    char *insert_err;           /* deferred error from unsupported INSERT encoding */

    /* queues */
    ngx_queue_t cb_queue;
    ngx_queue_t send_queue;
    int pending_count;
    int send_count;

    /* options */
    char *session_id;
    int compress;
    double connect_timeout;
    HV *default_settings;           /* connection-level ClickHouse settings */

    SV *on_connect;
    SV *on_error;
    SV *on_progress;
    SV *on_disconnect;
    int tls_skip_verify;
    double query_timeout;
    int auto_reconnect;
    uint32_t decode_flags;
    AV *native_col_names;   /* column names from last native result */
    AV *native_col_types;   /* column type strings from last native result */
    SV *on_drain;           /* callback fired when pending_count drops to 0 */
    char *last_query_id;    /* query_id of the last dispatched query */
    SV *on_trace;           /* debug trace callback */
    ev_timer ka_timer;      /* keepalive timer */
    double keepalive;       /* keepalive interval (0 = disabled) */
    int ka_timing;
    int callback_depth;
    /* error info from last SERVER_EXCEPTION or HTTP error */
    int32_t last_error_code;
    /* profile info from last SERVER_PROFILE_INFO */
    uint64_t profile_rows;
    uint64_t profile_bytes;
    uint64_t profile_rows_before_limit;
    /* totals / extremes from last native query */
    AV *native_totals;
    AV *native_extremes;
    /* reconnect backoff */
    double reconnect_delay;
    double reconnect_max_delay;
    int reconnect_attempts;
    ev_timer reconnect_timer;
    int reconnect_timing;
    /* LowCardinality cross-block dictionary state */
    SV ***lc_dicts;           /* array of dictionaries, one per column */
    uint64_t *lc_dict_sizes;  /* size of each dictionary */
    int lc_num_cols;          /* number of columns with LC state */
};

struct ev_ch_cb_s {
    SV          *cb;
    int          raw;        /* return raw response body instead of parsed rows */
    SV          *on_data;    /* per-query streaming callback (fires per block) */
    double       query_timeout;  /* per-query timeout (0=use default) */
    ngx_queue_t  queue;
};

struct ev_ch_send_s {
    char        *data;      /* full HTTP request or native packet */
    size_t       data_len;
    SV          *cb;
    char        *insert_data;     /* deferred TSV data for native INSERT */
    size_t       insert_data_len;
    SV          *insert_av;      /* deferred AV* data for native INSERT */
    int          raw;            /* return raw response body */
    SV          *on_data;        /* per-query streaming callback */
    double       query_timeout;  /* per-query timeout */
    char        *query_id;       /* query_id for tracking */
    ngx_queue_t  queue;
};

/* forward declarations */
static void io_cb(EV_P_ ev_io *w, int revents);
static void timer_cb(EV_P_ ev_timer *w, int revents);
static void ka_timer_cb(EV_P_ ev_timer *w, int revents);
static void start_keepalive(ev_clickhouse_t *self);
static void stop_keepalive(ev_clickhouse_t *self);
static void schedule_reconnect(ev_clickhouse_t *self);
static void lc_free_dicts(ev_clickhouse_t *self);
static void start_reading(ev_clickhouse_t *self);
static void stop_reading(ev_clickhouse_t *self);
static void start_writing(ev_clickhouse_t *self);
static void stop_writing(ev_clickhouse_t *self);
static void emit_error(ev_clickhouse_t *self, const char *msg);
static void emit_trace(ev_clickhouse_t *self, const char *fmt, ...);
static void cleanup_connection(ev_clickhouse_t *self);
static int  cancel_pending(ev_clickhouse_t *self, const char *errmsg);
static int  check_destroyed(ev_clickhouse_t *self);
static void on_connect_done(ev_clickhouse_t *self);
static void process_http_response(ev_clickhouse_t *self);
static int try_write(ev_clickhouse_t *self);
static int pipeline_advance(ev_clickhouse_t *self);
static void on_readable(ev_clickhouse_t *self);

/* --- freelist for cb_queue entries --- */

static ev_ch_cb_t *cbt_freelist = NULL;

static ev_ch_cb_t* alloc_cbt(void) {
    ev_ch_cb_t *cbt;
    if (cbt_freelist) {
        cbt = cbt_freelist;
        cbt_freelist = *(ev_ch_cb_t **)cbt;
    } else {
        Newx(cbt, 1, ev_ch_cb_t);
    }
    cbt->raw = 0;
    cbt->on_data = NULL;
    cbt->query_timeout = 0;
    return cbt;
}

static void release_cbt(ev_ch_cb_t *cbt) {
    *(ev_ch_cb_t **)cbt = cbt_freelist;
    cbt_freelist = cbt;
}

/* --- freelist for send_queue entries --- */

static ev_ch_send_t *send_freelist = NULL;

static ev_ch_send_t* alloc_send(void) {
    ev_ch_send_t *s;
    if (send_freelist) {
        s = send_freelist;
        send_freelist = *(ev_ch_send_t **)s;

ClickHouse.xs  view on Meta::CPAN

                    goto data_error;
                }
                dpos++;

                /* Allocate LC dict state on first column of first block */
                if (c == 0 && !self->lc_dicts && num_cols > 0) {
                    Newxz(self->lc_dicts, num_cols, SV**);
                    Newxz(self->lc_dict_sizes, num_cols, uint64_t);
                    self->lc_num_cols = (int)num_cols;
                }

                {
                    int col_err = 0;
                    columns[c] = decode_column_ex(dbuf, dlen, &dpos, num_rows, col_types[c], &col_err, self->decode_flags, self, (int)c);
                    if (!columns[c]) {
                        if (col_err || decompressed) {
                            *errmsg = safe_strdup("decode_column failed");
                            goto data_error;
                        }
                        goto data_need_more;
                    }
                }
            }

            /* Convert column-oriented to row-oriented */
            {
            AV **target;
            if (ptype == SERVER_TOTALS) {
                if (!self->native_totals) self->native_totals = newAV();
                target = &self->native_totals;
            } else if (ptype == SERVER_EXTREMES) {
                if (!self->native_extremes) self->native_extremes = newAV();
                target = &self->native_extremes;
            } else {
                if (!self->native_rows) self->native_rows = newAV();
                target = &self->native_rows;
            }

            if (named) {
                for (r = 0; r < num_rows; r++) {
                    HV *hv = newHV();
                    for (c = 0; c < num_cols; c++) {
                        if (!hv_store(hv, cnames[c], cname_lens[c], columns[c][r], 0))
                            SvREFCNT_dec(columns[c][r]);
                    }
                    av_push(*target, newRV_noinc((SV*)hv));
                }
            } else {
                for (r = 0; r < num_rows; r++) {
                    AV *row = newAV();
                    if (num_cols > 0)
                        av_extend(row, num_cols - 1);
                    for (c = 0; c < num_cols; c++) {
                        av_push(row, columns[c][r]);
                    }
                    av_push(*target, newRV_noinc((SV*)row));
                }
            }
            }

            /* Fire on_data streaming callback if set (only for DATA, not TOTALS/EXTREMES) */
            {
                SV *on_data = (ptype == SERVER_DATA) ? peek_cb_on_data(self) : NULL;
                if (on_data && self->native_rows) {
                    self->callback_depth++;
                    {
                        dSP;
                        ENTER; SAVETMPS;
                        PUSHMARK(SP);
                        PUSHs(sv_2mortal(newRV_inc((SV*)self->native_rows)));
                        PUTBACK;
                        call_sv(on_data, G_DISCARD | G_EVAL);
                        if (SvTRUE(ERRSV))
                            warn("EV::ClickHouse: exception in on_data handler: %s",
                                 SvPV_nolen(ERRSV));
                        FREETMPS; LEAVE;
                    }
                    self->callback_depth--;
                    /* Clear accumulated rows for next block */
                    SvREFCNT_dec((SV*)self->native_rows);
                    self->native_rows = NULL;
                    if (check_destroyed(self)) {
                        if (cnames) Safefree(cnames);
                        if (cname_lens) Safefree(cname_lens);
                        for (c = 0; c < num_cols; c++) {
                            Safefree(columns[c]);
                            free_col_type(col_types[c]);
                        }
                        Safefree(columns); Safefree(col_types);
                        if (decompressed) Safefree(decompressed);
                        return -2;
                    }
                }
            }

            /* Cleanup column arrays (SVs moved to rows, don't dec refcnt) */
            for (c = 0; c < num_cols; c++) {
                Safefree(columns[c]);
                free_col_type(col_types[c]);
            }
            Safefree(columns);
            Safefree(col_types);
            if (cnames) Safefree(cnames);
            if (cname_lens) Safefree(cname_lens);
            if (decompressed) Safefree(decompressed);
            else pos = dpos;  /* uncompressed: advance pos to match dpos */

            /* Consume from recv_buf */
            if (pos < self->recv_len) {
                memmove(self->recv_buf, self->recv_buf + pos,
                        self->recv_len - pos);
            }
            self->recv_len -= pos;
            return 1;

        data_error:
        data_need_more:
            /* Cleanup partial decode */
            for (c = 0; c < num_cols; c++) {
                if (columns[c]) {
                    uint64_t j;



( run in 0.904 second using v1.01-cache-2.11-cpan-39bf76dae61 )