EV-ClickHouse
view release on metacpan or search on metacpan
- Parameterized queries via params => { name => value }
- Keepalive ping timer for idle native connections
- Auto-reconnect with exponential backoff (reconnect_delay, reconnect_max_delay)
- Graceful drain callback (fires when all pending queries complete)
- Query cancellation and skip_pending
Query features:
- TabSeparated format parser for HTTP responses
- INSERT with TSV string or arrayref data (both protocols)
- Raw query mode for HTTP (return unparsed response body)
- on_data streaming callback for native protocol (per-block delivery)
- on_progress callback for native protocol progress packets
- query_id tracking via last_query_id accessor
- query_timeout per-query with automatic cancellation
Native protocol type support:
- Int8/16/32/64/128/256, UInt8/16/32/64/128/256
- Float32/Float64, Bool
- String, FixedString(N)
- Date, Date32, DateTime, DateTime64 (with timezone support)
- Decimal32/64/128 (with optional scaling via decode_decimal)
ClickHouse.xs view on Meta::CPAN
int reconnect_timing;
/* LowCardinality cross-block dictionary state */
SV ***lc_dicts; /* array of dictionaries, one per column */
uint64_t *lc_dict_sizes; /* size of each dictionary */
int lc_num_cols; /* number of columns with LC state */
};
struct ev_ch_cb_s {
SV *cb;
int raw; /* return raw response body instead of parsed rows */
SV *on_data; /* per-query streaming callback (fires per block) */
double query_timeout; /* per-query timeout (0=use default) */
ngx_queue_t queue;
};
struct ev_ch_send_s {
char *data; /* full HTTP request or native packet */
size_t data_len;
SV *cb;
char *insert_data; /* deferred TSV data for native INSERT */
size_t insert_data_len;
SV *insert_av; /* deferred AV* data for native INSERT */
int raw; /* return raw response body */
SV *on_data; /* per-query streaming callback */
double query_timeout; /* per-query timeout */
char *query_id; /* query_id for tracking */
ngx_queue_t queue;
};
/* forward declarations */
static void io_cb(EV_P_ ev_io *w, int revents);
static void timer_cb(EV_P_ ev_timer *w, int revents);
static void ka_timer_cb(EV_P_ ev_timer *w, int revents);
static void start_keepalive(ev_clickhouse_t *self);
ClickHouse.xs view on Meta::CPAN
if (num_cols > 0)
av_extend(row, num_cols - 1);
for (c = 0; c < num_cols; c++) {
av_push(row, columns[c][r]);
}
av_push(*target, newRV_noinc((SV*)row));
}
}
}
/* Fire on_data streaming callback if set (only for DATA, not TOTALS/EXTREMES) */
{
SV *on_data = (ptype == SERVER_DATA) ? peek_cb_on_data(self) : NULL;
if (on_data && self->native_rows) {
self->callback_depth++;
{
dSP;
ENTER; SAVETMPS;
PUSHMARK(SP);
PUSHs(sv_2mortal(newRV_inc((SV*)self->native_rows)));
PUTBACK;
eg/error_handling.pl
eg/insert.pl
eg/keepalive.pl
eg/native.pl
eg/native_compress.pl
eg/params.pl
eg/ping.pl
eg/query.pl
eg/queue.pl
eg/settings.pl
eg/streaming.pl
eg/totals.pl
eg/uri.pl
lib/EV/ClickHouse.pm
LICENSE
Makefile.PL
MANIFEST This list of files
MANIFEST.SKIP
META.json
META.yml
ngx_queue.h
lib/EV/ClickHouse.pm view on Meta::CPAN
my ($body, $err) = @_;
# $body is the raw CSV text
});
Not supported with the native protocol (croaks).
=item C<query_timeout> â per-query timeout in seconds, overriding the
connection-level C<query_timeout>.
=item C<on_data> â native protocol only. A code ref called for each data
block as it arrives. Enables streaming: rows are delivered incrementally
and not accumulated.
$ch->query("SELECT * FROM big_table",
{ on_data => sub { my ($rows) = @_; process_batch($rows) } },
sub { my (undef, $err) = @_; ... } # final callback
);
=back
B<Native protocol type notes:> With the native protocol, column values
t/11_new_features.t view on Meta::CPAN
cb => sub {
$ch->query("SELECT nonexistent_column FROM system.one", sub {
my ($rows, $err) = @_;
ok($err, 'error_code: got error');
like($err, qr/Code: \d+/, 'error_code: contains Code: N');
EV::break;
});
},
);
# Test 24-26: streaming on_data callback (native)
with_native(
tests => 3,
cb => sub {
my @blocks;
$ch->query(
"SELECT number FROM numbers(100)",
{ on_data => sub { push @blocks, $_[0] } },
sub {
my ($rows, $err) = @_;
ok(!$err, 'on_data: no error');
( run in 2.024 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )