EV-ClickHouse

 view release on metacpan or  search on metacpan

xs/io.c  view on Meta::CPAN

/* --- Async TCP connect, I/O dispatch, timers, keepalive, reconnect,
 *     pipeline advance, and OpenSSL one-time init. ---
 *
 * Forward decls for symbols defined later in this file but called from
 * earlier in the same file (cross-file callers use the forwards in
 * ClickHouse.xs).
 */
static void io_cb(EV_P_ ev_io *w, int revents);
static void on_connect_done(ev_clickhouse_t *self);

#ifdef HAVE_OPENSSL
/* Drain OpenSSL's per-thread error queue into self->last_tls_error
 * (most recent error wins). Safe to call when the queue is empty —
 * we leave the previous value untouched. */
static void capture_tls_error(ev_clickhouse_t *self) {
    unsigned long e = 0, last = 0;
    while ((e = ERR_get_error()) != 0) last = e;
    if (!last) return;
    char buf[256];
    ERR_error_string_n(last, buf, sizeof(buf));
    CLEAR_STR(self->last_tls_error);
    self->last_tls_error = savepv(buf);
}
#endif

static void start_connect(ev_clickhouse_t *self) {
    struct addrinfo hints, *res = NULL;
    int fd, ret;
    char port_str[16];

    self->connect_gen++;

    emit_trace(self, "connect %s:%u (%s)",
               self->host, self->port,
               self->protocol == PROTO_NATIVE ? "native" : "http");
    snprintf(port_str, sizeof(port_str), "%u", self->port);

    Zero(&hints, 1, struct addrinfo);
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;

    ret = getaddrinfo(self->host, port_str, &hints, &res);
    if (ret != 0) {
        char errbuf[256];
        snprintf(errbuf, sizeof(errbuf), "getaddrinfo: %s", gai_strerror(ret));
        fail_connection(self, errbuf);
        return;
    }

    fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
    if (fd < 0) {
        freeaddrinfo(res);
        fail_connection(self, "socket() failed");
        return;
    }

    /* non-blocking */
    {
        int fl = fcntl(fd, F_GETFL);
        if (fl < 0 || fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0) {
            freeaddrinfo(res);

xs/io.c  view on Meta::CPAN

            } else {
                capture_tls_error(self);
                fail_connection(self, "SSL handshake failed");
            }
            return;
        }
    }
#endif

    if (revents & EV_WRITE) {
        if (try_write(self)) return;
        if (self->fd < 0) return;
        if (self->pending_addendum_finish && self->send_pos >= self->send_len) {
            self->pending_addendum_finish = 0;
            self->native_state = NATIVE_IDLE;
            if (finish_connect(self)) return;
        }
    }

    if (revents & EV_READ) {
        on_readable(self);
    }
}

static void timer_cb(EV_P_ ev_timer *w, int revents) {
    ev_clickhouse_t *self = (ev_clickhouse_t *)w->data;
    (void)loop;
    (void)revents;

    if (self == NULL || self->magic != EV_CH_MAGIC) return;

    self->timing = 0;

    /* Treat any pre-`connected=1` timeout as a connect timeout — covers
     * TCP connect, TLS handshake, and native ServerHello stages. */
    if (!self->connected) {
        stop_writing(self);
        fail_connection(self, "connect timeout");
    } else {
        /* query timeout */
        CLEAR_SV(self->native_rows);
        CLEAR_SV(self->native_col_names);
        CLEAR_SV(self->native_col_types);
        CLEAR_SV(self->native_totals);
        CLEAR_SV(self->native_extremes);
        lc_free_dicts(self);
        CLEAR_INSERT(self);
        CLEAR_STR(self->insert_err);
        self->native_state = NATIVE_IDLE;
        if (self->send_count > 0) self->send_count--;

        int gen = self->connect_gen;
        /* Must reconnect — server may still be processing */
        if (teardown_after_deliver(self, "query timeout", "query timeout")) return;
        if (self->connect_gen != gen) return;
        if (self->auto_reconnect && self->host)
            schedule_reconnect(self);
    }
}

/* --- Keepalive timer callback --- */

static void ka_timer_cb(EV_P_ ev_timer *w, int revents) {
    ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
        offsetof(ev_clickhouse_t, ka_timer));
    (void)revents;

    if (self->magic != EV_CH_MAGIC) return;
    if (!self->connected || self->send_count > 0) return;

    /* Native: append PING bytes directly; the SERVER_PONG handler tracks
     * ka_in_flight so no callback queue entry is needed. */
    if (self->protocol == PROTO_NATIVE) {
        size_t ping_len;
        char *ping = build_native_ping(&ping_len);
        ensure_send_cap(self, self->send_len + ping_len);
        Copy(ping, self->send_buf + self->send_len, ping_len, char);
        self->send_len += ping_len;
        self->ka_in_flight++;
        Safefree(ping);
        start_writing(self);
    } else {
        /* HTTP: enqueue a real ping with a no-op callback. ClickHouse's
         * HTTP server closes idle connections after a few seconds, so
         * relying on TCP keepalive (kernel default ~2h) is not enough. */
        size_t req_len;
        char *req = build_http_ping_request(self, &req_len);
        ev_ch_send_t *s = alloc_send();
        s->data = req;
        s->data_len = req_len;
        s->cb = SvREFCNT_inc(keepalive_noop_cb);
        enqueue_send(self, s);
    }
}

static void start_keepalive(ev_clickhouse_t *self) {
    if (self->keepalive > 0 && !self->ka_timing && self->connected) {
        ev_timer_init(&self->ka_timer, ka_timer_cb, self->keepalive, self->keepalive);
        ev_timer_start(self->loop, &self->ka_timer);
        self->ka_timing = 1;
    }
}

static void stop_keepalive(ev_clickhouse_t *self) {
    if (self->ka_timing) {
        ev_timer_stop(self->loop, &self->ka_timer);
        self->ka_timing = 0;
    }
}

/* --- Reconnect with backoff --- */

static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
    ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
        offsetof(ev_clickhouse_t, reconnect_timer));
    (void)revents; (void)loop;
    self->reconnect_timing = 0;
    if (self->magic != EV_CH_MAGIC || self->connected || self->connecting) return;
    start_connect(self);
}

static void schedule_reconnect(ev_clickhouse_t *self) {
    if (!self->auto_reconnect || !self->host || self->magic != EV_CH_MAGIC) return;
    if (self->reconnect_max_attempts > 0
        && self->reconnect_attempts >= self->reconnect_max_attempts) {
        /* Give up so the user isn't trapped in an infinite loop on a
         * permanent failure (bad host, wrong creds). Drain any queries
         * still in send_queue first so their callbacks see the failure
         * instead of being silently orphaned. */
        emit_error(self, "max reconnect attempts exceeded");
        if (check_destroyed(self)) return;
        (void)cancel_pending(self, "max reconnect attempts exceeded");
        return;
    }
    /* Always defer through ev_timer so a synchronous start_connect failure
     * (e.g. getaddrinfo error) cannot cause unbounded fail_connection ->
     * schedule_reconnect -> start_connect recursion on the C stack. A
     * delay of 0 fires on the next event-loop iteration, not inline. */
    double delay = self->reconnect_delay > 0 ? self->reconnect_delay : 0.0;
    int i;
    for (i = 0; i < self->reconnect_attempts && i < 20; i++)
        delay *= 2;
    if (self->reconnect_max_delay > 0 && delay > self->reconnect_max_delay)
        delay = self->reconnect_max_delay;
    /* Apply jitter AFTER the cap so a configured ceiling isn't silently
     * exceeded. rand() / RAND_MAX is uniform in [0, 1]; clamping to the
     * cap again keeps the worst case bounded. */
    if (self->reconnect_jitter > 0 && delay > 0) {
        double j = ((double)rand() / (double)RAND_MAX) * self->reconnect_jitter;
        delay += delay * j;
        if (self->reconnect_max_delay > 0 && delay > self->reconnect_max_delay)
            delay = self->reconnect_max_delay;
    }
    self->reconnect_attempts++;
    if (self->reconnect_timing) {
        ev_timer_stop(self->loop, &self->reconnect_timer);
        self->reconnect_timing = 0;
    }
    ev_timer_init(&self->reconnect_timer, reconnect_timer_cb, delay, 0);
    ev_timer_start(self->loop, &self->reconnect_timer);
    self->reconnect_timing = 1;
}

/* Free LowCardinality cross-block dictionary state */
static void lc_free_dicts(ev_clickhouse_t *self) {
    if (self->lc_dicts) {
        int c;
        for (c = 0; c < self->lc_num_cols; c++) {
            if (self->lc_dicts[c]) {
                uint64_t j;
                for (j = 0; j < self->lc_dict_sizes[c]; j++)
                    SvREFCNT_dec(self->lc_dicts[c][j]);
                Safefree(self->lc_dicts[c]);
            }
        }
        Safefree(self->lc_dicts);
        Safefree(self->lc_dict_sizes);
        self->lc_dicts = NULL;
        self->lc_dict_sizes = NULL;
        self->lc_num_cols = 0;
    }
}

/* --- Pipeline orchestrator --- */

/* Send one request at a time, wait for response, then send the next.
 * Returns 1 if self was freed (caller must not access self). */
static int pipeline_advance(ev_clickhouse_t *self) {
    if (!self->connected) return 0;

    if (self->send_count > 0) {
        start_reading(self);
        return 0;
    }

    /* Check drain callback when all pending work is done */
    if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0
        && self->on_drain) {
        SV *drain_cb = self->on_drain;
        self->on_drain = NULL;
        if (fire_zero_arg_cb(self, drain_cb, "drain")) {
            SvREFCNT_dec(drain_cb);
            return 1;
        }
        /* Dropping the last ref to the drain CV can free a closure that
         * captured $ch — DESTROY then runs. Guard with callback_depth so
         * the free is deferred, then detect it before touching self. */
        self->callback_depth++;
        SvREFCNT_dec(drain_cb);
        self->callback_depth--;
        if (check_destroyed(self)) return 1;
    }

    /* Restart keepalive timer when idle (start_keepalive is a no-op if already
     * timing or if keepalive disabled) */
    if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0)
        start_keepalive(self);

    /* send next request from queue */
    if (!ngx_queue_empty(&self->send_queue)) {
        /* Stop keepalive during active query */
        stop_keepalive(self);
        emit_trace(self, "dispatch query (pending=%d)", self->pending_count);

        /* on_trace is user code: it may have called finish()/reset() or
         * dropped the last reference (DESTROY), any of which drains or
         * frees the send queue. Re-validate the connection and re-derive
         * the head entry AFTER the trace so we never touch a send struct
         * that cancel_pending has already released to the freelist. */
        if (self->magic != EV_CH_MAGIC || !self->connected
            || ngx_queue_empty(&self->send_queue))
            return check_destroyed(self);

        ngx_queue_t *q = ngx_queue_head(&self->send_queue);
        ev_ch_send_t *send = ngx_queue_data(q, ev_ch_send_t, queue);

        /* set up send buffer */
        ensure_send_cap(self, send->data_len);
        Copy(send->data, self->send_buf, send->data_len, char);
        self->send_len = send->data_len;
        self->send_pos = 0;

        /* move cb to recv queue */
        SV *dispatched_cb = send->cb;
        ngx_queue_remove(q);
        push_cb_owned_ex(self, send->cb, send->raw,
                          send->on_data, send->on_complete,
                          send->query_timeout);
        CLEAR_SV(send->on_data);
        send->on_complete = NULL;       /* ownership transferred to cbt */
        /* Track query_id + dispatch start time (used by on_query_complete). */
        CLEAR_STR(self->last_query_id);
        if (send->query_id) { self->last_query_id = send->query_id; send->query_id = NULL; }
        self->query_start_time = ev_now(self->loop);

        /* on_query_start: fire with the resolved query_id, just before
         * the write side runs. Suppressed for keepalive PINGs to match
         * on_query_complete semantics. */
        if (self->on_query_start && !IS_KEEPALIVE_CB(dispatched_cb)) {
            dSP;
            ENTER; SAVETMPS; PUSHMARK(SP);
            EXTEND(SP, 1);
            PUSHs(self->last_query_id
                  ? sv_2mortal(newSVpv(self->last_query_id, 0))
                  : &PL_sv_undef);
            PUTBACK;
            int gen_before = self->connect_gen;
            self->callback_depth++;
            PINNED_CALL_SV(self->on_query_start, G_EVAL | G_VOID | G_DISCARD);
            self->callback_depth--;
            WARN_AND_CLEAR_ERRSV("on_query_start");
            FREETMPS; LEAVE;
            /* The handler may have torn the connection down: DESTROY
             * (dropped the last ref), reset() (rotated it — connect_gen
             * bumped), or finish() (closed it — connected cleared). In
             * every case the cb was already delivered by cancel_pending;
             * drop the local send entry (already dequeued, so cancel_pending
             * never saw it) without dispatching it. Falling through on a
             * !connected struct would re-arm a stray query-timeout timer
             * and leave send_count stuck at 1. The `||` short-circuits
             * before reading self->connect_gen when self has been freed. */
            int destroyed = check_destroyed(self);
            if (destroyed || self->connect_gen != gen_before
                || !self->connected) {
                Safefree(send->data);
                CLEAR_INSERT(send);
                release_send(send);
                return destroyed ? 1 : 0;
            }
        }

        /* Clear per-query accumulated state so accessors don't return
         * the previous query's data. native_rows is already NULL at
         * EndOfStream; col_names/types are also cleared here so DDL
         * (or any query that emits no DATA block) does not leave the
         * previous SELECT's schema visible. */
        CLEAR_SV(self->native_col_names);
        CLEAR_SV(self->native_col_types);
        CLEAR_SV(self->native_totals);
        CLEAR_SV(self->native_extremes);
        self->last_error_code = 0;
        self->profile_rows = 0;
        self->profile_bytes = 0;
        self->profile_rows_before_limit = 0;
        if (self->progress_period > 0) {
            memset(self->progress_acc, 0, sizeof(self->progress_acc));
            self->progress_last = 0.0;
        }

        /* transfer deferred insert data from send entry to self */
        if (send->insert_data) {
            self->insert_data = send->insert_data;
            self->insert_data_len = send->insert_data_len;
            send->insert_data = NULL;
        }
        if (send->insert_av) {
            self->insert_av = send->insert_av;



( run in 2.208 seconds using v1.01-cache-2.11-cpan-524268b4103 )