EV-ClickHouse
view release on metacpan or search on metacpan
ClickHouse.xs view on Meta::CPAN
ev_timer timer;
int reading, writing, timing;
int connected, connecting;
int protocol; /* PROTO_HTTP or PROTO_NATIVE */
#ifdef HAVE_OPENSSL
SSL_CTX *ssl_ctx;
SSL *ssl;
#endif
int tls_enabled;
char *tls_ca_file;
/* connection params */
char *host, *user, *password, *database;
unsigned int port;
/* send/recv buffers */
char *send_buf;
size_t send_len, send_pos, send_cap;
char *recv_buf;
size_t recv_len, recv_cap;
/* native protocol state */
char *server_name;
char *server_display_name;
char *server_timezone;
unsigned int server_version_major, server_version_minor, server_revision;
unsigned int server_version_patch;
int native_state; /* NATIVE_IDLE, NATIVE_WAIT_HELLO, NATIVE_WAIT_RESULT, ... */
AV *native_rows; /* accumulate rows across Data blocks */
char *insert_data; /* pending TabSeparated data for two-phase INSERT */
size_t insert_data_len;
SV *insert_av; /* pending AV* of AV*s for arrayref INSERT */
char *insert_err; /* deferred error from unsupported INSERT encoding */
/* queues */
ngx_queue_t cb_queue;
ngx_queue_t send_queue;
int pending_count;
int send_count;
/* options */
char *session_id;
int compress;
double connect_timeout;
HV *default_settings; /* connection-level ClickHouse settings */
SV *on_connect;
SV *on_error;
SV *on_progress;
SV *on_disconnect;
int tls_skip_verify;
double query_timeout;
int auto_reconnect;
uint32_t decode_flags;
AV *native_col_names; /* column names from last native result */
AV *native_col_types; /* column type strings from last native result */
SV *on_drain; /* callback fired when pending_count drops to 0 */
char *last_query_id; /* query_id of the last dispatched query */
SV *on_trace; /* debug trace callback */
ev_timer ka_timer; /* keepalive timer */
double keepalive; /* keepalive interval (0 = disabled) */
int ka_timing;
int callback_depth;
/* error info from last SERVER_EXCEPTION or HTTP error */
int32_t last_error_code;
/* profile info from last SERVER_PROFILE_INFO */
uint64_t profile_rows;
uint64_t profile_bytes;
uint64_t profile_rows_before_limit;
/* totals / extremes from last native query */
AV *native_totals;
AV *native_extremes;
/* reconnect backoff */
double reconnect_delay;
double reconnect_max_delay;
int reconnect_attempts;
ev_timer reconnect_timer;
int reconnect_timing;
/* LowCardinality cross-block dictionary state */
SV ***lc_dicts; /* array of dictionaries, one per column */
uint64_t *lc_dict_sizes; /* size of each dictionary */
int lc_num_cols; /* number of columns with LC state */
};
struct ev_ch_cb_s {
SV *cb;
int raw; /* return raw response body instead of parsed rows */
SV *on_data; /* per-query streaming callback (fires per block) */
double query_timeout; /* per-query timeout (0=use default) */
ngx_queue_t queue;
};
struct ev_ch_send_s {
char *data; /* full HTTP request or native packet */
size_t data_len;
SV *cb;
char *insert_data; /* deferred TSV data for native INSERT */
size_t insert_data_len;
SV *insert_av; /* deferred AV* data for native INSERT */
int raw; /* return raw response body */
SV *on_data; /* per-query streaming callback */
double query_timeout; /* per-query timeout */
char *query_id; /* query_id for tracking */
ngx_queue_t queue;
};
/* forward declarations */
static void io_cb(EV_P_ ev_io *w, int revents);
static void timer_cb(EV_P_ ev_timer *w, int revents);
static void ka_timer_cb(EV_P_ ev_timer *w, int revents);
static void start_keepalive(ev_clickhouse_t *self);
static void stop_keepalive(ev_clickhouse_t *self);
static void schedule_reconnect(ev_clickhouse_t *self);
static void lc_free_dicts(ev_clickhouse_t *self);
static void start_reading(ev_clickhouse_t *self);
static void stop_reading(ev_clickhouse_t *self);
static void start_writing(ev_clickhouse_t *self);
static void stop_writing(ev_clickhouse_t *self);
static void emit_error(ev_clickhouse_t *self, const char *msg);
static void emit_trace(ev_clickhouse_t *self, const char *fmt, ...);
static void cleanup_connection(ev_clickhouse_t *self);
static int cancel_pending(ev_clickhouse_t *self, const char *errmsg);
static int check_destroyed(ev_clickhouse_t *self);
static void on_connect_done(ev_clickhouse_t *self);
static void process_http_response(ev_clickhouse_t *self);
static int try_write(ev_clickhouse_t *self);
static int pipeline_advance(ev_clickhouse_t *self);
static void on_readable(ev_clickhouse_t *self);
/* --- freelist for cb_queue entries --- */
static ev_ch_cb_t *cbt_freelist = NULL;
static ev_ch_cb_t* alloc_cbt(void) {
ev_ch_cb_t *cbt;
if (cbt_freelist) {
cbt = cbt_freelist;
cbt_freelist = *(ev_ch_cb_t **)cbt;
} else {
Newx(cbt, 1, ev_ch_cb_t);
}
cbt->raw = 0;
cbt->on_data = NULL;
cbt->query_timeout = 0;
return cbt;
}
static void release_cbt(ev_ch_cb_t *cbt) {
*(ev_ch_cb_t **)cbt = cbt_freelist;
cbt_freelist = cbt;
}
/* --- freelist for send_queue entries --- */
static ev_ch_send_t *send_freelist = NULL;
static ev_ch_send_t* alloc_send(void) {
ev_ch_send_t *s;
if (send_freelist) {
s = send_freelist;
send_freelist = *(ev_ch_send_t **)s;
} else {
Newx(s, 1, ev_ch_send_t);
}
s->insert_data = NULL;
s->insert_data_len = 0;
s->insert_av = NULL;
s->raw = 0;
s->on_data = NULL;
s->query_timeout = 0;
s->query_id = NULL;
return s;
ClickHouse.xs view on Meta::CPAN
}
self->callback_depth--;
return check_destroyed(self);
}
static void push_cb_owned_ex(ev_clickhouse_t *self, SV *cb, int raw,
SV *on_data, double query_timeout) {
ev_ch_cb_t *cbt = alloc_cbt();
cbt->cb = cb;
cbt->raw = raw;
cbt->on_data = on_data ? SvREFCNT_inc(on_data) : NULL;
cbt->query_timeout = query_timeout;
ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
/* pending_count already counted */
}
static SV* handler_accessor(SV **slot, SV *handler, int has_arg) {
if (has_arg) {
if (NULL != *slot) {
SvREFCNT_dec(*slot);
*slot = NULL;
}
if (NULL != handler && SvOK(handler) &&
SvROK(handler) && SvTYPE(SvRV(handler)) == SVt_PVCV) {
*slot = SvREFCNT_inc(handler);
}
}
return (NULL != *slot) ? SvREFCNT_inc(*slot) : &PL_sv_undef;
}
static char* safe_strdup(const char *s) {
char *d;
size_t len;
if (!s) return NULL;
len = strlen(s);
Newx(d, len + 1, char);
Copy(s, d, len + 1, char);
return d;
}
static int has_http_unsafe_chars(const char *s) {
if (!s) return 0;
for (; *s; s++)
if (*s == '\r' || *s == '\n') return 1;
return 0;
}
static int is_ip_literal(const char *s) {
struct in_addr a4;
struct in6_addr a6;
return (inet_pton(AF_INET, s, &a4) == 1 ||
inet_pton(AF_INET6, s, &a6) == 1);
}
static void cleanup_connection(ev_clickhouse_t *self) {
int was_connected = self->connected;
if (was_connected) emit_trace(self, "disconnect");
stop_reading(self);
stop_writing(self);
stop_keepalive(self);
if (self->timing) {
ev_timer_stop(self->loop, &self->timer);
self->timing = 0;
}
#ifdef HAVE_OPENSSL
if (self->ssl) {
SSL_shutdown(self->ssl);
SSL_free(self->ssl);
self->ssl = NULL;
}
if (self->ssl_ctx) {
SSL_CTX_free(self->ssl_ctx);
self->ssl_ctx = NULL;
}
#endif
if (self->fd >= 0) {
close(self->fd);
self->fd = -1;
}
self->connected = 0;
self->connecting = 0;
/* fire on_disconnect if we were connected */
if (was_connected && NULL != self->on_disconnect) {
self->callback_depth++;
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
PUTBACK;
call_sv(self->on_disconnect, G_DISCARD | G_EVAL);
if (SvTRUE(ERRSV))
warn("EV::ClickHouse: exception in disconnect handler: %s",
SvPV_nolen(ERRSV));
FREETMPS;
LEAVE;
}
self->callback_depth--;
}
self->send_len = 0;
self->send_pos = 0;
self->recv_len = 0;
self->send_count = 0;
self->native_state = NATIVE_IDLE;
if (self->native_rows) {
SvREFCNT_dec((SV*)self->native_rows);
self->native_rows = NULL;
}
if (self->native_col_names) {
SvREFCNT_dec((SV*)self->native_col_names);
self->native_col_names = NULL;
}
if (self->native_col_types) {
SvREFCNT_dec((SV*)self->native_col_types);
self->native_col_types = NULL;
}
ClickHouse.xs view on Meta::CPAN
+ settings_url_params_size(defaults, overrides);
char *params;
size_t plen = 0;
Newx(params, params_cap, char);
if (self->database) {
size_t db_len = strlen(self->database);
char *enc_db;
Newx(enc_db, db_len * 3 + 1, char);
size_t enc_len = url_encode(self->database, db_len, enc_db);
plen = (size_t)snprintf(params, params_cap, "?database=%.*s&wait_end_of_query=1",
(int)enc_len, enc_db);
Safefree(enc_db);
} else {
plen = (size_t)snprintf(params, params_cap, "?wait_end_of_query=1");
}
if (self->session_id) {
size_t sid_len = strlen(self->session_id);
char *enc_sid;
Newx(enc_sid, sid_len * 3 + 1, char);
size_t enc_len = url_encode(self->session_id, sid_len, enc_sid);
plen += (size_t)snprintf(params + plen, params_cap - plen,
"&session_id=%.*s", (int)enc_len, enc_sid);
Safefree(enc_sid);
}
plen = append_settings_url_params(params, plen,
defaults, overrides,
&query_id, &query_id_len);
if (query_id) {
size_t need = plen + 10 + query_id_len * 3 + 1;
if (need > params_cap) {
params_cap = need;
Renew(params, params_cap, char);
}
plen += (size_t)snprintf(params + plen, params_cap - plen, "&query_id=");
plen += url_encode(query_id, query_id_len, params + plen);
}
params[plen] = '\0';
req_cap = 512 + body_len + plen
+ (self->host ? strlen(self->host) : 0)
+ (self->user ? strlen(self->user) : 0)
+ (self->password ? strlen(self->password) : 0);
Newx(req, req_cap, char);
/* request line */
pos += snprintf(req + pos, req_cap - pos,
"POST /%s HTTP/1.1\r\n", params);
Safefree(params);
/* headers */
pos += snprintf(req + pos, req_cap - pos,
"Host: %s:%u\r\n", self->host, self->port);
if (self->user) {
pos += snprintf(req + pos, req_cap - pos,
"X-ClickHouse-User: %s\r\n", self->user);
}
if (self->password && self->password[0]) {
pos += snprintf(req + pos, req_cap - pos,
"X-ClickHouse-Key: %s\r\n", self->password);
}
pos += snprintf(req + pos, req_cap - pos, "Connection: keep-alive\r\n");
if (content_encoding)
pos += snprintf(req + pos, req_cap - pos, "%s", content_encoding);
if (self->compress)
pos += snprintf(req + pos, req_cap - pos, "Accept-Encoding: gzip\r\n");
pos += snprintf(req + pos, req_cap - pos,
"Content-Length: %lu\r\n\r\n", (unsigned long)body_len);
/* body */
if (body_len > 0) {
if (pos + body_len > req_cap) {
req_cap = pos + body_len + 1;
Renew(req, req_cap, char);
}
Copy(body ? body : sql, req + pos, body_len, char);
pos += body_len;
}
if (body) Safefree(body);
*req_len = pos;
return req;
}
/*
* Build HTTP POST request for INSERT with data.
* Query goes in URL param, data in body.
*/
static char* build_http_insert_request(ev_clickhouse_t *self, const char *table,
size_t table_len, const char *data,
size_t data_len, int do_compress,
HV *defaults, HV *overrides,
size_t *req_len) {
char *req;
size_t req_cap;
size_t pos = 0;
char *body = NULL;
size_t body_len = data_len;
const char *content_encoding = NULL;
if (do_compress && data_len > 0) {
size_t gz_len;
body = gzip_compress(data, data_len, &gz_len);
if (body) {
body_len = gz_len;
content_encoding = "Content-Encoding: gzip\r\n";
}
}
/* build query string: INSERT INTO <table> FORMAT TabSeparated */
size_t isql_cap = table_len + 64;
char *insert_sql;
Newx(insert_sql, isql_cap, char);
int isql_len = snprintf(insert_sql, isql_cap,
"INSERT INTO %.*s FORMAT TabSeparated",
(int)table_len, table);
const char *query_id = NULL;
ClickHouse.xs view on Meta::CPAN
char *enc_db;
Newx(enc_db, db_len * 3 + 1, char);
size_t enc_len = url_encode(self->database, db_len, enc_db);
plen = (size_t)snprintf(params, params_cap, "?database=%.*s&wait_end_of_query=1",
(int)enc_len, enc_db);
Safefree(enc_db);
} else {
plen = (size_t)snprintf(params, params_cap, "?wait_end_of_query=1");
}
if (self->session_id) {
size_t sid_len = strlen(self->session_id);
char *enc_sid;
Newx(enc_sid, sid_len * 3 + 1, char);
size_t enc_len = url_encode(self->session_id, sid_len, enc_sid);
plen += (size_t)snprintf(params + plen, params_cap - plen,
"&session_id=%.*s", (int)enc_len, enc_sid);
Safefree(enc_sid);
}
{
char *enc_q;
Newx(enc_q, isql_len * 3 + 1, char);
size_t enc_len = url_encode(insert_sql, isql_len, enc_q);
Safefree(insert_sql);
plen += (size_t)snprintf(params + plen, params_cap - plen,
"&query=%.*s", (int)enc_len, enc_q);
Safefree(enc_q);
}
plen = append_settings_url_params(params, plen,
defaults, overrides,
&query_id, &query_id_len);
if (query_id) {
size_t need = plen + 10 + query_id_len * 3 + 1;
if (need > params_cap) {
params_cap = need;
Renew(params, params_cap, char);
}
plen += (size_t)snprintf(params + plen, params_cap - plen, "&query_id=");
plen += url_encode(query_id, query_id_len, params + plen);
}
params[plen] = '\0';
req_cap = 512 + body_len + plen
+ (self->host ? strlen(self->host) : 0)
+ (self->user ? strlen(self->user) : 0)
+ (self->password ? strlen(self->password) : 0);
Newx(req, req_cap, char);
pos += snprintf(req + pos, req_cap - pos,
"POST /%s HTTP/1.1\r\n", params);
Safefree(params);
pos += snprintf(req + pos, req_cap - pos,
"Host: %s:%u\r\n", self->host, self->port);
if (self->user) {
pos += snprintf(req + pos, req_cap - pos,
"X-ClickHouse-User: %s\r\n", self->user);
}
if (self->password && self->password[0]) {
pos += snprintf(req + pos, req_cap - pos,
"X-ClickHouse-Key: %s\r\n", self->password);
}
pos += snprintf(req + pos, req_cap - pos, "Connection: keep-alive\r\n");
if (do_compress)
pos += snprintf(req + pos, req_cap - pos, "Accept-Encoding: gzip\r\n");
if (content_encoding)
pos += snprintf(req + pos, req_cap - pos, "%s", content_encoding);
pos += snprintf(req + pos, req_cap - pos,
"Content-Length: %lu\r\n\r\n", (unsigned long)body_len);
if (body_len > 0) {
if (pos + body_len > req_cap) {
req_cap = pos + body_len + 1;
Renew(req, req_cap, char);
}
Copy(body ? body : data, req + pos, body_len, char);
pos += body_len;
}
if (body) Safefree(body);
*req_len = pos;
return req;
}
/* Build HTTP GET /ping request */
static char* build_http_ping_request(ev_clickhouse_t *self, size_t *req_len) {
char *req;
size_t req_cap = 128 + (self->host ? strlen(self->host) : 0);
size_t pos = 0;
Newx(req, req_cap, char);
pos = snprintf(req, req_cap,
"GET /ping HTTP/1.1\r\n"
"Host: %s:%u\r\n"
"Connection: keep-alive\r\n\r\n",
self->host, self->port);
if (pos >= req_cap) pos = req_cap - 1;
*req_len = pos;
return req;
}
/* --- HTTP response parsing --- */
/* Find \r\n\r\n in recv_buf. Returns offset past it, or 0 if not found. */
static size_t find_header_end(const char *buf, size_t len) {
size_t i;
if (len < 4) return 0;
for (i = 0; i <= len - 4; i++) {
if (buf[i] == '\r' && buf[i+1] == '\n' &&
buf[i+2] == '\r' && buf[i+3] == '\n') {
return i + 4;
}
}
return 0;
}
/* Extract ClickHouse error code from HTTP error body ("Code: NNN. ...") */
static int32_t parse_ch_error_code(const char *body, size_t len) {
if (len > 6 && memcmp(body, "Code: ", 6) == 0)
return (int32_t)atoi(body + 6);
return 0;
}
/* Parse HTTP status line, extract status code */
static int parse_http_status(const char *buf, size_t len) {
/* HTTP/1.1 200 OK\r\n */
const char *p = buf;
const char *end = buf + len;
int status;
/* skip "HTTP/1.x " */
while (p < end && *p != ' ') p++;
if (p >= end) return 0;
p++;
status = atoi(p);
if (status < 100 || status > 599) return 500; /* treat malformed as server error */
return status;
}
/* Find header value (case-insensitive). Returns pointer into buf or NULL. */
static const char* find_header(const char *headers, size_t headers_len,
const char *name, size_t *value_len) {
size_t name_len = strlen(name);
const char *p = headers;
const char *end = headers + headers_len;
while (p < end) {
const char *line_end = p;
while (line_end < end && *line_end != '\r') line_end++;
if ((size_t)(line_end - p) > name_len + 1 && p[name_len] == ':') {
int match = 1;
size_t i;
for (i = 0; i < name_len; i++) {
ClickHouse.xs view on Meta::CPAN
}
static void timer_cb(EV_P_ ev_timer *w, int revents) {
ev_clickhouse_t *self = (ev_clickhouse_t *)w->data;
(void)loop;
(void)revents;
if (self == NULL || self->magic != EV_CH_MAGIC) return;
self->timing = 0;
if (self->connecting) {
stop_writing(self);
self->callback_depth++;
emit_error(self, "connect timeout");
self->callback_depth--;
if (check_destroyed(self)) return;
if (cancel_pending(self, "connect timeout")) return;
cleanup_connection(self);
} else {
/* query timeout */
if (self->native_rows) {
SvREFCNT_dec((SV*)self->native_rows);
self->native_rows = NULL;
}
if (self->native_col_names) {
SvREFCNT_dec((SV*)self->native_col_names);
self->native_col_names = NULL;
}
if (self->native_col_types) {
SvREFCNT_dec((SV*)self->native_col_types);
self->native_col_types = NULL;
}
lc_free_dicts(self);
if (self->insert_data) {
Safefree(self->insert_data);
self->insert_data = NULL;
self->insert_data_len = 0;
}
if (self->insert_av) {
SvREFCNT_dec(self->insert_av);
self->insert_av = NULL;
}
if (self->insert_err) {
Safefree(self->insert_err);
self->insert_err = NULL;
}
self->native_state = NATIVE_IDLE;
if (self->send_count > 0) self->send_count--;
if (deliver_error(self, "query timeout")) return;
/* Must reconnect â server may still be processing */
if (cancel_pending(self, "query timeout")) return;
cleanup_connection(self);
if (self->auto_reconnect && self->host)
schedule_reconnect(self);
}
}
/* --- Keepalive timer callback --- */
static void ka_timer_cb(EV_P_ ev_timer *w, int revents) {
ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
offsetof(ev_clickhouse_t, ka_timer));
(void)revents;
if (self->magic != EV_CH_MAGIC) return;
if (!self->connected || self->send_count > 0) return;
/* Send a ping to keep the connection alive */
if (self->protocol == PROTO_NATIVE) {
native_buf_t pkt;
nbuf_init(&pkt);
nbuf_varuint(&pkt, CLIENT_PING);
ensure_send_cap(self, self->send_len + pkt.len);
Copy(pkt.data, self->send_buf + self->send_len, pkt.len, char);
self->send_len += pkt.len;
Safefree(pkt.data);
if (!self->writing) start_writing(self);
}
/* HTTP: no-op ping â just rely on TCP keepalive or let the
* connection drop and auto-reconnect handles it. */
}
static void start_keepalive(ev_clickhouse_t *self) {
if (self->keepalive > 0 && !self->ka_timing && self->connected) {
ev_timer_init(&self->ka_timer, ka_timer_cb, self->keepalive, self->keepalive);
ev_timer_start(self->loop, &self->ka_timer);
self->ka_timing = 1;
}
}
static void stop_keepalive(ev_clickhouse_t *self) {
if (self->ka_timing) {
ev_timer_stop(self->loop, &self->ka_timer);
self->ka_timing = 0;
}
}
/* --- Reconnect with backoff --- */
static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
offsetof(ev_clickhouse_t, reconnect_timer));
(void)revents; (void)loop;
self->reconnect_timing = 0;
if (self->magic != EV_CH_MAGIC || self->connected || self->connecting) return;
start_connect(self);
}
static void schedule_reconnect(ev_clickhouse_t *self) {
if (!self->auto_reconnect || !self->host || self->magic != EV_CH_MAGIC) return;
if (self->reconnect_delay <= 0) {
self->reconnect_attempts = 0;
start_connect(self);
return;
}
double delay = self->reconnect_delay;
int i;
for (i = 0; i < self->reconnect_attempts && i < 20; i++)
delay *= 2;
if (self->reconnect_max_delay > 0 && delay > self->reconnect_max_delay)
delay = self->reconnect_max_delay;
self->reconnect_attempts++;
if (self->reconnect_timing) {
ev_timer_stop(self->loop, &self->reconnect_timer);
self->reconnect_timing = 0;
}
ev_timer_init(&self->reconnect_timer, reconnect_timer_cb, delay, 0);
ev_timer_start(self->loop, &self->reconnect_timer);
self->reconnect_timing = 1;
}
/* Free LowCardinality cross-block dictionary state */
static void lc_free_dicts(ev_clickhouse_t *self) {
if (self->lc_dicts) {
int c;
for (c = 0; c < self->lc_num_cols; c++) {
if (self->lc_dicts[c]) {
uint64_t j;
for (j = 0; j < self->lc_dict_sizes[c]; j++)
SvREFCNT_dec(self->lc_dicts[c][j]);
Safefree(self->lc_dicts[c]);
}
}
Safefree(self->lc_dicts);
Safefree(self->lc_dict_sizes);
self->lc_dicts = NULL;
self->lc_dict_sizes = NULL;
self->lc_num_cols = 0;
}
}
/* --- Pipeline orchestrator --- */
/*
* ClickHouse HTTP does not support true HTTP pipelining.
* We send one request at a time, wait for the response, then send the next.
*/
/* Returns 1 if self was freed (caller must not access self). */
static int pipeline_advance(ev_clickhouse_t *self) {
if (!self->connected) return 0;
/* if we're still waiting for a response, just ensure reading */
if (self->send_count > 0) {
start_reading(self);
return 0;
}
/* Check drain callback when all pending work is done */
if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0
&& self->on_drain) {
SV *drain_cb = self->on_drain;
self->on_drain = NULL;
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
PUTBACK;
self->callback_depth++;
call_sv(drain_cb, G_DISCARD | G_EVAL);
self->callback_depth--;
if (SvTRUE(ERRSV))
warn("EV::ClickHouse: drain callback died: %s",
SvPV_nolen(ERRSV));
FREETMPS;
LEAVE;
}
SvREFCNT_dec(drain_cb);
if (check_destroyed(self)) return 1;
}
/* Restart keepalive timer when idle */
if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0
&& self->keepalive > 0 && !self->ka_timing) {
start_keepalive(self);
}
/* send next request from queue */
if (!ngx_queue_empty(&self->send_queue)) {
ngx_queue_t *q = ngx_queue_head(&self->send_queue);
ev_ch_send_t *send = ngx_queue_data(q, ev_ch_send_t, queue);
/* Stop keepalive during active query */
stop_keepalive(self);
emit_trace(self, "dispatch query (pending=%d)", self->pending_count);
/* set up send buffer */
ensure_send_cap(self, send->data_len);
Copy(send->data, self->send_buf, send->data_len, char);
self->send_len = send->data_len;
self->send_pos = 0;
/* move cb to recv queue */
ngx_queue_remove(q);
push_cb_owned_ex(self, send->cb, send->raw,
send->on_data, send->query_timeout);
if (send->on_data) { SvREFCNT_dec(send->on_data); send->on_data = NULL; }
/* Track query_id */
if (self->last_query_id) { Safefree(self->last_query_id); self->last_query_id = NULL; }
if (send->query_id) { self->last_query_id = send->query_id; send->query_id = NULL; }
/* transfer deferred insert data from send entry to self */
if (send->insert_data) {
self->insert_data = send->insert_data;
self->insert_data_len = send->insert_data_len;
send->insert_data = NULL;
}
if (send->insert_av) {
self->insert_av = send->insert_av;
send->insert_av = NULL;
}
Safefree(send->data);
{
double qt = send->query_timeout;
release_send(send);
self->send_count++;
/* Start query timeout timer */
{
double timeout = qt > 0 ? qt : self->query_timeout;
if (timeout > 0 && !self->timing) {
ev_timer_set(&self->timer, (ev_tstamp)timeout, 0.0);
ev_timer_start(self->loop, &self->timer);
self->timing = 1;
}
}
}
if (self->protocol == PROTO_NATIVE) {
if (self->insert_data || self->insert_av)
self->native_state = NATIVE_WAIT_INSERT_META;
else
self->native_state = NATIVE_WAIT_RESULT;
}
return try_write(self);
}
return 0;
}
/* --- OpenSSL init (must be in plain C, not inside XS BOOT) --- */
static void ch_openssl_init(void) {
ClickHouse.xs view on Meta::CPAN
last_totals(EV::ClickHouse self)
CODE:
{
if (self->native_totals)
RETVAL = newRV_inc((SV*)self->native_totals);
else
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
SV *
last_extremes(EV::ClickHouse self)
CODE:
{
if (self->native_extremes)
RETVAL = newRV_inc((SV*)self->native_extremes);
else
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
SV *
profile_rows_before_limit(EV::ClickHouse self)
CODE:
{
RETVAL = newSVuv(self->profile_rows_before_limit);
}
OUTPUT:
RETVAL
SV *
profile_rows(EV::ClickHouse self)
CODE:
{
RETVAL = newSVuv(self->profile_rows);
}
OUTPUT:
RETVAL
SV *
profile_bytes(EV::ClickHouse self)
CODE:
{
RETVAL = newSVuv(self->profile_bytes);
}
OUTPUT:
RETVAL
SV*
on_trace(EV::ClickHouse self, SV *handler = NULL)
CODE:
{
RETVAL = handler_accessor(&self->on_trace, handler, items > 1);
}
OUTPUT:
RETVAL
void
_set_keepalive(EV::ClickHouse self, double val)
CODE:
{
self->keepalive = val;
}
void
_set_reconnect_delay(EV::ClickHouse self, double val)
CODE:
{
self->reconnect_delay = val;
}
void
_set_reconnect_max_delay(EV::ClickHouse self, double val)
CODE:
{
self->reconnect_max_delay = val;
}
void
drain(EV::ClickHouse self, SV *cb)
CODE:
{
if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
croak("drain callback must be a CODE reference");
if (self->on_drain) SvREFCNT_dec(self->on_drain);
if (self->pending_count == 0 && ngx_queue_empty(&self->send_queue)) {
/* Nothing pending â fire immediately */
self->on_drain = NULL;
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
PUTBACK;
self->callback_depth++;
call_sv(cb, G_DISCARD | G_EVAL);
self->callback_depth--;
if (SvTRUE(ERRSV))
warn("EV::ClickHouse: drain callback died: %s",
SvPV_nolen(ERRSV));
FREETMPS;
LEAVE;
}
check_destroyed(self);
} else {
self->on_drain = SvREFCNT_inc(cb);
}
}
void
cancel(EV::ClickHouse self)
CODE:
{
if (self->protocol == PROTO_NATIVE && self->send_count > 0) {
/* Send CLIENT_CANCEL packet */
native_buf_t pkt;
nbuf_init(&pkt);
nbuf_varuint(&pkt, CLIENT_CANCEL);
ensure_send_cap(self, self->send_len + pkt.len);
Copy(pkt.data, self->send_buf + self->send_len, pkt.len, char);
self->send_len += pkt.len;
Safefree(pkt.data);
( run in 1.547 second using v1.01-cache-2.11-cpan-39bf76dae61 )