EV-ClickHouse
view release on metacpan or search on metacpan
/* --- Async TCP connect, I/O dispatch, timers, keepalive, reconnect,
* pipeline advance, and OpenSSL one-time init. ---
*
* Forward decls for symbols defined later in this file but called from
* earlier in the same file (cross-file callers use the forwards in
* ClickHouse.xs).
*/
static void io_cb(EV_P_ ev_io *w, int revents);
static void on_connect_done(ev_clickhouse_t *self);
#ifdef HAVE_OPENSSL
/* Drain OpenSSL's per-thread error queue into self->last_tls_error
* (most recent error wins). Safe to call when the queue is empty â
* we leave the previous value untouched. */
static void capture_tls_error(ev_clickhouse_t *self) {
unsigned long e = 0, last = 0;
while ((e = ERR_get_error()) != 0) last = e;
if (!last) return;
char buf[256];
ERR_error_string_n(last, buf, sizeof(buf));
CLEAR_STR(self->last_tls_error);
self->last_tls_error = savepv(buf);
}
#endif
static void start_connect(ev_clickhouse_t *self) {
struct addrinfo hints, *res = NULL;
int fd, ret;
char port_str[16];
self->connect_gen++;
emit_trace(self, "connect %s:%u (%s)",
self->host, self->port,
self->protocol == PROTO_NATIVE ? "native" : "http");
snprintf(port_str, sizeof(port_str), "%u", self->port);
Zero(&hints, 1, struct addrinfo);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
ret = getaddrinfo(self->host, port_str, &hints, &res);
if (ret != 0) {
char errbuf[256];
snprintf(errbuf, sizeof(errbuf), "getaddrinfo: %s", gai_strerror(ret));
fail_connection(self, errbuf);
return;
}
fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (fd < 0) {
freeaddrinfo(res);
fail_connection(self, "socket() failed");
return;
}
/* non-blocking */
{
int fl = fcntl(fd, F_GETFL);
if (fl < 0 || fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0) {
freeaddrinfo(res);
} else {
capture_tls_error(self);
fail_connection(self, "SSL handshake failed");
}
return;
}
}
#endif
if (revents & EV_WRITE) {
if (try_write(self)) return;
if (self->fd < 0) return;
if (self->pending_addendum_finish && self->send_pos >= self->send_len) {
self->pending_addendum_finish = 0;
self->native_state = NATIVE_IDLE;
if (finish_connect(self)) return;
}
}
if (revents & EV_READ) {
on_readable(self);
}
}
static void timer_cb(EV_P_ ev_timer *w, int revents) {
ev_clickhouse_t *self = (ev_clickhouse_t *)w->data;
(void)loop;
(void)revents;
if (self == NULL || self->magic != EV_CH_MAGIC) return;
self->timing = 0;
/* Treat any pre-`connected=1` timeout as a connect timeout â covers
* TCP connect, TLS handshake, and native ServerHello stages. */
if (!self->connected) {
stop_writing(self);
fail_connection(self, "connect timeout");
} else {
/* query timeout */
CLEAR_SV(self->native_rows);
CLEAR_SV(self->native_col_names);
CLEAR_SV(self->native_col_types);
CLEAR_SV(self->native_totals);
CLEAR_SV(self->native_extremes);
lc_free_dicts(self);
CLEAR_INSERT(self);
CLEAR_STR(self->insert_err);
self->native_state = NATIVE_IDLE;
if (self->send_count > 0) self->send_count--;
int gen = self->connect_gen;
/* Must reconnect â server may still be processing */
if (teardown_after_deliver(self, "query timeout", "query timeout")) return;
if (self->connect_gen != gen) return;
if (self->auto_reconnect && self->host)
schedule_reconnect(self);
}
}
/* --- Keepalive timer callback --- */
static void ka_timer_cb(EV_P_ ev_timer *w, int revents) {
ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
offsetof(ev_clickhouse_t, ka_timer));
(void)revents;
if (self->magic != EV_CH_MAGIC) return;
if (!self->connected || self->send_count > 0) return;
/* Native: append PING bytes directly; the SERVER_PONG handler tracks
* ka_in_flight so no callback queue entry is needed. */
if (self->protocol == PROTO_NATIVE) {
size_t ping_len;
char *ping = build_native_ping(&ping_len);
ensure_send_cap(self, self->send_len + ping_len);
Copy(ping, self->send_buf + self->send_len, ping_len, char);
self->send_len += ping_len;
self->ka_in_flight++;
Safefree(ping);
start_writing(self);
} else {
/* HTTP: enqueue a real ping with a no-op callback. ClickHouse's
* HTTP server closes idle connections after a few seconds, so
* relying on TCP keepalive (kernel default ~2h) is not enough. */
size_t req_len;
char *req = build_http_ping_request(self, &req_len);
ev_ch_send_t *s = alloc_send();
s->data = req;
s->data_len = req_len;
s->cb = SvREFCNT_inc(keepalive_noop_cb);
enqueue_send(self, s);
}
}
static void start_keepalive(ev_clickhouse_t *self) {
if (self->keepalive > 0 && !self->ka_timing && self->connected) {
ev_timer_init(&self->ka_timer, ka_timer_cb, self->keepalive, self->keepalive);
ev_timer_start(self->loop, &self->ka_timer);
self->ka_timing = 1;
}
}
static void stop_keepalive(ev_clickhouse_t *self) {
if (self->ka_timing) {
ev_timer_stop(self->loop, &self->ka_timer);
self->ka_timing = 0;
}
}
/* --- Reconnect with backoff --- */
static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
offsetof(ev_clickhouse_t, reconnect_timer));
(void)revents; (void)loop;
self->reconnect_timing = 0;
if (self->magic != EV_CH_MAGIC || self->connected || self->connecting) return;
start_connect(self);
}
static void schedule_reconnect(ev_clickhouse_t *self) {
if (!self->auto_reconnect || !self->host || self->magic != EV_CH_MAGIC) return;
if (self->reconnect_max_attempts > 0
&& self->reconnect_attempts >= self->reconnect_max_attempts) {
/* Give up so the user isn't trapped in an infinite loop on a
* permanent failure (bad host, wrong creds). Drain any queries
* still in send_queue first so their callbacks see the failure
* instead of being silently orphaned. */
emit_error(self, "max reconnect attempts exceeded");
if (check_destroyed(self)) return;
(void)cancel_pending(self, "max reconnect attempts exceeded");
return;
}
/* Always defer through ev_timer so a synchronous start_connect failure
* (e.g. getaddrinfo error) cannot cause unbounded fail_connection ->
* schedule_reconnect -> start_connect recursion on the C stack. A
* delay of 0 fires on the next event-loop iteration, not inline. */
double delay = self->reconnect_delay > 0 ? self->reconnect_delay : 0.0;
int i;
for (i = 0; i < self->reconnect_attempts && i < 20; i++)
delay *= 2;
if (self->reconnect_max_delay > 0 && delay > self->reconnect_max_delay)
delay = self->reconnect_max_delay;
/* Apply jitter AFTER the cap so a configured ceiling isn't silently
* exceeded. rand() / RAND_MAX is uniform in [0, 1]; clamping to the
* cap again keeps the worst case bounded. */
if (self->reconnect_jitter > 0 && delay > 0) {
double j = ((double)rand() / (double)RAND_MAX) * self->reconnect_jitter;
delay += delay * j;
if (self->reconnect_max_delay > 0 && delay > self->reconnect_max_delay)
delay = self->reconnect_max_delay;
}
self->reconnect_attempts++;
if (self->reconnect_timing) {
ev_timer_stop(self->loop, &self->reconnect_timer);
self->reconnect_timing = 0;
}
ev_timer_init(&self->reconnect_timer, reconnect_timer_cb, delay, 0);
ev_timer_start(self->loop, &self->reconnect_timer);
self->reconnect_timing = 1;
}
/* Free LowCardinality cross-block dictionary state */
static void lc_free_dicts(ev_clickhouse_t *self) {
if (self->lc_dicts) {
int c;
for (c = 0; c < self->lc_num_cols; c++) {
if (self->lc_dicts[c]) {
uint64_t j;
for (j = 0; j < self->lc_dict_sizes[c]; j++)
SvREFCNT_dec(self->lc_dicts[c][j]);
Safefree(self->lc_dicts[c]);
}
}
Safefree(self->lc_dicts);
Safefree(self->lc_dict_sizes);
self->lc_dicts = NULL;
self->lc_dict_sizes = NULL;
self->lc_num_cols = 0;
}
}
/* --- Pipeline orchestrator --- */
/* Send one request at a time, wait for response, then send the next.
* Returns 1 if self was freed (caller must not access self). */
static int pipeline_advance(ev_clickhouse_t *self) {
if (!self->connected) return 0;
if (self->send_count > 0) {
start_reading(self);
return 0;
}
/* Check drain callback when all pending work is done */
if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0
&& self->on_drain) {
SV *drain_cb = self->on_drain;
self->on_drain = NULL;
if (fire_zero_arg_cb(self, drain_cb, "drain")) {
SvREFCNT_dec(drain_cb);
return 1;
}
/* Dropping the last ref to the drain CV can free a closure that
* captured $ch â DESTROY then runs. Guard with callback_depth so
* the free is deferred, then detect it before touching self. */
self->callback_depth++;
SvREFCNT_dec(drain_cb);
self->callback_depth--;
if (check_destroyed(self)) return 1;
}
/* Restart keepalive timer when idle (start_keepalive is a no-op if already
* timing or if keepalive disabled) */
if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0)
start_keepalive(self);
/* send next request from queue */
if (!ngx_queue_empty(&self->send_queue)) {
/* Stop keepalive during active query */
stop_keepalive(self);
emit_trace(self, "dispatch query (pending=%d)", self->pending_count);
/* on_trace is user code: it may have called finish()/reset() or
* dropped the last reference (DESTROY), any of which drains or
* frees the send queue. Re-validate the connection and re-derive
* the head entry AFTER the trace so we never touch a send struct
* that cancel_pending has already released to the freelist. */
if (self->magic != EV_CH_MAGIC || !self->connected
|| ngx_queue_empty(&self->send_queue))
return check_destroyed(self);
ngx_queue_t *q = ngx_queue_head(&self->send_queue);
ev_ch_send_t *send = ngx_queue_data(q, ev_ch_send_t, queue);
/* set up send buffer */
ensure_send_cap(self, send->data_len);
Copy(send->data, self->send_buf, send->data_len, char);
self->send_len = send->data_len;
self->send_pos = 0;
/* move cb to recv queue */
SV *dispatched_cb = send->cb;
ngx_queue_remove(q);
push_cb_owned_ex(self, send->cb, send->raw,
send->on_data, send->on_complete,
send->query_timeout);
CLEAR_SV(send->on_data);
send->on_complete = NULL; /* ownership transferred to cbt */
/* Track query_id + dispatch start time (used by on_query_complete). */
CLEAR_STR(self->last_query_id);
if (send->query_id) { self->last_query_id = send->query_id; send->query_id = NULL; }
self->query_start_time = ev_now(self->loop);
/* on_query_start: fire with the resolved query_id, just before
* the write side runs. Suppressed for keepalive PINGs to match
* on_query_complete semantics. */
if (self->on_query_start && !IS_KEEPALIVE_CB(dispatched_cb)) {
dSP;
ENTER; SAVETMPS; PUSHMARK(SP);
EXTEND(SP, 1);
PUSHs(self->last_query_id
? sv_2mortal(newSVpv(self->last_query_id, 0))
: &PL_sv_undef);
PUTBACK;
int gen_before = self->connect_gen;
self->callback_depth++;
PINNED_CALL_SV(self->on_query_start, G_EVAL | G_VOID | G_DISCARD);
self->callback_depth--;
WARN_AND_CLEAR_ERRSV("on_query_start");
FREETMPS; LEAVE;
/* The handler may have torn the connection down: DESTROY
* (dropped the last ref), reset() (rotated it â connect_gen
* bumped), or finish() (closed it â connected cleared). In
* every case the cb was already delivered by cancel_pending;
* drop the local send entry (already dequeued, so cancel_pending
* never saw it) without dispatching it. Falling through on a
* !connected struct would re-arm a stray query-timeout timer
* and leave send_count stuck at 1. The `||` short-circuits
* before reading self->connect_gen when self has been freed. */
int destroyed = check_destroyed(self);
if (destroyed || self->connect_gen != gen_before
|| !self->connected) {
Safefree(send->data);
CLEAR_INSERT(send);
release_send(send);
return destroyed ? 1 : 0;
}
}
/* Clear per-query accumulated state so accessors don't return
* the previous query's data. native_rows is already NULL at
* EndOfStream; col_names/types are also cleared here so DDL
* (or any query that emits no DATA block) does not leave the
* previous SELECT's schema visible. */
CLEAR_SV(self->native_col_names);
CLEAR_SV(self->native_col_types);
CLEAR_SV(self->native_totals);
CLEAR_SV(self->native_extremes);
self->last_error_code = 0;
self->profile_rows = 0;
self->profile_bytes = 0;
self->profile_rows_before_limit = 0;
if (self->progress_period > 0) {
memset(self->progress_acc, 0, sizeof(self->progress_acc));
self->progress_last = 0.0;
}
/* transfer deferred insert data from send entry to self */
if (send->insert_data) {
self->insert_data = send->insert_data;
self->insert_data_len = send->insert_data_len;
send->insert_data = NULL;
}
if (send->insert_av) {
self->insert_av = send->insert_av;
( run in 2.208 seconds using v1.01-cache-2.11-cpan-524268b4103 )