view release on metacpan or search on metacpan
Core:
- Async queries via HTTP protocol (port 8123)
- Native TCP binary protocol (port 9000, revision 54459)
- TLS/SSL support via OpenSSL (both protocols)
- LZ4 compression for native protocol (with CityHash checksums)
- Gzip compression for HTTP protocol (request and response)
- HTTP chunked transfer encoding
- Connection URI parsing (clickhouse[+native]://user:pass@host:port/db)
- Per-query and connection-level ClickHouse settings
- Parameterized queries via params => { name => value }
- Keepalive ping timer for idle native connections
- Auto-reconnect with exponential backoff (reconnect_delay, reconnect_max_delay)
- Graceful drain callback (fires when all pending queries complete)
- Query cancellation and skip_pending
Query features:
- TabSeparated format parser for HTTP responses
- INSERT with TSV string or arrayref data (both protocols)
- Raw query mode for HTTP (return unparsed response body)
- on_data streaming callback for native protocol (per-block delivery)
- on_progress callback for native protocol progress packets
ClickHouse.xs view on Meta::CPAN
SV *on_disconnect;
int tls_skip_verify;
double query_timeout;
int auto_reconnect;
uint32_t decode_flags;
AV *native_col_names; /* column names from last native result */
AV *native_col_types; /* column type strings from last native result */
SV *on_drain; /* callback fired when pending_count drops to 0 */
char *last_query_id; /* query_id of the last dispatched query */
SV *on_trace; /* debug trace callback */
ev_timer ka_timer; /* keepalive timer */
double keepalive; /* keepalive interval (0 = disabled) */
int ka_timing;
int callback_depth;
/* error info from last SERVER_EXCEPTION or HTTP error */
int32_t last_error_code;
/* profile info from last SERVER_PROFILE_INFO */
uint64_t profile_rows;
uint64_t profile_bytes;
uint64_t profile_rows_before_limit;
/* totals / extremes from last native query */
AV *native_totals;
ClickHouse.xs view on Meta::CPAN
SV *on_data; /* per-query streaming callback */
double query_timeout; /* per-query timeout */
char *query_id; /* query_id for tracking */
ngx_queue_t queue;
};
/* forward declarations */
static void io_cb(EV_P_ ev_io *w, int revents);
static void timer_cb(EV_P_ ev_timer *w, int revents);
static void ka_timer_cb(EV_P_ ev_timer *w, int revents);
static void start_keepalive(ev_clickhouse_t *self);
static void stop_keepalive(ev_clickhouse_t *self);
static void schedule_reconnect(ev_clickhouse_t *self);
static void lc_free_dicts(ev_clickhouse_t *self);
static void start_reading(ev_clickhouse_t *self);
static void stop_reading(ev_clickhouse_t *self);
static void start_writing(ev_clickhouse_t *self);
static void stop_writing(ev_clickhouse_t *self);
static void emit_error(ev_clickhouse_t *self, const char *msg);
static void emit_trace(ev_clickhouse_t *self, const char *fmt, ...);
static void cleanup_connection(ev_clickhouse_t *self);
static int cancel_pending(ev_clickhouse_t *self, const char *errmsg);
ClickHouse.xs view on Meta::CPAN
return (inet_pton(AF_INET, s, &a4) == 1 ||
inet_pton(AF_INET6, s, &a6) == 1);
}
static void cleanup_connection(ev_clickhouse_t *self) {
int was_connected = self->connected;
if (was_connected) emit_trace(self, "disconnect");
stop_reading(self);
stop_writing(self);
stop_keepalive(self);
if (self->timing) {
ev_timer_stop(self->loop, &self->timer);
self->timing = 0;
}
#ifdef HAVE_OPENSSL
if (self->ssl) {
SSL_shutdown(self->ssl);
SSL_free(self->ssl);
self->ssl = NULL;
ClickHouse.xs view on Meta::CPAN
pos += snprintf(req + pos, req_cap - pos,
"Host: %s:%u\r\n", self->host, self->port);
if (self->user) {
pos += snprintf(req + pos, req_cap - pos,
"X-ClickHouse-User: %s\r\n", self->user);
}
if (self->password && self->password[0]) {
pos += snprintf(req + pos, req_cap - pos,
"X-ClickHouse-Key: %s\r\n", self->password);
}
pos += snprintf(req + pos, req_cap - pos, "Connection: keep-alive\r\n");
if (content_encoding)
pos += snprintf(req + pos, req_cap - pos, "%s", content_encoding);
if (self->compress)
pos += snprintf(req + pos, req_cap - pos, "Accept-Encoding: gzip\r\n");
pos += snprintf(req + pos, req_cap - pos,
"Content-Length: %lu\r\n\r\n", (unsigned long)body_len);
ClickHouse.xs view on Meta::CPAN
pos += snprintf(req + pos, req_cap - pos,
"Host: %s:%u\r\n", self->host, self->port);
if (self->user) {
pos += snprintf(req + pos, req_cap - pos,
"X-ClickHouse-User: %s\r\n", self->user);
}
if (self->password && self->password[0]) {
pos += snprintf(req + pos, req_cap - pos,
"X-ClickHouse-Key: %s\r\n", self->password);
}
pos += snprintf(req + pos, req_cap - pos, "Connection: keep-alive\r\n");
if (do_compress)
pos += snprintf(req + pos, req_cap - pos, "Accept-Encoding: gzip\r\n");
if (content_encoding)
pos += snprintf(req + pos, req_cap - pos, "%s", content_encoding);
pos += snprintf(req + pos, req_cap - pos,
"Content-Length: %lu\r\n\r\n", (unsigned long)body_len);
ClickHouse.xs view on Meta::CPAN
/* Build HTTP GET /ping request */
static char* build_http_ping_request(ev_clickhouse_t *self, size_t *req_len) {
char *req;
size_t req_cap = 128 + (self->host ? strlen(self->host) : 0);
size_t pos = 0;
Newx(req, req_cap, char);
pos = snprintf(req, req_cap,
"GET /ping HTTP/1.1\r\n"
"Host: %s:%u\r\n"
"Connection: keep-alive\r\n\r\n",
self->host, self->port);
if (pos >= req_cap) pos = req_cap - 1;
*req_len = pos;
return req;
}
/* --- HTTP response parsing --- */
/* Find \r\n\r\n in recv_buf. Returns offset past it, or 0 if not found. */
static size_t find_header_end(const char *buf, size_t len) {
ClickHouse.xs view on Meta::CPAN
if (deliver_error(self, "query timeout")) return;
/* Must reconnect â server may still be processing */
if (cancel_pending(self, "query timeout")) return;
cleanup_connection(self);
if (self->auto_reconnect && self->host)
schedule_reconnect(self);
}
}
/* --- Keepalive timer callback --- */
static void ka_timer_cb(EV_P_ ev_timer *w, int revents) {
ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
offsetof(ev_clickhouse_t, ka_timer));
(void)revents;
if (self->magic != EV_CH_MAGIC) return;
if (!self->connected || self->send_count > 0) return;
/* Send a ping to keep the connection alive */
if (self->protocol == PROTO_NATIVE) {
native_buf_t pkt;
nbuf_init(&pkt);
nbuf_varuint(&pkt, CLIENT_PING);
ensure_send_cap(self, self->send_len + pkt.len);
Copy(pkt.data, self->send_buf + self->send_len, pkt.len, char);
self->send_len += pkt.len;
Safefree(pkt.data);
if (!self->writing) start_writing(self);
}
/* HTTP: no-op ping â just rely on TCP keepalive or let the
* connection drop and auto-reconnect handles it. */
}
static void start_keepalive(ev_clickhouse_t *self) {
if (self->keepalive > 0 && !self->ka_timing && self->connected) {
ev_timer_init(&self->ka_timer, ka_timer_cb, self->keepalive, self->keepalive);
ev_timer_start(self->loop, &self->ka_timer);
self->ka_timing = 1;
}
}
static void stop_keepalive(ev_clickhouse_t *self) {
if (self->ka_timing) {
ev_timer_stop(self->loop, &self->ka_timer);
self->ka_timing = 0;
}
}
/* --- Reconnect with backoff --- */
static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
ClickHouse.xs view on Meta::CPAN
if (SvTRUE(ERRSV))
warn("EV::ClickHouse: drain callback died: %s",
SvPV_nolen(ERRSV));
FREETMPS;
LEAVE;
}
SvREFCNT_dec(drain_cb);
if (check_destroyed(self)) return 1;
}
/* Restart keepalive timer when idle */
if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0
&& self->keepalive > 0 && !self->ka_timing) {
start_keepalive(self);
}
/* send next request from queue */
if (!ngx_queue_empty(&self->send_queue)) {
ngx_queue_t *q = ngx_queue_head(&self->send_queue);
ev_ch_send_t *send = ngx_queue_data(q, ev_ch_send_t, queue);
/* Stop keepalive during active query */
stop_keepalive(self);
emit_trace(self, "dispatch query (pending=%d)", self->pending_count);
/* set up send buffer */
ensure_send_cap(self, send->data_len);
Copy(send->data, self->send_buf, send->data_len, char);
self->send_len = send->data_len;
self->send_pos = 0;
/* move cb to recv queue */
ngx_queue_remove(q);
ClickHouse.xs view on Meta::CPAN
SV*
on_trace(EV::ClickHouse self, SV *handler = NULL)
CODE:
{
RETVAL = handler_accessor(&self->on_trace, handler, items > 1);
}
OUTPUT:
RETVAL
void
_set_keepalive(EV::ClickHouse self, double val)
CODE:
{
self->keepalive = val;
}
void
_set_reconnect_delay(EV::ClickHouse self, double val)
CODE:
{
self->reconnect_delay = val;
}
void
Changes
cityhash.h
ClickHouse.xs
cpanfile
eg/decode_options.pl
eg/drain.pl
eg/error_handling.pl
eg/insert.pl
eg/keepalive.pl
eg/native.pl
eg/native_compress.pl
eg/params.pl
eg/ping.pl
eg/query.pl
eg/queue.pl
eg/settings.pl
eg/streaming.pl
eg/totals.pl
eg/uri.pl
eg/keepalive.pl view on Meta::CPAN
#!/usr/bin/env perl
# Keepalive â periodic pings keep idle connections alive
use strict;
use warnings;
use EV;
use EV::ClickHouse;
my $ch;
$ch = EV::ClickHouse->new(
host => $ENV{CLICKHOUSE_HOST} // '127.0.0.1',
port => $ENV{CLICKHOUSE_NATIVE_PORT} // 9000,
protocol => 'native',
keepalive => 30, # ping every 30 seconds when idle
on_connect => sub {
printf "Connected with keepalive=30s: %s\n", $ch->server_info;
# Simulate idle time, then query
my $t; $t = EV::timer(2, 0, sub {
undef $t;
printf "Still connected after 2s idle: %s\n",
$ch->is_connected ? "yes" : "no";
$ch->query("SELECT 1", sub {
my ($rows, $err) = @_;
printf "Query after idle: %s\n", $err // "ok";
$ch->finish;
lib/EV/ClickHouse.pm view on Meta::CPAN
my $tls = delete $args{tls} // 0;
my $tls_ca_file = delete $args{tls_ca_file};
my $tls_skip_verify = delete $args{tls_skip_verify} // 0;
# options
my $compress = delete $args{compress} // 0;
my $session_id = delete $args{session_id};
my $connect_timeout = delete $args{connect_timeout};
my $query_timeout = delete $args{query_timeout};
my $auto_reconnect = delete $args{auto_reconnect} // 0;
my $keepalive = delete $args{keepalive} // 0;
my $reconnect_delay = delete $args{reconnect_delay} // 0;
my $reconnect_max_delay = delete $args{reconnect_max_delay} // 0;
# decode options (native protocol)
my $decode_datetime = delete $args{decode_datetime} // 0;
my $decode_decimal = delete $args{decode_decimal} // 0;
my $decode_enum = delete $args{decode_enum} // 0;
my $named_rows = delete $args{named_rows} // 0;
die "EV::ClickHouse: unknown protocol '$protocol' (expected 'http' or 'native')\n"
lib/EV/ClickHouse.pm view on Meta::CPAN
$self->_set_protocol($protocol eq 'native' ? 1 : 0);
$self->_set_compress($compress) if $compress;
$self->_set_session_id($session_id) if defined $session_id;
$self->_set_connect_timeout($connect_timeout) if $connect_timeout;
$self->_set_query_timeout($query_timeout) if $query_timeout;
$self->_set_tls($tls) if $tls;
$self->_set_tls_ca_file($tls_ca_file) if defined $tls_ca_file;
$self->_set_tls_skip_verify($tls_skip_verify) if $tls_skip_verify;
$self->_set_auto_reconnect($auto_reconnect) if $auto_reconnect;
$self->_set_keepalive($keepalive) if $keepalive;
$self->_set_reconnect_delay($reconnect_delay) if $reconnect_delay;
$self->_set_reconnect_max_delay($reconnect_max_delay) if $reconnect_max_delay;
# compute decode_flags bitmask
my $decode_flags = 0;
$decode_flags |= 1 if $decode_datetime; # DECODE_DT_STR
$decode_flags |= 2 if $decode_decimal; # DECODE_DEC_SCALE
$decode_flags |= 4 if $decode_enum; # DECODE_ENUM_STR
$decode_flags |= 8 if $named_rows; # DECODE_NAMED_ROWS
$self->_set_decode_flags($decode_flags) if $decode_flags;
lib/EV/ClickHouse.pm view on Meta::CPAN
When enabled, queued (unsent) queries are preserved across reconnects;
in-flight queries receive an error callback.
=item settings => \%hash
Connection-level ClickHouse settings applied to every query and insert.
Per-query settings (see L</query>, L</insert>) override these defaults.
settings => { async_insert => 1, max_threads => 4 }
=item keepalive => $seconds
Send periodic native protocol ping packets to keep the connection alive
during idle periods. Set to C<0> (default) to disable. Only effective
with the native protocol.
=item reconnect_delay => $seconds
Initial delay for reconnect backoff when C<auto_reconnect> is enabled.
The delay doubles after each failed attempt, up to C<reconnect_max_delay>.
Set to C<0> (default) for immediate reconnect (no backoff).
=item reconnect_max_delay => $seconds
lib/EV/ClickHouse.pm view on Meta::CPAN
[2, undef], # NULL
[3, [10, 20]], # Array column
], sub { ... });
The optional C<\%settings> hashref works the same as in L</query>.
=head2 ping
$ch->ping(sub { my ($result, $err) = @_ });
Checks if the connection is alive. On success C<$result> is a true value
and C<$err> is undef. On error: C<(undef, $error_message)>.
=head2 finish
$ch->finish;
Disconnects. Cancels pending operations.
=head2 reset
t/13_params_uri.t view on Meta::CPAN
});
# Also test UInt256
$ch->query("SELECT toUInt256('99999999999999999999') as big", sub {
my ($rows, $err) = @_;
is($rows->[0][0], '99999999999999999999', 'UInt256: correct value');
});
$ch->drain(sub { EV::break });
},
);
# Test 20-22: Keepalive (just verify it doesn't crash and connection stays alive)
SKIP: {
skip "Native port not reachable", 3 unless $nat_ok;
my ($ka_ch, $ka_wait);
$ka_ch = EV::ClickHouse->new(
host => $host,
port => $nat_port,
protocol => 'native',
keepalive => 1,
on_connect => sub {
ok(1, 'keepalive: connected');
# Wait a bit, then query
$ka_wait = EV::timer(0.5, 0, sub {
ok($ka_ch->is_connected, 'keepalive: still connected');
$ka_ch->query("SELECT 1", sub {
my ($rows, $err) = @_;
ok(!$err, 'keepalive: query after wait ok');
EV::break;
});
});
},
on_error => sub { diag("KA error: $_[0]"); EV::break },
);
my $timeout = EV::timer(10, 0, sub { EV::break });
EV::run;
$ka_ch->finish if $ka_ch && $ka_ch->is_connected;
}