view release on metacpan or search on metacpan
0.03 2026-05-21
- Multi-host failover and connection Pool (circuit breaker, hedged
queries, fan-out, sessions); async DNS; TLS mutual auth.
- retry, wait_mutation, native external tables, idempotent and
async/aggregated insert, Streamer backpressure.
- More introspection helpers, the EV::ClickHouse::Error class, and
new types (BFloat16, Decimal256, Interval, Geo, JSON).
- Reliability and memory-safety fixes; docs and tests.
0.02 2026-04-30
- Bug fixes: keepalive PING/PONG routing; last_error_code reports
top-level code; IPv6 URI literals; skip_pending teardown.
- HTTP arrayref insert() croaks on nested refs.
- POD rewrite; expanded t/, xt/, eg/.
0.01 2026-03-15
- Initial release.
- HTTP (8123) + native (9000, rev 54459) protocols.
- TLS via OpenSSL; LZ4 native compression; gzip HTTP compression.
- URI parsing; per-query + connection settings; parameterized
queries; auto-reconnect; drain / cancel / skip_pending.
ClickHouse.xs view on Meta::CPAN
size_t max_recv_buffer; /* 0 = unlimited; defensive recv ceiling */
int http_basic_auth; /* 0=X-ClickHouse-{User,Key} (default);
* 1=Authorization: Basic ... (for proxies) */
int auto_reconnect;
uint32_t decode_flags;
AV *native_col_names; /* column names from last native result */
AV *native_col_types; /* column type strings from last native result */
SV *on_drain; /* callback fired when pending_count drops to 0 */
char *last_query_id; /* query_id of the last dispatched query */
SV *on_trace; /* debug trace callback */
ev_timer ka_timer; /* keepalive timer */
double keepalive; /* keepalive interval (0 = disabled) */
int ka_timing;
unsigned int ka_in_flight; /* keepalive pings sent but not yet ack'd */
int callback_depth;
/* error info from last SERVER_EXCEPTION or HTTP error */
int32_t last_error_code;
/* profile info from last SERVER_PROFILE_INFO */
uint64_t profile_rows;
uint64_t profile_bytes;
uint64_t profile_rows_before_limit;
/* totals / extremes from last native query */
AV *native_totals;
AV *native_extremes;
ClickHouse.xs view on Meta::CPAN
SV *on_complete; /* per-query on_query_complete override */
double query_timeout; /* per-query timeout */
char *query_id; /* query_id for tracking */
ngx_queue_t queue;
};
/* Forward declarations for helpers defined further down (or in xs/io.c)
* but called from earlier code in this file or from xs/*.c included
* before the definition site. */
static void timer_cb(EV_P_ ev_timer *w, int revents);
static void stop_keepalive(ev_clickhouse_t *self);
static void schedule_reconnect(ev_clickhouse_t *self);
static void lc_free_dicts(ev_clickhouse_t *self);
static void start_reading(ev_clickhouse_t *self);
static void stop_reading(ev_clickhouse_t *self);
static void start_writing(ev_clickhouse_t *self);
static void stop_writing(ev_clickhouse_t *self);
static void emit_error(ev_clickhouse_t *self, const char *msg);
static void emit_trace(ev_clickhouse_t *self, const char *fmt, ...);
static int cleanup_connection(ev_clickhouse_t *self);
static int cancel_pending(ev_clickhouse_t *self, const char *errmsg);
ClickHouse.xs view on Meta::CPAN
static int check_destroyed(ev_clickhouse_t *self) {
if (self->magic == EV_CH_FREED && self->callback_depth == 0) {
Safefree(self);
return 1;
}
return 0;
}
/* Free the per-connection failover host list (allocated by setter). */
/* Free just the host-list arrays. Called from _set_failover before
* re-populating + from failover_free below. Keeps on_failover alive. */
static void failover_free_hosts(ev_clickhouse_t *self) {
if (self->failover_hosts) {
for (int i = 0; i < self->failover_n; i++)
if (self->failover_hosts[i]) Safefree(self->failover_hosts[i]);
Safefree(self->failover_hosts);
self->failover_hosts = NULL;
}
if (self->failover_ports) {
Safefree(self->failover_ports);
self->failover_ports = NULL;
ClickHouse.xs view on Meta::CPAN
FREETMPS; LEAVE;
}
self->callback_depth--;
/* Reset so a subsequent fire for a never-dispatched cancelled
* query (e.g. from cancel_pending draining send_queue) sees
* query_start_time == 0 and reports dur = 0.0, not a stale
* duration carried over from the previous in-flight query. */
self->query_start_time = 0;
}
/* IS_KEEPALIVE_CB is defined in xs/macros.h; keepalive_noop_cb is the
* sentinel that backs it (declared below this comment). */
static int deliver_error(ev_clickhouse_t *self, const char *errmsg) {
SV *oqc = NULL;
SV *cb = pop_cb_ex(self, &oqc);
if (cb == NULL) {
fire_on_query_complete_ex(self, errmsg, oqc);
if (oqc) SvREFCNT_dec(oqc);
return check_destroyed(self);
}
ClickHouse.xs view on Meta::CPAN
}
/* Tears down the socket + per-connection state, then fires on_disconnect.
* Returns 1 if on_disconnect freed self (caller must not touch self), else 0. */
static int cleanup_connection(ev_clickhouse_t *self) {
int was_connected = self->connected;
if (was_connected) emit_trace(self, "disconnect");
stop_reading(self);
stop_writing(self);
stop_keepalive(self);
stop_timing(self);
#ifdef HAVE_OPENSSL
if (self->ssl) {
SSL_shutdown(self->ssl);
SSL_free(self->ssl);
self->ssl = NULL;
}
if (self->ssl_ctx) {
SSL_CTX_free(self->ssl_ctx);
ClickHouse.xs view on Meta::CPAN
* don't touch a freed self. */
if (was_connected && self->on_disconnect)
return fire_zero_arg_cb(self, self->on_disconnect, "disconnect");
return 0;
}
/* Fire user error callback + on_query_complete (per-query override or
* connection-level â fire_on_query_complete_ex falls back). Honors the
* documented "fires after every query (success or error)" contract for
* cancelled queries. oqc is consumed (refcount-dec'd) here.
* HTTP keepalive PINGs are suppressed to match the success-path behavior:
* users instrumenting via on_query_complete shouldn't see spurious zero-
* row completions for pings they didn't initiate. */
static void fire_err_and_complete(ev_clickhouse_t *self, SV *cb,
const char *errmsg, SV *oqc) {
/* Fire on_query_complete BEFORE the user cb to match the order
* used by deliver_error / deliver_rows on the normal path â
* instrumentation observers expect the global hook to run first.
* Keepalive PINGs are suppressed to match the success-path
* behavior so observers don't see spurious zero-row completions. */
if (!IS_KEEPALIVE_CB(cb))
fire_on_query_complete_ex(self, errmsg, oqc);
if (oqc) SvREFCNT_dec(oqc);
/* A user error callback may drop the last ref to $ch (DESTROY runs
* deferred while callback_depth > 0). invoke_err_cb itself is safe
* either way â caller's outer magic check handles the next loop. */
invoke_err_cb(cb, errmsg);
}
ClickHouse.xs view on Meta::CPAN
#include "xs/io.c"
/* --- XS interface --- */
MODULE = EV::ClickHouse PACKAGE = EV::ClickHouse
BOOT:
{
I_EV_API("EV::ClickHouse");
ch_openssl_init();
/* Permanent no-op CV used for internal callbacks (HTTP keepalive ping). */
keepalive_noop_cb = newRV_inc((SV*)get_cv("EV::ClickHouse::__keepalive_noop", GV_ADD));
/* Per-process rand() seed so reconnect_jitter desynchronises forks
* (otherwise every worker generates the same sequence and the
* jitter is uniform across the herd, defeating its purpose). */
srand((unsigned)time(NULL) ^ (unsigned)getpid());
}
EV::ClickHouse
_new(char *class, EV::Loop loop)
CODE:
{
ClickHouse.xs view on Meta::CPAN
void
DESTROY(EV::ClickHouse self)
CODE:
{
if (self->magic != EV_CH_MAGIC) return;
stop_reading(self);
stop_writing(self);
stop_timing(self);
stop_keepalive(self);
if (self->reconnect_timing) {
ev_timer_stop(self->loop, &self->reconnect_timer);
self->reconnect_timing = 0;
}
if (PL_dirty) {
self->magic = EV_CH_FREED;
while (!ngx_queue_empty(&self->send_queue)) {
ngx_queue_t *q = ngx_queue_head(&self->send_queue);
ev_ch_send_t *send = ngx_queue_data(q, ev_ch_send_t, queue);
ClickHouse.xs view on Meta::CPAN
if (cancel_pending(self, "object destroyed"))
return; /* inner DESTROY already freed self */
/* A user callback fired from cancel_pending may have called
* $ch->reset() which re-arms watchers / opens a fresh fd.
* Stop everything again before tearing the struct down so the
* EV loop can't dispatch into freed memory. */
stop_reading(self);
stop_writing(self);
stop_timing(self);
stop_keepalive(self);
if (self->reconnect_timing) {
ev_timer_stop(self->loop, &self->reconnect_timer);
self->reconnect_timing = 0;
}
#ifdef HAVE_OPENSSL
if (self->ssl) {
SSL_shutdown(self->ssl);
SSL_free(self->ssl);
self->ssl = NULL;
ClickHouse.xs view on Meta::CPAN
SV*
on_trace(EV::ClickHouse self, SV *handler = NULL)
CODE:
{
RETVAL = handler_accessor(&self->on_trace, handler, items > 1);
}
OUTPUT:
RETVAL
void
_set_keepalive(EV::ClickHouse self, double val)
CODE:
{
self->keepalive = val;
}
void
_set_reconnect_delay(EV::ClickHouse self, double val)
CODE:
{
self->reconnect_delay = val;
}
void
eg/graceful_shutdown.pl
eg/health_dashboard.pl
eg/health_probe.pl
eg/hedged_pool.pl
eg/idempotent_insert.pl
eg/insert.pl
eg/insert_streaming.pl
eg/ipv6.pl
eg/iterate.pl
eg/json.pl
eg/keepalive.pl
eg/log_tail.pl
eg/migration_runner.pl
eg/named_rows.pl
eg/native.pl
eg/native_compress.pl
eg/on_progress.pl
eg/params.pl
eg/ping.pl
eg/pool.pl
eg/query.pl
- gzip compression (HTTP) and LZ4 compression with CityHash
checksums (native)
- TLS/SSL via OpenSSL, with optional `tls_skip_verify` for
self-signed certs and `tls_ca_file` for additional roots
- Connection URIs (`clickhouse[+native]://user:pass@host:port/db`),
including bracketed IPv6 literals
- Per-query and connection-level ClickHouse settings; parameterized
queries via `params`; external tables (native) via `external`
- Auto-reconnect with exponential backoff; queued (unsent) queries
are preserved across reconnects
- Keepalive pings for idle native connections; graceful drain;
query cancellation and skip\_pending
- Streaming results via `on_data` per-block callback (native);
on\_progress for native progress packets
- Raw HTTP response mode for CSV / JSONEachRow / Parquet / etc.
- 35+ ClickHouse types including Int/UInt 8..256, Float32/64,
BFloat16, Decimal32/64/128/256, UUID, IPv4/IPv6, Nullable, Array,
Tuple, Map, LowCardinality (with cross-block dictionaries),
SimpleAggregateFunction, Nested, Geo (Point/Ring/LineString/Polygon
and the Multi variants), and JSON / Object('json') with auto-flattened
hashref leaves (Int64/Float64/Bool/String + Array variants).
misconfigured peer or client-side bug that retry would only loop on.
Combine with `reconnect_max_attempts` for an explicit ceiling.
- settings => \\%hash
ClickHouse settings applied to every query and insert. Per-call settings
(see ["query"](#query), ["insert"](#insert)) override these.
settings => { async_insert => 1, max_threads => 4 }
- keepalive => $seconds
Send a keepalive request every N seconds while the connection is idle:
a native CLIENT\_PING on the native protocol or a `GET /ping` on HTTP
(some load balancers / NATs drop idle HTTP connections after a few
seconds; TCP-level keepalive is too coarse). Default: `0` (disabled).
- reconnect\_delay => $seconds
Initial delay for the `auto_reconnect` exponential backoff. Each failed
attempt doubles the delay, capped at `reconnect_max_delay`. Default:
`0` (immediate retry, no backoff).
- reconnect\_max\_delay => $seconds
Backoff ceiling. Default: `0`, meaning no explicit cap; the implementation
- `async_insert => 1`
Enables ClickHouse server-side insert batching by setting
`async_insert=1, wait_for_async_insert=0`. Both sub-settings can be
overridden by passing them explicitly.
## ping
$ch->ping(sub { my ($result, $err) = @_ });
Send a no-op round trip to verify the connection is alive. On success
`$result` is true, `$err` is `undef`. On error: `(undef, $error)`.
## is\_healthy
$ch->is_healthy(sub { my ($ok, $err) = @_ });
$ch->is_healthy(sub { ... }, $timeout_seconds);
Bounded health probe: wraps ["ping"](#ping) with a deadline (default 5s). The
callback receives `(1, undef)` on a successful round trip, or
`(0, $msg)` on ping error or timeout. Failure does **not** tear down the
$ch->on_query_start(sub {
my ($query_id) = @_;
log_metric_start($query_id);
});
Optional connection-level hook that fires the moment a query is
dispatched to the wire (after the query\_id has been resolved, before
the first send byte). Symmetric with ["on\_query\_complete"](#on_query_complete); useful for
deriving accurate "query in flight" durations without depending on
the per-query callback closure. Keepalive PINGs are suppressed, the
same as for `on_query_complete`. Also accepted as a constructor
argument.
## on\_query\_complete
$ch->on_query_complete(sub {
my ($query_id, $rows, $bytes, $error_code, $duration_s, $err) = @_;
log_metric(...);
});
aggregated - the callback always fires with a complete list. Pass
`settings => \%h` for per-query options.
**Circuit breaker:** pass `circuit_threshold => N` at construction
to enable per-member fail-fast. After N consecutive query/insert/ping
errors on a given member, that member is excluded from `_pick` for
`circuit_cooldown` seconds (default 30). A successful callback resets
the per-member fail counter. If every member is dead at pick time the
breaker is bypassed so the next attempt still has a chance to recover.
Inspect with `$pool->circuit_state` which returns one
`{ fails => N, dead_until => $epoch, alive => 0|1 }`
hashref per member.
**Graceful shutdown:** `$pool->shutdown($grace_seconds, $cb)`
drains every member, then calls `finish` on each. If `$grace_seconds`
elapses before every member drains, members still in flight are
force-finished and `$cb` receives the string
`"Pool::shutdown timed out after Ns"`. On a clean shutdown `$cb`
receives undef. `$grace_seconds` may be 0 (or undef) to wait
indefinitely. The callback fires exactly once.
Enables LZ4 (native) or gzip (HTTP). LZ4 cost is small and saves ~50-70%
on text-heavy columns. Gzip is heavier; turn on only if you're bandwidth-bound.
- `insert_streamer` batch\_size
Default 10\_000 is a good baseline. Smaller (1k-2k) reduces memory pressure
on the producer; larger (50k-100k) reduces server-side merge cost on
MergeTree. Match to your row width: ~1 MB per batch is a sweet spot.
- `keepalive`
Enable on long-lived idle connections (HTTP behind a load balancer or
NAT, or a native connection that may sit minutes between queries). 15-30s
is typical.
- `reconnect_max_attempts`
Always set in production. Default is unlimited; a permanent failure
(wrong host, wrong port, dead server) will spin `on_error` forever
otherwise.
eg/cancel_streaming.pl view on Meta::CPAN
#!/usr/bin/env perl
# Cancel a streaming select mid-flight from inside the on_data callback,
# once a condition is met. The connection stays alive for follow-up queries.
use strict;
use warnings;
use EV;
use EV::ClickHouse;
my $ch;
my $blocks_seen = 0;
$ch = EV::ClickHouse->new(
host => $ENV{CLICKHOUSE_HOST} // '127.0.0.1',
port => $ENV{CLICKHOUSE_NATIVE_PORT} // 9000,
eg/cancel_streaming.pl view on Meta::CPAN
$blocks_seen++;
printf "block %d: %d rows\n", $blocks_seen, scalar @$rows;
# Stop after we've seen a couple of blocks
$ch->cancel if $blocks_seen >= 2;
} },
sub {
# Native cancel doesn't raise â the callback simply fires
# once the server acks with EndOfStream.
print "Cancelled after $blocks_seen blocks\n";
# The connection is still alive. Run a normal query.
$ch->query("select 1 + 1", sub {
my ($r) = @_;
print "Follow-up query: ", $r->[0][0], "\n";
EV::break;
});
},
);
},
on_error => sub { die "Error: $_[0]\n" },
);
eg/circuit_breaker.pl view on Meta::CPAN
#!/usr/bin/env perl
# Pool circuit breaker: after `circuit_threshold` consecutive query
# errors on a member, the Pool marks it dead for `circuit_cooldown`
# seconds. Subsequent _pick()s skip dead members; if all are dead,
# the breaker is bypassed so recovery attempts still go through.
#
# Inspect state via $pool->circuit_state - returns a list of
# { fails => N, dead_until => $epoch, alive => 0|1 } per member.
#
# This demo connects to a real server (queries must actually be
# dispatched to reach the breaker observer) and repeatedly runs a
# query that fails server-side. After `circuit_threshold` errors
# each member trips; once all members are dead the breaker is
# bypassed so recovery attempts still go through.
use strict;
use warnings;
use EV;
use EV::ClickHouse;
eg/circuit_breaker.pl view on Meta::CPAN
my $issued = 0;
my $w = EV::timer(0, 0.5, sub {
# Fire a server-side error - reaches the breaker via the async path.
for (1 .. 3) {
$pool->query("select * from no_such_db_$$.no_such_table_$$",
sub { }); # errors ignored - we just want the breaker hits
}
$issued += 3;
my @s = $pool->circuit_state;
for my $i (0 .. $#s) {
printf STDERR "[breaker] member %d fails=%d alive=%s\n",
$i, $s[$i]{fails}, ($s[$i]{alive} ? 'yes' : 'no');
}
EV::break if $issued >= 30;
});
EV::run;
$pool->finish;
eg/health_dashboard.pl view on Meta::CPAN
my $probe = EV::timer(0, 5, sub {
for my $i (0 .. $#conns) {
$conns[$i]->ping_round_trip(sub {
my ($s, $err) = @_;
$rtt[$i] = $err ? undef : $s;
});
}
});
# Tiny HTTP server. Single-shot, no keep-alive, no streaming - just
# enough to demonstrate the JSON shape.
my $listener = IO::Socket::INET->new(
Listen => 16, LocalAddr => '0.0.0.0', LocalPort => $dash_port,
ReuseAddr => 1, Blocking => 0,
) or die "listen $dash_port: $!";
my $accept_io = EV::io($listener->fileno, EV::READ, sub {
while (my $cli = $listener->accept) {
$cli->blocking(0);
my $buf = '';
eg/hedged_pool.pl view on Meta::CPAN
$latency * 1000, $winner, $rows->[0][1];
},
);
});
# Report breaker + win distribution every 5s.
my $report = EV::timer(5, 5, sub {
print "--- circuit state + wins ---\n";
my @st = $pool->circuit_state;
for my $i (0 .. $#st) {
printf " member %d: fails=%d alive=%d wins=%d\n",
$i, $st[$i]{fails}, $st[$i]{alive}, $wins[$i];
}
});
# Graceful drain on Ctrl-C.
my $stop = EV::signal('INT', sub {
undef $issue; undef $report;
print "draining\n";
$pool->shutdown(5, sub { print "done\n"; EV::break });
});
eg/keepalive.pl view on Meta::CPAN
#!/usr/bin/env perl
# Keepalive â periodic pings keep idle connections alive
use strict;
use warnings;
use EV;
use EV::ClickHouse;
my $ch;
$ch = EV::ClickHouse->new(
host => $ENV{CLICKHOUSE_HOST} // $ENV{TEST_CLICKHOUSE_HOST} // '127.0.0.1',
port => $ENV{CLICKHOUSE_NATIVE_PORT} // $ENV{TEST_CLICKHOUSE_NATIVE_PORT} // 9000,
protocol => 'native',
keepalive => 30, # ping every 30 seconds when idle
on_connect => sub {
printf "Connected with keepalive=30s: %s\n", $ch->server_info;
# Simulate idle time, then query
my $t; $t = EV::timer(2, 0, sub {
undef $t;
printf "Still connected after 2s idle: %s\n",
$ch->is_connected ? "yes" : "no";
$ch->query("select 1", sub {
my ($rows, $err) = @_;
printf "Query after idle: %s\n", $err // "ok";
$ch->finish;
lib/EV/ClickHouse.pm view on Meta::CPAN
$port //= ($protocol eq 'native') ? 9000 : 8123;
$self->_set_protocol($protocol eq 'native' ? 1 : 0);
# Pass-through setters. Skip only when the key was absent â explicit
# 0/'' must reach the setter so e.g. `compress => 0` is honored, not
# ignored. (Use `exists` rather than `defined` so a deliberate undef
# is also passed through and rejected by the setter if invalid.)
for my $opt (qw(compress tls tls_skip_verify auto_reconnect
keepalive reconnect_delay reconnect_max_delay
reconnect_jitter reconnect_max_attempts
progress_period http_basic_auth
connect_timeout query_timeout
max_query_size max_recv_buffer)) {
next unless exists $args{$opt};
my $val = delete $args{$opt};
my $setter = "_set_$opt";
$self->$setter($val);
}
for my $opt (qw(session_id tls_ca_file tls_cert_file tls_key_file)) {
lib/EV/ClickHouse.pm view on Meta::CPAN
&& eval { require EV::cares; 1 }) {
# Stash the resolver in %_failover, keyed by refaddr of the
# resolver itself (NOT of $self). Two reasons:
# - never delete the resolver from inside its own callback â
# ares_destroy from a c-ares cb corrupts the channel heap.
# We defer the delete via EV::timer(0,...) so it runs from
# a clean stack frame.
# - keying by refaddr($self) was racy: A's deferred-delete
# could fire after A's struct was freed and B got the same
# refaddr, dropping B's resolver. refaddr($r) is unique
# while $r is alive in %_failover.
my $r = EV::cares->new;
my $key = refaddr($r);
$_failover{$key} = $r;
my $weak2 = $self; weaken $weak2;
$self->_set_dns_pending(1);
$r->resolve($host, sub {
my ($status, @addrs) = @_;
my $w; $w = EV::timer(0, 0, sub { undef $w; delete $_failover{$key} });
# Skip if the connection has been DESTROYed or the user
# finished it while DNS was in flight (cleanup_connection
lib/EV/ClickHouse.pm view on Meta::CPAN
if (defined $dur && $dur >= 0) {
if (@{ $ring->{buf} } < $ring->{size}) {
push @{ $ring->{buf} }, $dur;
} else {
$ring->{buf}[ $ring->{pos} ] = $dur;
$ring->{pos} = ($ring->{pos} + 1) % $ring->{size};
}
}
}
$prev->(@_) if $prev;
$guard; # keep the guard alive for the closure's lifetime
});
} else {
# Resize: linearize chronological order (oldest at $st->{pos}
# once the ring is full), keep the newest min(N, $size) samples.
# Plain shift would drop by physical index instead of by age.
my $buf = $st->{buf};
if (@$buf >= $st->{size}) {
# Ring full: chronological is buf[pos..end] then buf[0..pos-1].
my $pos = $st->{pos} % $st->{size};
@$buf = (@{$buf}[$pos .. $#$buf], @{$buf}[0 .. $pos - 1]);
lib/EV/ClickHouse.pm view on Meta::CPAN
die "hedged_query: unknown options: " . join(', ', sort keys %opts)
if %opts;
my @c = @{ $self->{conns} };
die "hedged_query: no members" unless @c;
# Filter out circuit-broken members. If the breaker tripped everywhere
# fall back to the full set so the caller still hears something â but
# in that fallback skip _cb_observer too, otherwise every failed hedge
# extends each member's dead_until (resetting cooldown indefinitely
# under load).
my $now = EV::time();
my @alive = grep { $self->{cb_state}[$_]{dead_until} <= $now } 0 .. $#c;
my $all_dead = !@alive;
@alive = (0 .. $#c) if $all_dead;
$hedge_n = @alive if $hedge_n > @alive;
$hedge_n = 1 if $hedge_n < 1;
# Reservoir-style shuffle for distinct random picks.
my @pool = @alive;
my @idx;
while (@idx < $hedge_n) { push @idx, splice(@pool, int(rand(scalar @pool)), 1) }
my $fired = 0;
my $pending = scalar @idx;
my $first_err;
for my $i (@idx) {
my $ch = $c[$i];
my $inner = sub {
my ($rows, $err) = @_;
$pending--;
lib/EV/ClickHouse.pm view on Meta::CPAN
# Pass $i (not $ch) so _cb_observer can index cb_state directly
# instead of walking conns via _slot_for.
my $obs = $self->_cb_observer($i, $inner, !$all_dead);
if ($settings) { $ch->query($sql, $settings, $obs) }
else { $ch->query($sql, $obs) }
}
return;
}
# Circuit breaker introspection: per-member state for monitoring.
# Returns ({ fails => N, dead_until => $epoch_seconds, alive => 0|1 }, ...).
sub circuit_state {
my $self = shift;
my $now = EV::time();
map +{ %$_, alive => $_->{dead_until} <= $now },
@{ $self->{cb_state} };
}
# Aggregate stats
sub size { scalar @{ $_[0]{conns} } }
sub pending_count { my $t = 0; $t += $_->pending_count for @{ $_[0]{conns} }; $t }
sub conns { @{ $_[0]{conns} } }
# Apply a code ref to every pool member. The callback receives
# ($conn, $idx) per call. Useful for warm-up (preload dictionaries,
lib/EV/ClickHouse.pm view on Meta::CPAN
=item * Connection URIs (C<clickhouse[+native]://user:pass@host:port/db>),
including bracketed IPv6 literals
=item * Per-query and connection-level ClickHouse settings; parameterized
queries via C<params>; external tables (native) via C<external>
=item * Auto-reconnect with exponential backoff; queued (unsent) queries
are preserved across reconnects
=item * Keepalive pings for idle native connections; graceful drain;
query cancellation and skip_pending
=item * Streaming results via C<on_data> per-block callback (native);
on_progress for native progress packets
=item * Raw HTTP response mode for CSV / JSONEachRow / Parquet / etc.
=item * 35+ ClickHouse types including Int/UInt 8..256, Float32/64,
BFloat16, Decimal32/64/128/256, UUID, IPv4/IPv6, Nullable, Array,
Tuple, Map, LowCardinality (with cross-block dictionaries),
lib/EV/ClickHouse.pm view on Meta::CPAN
misconfigured peer or client-side bug that retry would only loop on.
Combine with C<reconnect_max_attempts> for an explicit ceiling.
=item settings => \%hash
ClickHouse settings applied to every query and insert. Per-call settings
(see L</query>, L</insert>) override these.
settings => { async_insert => 1, max_threads => 4 }
=item keepalive => $seconds
Send a keepalive request every N seconds while the connection is idle:
a native CLIENT_PING on the native protocol or a C<GET /ping> on HTTP
(some load balancers / NATs drop idle HTTP connections after a few
seconds; TCP-level keepalive is too coarse). Default: C<0> (disabled).
=item reconnect_delay => $seconds
Initial delay for the C<auto_reconnect> exponential backoff. Each failed
attempt doubles the delay, capped at C<reconnect_max_delay>. Default:
C<0> (immediate retry, no backoff).
=item reconnect_max_delay => $seconds
Backoff ceiling. Default: C<0>, meaning no explicit cap; the implementation
lib/EV/ClickHouse.pm view on Meta::CPAN
Enables ClickHouse server-side insert batching by setting
C<async_insert=1, wait_for_async_insert=0>. Both sub-settings can be
overridden by passing them explicitly.
=back
=head2 ping
$ch->ping(sub { my ($result, $err) = @_ });
Send a no-op round trip to verify the connection is alive. On success
C<$result> is true, C<$err> is C<undef>. On error: C<(undef, $error)>.
=head2 is_healthy
$ch->is_healthy(sub { my ($ok, $err) = @_ });
$ch->is_healthy(sub { ... }, $timeout_seconds);
Bounded health probe: wraps L</ping> with a deadline (default 5s). The
callback receives C<(1, undef)> on a successful round trip, or
C<(0, $msg)> on ping error or timeout. Failure does B<not> tear down the
lib/EV/ClickHouse.pm view on Meta::CPAN
$ch->on_query_start(sub {
my ($query_id) = @_;
log_metric_start($query_id);
});
Optional connection-level hook that fires the moment a query is
dispatched to the wire (after the query_id has been resolved, before
the first send byte). Symmetric with L</on_query_complete>; useful for
deriving accurate "query in flight" durations without depending on
the per-query callback closure. Keepalive PINGs are suppressed, the
same as for C<on_query_complete>. Also accepted as a constructor
argument.
=head2 on_query_complete
$ch->on_query_complete(sub {
my ($query_id, $rows, $bytes, $error_code, $duration_s, $err) = @_;
log_metric(...);
});
lib/EV/ClickHouse.pm view on Meta::CPAN
aggregated - the callback always fires with a complete list. Pass
C<settings =E<gt> \%h> for per-query options.
B<Circuit breaker:> pass C<circuit_threshold =E<gt> N> at construction
to enable per-member fail-fast. After N consecutive query/insert/ping
errors on a given member, that member is excluded from C<_pick> for
C<circuit_cooldown> seconds (default 30). A successful callback resets
the per-member fail counter. If every member is dead at pick time the
breaker is bypassed so the next attempt still has a chance to recover.
Inspect with C<$pool-E<gt>circuit_state> which returns one
C<{ fails =E<gt> N, dead_until =E<gt> $epoch, alive =E<gt> 0|1 }>
hashref per member.
B<Graceful shutdown:> C<<< $pool->shutdown($grace_seconds, $cb) >>>
drains every member, then calls C<finish> on each. If C<$grace_seconds>
elapses before every member drains, members still in flight are
force-finished and C<$cb> receives the string
C<"Pool::shutdown timed out after Ns">. On a clean shutdown C<$cb>
receives undef. C<$grace_seconds> may be 0 (or undef) to wait
indefinitely. The callback fires exactly once.
lib/EV/ClickHouse.pm view on Meta::CPAN
Enables LZ4 (native) or gzip (HTTP). LZ4 cost is small and saves ~50-70%
on text-heavy columns. Gzip is heavier; turn on only if you're bandwidth-bound.
=item C<insert_streamer> batch_size
Default 10_000 is a good baseline. Smaller (1k-2k) reduces memory pressure
on the producer; larger (50k-100k) reduces server-side merge cost on
MergeTree. Match to your row width: ~1 MB per batch is a sweet spot.
=item C<keepalive>
Enable on long-lived idle connections (HTTP behind a load balancer or
NAT, or a native connection that may sit minutes between queries). 15-30s
is typical.
=item C<reconnect_max_attempts>
Always set in production. Default is unlimited; a permanent failure
(wrong host, wrong port, dead server) will spin C<on_error> forever
otherwise.
t/13_params_uri.t view on Meta::CPAN
});
# Also test UInt256
$ch->query("select toUInt256('99999999999999999999') as big", sub {
my ($rows, $err) = @_;
is($rows->[0][0], '99999999999999999999', 'UInt256: correct value');
});
$ch->drain(sub { EV::break });
},
);
# Test 20-22: Keepalive (just verify it doesn't crash and connection stays alive)
SKIP: {
skip "Native port not reachable", 3 unless $nat_ok;
my ($ka_ch, $ka_wait);
$ka_ch = EV::ClickHouse->new(
host => $host,
port => $nat_port,
protocol => 'native',
keepalive => 1,
on_connect => sub {
ok(1, 'keepalive: connected');
# Wait a bit, then query
$ka_wait = EV::timer(0.5, 0, sub {
ok($ka_ch->is_connected, 'keepalive: still connected');
$ka_ch->query("select 1", sub {
my ($rows, $err) = @_;
ok(!$err, 'keepalive: query after wait ok');
EV::break;
});
});
},
on_error => sub { diag("KA error: $_[0]"); EV::break },
);
my $timeout = EV::timer(10, 0, sub { EV::break });
EV::run;
$ka_ch->finish if $ka_ch && $ka_ch->is_connected;
}
t/20_reconnect.t view on Meta::CPAN
use strict;
use warnings;
use Test::More;
use EV;
use EV::ClickHouse;
# auto_reconnect coverage:
# - basic plumbing (flag set, simple query)
# - pre-connect queue drain
# - real mid-session reconnect: HTTP keep_alive_timeout closes the server
# side after a few idle seconds; the client must recover and dispatch the
# next query on a fresh socket.
#
# ClickHouse native protocol has no SQL-level disconnect primitive in 26.x
# (no SYSTEM drop CONNECTION; KILL QUERY only kills queries), so a true
# native disconnect-recovery test would need a TCP proxy harness or
# tcp_close_connection_after_queries_seconds in server config â left as
# integration testing.
my $host = $ENV{TEST_CLICKHOUSE_HOST} || '127.0.0.1';
t/20_reconnect.t view on Meta::CPAN
is(scalar @results, 3, "queued-pre-connect: all 3 callbacks fired");
is_deeply([sort @results], [1, 2, 3],
"queued-pre-connect: results match queries");
$ch->finish if $ch && $ch->is_connected;
}
} # end native SKIP block
# 5-7: real mid-session reconnect via HTTP keep_alive_timeout (default 3s).
# Run Q1, wait long enough for the server to drop the socket on idle, then
# run Q2 â auto_reconnect must re-establish on the fly. We arm the idle
# timer only once (on_connect fires on every reconnect, so guarding with a
# flag prevents us from looping forever on Q1).
SKIP: {
skip "HTTP port not reachable", 3 unless $http_ok;
my $ch;
my ($q1_err, $q2_err, $q2_val);
my $disconnects = 0;
t/20_reconnect.t view on Meta::CPAN
reconnect_delay => 0.1,
reconnect_max_delay => 1,
on_disconnect => sub { $disconnects++ },
on_error => sub { diag("HTTP reconnect error: $_[0]") },
on_connect => sub {
return unless $first_connect;
$first_connect = 0;
$ch->query("select 1 format TabSeparated", sub {
(undef, $q1_err) = @_;
# Idle past keep_alive_timeout so the server hangs up,
# then send Q2 â auto_reconnect must dispatch it onto a
# fresh socket.
$idle_timer = EV::timer(4, 0, sub {
$ch->query("select 2 format TabSeparated", sub {
my ($rows, $err) = @_;
$q2_err = $err;
$q2_val = ($rows && @$rows) ? $rows->[0][0] : undef;
EV::break;
});
});
t/25_features.t view on Meta::CPAN
use strict;
use warnings;
use Test::More;
use EV;
use EV::ClickHouse;
# Tests for the 0.03 feature batch:
# - max_reconnect_attempts
# - HTTP keepalive PING
# - progress_period coalescing
# - for_table schema helper
# - insert_streamer streaming insert
# - cancel during on_data
# - on_disconnect not firing on connect-phase failures
my $host = $ENV{TEST_CLICKHOUSE_HOST} || '127.0.0.1';
my $http_port = $ENV{TEST_CLICKHOUSE_PORT} || 8123;
my $nat_port = $ENV{TEST_CLICKHOUSE_NATIVE_PORT} || 9000;
t/25_features.t view on Meta::CPAN
},
);
run_with_timeout(5);
cmp_ok(scalar @errors, '>=', 3,
"max_reconnect_attempts: at least N+1 on_error fires");
ok((grep /max reconnect attempts exceeded/, @errors),
"max_reconnect_attempts: terminal error message fires");
$ch->finish if $ch->is_connected;
}
# 2-3: HTTP keepalive PING â set a tiny keepalive, wait, ensure pending_count
# stays at 0 (the noop ping clears itself) and the connection stays alive.
SKIP: {
skip "HTTP port not reachable", 2 unless $http_ok;
my ($ch, $err);
$ch = EV::ClickHouse->new(
host => $host,
port => $http_port,
keepalive => 0.2,
on_connect => sub { },
on_error => sub { $err = $_[0] },
);
# Wait long enough for at least 2 keepalive pings to fire.
my $t = EV::timer(0.6, 0, sub { EV::break });
EV::run;
ok(!$err, "HTTP keepalive: no errors") or diag "err=$err";
$ch->query("select 1 format TabSeparated", sub {
my ($rows) = @_;
is($rows && @$rows ? $rows->[0][0] : undef, 1,
"HTTP keepalive: connection still usable for queries");
EV::break;
});
run_with_timeout(5);
$ch->finish if $ch->is_connected;
}
# 4: for_table returns column metadata.
SKIP: {
skip "Native port not reachable", 1 unless $nat_ok;
my ($ch, $info, $err);
t/28_pass2_coverage.t view on Meta::CPAN
#!/usr/bin/env perl
# Coverage for corner cases the earlier test files miss:
# - on_query_complete (success + error path)
# - HTTP keepalive PING does NOT fire on_query_complete (IS_KEEPALIVE_CB)
# - query_log_comment is applied to INSERT and on HTTP
# - DNS failure with pre-queued queries delivers errors
# - Pool: cancel / skip_pending / reset broadcast
# - Iterator timeout returns undef without setting error
# - Streamer high_water fires when buffered count crosses watermark
use strict;
use warnings;
use Test::More;
use EV;
use EV::ClickHouse;
t/28_pass2_coverage.t view on Meta::CPAN
},
on_query_complete => sub { $completed = [@_] },
on_error => sub { },
);
EV::run;
ok $completed && $completed->[5], 'on_query_complete fired with err msg on failure';
cmp_ok $completed->[3], '>', 0, ' error_code populated';
$ch->finish;
}
# 3. HTTP keepalive PING does not fire on_query_complete
SKIP: {
skip 'HTTP not reachable', 1
unless IO::Socket::INET->new(PeerAddr => $host, PeerPort => $hport, Timeout => 2);
my $count = 0;
my $ch; $ch = EV::ClickHouse->new(
host => $host, port => $hport, protocol => 'http',
keepalive => 0.2,
on_connect => sub { },
on_query_complete => sub { $count++ },
on_error => sub { },
);
# let two keepalive pings fly
run_with_timeout(0.6);
is $count, 0, 'HTTP keepalive PING did not fire on_query_complete';
$ch->finish;
}
# 4. query_log_comment is applied to INSERT (and is well-formed enough that
# the server accepts it). We can't read system.query_log on every test
# setup, so we verify the insert succeeds with the comment in place.
{
my $err;
my $ch; $ch = EV::ClickHouse->new(
host => $host, port => $nport, protocol => 'native',
/* --- Async TCP connect, I/O dispatch, timers, keepalive, reconnect,
* pipeline advance, and OpenSSL one-time init. ---
*
* Forward decls for symbols defined later in this file but called from
* earlier in the same file (cross-file callers use the forwards in
* ClickHouse.xs).
*/
static void io_cb(EV_P_ ev_io *w, int revents);
static void on_connect_done(ev_clickhouse_t *self);
#ifdef HAVE_OPENSSL
int gen = self->connect_gen;
/* Must reconnect â server may still be processing */
if (teardown_after_deliver(self, "query timeout", "query timeout")) return;
if (self->connect_gen != gen) return;
if (self->auto_reconnect && self->host)
schedule_reconnect(self);
}
}
/* --- Keepalive timer callback --- */
static void ka_timer_cb(EV_P_ ev_timer *w, int revents) {
ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
offsetof(ev_clickhouse_t, ka_timer));
(void)revents;
if (self->magic != EV_CH_MAGIC) return;
if (!self->connected || self->send_count > 0) return;
/* Native: append PING bytes directly; the SERVER_PONG handler tracks
char *ping = build_native_ping(&ping_len);
ensure_send_cap(self, self->send_len + ping_len);
Copy(ping, self->send_buf + self->send_len, ping_len, char);
self->send_len += ping_len;
self->ka_in_flight++;
Safefree(ping);
start_writing(self);
} else {
/* HTTP: enqueue a real ping with a no-op callback. ClickHouse's
* HTTP server closes idle connections after a few seconds, so
* relying on TCP keepalive (kernel default ~2h) is not enough. */
size_t req_len;
char *req = build_http_ping_request(self, &req_len);
ev_ch_send_t *s = alloc_send();
s->data = req;
s->data_len = req_len;
s->cb = SvREFCNT_inc(keepalive_noop_cb);
enqueue_send(self, s);
}
}
static void start_keepalive(ev_clickhouse_t *self) {
if (self->keepalive > 0 && !self->ka_timing && self->connected) {
ev_timer_init(&self->ka_timer, ka_timer_cb, self->keepalive, self->keepalive);
ev_timer_start(self->loop, &self->ka_timer);
self->ka_timing = 1;
}
}
static void stop_keepalive(ev_clickhouse_t *self) {
if (self->ka_timing) {
ev_timer_stop(self->loop, &self->ka_timer);
self->ka_timing = 0;
}
}
/* --- Reconnect with backoff --- */
static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
}
/* Dropping the last ref to the drain CV can free a closure that
* captured $ch â DESTROY then runs. Guard with callback_depth so
* the free is deferred, then detect it before touching self. */
self->callback_depth++;
SvREFCNT_dec(drain_cb);
self->callback_depth--;
if (check_destroyed(self)) return 1;
}
/* Restart keepalive timer when idle (start_keepalive is a no-op if already
* timing or if keepalive disabled) */
if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0)
start_keepalive(self);
/* send next request from queue */
if (!ngx_queue_empty(&self->send_queue)) {
/* Stop keepalive during active query */
stop_keepalive(self);
emit_trace(self, "dispatch query (pending=%d)", self->pending_count);
/* on_trace is user code: it may have called finish()/reset() or
* dropped the last reference (DESTROY), any of which drains or
* frees the send queue. Re-validate the connection and re-derive
* the head entry AFTER the trace so we never touch a send struct
* that cancel_pending has already released to the freelist. */
if (self->magic != EV_CH_MAGIC || !self->connected
|| ngx_queue_empty(&self->send_queue))
return check_destroyed(self);
send->on_data, send->on_complete,
send->query_timeout);
CLEAR_SV(send->on_data);
send->on_complete = NULL; /* ownership transferred to cbt */
/* Track query_id + dispatch start time (used by on_query_complete). */
CLEAR_STR(self->last_query_id);
if (send->query_id) { self->last_query_id = send->query_id; send->query_id = NULL; }
self->query_start_time = ev_now(self->loop);
/* on_query_start: fire with the resolved query_id, just before
* the write side runs. Suppressed for keepalive PINGs to match
* on_query_complete semantics. */
if (self->on_query_start && !IS_KEEPALIVE_CB(dispatched_cb)) {
dSP;
ENTER; SAVETMPS; PUSHMARK(SP);
EXTEND(SP, 1);
PUSHs(self->last_query_id
? sv_2mortal(newSVpv(self->last_query_id, 0))
: &PL_sv_undef);
PUTBACK;
int gen_before = self->connect_gen;
xs/macros.h view on Meta::CPAN
/* Common idioms used across the XS â kept here so the main translation
* unit and the file-local-to-the-TU includes can share them without
* duplicating definitions.
*
* IS_KEEPALIVE_CB depends on `keepalive_noop_cb` which is defined later
* in the .xs; this header only declares the macro so the dependency is
* resolved at expansion time, not at parse time. */
#ifndef EV_CH_MACROS_H
#define EV_CH_MACROS_H
/* Drop an owned C string and reset the slot to NULL. */
#define CLEAR_STR(p) do { if (p) { Safefree(p); (p) = NULL; } } while (0)
/* Drop a refcounted SV and reset the slot to NULL. */
xs/macros.h view on Meta::CPAN
* callback threw. Use only at sites that must continue work after the
* call (e.g. still need SPAGAIN/FREETMPS or further dispatches); helpers
* that return immediately can omit the clear. */
#define WARN_AND_CLEAR_ERRSV(label) do { \
if (SvTRUE(ERRSV)) { \
warn("EV::ClickHouse: exception in " label ": %s", SvPV_nolen(ERRSV)); \
sv_setsv(ERRSV, &PL_sv_undef); \
} \
} while (0)
/* HTTP keepalive uses keepalive_noop_cb as a sentinel â suppress the
* on_query_complete (and on_query_start) fire for it so observers don't
* see spurious zero-row "completions" they didn't initiate. The native
* keepalive path bypasses cb_queue entirely, so this only matters for
* HTTP. The sentinel is defined later in the .xs; this macro just
* forward-references it. */
#define IS_KEEPALIVE_CB(cb) ((cb) && (cb) == keepalive_noop_cb)
#endif /* EV_CH_MACROS_H */
xs/proto_http.c view on Meta::CPAN
"Authorization: Basic %s\r\n", buf + cred_len);
Safefree(buf);
} else {
if (self->user)
pos += snprintf(req + pos, req_cap - pos,
"X-ClickHouse-User: %s\r\n", self->user);
if (self->password && self->password[0])
pos += snprintf(req + pos, req_cap - pos,
"X-ClickHouse-Key: %s\r\n", self->password);
}
pos += snprintf(req + pos, req_cap - pos, "Connection: keep-alive\r\n");
if (self->compress)
pos += snprintf(req + pos, req_cap - pos, "Accept-Encoding: gzip\r\n");
if (content_encoding)
pos += snprintf(req + pos, req_cap - pos, "%s", content_encoding);
pos += snprintf(req + pos, req_cap - pos,
"Content-Length: %lu\r\n\r\n", (unsigned long)body_len);
/* body */
if (body_len > 0) {
if (pos + body_len > req_cap) {
xs/proto_http.c view on Meta::CPAN
/* Build HTTP GET /ping request */
static char* build_http_ping_request(ev_clickhouse_t *self, size_t *req_len) {
char *req;
size_t req_cap = 128 + (self->host ? strlen(self->host) : 0);
size_t pos = 0;
Newx(req, req_cap, char);
pos = snprintf(req, req_cap,
"GET /ping HTTP/1.1\r\n"
"Host: %s:%u\r\n"
"Connection: keep-alive\r\n\r\n",
self->host, self->port);
if (pos >= req_cap) pos = req_cap - 1;
*req_len = pos;
return req;
}
/* --- HTTP response parsing --- */
/* Length-bounded uint parser. Stops at the first non-digit OR at `len`,
* whichever comes first. Used to parse X-ClickHouse-Summary values out
xs/proto_native_parse.c view on Meta::CPAN
self->native_rows = NULL;
if (deliver_rows(self, rows)) return;
}
/* advance pipeline â may free self via try_write error */
pipeline_advance(self);
return;
}
if (rc == 3) {
/* Pong â ack a keepalive ping, or deliver to user's ping() cb */
self->native_state = NATIVE_IDLE;
if (self->ka_in_flight > 0) {
/* Keepalive ack: not tied to send_count or any user cb */
self->ka_in_flight--;
continue;
}
stop_timing(self);
if (self->send_count > 0) self->send_count--;
AV *rows = newAV();
if (deliver_rows(self, rows)) return;
pipeline_advance(self);
return;
}
xs/queues.c view on Meta::CPAN
/* --- freelist for cb_queue entries + send_queue entries ---
*
* Both use a singly-linked-list-in-the-record-itself stash: the first
* sizeof(void*) bytes of a released entry are repurposed as the "next
* free" pointer. Reuse the same struct without rebuilding it from
* scratch.
*
* Also lives here: the singleton sentinels (keepalive_noop_cb,
* iter_timeout_cb) and the small helpers that munge per-query
* settings into per-send fields.
*
* This file is #include'd from ClickHouse.xs as part of the single
* translation unit; symbols stay file-local-to-the-TU.
*/
static ev_ch_cb_t *cbt_freelist = NULL;
static ev_ch_cb_t* alloc_cbt(void) {
xs/queues.c view on Meta::CPAN
}
static ev_ch_send_t *send_freelist = NULL;
/* Iterator timeout watcher cb: just break the loop the iterator drove. */
static void iter_timeout_cb(EV_P_ ev_timer *w, int revents) {
(void)w; (void)revents;
ev_break(EV_A, EVBREAK_ONE);
}
/* No-op CV reference used as the callback for HTTP keepalive pings;
* initialised once at BOOT and shared by all connections. */
static SV *keepalive_noop_cb = NULL;
static ev_ch_send_t* alloc_send(void) {
ev_ch_send_t *s;
if (send_freelist) {
s = send_freelist;
send_freelist = *(ev_ch_send_t **)s;
} else {
Newx(s, 1, ev_ch_send_t);
}
s->data = NULL;