EV-ClickHouse
view release on metacpan or search on metacpan
ClickHouse.xs view on Meta::CPAN
int native_state; /* NATIVE_IDLE, NATIVE_WAIT_HELLO, NATIVE_WAIT_RESULT, ... */
AV *native_rows; /* accumulate rows across Data blocks */
char *insert_data; /* pending TabSeparated data for two-phase INSERT */
size_t insert_data_len;
SV *insert_av; /* pending AV* of AV*s for arrayref INSERT */
char *insert_err; /* deferred error from unsupported INSERT encoding */
/* queues */
ngx_queue_t cb_queue;
ngx_queue_t send_queue;
int pending_count;
int send_count;
/* options */
char *session_id;
int compress;
double connect_timeout;
HV *default_settings; /* connection-level ClickHouse settings */
SV *on_connect;
SV *on_error;
SV *on_progress;
SV *on_disconnect;
int tls_skip_verify;
double query_timeout;
int auto_reconnect;
uint32_t decode_flags;
AV *native_col_names; /* column names from last native result */
AV *native_col_types; /* column type strings from last native result */
SV *on_drain; /* callback fired when pending_count drops to 0 */
char *last_query_id; /* query_id of the last dispatched query */
SV *on_trace; /* debug trace callback */
ev_timer ka_timer; /* keepalive timer */
double keepalive; /* keepalive interval (0 = disabled) */
int ka_timing;
int callback_depth;
/* error info from last SERVER_EXCEPTION or HTTP error */
int32_t last_error_code;
/* profile info from last SERVER_PROFILE_INFO */
uint64_t profile_rows;
uint64_t profile_bytes;
uint64_t profile_rows_before_limit;
/* totals / extremes from last native query */
AV *native_totals;
AV *native_extremes;
/* reconnect backoff */
double reconnect_delay;
double reconnect_max_delay;
int reconnect_attempts;
ev_timer reconnect_timer;
int reconnect_timing;
/* LowCardinality cross-block dictionary state */
SV ***lc_dicts; /* array of dictionaries, one per column */
uint64_t *lc_dict_sizes; /* size of each dictionary */
int lc_num_cols; /* number of columns with LC state */
};
struct ev_ch_cb_s {
SV *cb;
int raw; /* return raw response body instead of parsed rows */
SV *on_data; /* per-query streaming callback (fires per block) */
double query_timeout; /* per-query timeout (0=use default) */
ngx_queue_t queue;
};
struct ev_ch_send_s {
char *data; /* full HTTP request or native packet */
size_t data_len;
SV *cb;
char *insert_data; /* deferred TSV data for native INSERT */
size_t insert_data_len;
SV *insert_av; /* deferred AV* data for native INSERT */
int raw; /* return raw response body */
SV *on_data; /* per-query streaming callback */
double query_timeout; /* per-query timeout */
char *query_id; /* query_id for tracking */
ngx_queue_t queue;
};
/* forward declarations */
static void io_cb(EV_P_ ev_io *w, int revents);
static void timer_cb(EV_P_ ev_timer *w, int revents);
static void ka_timer_cb(EV_P_ ev_timer *w, int revents);
static void start_keepalive(ev_clickhouse_t *self);
static void stop_keepalive(ev_clickhouse_t *self);
static void schedule_reconnect(ev_clickhouse_t *self);
static void lc_free_dicts(ev_clickhouse_t *self);
static void start_reading(ev_clickhouse_t *self);
static void stop_reading(ev_clickhouse_t *self);
static void start_writing(ev_clickhouse_t *self);
static void stop_writing(ev_clickhouse_t *self);
static void emit_error(ev_clickhouse_t *self, const char *msg);
static void emit_trace(ev_clickhouse_t *self, const char *fmt, ...);
static void cleanup_connection(ev_clickhouse_t *self);
static int cancel_pending(ev_clickhouse_t *self, const char *errmsg);
static int check_destroyed(ev_clickhouse_t *self);
static void on_connect_done(ev_clickhouse_t *self);
static void process_http_response(ev_clickhouse_t *self);
static int try_write(ev_clickhouse_t *self);
static int pipeline_advance(ev_clickhouse_t *self);
static void on_readable(ev_clickhouse_t *self);
/* --- freelist for cb_queue entries --- */
static ev_ch_cb_t *cbt_freelist = NULL;
static ev_ch_cb_t* alloc_cbt(void) {
ev_ch_cb_t *cbt;
if (cbt_freelist) {
cbt = cbt_freelist;
cbt_freelist = *(ev_ch_cb_t **)cbt;
} else {
Newx(cbt, 1, ev_ch_cb_t);
}
cbt->raw = 0;
cbt->on_data = NULL;
cbt->query_timeout = 0;
return cbt;
}
static void release_cbt(ev_ch_cb_t *cbt) {
*(ev_ch_cb_t **)cbt = cbt_freelist;
cbt_freelist = cbt;
}
/* --- freelist for send_queue entries --- */
static ev_ch_send_t *send_freelist = NULL;
static ev_ch_send_t* alloc_send(void) {
ev_ch_send_t *s;
if (send_freelist) {
s = send_freelist;
send_freelist = *(ev_ch_send_t **)s;
ClickHouse.xs view on Meta::CPAN
goto data_error;
}
dpos++;
/* Allocate LC dict state on first column of first block */
if (c == 0 && !self->lc_dicts && num_cols > 0) {
Newxz(self->lc_dicts, num_cols, SV**);
Newxz(self->lc_dict_sizes, num_cols, uint64_t);
self->lc_num_cols = (int)num_cols;
}
{
int col_err = 0;
columns[c] = decode_column_ex(dbuf, dlen, &dpos, num_rows, col_types[c], &col_err, self->decode_flags, self, (int)c);
if (!columns[c]) {
if (col_err || decompressed) {
*errmsg = safe_strdup("decode_column failed");
goto data_error;
}
goto data_need_more;
}
}
}
/* Convert column-oriented to row-oriented */
{
AV **target;
if (ptype == SERVER_TOTALS) {
if (!self->native_totals) self->native_totals = newAV();
target = &self->native_totals;
} else if (ptype == SERVER_EXTREMES) {
if (!self->native_extremes) self->native_extremes = newAV();
target = &self->native_extremes;
} else {
if (!self->native_rows) self->native_rows = newAV();
target = &self->native_rows;
}
if (named) {
for (r = 0; r < num_rows; r++) {
HV *hv = newHV();
for (c = 0; c < num_cols; c++) {
if (!hv_store(hv, cnames[c], cname_lens[c], columns[c][r], 0))
SvREFCNT_dec(columns[c][r]);
}
av_push(*target, newRV_noinc((SV*)hv));
}
} else {
for (r = 0; r < num_rows; r++) {
AV *row = newAV();
if (num_cols > 0)
av_extend(row, num_cols - 1);
for (c = 0; c < num_cols; c++) {
av_push(row, columns[c][r]);
}
av_push(*target, newRV_noinc((SV*)row));
}
}
}
/* Fire on_data streaming callback if set (only for DATA, not TOTALS/EXTREMES) */
{
SV *on_data = (ptype == SERVER_DATA) ? peek_cb_on_data(self) : NULL;
if (on_data && self->native_rows) {
self->callback_depth++;
{
dSP;
ENTER; SAVETMPS;
PUSHMARK(SP);
PUSHs(sv_2mortal(newRV_inc((SV*)self->native_rows)));
PUTBACK;
call_sv(on_data, G_DISCARD | G_EVAL);
if (SvTRUE(ERRSV))
warn("EV::ClickHouse: exception in on_data handler: %s",
SvPV_nolen(ERRSV));
FREETMPS; LEAVE;
}
self->callback_depth--;
/* Clear accumulated rows for next block */
SvREFCNT_dec((SV*)self->native_rows);
self->native_rows = NULL;
if (check_destroyed(self)) {
if (cnames) Safefree(cnames);
if (cname_lens) Safefree(cname_lens);
for (c = 0; c < num_cols; c++) {
Safefree(columns[c]);
free_col_type(col_types[c]);
}
Safefree(columns); Safefree(col_types);
if (decompressed) Safefree(decompressed);
return -2;
}
}
}
/* Cleanup column arrays (SVs moved to rows, don't dec refcnt) */
for (c = 0; c < num_cols; c++) {
Safefree(columns[c]);
free_col_type(col_types[c]);
}
Safefree(columns);
Safefree(col_types);
if (cnames) Safefree(cnames);
if (cname_lens) Safefree(cname_lens);
if (decompressed) Safefree(decompressed);
else pos = dpos; /* uncompressed: advance pos to match dpos */
/* Consume from recv_buf */
if (pos < self->recv_len) {
memmove(self->recv_buf, self->recv_buf + pos,
self->recv_len - pos);
}
self->recv_len -= pos;
return 1;
data_error:
data_need_more:
/* Cleanup partial decode */
for (c = 0; c < num_cols; c++) {
if (columns[c]) {
uint64_t j;
( run in 0.904 second using v1.01-cache-2.11-cpan-39bf76dae61 )