EV-ClickHouse

 view release on metacpan or  search on metacpan

ClickHouse.xs  view on Meta::CPAN

    ev_timer timer;
    int reading, writing, timing;
    int connected, connecting;
    int protocol;               /* PROTO_HTTP or PROTO_NATIVE */

#ifdef HAVE_OPENSSL
    SSL_CTX *ssl_ctx;
    SSL *ssl;
#endif
    int tls_enabled;
    char *tls_ca_file;

    /* connection params */
    char *host, *user, *password, *database;
    unsigned int port;

    /* send/recv buffers */
    char *send_buf;
    size_t send_len, send_pos, send_cap;
    char *recv_buf;
    size_t recv_len, recv_cap;

    /* native protocol state */
    char *server_name;
    char *server_display_name;
    char *server_timezone;
    unsigned int server_version_major, server_version_minor, server_revision;
    unsigned int server_version_patch;
    int native_state;           /* NATIVE_IDLE, NATIVE_WAIT_HELLO, NATIVE_WAIT_RESULT, ... */
    AV *native_rows;            /* accumulate rows across Data blocks */
    char *insert_data;          /* pending TabSeparated data for two-phase INSERT */
    size_t insert_data_len;
    SV *insert_av;              /* pending AV* of AV*s for arrayref INSERT */
    char *insert_err;           /* deferred error from unsupported INSERT encoding */

    /* queues */
    ngx_queue_t cb_queue;
    ngx_queue_t send_queue;
    int pending_count;
    int send_count;

    /* options */
    char *session_id;
    int compress;
    double connect_timeout;
    HV *default_settings;           /* connection-level ClickHouse settings */

    SV *on_connect;
    SV *on_error;
    SV *on_progress;
    SV *on_disconnect;
    int tls_skip_verify;
    double query_timeout;
    int auto_reconnect;
    uint32_t decode_flags;
    AV *native_col_names;   /* column names from last native result */
    AV *native_col_types;   /* column type strings from last native result */
    SV *on_drain;           /* callback fired when pending_count drops to 0 */
    char *last_query_id;    /* query_id of the last dispatched query */
    SV *on_trace;           /* debug trace callback */
    ev_timer ka_timer;      /* keepalive timer */
    double keepalive;       /* keepalive interval (0 = disabled) */
    int ka_timing;
    int callback_depth;
    /* error info from last SERVER_EXCEPTION or HTTP error */
    int32_t last_error_code;
    /* profile info from last SERVER_PROFILE_INFO */
    uint64_t profile_rows;
    uint64_t profile_bytes;
    uint64_t profile_rows_before_limit;
    /* totals / extremes from last native query */
    AV *native_totals;
    AV *native_extremes;
    /* reconnect backoff */
    double reconnect_delay;
    double reconnect_max_delay;
    int reconnect_attempts;
    ev_timer reconnect_timer;
    int reconnect_timing;
    /* LowCardinality cross-block dictionary state */
    SV ***lc_dicts;           /* array of dictionaries, one per column */
    uint64_t *lc_dict_sizes;  /* size of each dictionary */
    int lc_num_cols;          /* number of columns with LC state */
};

struct ev_ch_cb_s {
    SV          *cb;
    int          raw;        /* return raw response body instead of parsed rows */
    SV          *on_data;    /* per-query streaming callback (fires per block) */
    double       query_timeout;  /* per-query timeout (0=use default) */
    ngx_queue_t  queue;
};

struct ev_ch_send_s {
    char        *data;      /* full HTTP request or native packet */
    size_t       data_len;
    SV          *cb;
    char        *insert_data;     /* deferred TSV data for native INSERT */
    size_t       insert_data_len;
    SV          *insert_av;      /* deferred AV* data for native INSERT */
    int          raw;            /* return raw response body */
    SV          *on_data;        /* per-query streaming callback */
    double       query_timeout;  /* per-query timeout */
    char        *query_id;       /* query_id for tracking */
    ngx_queue_t  queue;
};

/* forward declarations */
static void io_cb(EV_P_ ev_io *w, int revents);
static void timer_cb(EV_P_ ev_timer *w, int revents);
static void ka_timer_cb(EV_P_ ev_timer *w, int revents);
static void start_keepalive(ev_clickhouse_t *self);
static void stop_keepalive(ev_clickhouse_t *self);
static void schedule_reconnect(ev_clickhouse_t *self);
static void lc_free_dicts(ev_clickhouse_t *self);
static void start_reading(ev_clickhouse_t *self);
static void stop_reading(ev_clickhouse_t *self);
static void start_writing(ev_clickhouse_t *self);
static void stop_writing(ev_clickhouse_t *self);
static void emit_error(ev_clickhouse_t *self, const char *msg);
static void emit_trace(ev_clickhouse_t *self, const char *fmt, ...);
static void cleanup_connection(ev_clickhouse_t *self);
static int  cancel_pending(ev_clickhouse_t *self, const char *errmsg);
static int  check_destroyed(ev_clickhouse_t *self);
static void on_connect_done(ev_clickhouse_t *self);
static void process_http_response(ev_clickhouse_t *self);
static int try_write(ev_clickhouse_t *self);
static int pipeline_advance(ev_clickhouse_t *self);
static void on_readable(ev_clickhouse_t *self);

/* --- freelist for cb_queue entries --- */

static ev_ch_cb_t *cbt_freelist = NULL;

static ev_ch_cb_t* alloc_cbt(void) {
    ev_ch_cb_t *cbt;
    if (cbt_freelist) {
        cbt = cbt_freelist;
        cbt_freelist = *(ev_ch_cb_t **)cbt;
    } else {
        Newx(cbt, 1, ev_ch_cb_t);
    }
    cbt->raw = 0;
    cbt->on_data = NULL;
    cbt->query_timeout = 0;
    return cbt;
}

static void release_cbt(ev_ch_cb_t *cbt) {
    *(ev_ch_cb_t **)cbt = cbt_freelist;
    cbt_freelist = cbt;
}

/* --- freelist for send_queue entries --- */

static ev_ch_send_t *send_freelist = NULL;

static ev_ch_send_t* alloc_send(void) {
    ev_ch_send_t *s;
    if (send_freelist) {
        s = send_freelist;
        send_freelist = *(ev_ch_send_t **)s;
    } else {
        Newx(s, 1, ev_ch_send_t);
    }
    s->insert_data = NULL;
    s->insert_data_len = 0;
    s->insert_av = NULL;
    s->raw = 0;
    s->on_data = NULL;
    s->query_timeout = 0;
    s->query_id = NULL;
    return s;

ClickHouse.xs  view on Meta::CPAN

    }
    self->callback_depth--;
    return check_destroyed(self);
}

static void push_cb_owned_ex(ev_clickhouse_t *self, SV *cb, int raw,
                              SV *on_data, double query_timeout) {
    ev_ch_cb_t *cbt = alloc_cbt();
    cbt->cb = cb;
    cbt->raw = raw;
    cbt->on_data = on_data ? SvREFCNT_inc(on_data) : NULL;
    cbt->query_timeout = query_timeout;
    ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
    /* pending_count already counted */
}

static SV* handler_accessor(SV **slot, SV *handler, int has_arg) {
    if (has_arg) {
        if (NULL != *slot) {
            SvREFCNT_dec(*slot);
            *slot = NULL;
        }
        if (NULL != handler && SvOK(handler) &&
            SvROK(handler) && SvTYPE(SvRV(handler)) == SVt_PVCV) {
            *slot = SvREFCNT_inc(handler);
        }
    }
    return (NULL != *slot) ? SvREFCNT_inc(*slot) : &PL_sv_undef;
}

static char* safe_strdup(const char *s) {
    char *d;
    size_t len;
    if (!s) return NULL;
    len = strlen(s);
    Newx(d, len + 1, char);
    Copy(s, d, len + 1, char);
    return d;
}

static int has_http_unsafe_chars(const char *s) {
    if (!s) return 0;
    for (; *s; s++)
        if (*s == '\r' || *s == '\n') return 1;
    return 0;
}

static int is_ip_literal(const char *s) {
    struct in_addr  a4;
    struct in6_addr a6;
    return (inet_pton(AF_INET, s, &a4) == 1 ||
            inet_pton(AF_INET6, s, &a6) == 1);
}

static void cleanup_connection(ev_clickhouse_t *self) {
    int was_connected = self->connected;

    if (was_connected) emit_trace(self, "disconnect");
    stop_reading(self);
    stop_writing(self);
    stop_keepalive(self);
    if (self->timing) {
        ev_timer_stop(self->loop, &self->timer);
        self->timing = 0;
    }

#ifdef HAVE_OPENSSL
    if (self->ssl) {
        SSL_shutdown(self->ssl);
        SSL_free(self->ssl);
        self->ssl = NULL;
    }
    if (self->ssl_ctx) {
        SSL_CTX_free(self->ssl_ctx);
        self->ssl_ctx = NULL;
    }
#endif

    if (self->fd >= 0) {
        close(self->fd);
        self->fd = -1;
    }

    self->connected = 0;
    self->connecting = 0;

    /* fire on_disconnect if we were connected */
    if (was_connected && NULL != self->on_disconnect) {
        self->callback_depth++;
        {
            dSP;
            ENTER;
            SAVETMPS;
            PUSHMARK(SP);
            PUTBACK;
            call_sv(self->on_disconnect, G_DISCARD | G_EVAL);
            if (SvTRUE(ERRSV))
                warn("EV::ClickHouse: exception in disconnect handler: %s",
                     SvPV_nolen(ERRSV));
            FREETMPS;
            LEAVE;
        }
        self->callback_depth--;
    }
    self->send_len = 0;
    self->send_pos = 0;
    self->recv_len = 0;
    self->send_count = 0;
    self->native_state = NATIVE_IDLE;
    if (self->native_rows) {
        SvREFCNT_dec((SV*)self->native_rows);
        self->native_rows = NULL;
    }
    if (self->native_col_names) {
        SvREFCNT_dec((SV*)self->native_col_names);
        self->native_col_names = NULL;
    }
    if (self->native_col_types) {
        SvREFCNT_dec((SV*)self->native_col_types);
        self->native_col_types = NULL;
    }

ClickHouse.xs  view on Meta::CPAN

        + settings_url_params_size(defaults, overrides);
    char *params;
    size_t plen = 0;
    Newx(params, params_cap, char);
    if (self->database) {
        size_t db_len = strlen(self->database);
        char *enc_db;
        Newx(enc_db, db_len * 3 + 1, char);
        size_t enc_len = url_encode(self->database, db_len, enc_db);
        plen = (size_t)snprintf(params, params_cap, "?database=%.*s&wait_end_of_query=1",
                        (int)enc_len, enc_db);
        Safefree(enc_db);
    } else {
        plen = (size_t)snprintf(params, params_cap, "?wait_end_of_query=1");
    }
    if (self->session_id) {
        size_t sid_len = strlen(self->session_id);
        char *enc_sid;
        Newx(enc_sid, sid_len * 3 + 1, char);
        size_t enc_len = url_encode(self->session_id, sid_len, enc_sid);
        plen += (size_t)snprintf(params + plen, params_cap - plen,
                         "&session_id=%.*s", (int)enc_len, enc_sid);
        Safefree(enc_sid);
    }
    plen = append_settings_url_params(params, plen,
                                       defaults, overrides,
                                       &query_id, &query_id_len);
    if (query_id) {
        size_t need = plen + 10 + query_id_len * 3 + 1;
        if (need > params_cap) {
            params_cap = need;
            Renew(params, params_cap, char);
        }
        plen += (size_t)snprintf(params + plen, params_cap - plen, "&query_id=");
        plen += url_encode(query_id, query_id_len, params + plen);
    }
    params[plen] = '\0';

    req_cap = 512 + body_len + plen
           + (self->host ? strlen(self->host) : 0)
           + (self->user ? strlen(self->user) : 0)
           + (self->password ? strlen(self->password) : 0);
    Newx(req, req_cap, char);

    /* request line */
    pos += snprintf(req + pos, req_cap - pos,
                    "POST /%s HTTP/1.1\r\n", params);
    Safefree(params);

    /* headers */
    pos += snprintf(req + pos, req_cap - pos,
                    "Host: %s:%u\r\n", self->host, self->port);
    if (self->user) {
        pos += snprintf(req + pos, req_cap - pos,
                        "X-ClickHouse-User: %s\r\n", self->user);
    }
    if (self->password && self->password[0]) {
        pos += snprintf(req + pos, req_cap - pos,
                        "X-ClickHouse-Key: %s\r\n", self->password);
    }
    pos += snprintf(req + pos, req_cap - pos, "Connection: keep-alive\r\n");

    if (content_encoding)
        pos += snprintf(req + pos, req_cap - pos, "%s", content_encoding);

    if (self->compress)
        pos += snprintf(req + pos, req_cap - pos, "Accept-Encoding: gzip\r\n");

    pos += snprintf(req + pos, req_cap - pos,
                    "Content-Length: %lu\r\n\r\n", (unsigned long)body_len);

    /* body */
    if (body_len > 0) {
        if (pos + body_len > req_cap) {
            req_cap = pos + body_len + 1;
            Renew(req, req_cap, char);
        }
        Copy(body ? body : sql, req + pos, body_len, char);
        pos += body_len;
    }

    if (body) Safefree(body);

    *req_len = pos;
    return req;
}

/*
 * Build HTTP POST request for INSERT with data.
 * Query goes in URL param, data in body.
 */
static char* build_http_insert_request(ev_clickhouse_t *self, const char *table,
                                        size_t table_len, const char *data,
                                        size_t data_len, int do_compress,
                                        HV *defaults, HV *overrides,
                                        size_t *req_len) {
    char *req;
    size_t req_cap;
    size_t pos = 0;
    char *body = NULL;
    size_t body_len = data_len;
    const char *content_encoding = NULL;

    if (do_compress && data_len > 0) {
        size_t gz_len;
        body = gzip_compress(data, data_len, &gz_len);
        if (body) {
            body_len = gz_len;
            content_encoding = "Content-Encoding: gzip\r\n";
        }
    }

    /* build query string: INSERT INTO <table> FORMAT TabSeparated */
    size_t isql_cap = table_len + 64;
    char *insert_sql;
    Newx(insert_sql, isql_cap, char);
    int isql_len = snprintf(insert_sql, isql_cap,
                            "INSERT INTO %.*s FORMAT TabSeparated",
                            (int)table_len, table);

    const char *query_id = NULL;

ClickHouse.xs  view on Meta::CPAN

        char *enc_db;
        Newx(enc_db, db_len * 3 + 1, char);
        size_t enc_len = url_encode(self->database, db_len, enc_db);
        plen = (size_t)snprintf(params, params_cap, "?database=%.*s&wait_end_of_query=1",
                        (int)enc_len, enc_db);
        Safefree(enc_db);
    } else {
        plen = (size_t)snprintf(params, params_cap, "?wait_end_of_query=1");
    }
    if (self->session_id) {
        size_t sid_len = strlen(self->session_id);
        char *enc_sid;
        Newx(enc_sid, sid_len * 3 + 1, char);
        size_t enc_len = url_encode(self->session_id, sid_len, enc_sid);
        plen += (size_t)snprintf(params + plen, params_cap - plen,
                         "&session_id=%.*s", (int)enc_len, enc_sid);
        Safefree(enc_sid);
    }
    {
        char *enc_q;
        Newx(enc_q, isql_len * 3 + 1, char);
        size_t enc_len = url_encode(insert_sql, isql_len, enc_q);
        Safefree(insert_sql);
        plen += (size_t)snprintf(params + plen, params_cap - plen,
                         "&query=%.*s", (int)enc_len, enc_q);
        Safefree(enc_q);
    }
    plen = append_settings_url_params(params, plen,
                                       defaults, overrides,
                                       &query_id, &query_id_len);
    if (query_id) {
        size_t need = plen + 10 + query_id_len * 3 + 1;
        if (need > params_cap) {
            params_cap = need;
            Renew(params, params_cap, char);
        }
        plen += (size_t)snprintf(params + plen, params_cap - plen, "&query_id=");
        plen += url_encode(query_id, query_id_len, params + plen);
    }
    params[plen] = '\0';

    req_cap = 512 + body_len + plen
           + (self->host ? strlen(self->host) : 0)
           + (self->user ? strlen(self->user) : 0)
           + (self->password ? strlen(self->password) : 0);
    Newx(req, req_cap, char);

    pos += snprintf(req + pos, req_cap - pos,
                    "POST /%s HTTP/1.1\r\n", params);
    Safefree(params);
    pos += snprintf(req + pos, req_cap - pos,
                    "Host: %s:%u\r\n", self->host, self->port);
    if (self->user) {
        pos += snprintf(req + pos, req_cap - pos,
                        "X-ClickHouse-User: %s\r\n", self->user);
    }
    if (self->password && self->password[0]) {
        pos += snprintf(req + pos, req_cap - pos,
                        "X-ClickHouse-Key: %s\r\n", self->password);
    }
    pos += snprintf(req + pos, req_cap - pos, "Connection: keep-alive\r\n");

    if (do_compress)
        pos += snprintf(req + pos, req_cap - pos, "Accept-Encoding: gzip\r\n");

    if (content_encoding)
        pos += snprintf(req + pos, req_cap - pos, "%s", content_encoding);

    pos += snprintf(req + pos, req_cap - pos,
                    "Content-Length: %lu\r\n\r\n", (unsigned long)body_len);

    if (body_len > 0) {
        if (pos + body_len > req_cap) {
            req_cap = pos + body_len + 1;
            Renew(req, req_cap, char);
        }
        Copy(body ? body : data, req + pos, body_len, char);
        pos += body_len;
    }

    if (body) Safefree(body);

    *req_len = pos;
    return req;
}

/* Build HTTP GET /ping request */
static char* build_http_ping_request(ev_clickhouse_t *self, size_t *req_len) {
    char *req;
    size_t req_cap = 128 + (self->host ? strlen(self->host) : 0);
    size_t pos = 0;

    Newx(req, req_cap, char);
    pos = snprintf(req, req_cap,
                   "GET /ping HTTP/1.1\r\n"
                   "Host: %s:%u\r\n"
                   "Connection: keep-alive\r\n\r\n",
                   self->host, self->port);
    if (pos >= req_cap) pos = req_cap - 1;
    *req_len = pos;
    return req;
}

/* --- HTTP response parsing --- */

/* Find \r\n\r\n in recv_buf. Returns offset past it, or 0 if not found. */
static size_t find_header_end(const char *buf, size_t len) {
    size_t i;
    if (len < 4) return 0;
    for (i = 0; i <= len - 4; i++) {
        if (buf[i] == '\r' && buf[i+1] == '\n' &&
            buf[i+2] == '\r' && buf[i+3] == '\n') {
            return i + 4;
        }
    }
    return 0;
}

/* Extract ClickHouse error code from HTTP error body ("Code: NNN. ...") */
static int32_t parse_ch_error_code(const char *body, size_t len) {
    if (len > 6 && memcmp(body, "Code: ", 6) == 0)
        return (int32_t)atoi(body + 6);
    return 0;
}

/* Parse HTTP status line, extract status code */
static int parse_http_status(const char *buf, size_t len) {
    /* HTTP/1.1 200 OK\r\n */
    const char *p = buf;
    const char *end = buf + len;
    int status;

    /* skip "HTTP/1.x " */
    while (p < end && *p != ' ') p++;
    if (p >= end) return 0;
    p++;

    status = atoi(p);
    if (status < 100 || status > 599) return 500; /* treat malformed as server error */
    return status;
}

/* Find header value (case-insensitive). Returns pointer into buf or NULL. */
static const char* find_header(const char *headers, size_t headers_len,
                                const char *name, size_t *value_len) {
    size_t name_len = strlen(name);
    const char *p = headers;
    const char *end = headers + headers_len;

    while (p < end) {
        const char *line_end = p;
        while (line_end < end && *line_end != '\r') line_end++;

        if ((size_t)(line_end - p) > name_len + 1 && p[name_len] == ':') {
            int match = 1;
            size_t i;
            for (i = 0; i < name_len; i++) {

ClickHouse.xs  view on Meta::CPAN

}

static void timer_cb(EV_P_ ev_timer *w, int revents) {
    ev_clickhouse_t *self = (ev_clickhouse_t *)w->data;
    (void)loop;
    (void)revents;

    if (self == NULL || self->magic != EV_CH_MAGIC) return;

    self->timing = 0;

    if (self->connecting) {
        stop_writing(self);
        self->callback_depth++;
        emit_error(self, "connect timeout");
        self->callback_depth--;
        if (check_destroyed(self)) return;
        if (cancel_pending(self, "connect timeout")) return;
        cleanup_connection(self);
    } else {
        /* query timeout */
        if (self->native_rows) {
            SvREFCNT_dec((SV*)self->native_rows);
            self->native_rows = NULL;
        }
        if (self->native_col_names) {
            SvREFCNT_dec((SV*)self->native_col_names);
            self->native_col_names = NULL;
        }
        if (self->native_col_types) {
            SvREFCNT_dec((SV*)self->native_col_types);
            self->native_col_types = NULL;
        }
        lc_free_dicts(self);
        if (self->insert_data) {
            Safefree(self->insert_data);
            self->insert_data = NULL;
            self->insert_data_len = 0;
        }
        if (self->insert_av) {
            SvREFCNT_dec(self->insert_av);
            self->insert_av = NULL;
        }
        if (self->insert_err) {
            Safefree(self->insert_err);
            self->insert_err = NULL;
        }
        self->native_state = NATIVE_IDLE;
        if (self->send_count > 0) self->send_count--;

        if (deliver_error(self, "query timeout")) return;

        /* Must reconnect — server may still be processing */
        if (cancel_pending(self, "query timeout")) return;
        cleanup_connection(self);
        if (self->auto_reconnect && self->host)
            schedule_reconnect(self);
    }
}

/* --- Keepalive timer callback --- */

static void ka_timer_cb(EV_P_ ev_timer *w, int revents) {
    ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
        offsetof(ev_clickhouse_t, ka_timer));
    (void)revents;

    if (self->magic != EV_CH_MAGIC) return;
    if (!self->connected || self->send_count > 0) return;

    /* Send a ping to keep the connection alive */
    if (self->protocol == PROTO_NATIVE) {
        native_buf_t pkt;
        nbuf_init(&pkt);
        nbuf_varuint(&pkt, CLIENT_PING);
        ensure_send_cap(self, self->send_len + pkt.len);
        Copy(pkt.data, self->send_buf + self->send_len, pkt.len, char);
        self->send_len += pkt.len;
        Safefree(pkt.data);
        if (!self->writing) start_writing(self);
    }
    /* HTTP: no-op ping — just rely on TCP keepalive or let the
     * connection drop and auto-reconnect handles it. */
}

static void start_keepalive(ev_clickhouse_t *self) {
    if (self->keepalive > 0 && !self->ka_timing && self->connected) {
        ev_timer_init(&self->ka_timer, ka_timer_cb, self->keepalive, self->keepalive);
        ev_timer_start(self->loop, &self->ka_timer);
        self->ka_timing = 1;
    }
}

static void stop_keepalive(ev_clickhouse_t *self) {
    if (self->ka_timing) {
        ev_timer_stop(self->loop, &self->ka_timer);
        self->ka_timing = 0;
    }
}

/* --- Reconnect with backoff --- */

static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
    ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
        offsetof(ev_clickhouse_t, reconnect_timer));
    (void)revents; (void)loop;
    self->reconnect_timing = 0;
    if (self->magic != EV_CH_MAGIC || self->connected || self->connecting) return;
    start_connect(self);
}

static void schedule_reconnect(ev_clickhouse_t *self) {
    if (!self->auto_reconnect || !self->host || self->magic != EV_CH_MAGIC) return;
    if (self->reconnect_delay <= 0) {
        self->reconnect_attempts = 0;
        start_connect(self);
        return;
    }
    double delay = self->reconnect_delay;
    int i;
    for (i = 0; i < self->reconnect_attempts && i < 20; i++)
        delay *= 2;
    if (self->reconnect_max_delay > 0 && delay > self->reconnect_max_delay)
        delay = self->reconnect_max_delay;
    self->reconnect_attempts++;
    if (self->reconnect_timing) {
        ev_timer_stop(self->loop, &self->reconnect_timer);
        self->reconnect_timing = 0;
    }
    ev_timer_init(&self->reconnect_timer, reconnect_timer_cb, delay, 0);
    ev_timer_start(self->loop, &self->reconnect_timer);
    self->reconnect_timing = 1;
}

/* Free LowCardinality cross-block dictionary state */
static void lc_free_dicts(ev_clickhouse_t *self) {
    if (self->lc_dicts) {
        int c;
        for (c = 0; c < self->lc_num_cols; c++) {
            if (self->lc_dicts[c]) {
                uint64_t j;
                for (j = 0; j < self->lc_dict_sizes[c]; j++)
                    SvREFCNT_dec(self->lc_dicts[c][j]);
                Safefree(self->lc_dicts[c]);
            }
        }
        Safefree(self->lc_dicts);
        Safefree(self->lc_dict_sizes);
        self->lc_dicts = NULL;
        self->lc_dict_sizes = NULL;
        self->lc_num_cols = 0;
    }
}

/* --- Pipeline orchestrator --- */

/*
 * ClickHouse HTTP does not support true HTTP pipelining.
 * We send one request at a time, wait for the response, then send the next.
 */
/* Returns 1 if self was freed (caller must not access self). */
static int pipeline_advance(ev_clickhouse_t *self) {
    if (!self->connected) return 0;

    /* if we're still waiting for a response, just ensure reading */
    if (self->send_count > 0) {
        start_reading(self);
        return 0;
    }

    /* Check drain callback when all pending work is done */
    if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0
        && self->on_drain) {
        SV *drain_cb = self->on_drain;
        self->on_drain = NULL;
        {
            dSP;
            ENTER;
            SAVETMPS;
            PUSHMARK(SP);
            PUTBACK;
            self->callback_depth++;
            call_sv(drain_cb, G_DISCARD | G_EVAL);
            self->callback_depth--;
            if (SvTRUE(ERRSV))
                warn("EV::ClickHouse: drain callback died: %s",
                     SvPV_nolen(ERRSV));
            FREETMPS;
            LEAVE;
        }
        SvREFCNT_dec(drain_cb);
        if (check_destroyed(self)) return 1;
    }

    /* Restart keepalive timer when idle */
    if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0
        && self->keepalive > 0 && !self->ka_timing) {
        start_keepalive(self);
    }

    /* send next request from queue */
    if (!ngx_queue_empty(&self->send_queue)) {
        ngx_queue_t *q = ngx_queue_head(&self->send_queue);
        ev_ch_send_t *send = ngx_queue_data(q, ev_ch_send_t, queue);

        /* Stop keepalive during active query */
        stop_keepalive(self);
        emit_trace(self, "dispatch query (pending=%d)", self->pending_count);

        /* set up send buffer */
        ensure_send_cap(self, send->data_len);
        Copy(send->data, self->send_buf, send->data_len, char);
        self->send_len = send->data_len;
        self->send_pos = 0;

        /* move cb to recv queue */
        ngx_queue_remove(q);
        push_cb_owned_ex(self, send->cb, send->raw,
                          send->on_data, send->query_timeout);
        if (send->on_data) { SvREFCNT_dec(send->on_data); send->on_data = NULL; }
        /* Track query_id */
        if (self->last_query_id) { Safefree(self->last_query_id); self->last_query_id = NULL; }
        if (send->query_id) { self->last_query_id = send->query_id; send->query_id = NULL; }

        /* transfer deferred insert data from send entry to self */
        if (send->insert_data) {
            self->insert_data = send->insert_data;
            self->insert_data_len = send->insert_data_len;
            send->insert_data = NULL;
        }
        if (send->insert_av) {
            self->insert_av = send->insert_av;
            send->insert_av = NULL;
        }

        Safefree(send->data);
        {
            double qt = send->query_timeout;
            release_send(send);
            self->send_count++;

            /* Start query timeout timer */
            {
                double timeout = qt > 0 ? qt : self->query_timeout;
                if (timeout > 0 && !self->timing) {
                    ev_timer_set(&self->timer, (ev_tstamp)timeout, 0.0);
                    ev_timer_start(self->loop, &self->timer);
                    self->timing = 1;
                }
            }
        }

        if (self->protocol == PROTO_NATIVE) {
            if (self->insert_data || self->insert_av)
                self->native_state = NATIVE_WAIT_INSERT_META;
            else
                self->native_state = NATIVE_WAIT_RESULT;
        }

        return try_write(self);
    }
    return 0;
}

/* --- OpenSSL init (must be in plain C, not inside XS BOOT) --- */

static void ch_openssl_init(void) {

ClickHouse.xs  view on Meta::CPAN

last_totals(EV::ClickHouse self)
CODE:
{
    if (self->native_totals)
        RETVAL = newRV_inc((SV*)self->native_totals);
    else
        RETVAL = &PL_sv_undef;
}
OUTPUT:
    RETVAL

SV *
last_extremes(EV::ClickHouse self)
CODE:
{
    if (self->native_extremes)
        RETVAL = newRV_inc((SV*)self->native_extremes);
    else
        RETVAL = &PL_sv_undef;
}
OUTPUT:
    RETVAL

SV *
profile_rows_before_limit(EV::ClickHouse self)
CODE:
{
    RETVAL = newSVuv(self->profile_rows_before_limit);
}
OUTPUT:
    RETVAL

SV *
profile_rows(EV::ClickHouse self)
CODE:
{
    RETVAL = newSVuv(self->profile_rows);
}
OUTPUT:
    RETVAL

SV *
profile_bytes(EV::ClickHouse self)
CODE:
{
    RETVAL = newSVuv(self->profile_bytes);
}
OUTPUT:
    RETVAL

SV*
on_trace(EV::ClickHouse self, SV *handler = NULL)
CODE:
{
    RETVAL = handler_accessor(&self->on_trace, handler, items > 1);
}
OUTPUT:
    RETVAL

void
_set_keepalive(EV::ClickHouse self, double val)
CODE:
{
    self->keepalive = val;
}

void
_set_reconnect_delay(EV::ClickHouse self, double val)
CODE:
{
    self->reconnect_delay = val;
}

void
_set_reconnect_max_delay(EV::ClickHouse self, double val)
CODE:
{
    self->reconnect_max_delay = val;
}

void
drain(EV::ClickHouse self, SV *cb)
CODE:
{
    if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
        croak("drain callback must be a CODE reference");
    if (self->on_drain) SvREFCNT_dec(self->on_drain);
    if (self->pending_count == 0 && ngx_queue_empty(&self->send_queue)) {
        /* Nothing pending — fire immediately */
        self->on_drain = NULL;
        {
            dSP;
            ENTER;
            SAVETMPS;
            PUSHMARK(SP);
            PUTBACK;
            self->callback_depth++;
            call_sv(cb, G_DISCARD | G_EVAL);
            self->callback_depth--;
            if (SvTRUE(ERRSV))
                warn("EV::ClickHouse: drain callback died: %s",
                     SvPV_nolen(ERRSV));
            FREETMPS;
            LEAVE;
        }
        check_destroyed(self);
    } else {
        self->on_drain = SvREFCNT_inc(cb);
    }
}

void
cancel(EV::ClickHouse self)
CODE:
{
    if (self->protocol == PROTO_NATIVE && self->send_count > 0) {
        /* Send CLIENT_CANCEL packet */
        native_buf_t pkt;
        nbuf_init(&pkt);
        nbuf_varuint(&pkt, CLIENT_CANCEL);
        ensure_send_cap(self, self->send_len + pkt.len);
        Copy(pkt.data, self->send_buf + self->send_len, pkt.len, char);
        self->send_len += pkt.len;
        Safefree(pkt.data);



( run in 1.547 second using v1.01-cache-2.11-cpan-39bf76dae61 )