ClickHouse-Encoder
view release on metacpan or search on metacpan
lib/ClickHouse/Encoder.pm view on Meta::CPAN
my $table = shift;
$table =~ /\A[A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z_][A-Za-z0-9_]*)?\z/
or die "Invalid table name '$table': expected [db.]name with [A-Za-z0-9_]";
return;
}
# Build (url, headers) for a ClickHouse HTTP request with the given SQL
# in the `query` parameter. Pulls connection params from %opts using
# the same defaults as for_table / insert_http. UTF-8 encodes before
# percent-escaping so non-ASCII (caf%C3%A9, emoji) round-trips correctly.
sub _http_url_headers {
my ($sql, %opts) = @_;
require Encode;
my $esc = sub {
my $s = Encode::encode('UTF-8', $_[0], 0);
$s =~ s/([^A-Za-z0-9\-_.~])/sprintf('%%%02X', ord($1))/ge;
$s;
};
my ($scheme, $host, $port) = _check_endpoint(\%opts);
my $database = $opts{database} // 'default';
my $user = $opts{user} // 'default';
my $password = $opts{password} // '';
my $url = "$scheme://$host:$port/?database=" . $esc->($database);
$url .= "&query=" . $esc->($sql) if length $sql;
# Per-query settings: { max_memory_usage => '...', max_execution_time => 30 }
if (my $s = $opts{settings}) {
for my $k (sort keys %$s) {
$url .= "&" . $esc->($k) . "=" . $esc->($s->{$k});
}
}
# Insert-side idempotency token: identical token + payload is rejected.
if (defined(my $tok = $opts{dedup_token})) {
$url .= "&insert_deduplication_token=" . $esc->($tok);
}
my %hdr = ('X-ClickHouse-User' => $user);
$hdr{'X-ClickHouse-Key'} = $password if $password ne '';
return ($url, \%hdr);
}
# Validate the host/port/scheme triple shared by every HTTP entry point.
# Rejects anything other than http/https, ensures the port is a positive
# integer, and refuses host strings that contain URL-structural characters
# (':/?#&'). Centralised here so insert_http, bulk_inserter, ping, and
# select_blocks share a single allow-list and identical error messages.
sub _check_endpoint {
my ($opts) = @_;
my $scheme = $opts->{scheme} // 'http';
my $host = $opts->{host} // 'localhost';
my $port = $opts->{port} // 8123;
die "endpoint: scheme must be 'http' or 'https' (got '$scheme')\n"
unless $scheme eq 'http' || $scheme eq 'https';
die "endpoint: host must not contain URL-structural characters "
. "(got '$host')\n"
if $host =~ m{[:/?#&\s]} || !length $host;
die "endpoint: port must be a positive integer (got '$port')\n"
unless $port =~ /\A[1-9]\d{0,4}\z/ && $port < 65536;
return ($scheme, $host, $port);
}
# Build an HTTP::Tiny instance honoring ssl_options (verify_SSL, SSL_ca_file,
# etc.) and keep_alive. Shared by insert_http, bulk_inserter, server_version,
# ping, select_blocks; callers pass %opts unchanged. Loading HTTP::Tiny here
# keeps the require local to HTTP code paths.
sub _http_tiny {
my (%opts) = @_;
require HTTP::Tiny;
my @args = (timeout => $opts{timeout} // 60);
push @args, keep_alive => 1 if $opts{keep_alive};
push @args, SSL_options => $opts{ssl_options} if $opts{ssl_options};
push @args, verify_SSL => $opts{verify_SSL} if exists $opts{verify_SSL};
return HTTP::Tiny->new(@args);
}
# Parse a flat CH JSON object string (X-ClickHouse-Summary /
# X-ClickHouse-Progress) without depending on JSON::PP. Both are small
# flat objects of stringified integers (read_rows, written_rows,
# total_rows_to_read, elapsed_ns, ...). Returns a hashref or undef.
sub _parse_ch_kv {
my ($str) = @_;
return unless defined $str && length $str;
my %h;
# NB: stash $1/$2 before the digit-test regex - that regex resets
# capture variables and would silently turn every key into ''.
while ($str =~ /"([^"\\]+)"\s*:\s*"([^"\\]*)"/g) {
my ($k, $v) = ($1, $2);
$h{$k} = ($v =~ /\A-?\d+\z/) ? $v + 0 : $v;
}
return scalar(keys %h) ? \%h : undef;
}
# Lift a few ClickHouse response headers into a small hashref. Returns
# undef when none are present; otherwise carries query_id, server
# (revision), format, a parsed summary, and the final progress snapshot
# so callers don't reparse the same headers. X-ClickHouse-Progress is
# sent repeatedly while a query runs (with send_progress_in_http_headers
# =1); HTTP::Tiny collapses repeats into an arrayref, so we take the
# last - the most complete - snapshot.
sub _decorate_response {
my ($resp) = @_;
return $resp unless ref $resp eq 'HASH';
my $h = $resp->{headers} or return $resp;
my %ch;
for my $k (qw(query-id server format exception-code)) {
my $hv = $h->{"x-clickhouse-$k"};
$ch{$k} = $hv if defined $hv;
}
if (my $sum = _parse_ch_kv($h->{'x-clickhouse-summary'})) {
$ch{summary} = $sum;
}
if (defined(my $pv = $h->{'x-clickhouse-progress'})) {
$pv = $pv->[-1] if ref $pv eq 'ARRAY';
if (my $prog = _parse_ch_kv($pv)) {
$ch{progress} = $prog;
}
}
$resp->{ch} = \%ch if %ch;
return $resp;
}
sub for_table {
my ($class, $table, %opts) = @_;
_validate_table_name($table);
return $class->_for_describe("describe table $table", %opts);
}
sub _for_describe {
my ($class, $describe_sql, %opts) = @_;
lib/ClickHouse/Encoder.pm view on Meta::CPAN
$url .= '&compress=1' if $decompress;
my $buf = '';
# Block walker: when decompress is set, walk through compressed-block-
# framing entries and feed the decompressed bytes into a second
# accumulator that decode_block reads. Otherwise feed buf directly.
my $inner_buf = '';
my $drain = sub {
# Phase 1: pull compressed-block frames out of $buf into $inner_buf
if ($decompress) {
while (length($buf) >= 25) { # 16 hash + 9 header minimum
my $csize = unpack 'V', substr($buf, 17, 4);
last if length($buf) < 16 + $csize;
my ($plain, $consumed) =
$class->decompress_native_block($buf);
$inner_buf .= $plain;
substr($buf, 0, $consumed, '');
}
} else {
$inner_buf = $buf;
}
# Phase 2: decode whole Native blocks out of $inner_buf
while (length($inner_buf) > 0) {
my $block = eval { $class->decode_block($inner_buf, 0, $keep) };
if ($@) {
last if $@ =~ /buffer truncated/i;
die $@;
}
$cb->($block);
substr($inner_buf, 0, $block->{consumed}, '');
}
# When not decompressing, inner_buf IS buf; carry the residual
# back so the next data_callback append sees the unconsumed tail.
if (!$decompress) {
$buf = $inner_buf;
}
};
my $resp = _http_tiny(%opts, timeout => $opts{timeout} // 60)->post(
$url,
{ content => $sql,
headers => { %$hdr, 'Content-Type' => 'text/plain' },
data_callback => sub { $buf .= $_[0]; $drain->() },
});
die "select_blocks: HTTP $resp->{status}: $resp->{content}\n"
unless $resp->{success};
$drain->();
die "select_blocks: " . length($buf) . " trailing bytes "
. "after last complete compressed block\n"
if $decompress && length $buf;
die "select_blocks: " . length($inner_buf) . " trailing bytes "
. "after last complete block\n"
if length $inner_buf;
return;
}
# Returns a bulk-inserter object: ->push($row), ->push_many(\@rows),
# ->flush (idempotent), ->finish. Holds a single HTTP::Tiny instance
# across batches (so keepalive applies) and auto-flushes when the
# accumulated row count crosses batch_size. Transient HTTP failures
# (5xx, network errors) are retried up to retries times with linear
# backoff; 4xx errors die immediately.
sub bulk_inserter {
my ($class_or_self, %args) = @_;
return ClickHouse::Encoder::BulkInserter->new(%args,
_origin => $class_or_self);
}
package ClickHouse::Encoder::BulkInserter; ## no critic (ProhibitMultiplePackages)
sub new {
my ($class, %args) = @_;
my $origin_raw = delete $args{_origin};
my $origin = (ref $origin_raw || $origin_raw) || 'ClickHouse::Encoder';
my $enc = $args{encoder} // do {
my $cols = $args{columns} or die "bulk_inserter needs columns or encoder";
$origin->new(columns => $cols);
};
my $table = $args{table} or die "bulk_inserter needs table";
my $compress = $args{compress} // 'raw';
my $timeout = $args{timeout} // 60;
my ($url, $hdr) = ClickHouse::Encoder::_build_insert_endpoint(
$table, $compress, %args);
return bless {
enc => $enc,
url => $url,
hdr => $hdr,
rows => [],
batch_size => $args{batch_size} // 10_000,
retries => $args{retries} // 3,
retry_wait => $args{retry_wait} // 0.5,
retry_max_wait => $args{retry_max_wait} // 30,
compress => $compress,
http => ClickHouse::Encoder::_http_tiny(
%args, timeout => $timeout, keep_alive => 1),
origin => $origin,
sent_rows => 0,
sent_batches => 0,
last_response => undef,
summary => {},
}, $class;
}
sub push :method { ## no critic (ProhibitBuiltinHomonyms)
my ($self, $row) = @_;
CORE::push @{ $self->{rows} }, $row;
$self->flush if @{ $self->{rows} } >= $self->{batch_size};
return $self;
}
sub push_many {
my ($self, $rows) = @_;
CORE::push @{ $self->{rows} }, @{$rows};
# Slice exactly batch_size rows per flush so we never POST one
# oversized body when push_many is called with N >> batch_size.
# `local` restores $self->{rows} to the remainder arrayref even
# when `flush` croaks mid-batch (so caller's eval{} sees the
# untried rows still buffered for a retry).
while (@{ $self->{rows} } > $self->{batch_size}) {
my @batch = splice @{ $self->{rows} }, 0, $self->{batch_size};
local $self->{rows} = \@batch;
$self->flush;
}
$self->flush if @{ $self->{rows} } >= $self->{batch_size};
return $self;
}
sub flush {
my $self = shift;
my $rows = $self->{rows};
return $self if !@{$rows};
my $body = ClickHouse::Encoder::_apply_compression(
$self->{origin}, $self->{compress}, $self->{enc}->encode($rows));
my $resp;
my $last_err;
for my $attempt (0 .. $self->{retries}) {
$resp = $self->{http}->post($self->{url},
{ headers => $self->{hdr}, content => $body });
last if $resp->{success};
# 4xx errors are not retryable - the request is malformed.
die "bulk_inserter: HTTP $resp->{status}: $resp->{content}\n"
if $resp->{status} >= 400 && $resp->{status} < 500;
$last_err = "HTTP $resp->{status}: $resp->{content}";
# 5xx and network failures (599) are retryable. Exponential
# backoff (retry_wait * 2^attempt, capped at retry_max_wait)
# with equal jitter: sleep half the window deterministically
# then a random half, so concurrent inserters retrying the
# same failed server don't resynchronise into a thundering herd.
if ($attempt < $self->{retries}) {
require Time::HiRes;
my $window = $self->{retry_wait} * (2 ** $attempt);
$window = $self->{retry_max_wait}
if $window > $self->{retry_max_wait};
Time::HiRes::sleep($window / 2 + rand($window / 2));
}
lib/ClickHouse/Encoder.pm view on Meta::CPAN
=over 4
=item C<via>
C<'client'> (default) shells out to C<clickhouse-client>. C<'http'> uses
L<HTTP::Tiny> against C<host:port> directly with no external binary
dependency. Recommended on environments without C<clickhouse-client>.
=item C<host>, C<port>, C<database>, C<user>
Connection parameters. Defaults: C<localhost>; C<9000> for
C<<< via => 'client' >>> or C<8123> for C<<< via => 'http' >>>;
C<default>, C<default>.
=item C<password>
For C<client> mode, passed via C<CLICKHOUSE_PASSWORD> env var. For C<http>
mode, sent as the C<X-ClickHouse-Key> header.
=item C<client>
Path to the C<clickhouse-client> executable (only for
C<<< via => 'client' >>>). Default: whatever C<exec> finds on C<$PATH>.
=item C<scheme>, C<timeout>, C<ssl_options>, C<verify_SSL>, C<settings>
For C<<< via => 'http' >>>: the URL scheme (default C<http>; pass
C<https> for TLS), request timeout in seconds (default 10),
optional SSL options / verify_SSL passthrough to L<HTTP::Tiny>,
and a hashref of per-query CH settings. Ignored under
C<<< via => 'client' >>> (the C<clickhouse-client> binary handles
its own connection).
=back
=head2 stream
$enc->stream(\&iter, \&writer, batch_size => 10_000);
Pulls rows from C<&iter> (a coderef returning the next row, or C<undef> when
done) and emits one Native block per C<batch_size> rows by calling
C<&writer> with the encoded bytes. The iterator loop runs in XS, so the
per-row Perl overhead is bounded.
=head2 streamer
my $st = $enc->streamer(\&writer, batch_size => 10_000);
my $st = $enc->streamer(\&writer, batch_size => 10_000,
compress => 'lz4'); # XS-level compression
$st->push_row($row);
$st->push_row($row);
...
$st->finish;
Returns a C<ClickHouse::Encoder::Streamer> object that buffers rows and
flushes a complete Native block to C<&writer> every C<batch_size> rows.
Call C<finish> to flush any partial last batch, or C<reset> to discard
the buffered rows without flushing (useful for error recovery after an
upstream failure). C<buffered_count> and C<is_empty> let producers
inspect the current backlog. The streamer keeps the encoder alive for
its own lifetime, so dropping the encoder reference after creating the
streamer is safe.
Options:
=over 4
=item C<batch_size =E<gt> N>
Flush every C<N> rows. Default 10_000.
=item C<compress =E<gt> 'lz4' | 'zstd' | 'auto' | 'none'>
When set (and not C<'none'>/C<'raw'>), each emitted batch is wrapped
in CH's compressed-block framing via L</compress_native_block>
before being passed to C<&writer>. Done at the XS level, so no
per-batch Perl-callback overhead beyond the one C<compress_native_block>
call. Pair with L</compressed_writer> only when you want a different
compression scheme on top (e.g. HTTP C<Content-Encoding: gzip>).
=item C<hasher =E<gt> $coderef>
Override the bundled CityHash128 v1.0.2 used by the compression
framing. Useful when integrating with a non-default checksum
implementation; usually omit.
=back
The returned C<ClickHouse::Encoder::Streamer> object exposes:
=over 4
=item C<< $st->push_row(\@row) >>
Append one row. Triggers an auto-flush (one call to C<&writer>)
once the buffer reaches C<batch_size>.
=item C<< $st->finish >>
Flush any partial last batch via C<&writer> and return. Safe to
call multiple times - subsequent calls on an empty buffer are
no-ops, and a fresh C<push_row> after finish reopens the streamer.
=item C<< $st->reset >>
Discard buffered rows without flushing. Useful when an upstream
producer hits an error mid-batch and the in-flight rows should be
dropped rather than emitted with stale data.
=item C<< $st->buffered_count >>
Return the integer count of rows currently buffered (not yet
flushed). Lets producers inspect the backlog before committing.
=item C<< $st->is_empty >>
Return a true value when C<buffered_count == 0>. Convenience for
hot-path checks.
=back
lib/ClickHouse/Encoder.pm view on Meta::CPAN
ncols => $n_columns,
nrows => $n_rows,
columns => [
{ name => 'id', type => 'UInt64', values => [...] },
{ name => 'name', type => 'String', values => [...] },
...
],
consumed => $bytes_used,
}
C<consumed> is the number of bytes used. To walk a stream of
concatenated blocks (multi-block C<select format native> response),
prefer L</decode_blocks>. Or pass a starting offset directly:
my $block = ClickHouse::Encoder->decode_block($bytes, $offset);
The 3-arg form avoids the O(N) C<substr> copy per call that
C<<< substr($bytes, $offset) >>> would entail.
An optional fourth-argument hashref filters which columns to keep:
my $block = ClickHouse::Encoder->decode_block(
$bytes, 0, { id => 1, ts => 1 });
Columns whose name isn't in the filter still consume their wire
bytes (so the cursor stays aligned) but their C<values> array is
replaced with N C<undef>s and the column hashref carries a
C<<< skipped => 1 >>> marker. Skips the SV-allocation cost for
unwanted columns; useful on wide C<select *> responses.
XS implementation: walks the Native byte stream using the same type
parser the encoder uses, so symmetric round-trips are guaranteed for
every type C<encode> handles (C<BFloat16>, alphabetical Variant
remapping, C<LowCardinality> dict indirection,
C<SimpleAggregateFunction> passthrough, JSON typed paths, etc.).
C<Decimal128> values come back as C<<< [$lo_uint64, $hi_int64] >>>;
C<Decimal256> as a 4-limb arrayref. Use L</decimal128_str> /
L</decimal256_str> to convert to scaled decimal strings.
=head2 decode_rows
my $r = ClickHouse::Encoder->decode_rows($bytes);
my $r = ClickHouse::Encoder->decode_rows($bytes, $offset);
Row-oriented convenience. Returns:
{
ncols => $n_columns,
nrows => $n_rows,
names => [...],
types => [...],
rows => [[...], [...], ...],
consumed => $bytes_used,
}
Calls an XS row-major decoder (C<decode_block_rows>) that walks each
column then immediately distributes its values into the per-row
arrayrefs and frees the column AV. Peak memory holds one column's
AV plus the row AVs (vs both column- and row-major representations
fully alive, which a Perl-side transpose would entail). Throughput
is similar to L</decode_block>; the win is the tighter peak memory
on wide blocks.
=head2 decode_block_rows
my $r = ClickHouse::Encoder->decode_block_rows($bytes, $offset);
XS row-major decoder; same return shape as L</decode_rows>. Direct
entry point if you want to avoid the L</decode_rows> Perl-side
trampoline.
=head2 decode_blocks
my $blocks = ClickHouse::Encoder->decode_blocks($bytes);
ClickHouse::Encoder->decode_blocks($bytes, sub { my $b = shift; ... });
A C<select ... format native> response is a concatenated stream of
blocks (one per granule of C<max_block_size> rows). With no callback,
C<decode_blocks> walks the stream and returns an arrayref of the same
hashref shape as L</decode_block>. With a callback, each block is
passed to the callback as it's decoded and no list is accumulated -
useful for very long selects where the full block list wouldn't fit
comfortably in memory.
Uses the 3-arg form of L</decode_block> (with explicit offset) to
keep total work O(N) regardless of block count. Stops cleanly when
bytes are exhausted; partial trailing bytes croak.
The optional C<keep =E<gt> \%names> hashref forwards a column filter
to L</decode_block> for every block in the stream, matching the
same semantics: present keys are decoded, absent ones still have
their bytes consumed (to keep the cursor aligned) but their values
are not materialized and their column hash carries C<skipped =E<gt> 1>.
Useful for big-fan-out select responses where only a few columns
of a wide row matter.
ClickHouse::Encoder->decode_blocks($bytes, $cb,
keep => { id => 1, event => 1 });
=head2 decode_blocks_iter
my $iter = ClickHouse::Encoder->decode_blocks_iter($bytes);
while (my $block = $iter->()) { ... }
my $iter = ClickHouse::Encoder->decode_blocks_iter($bytes,
keep => { id => 1 });
Returns a coderef that yields one block per call (C<undef> when
exhausted). Same per-block payload as L</decode_block>; useful when
you want pull-style iteration without committing to a callback.
Accepts the same C<keep> filter as L</decode_blocks>.
=head2 decode_stream
ClickHouse::Encoder->decode_stream($fh, sub { my $block = shift; ... },
chunk_size => 65536);
ClickHouse::Encoder->decode_stream($fh, $cb,
keep => { id => 1, event => 1 });
lib/ClickHouse/Encoder.pm view on Meta::CPAN
Returns an order-of-magnitude figure for batch-split decisions
("should I split this into two POSTs?") without paying the encode
cost. Fixed-width types are byte-exact; variable types
(C<String>, C<Array>, etc.) use a configurable 16-byte average
heuristic. For byte-exact size, call C<length($enc-E<gt>encode(...))>.
=head2 select_blocks
ClickHouse::Encoder->select_blocks(
'select id, event, ts from events where date = today()',
host => 'db.example', port => 8123,
database => 'default', user => 'default',
on_block => sub { my $block = shift; ... },
keep => { id => 1, event => 1 }, # optional projection
);
Streaming counterpart to L</insert_http>: POSTs C<$sql> to the
ClickHouse HTTP endpoint with C<default_format=Native>, feeds the
response chunks into a sliding buffer, and invokes C<on_block> for
every complete L</decode_block>-shaped block as it arrives. Memory
stays bounded by one HTTP::Tiny chunk plus one block; this is the
right entry point for selects that return more than fits in
process memory.
C<$sql> must NOT end with a C<format ...> clause - C<select_blocks>
appends C<format Native> at the URL level and croaks when the SQL
trails with a different format pin. A C<FORMAT> token inside the
query body (for example a column literal) is fine.
The optional C<keep =E<gt> \%names> hashref forwards a column
filter to L</decode_block>: skipped columns still have their bytes
consumed (so the cursor stays aligned) but their values are not
materialized into SVs. Useful when you only need a few of many
select-list columns.
With C<decompress =E<gt> 1> the URL is augmented with
C<?compress=1> so ClickHouse wraps each response Native block in
its compressed-block framing (16-byte CityHash128 + 9-byte header
+ LZ4 payload). The HTTP body is then a stream of compressed blocks
which C<select_blocks> peels and decompresses block-by-block via
L</decompress_native_block> before feeding the result to
L</decode_block>. Memory stays bounded by one HTTP chunk plus one
compressed block plus one decompressed block.
Recognised options (besides C<on_block> / C<keep> /
C<decompress>): C<scheme>, C<host>, C<port>, C<database>, C<user>,
C<password>, C<timeout>, C<ssl_options>, C<verify_SSL>, and
C<settings> (per-query CH settings hashref, useful for
C<max_execution_time> and similar). C<dedup_token> is meaningful
only on insert and is ignored if passed here.
=head2 bulk_inserter
my $bi = ClickHouse::Encoder->bulk_inserter(
host => 'db.example', port => 8123, table => 'events',
columns => \@cols, batch_size => 5000, compress => 'zstd',
retries => 3);
$bi->push([$row]) for @rows;
$bi->finish;
Holds an L<HTTP::Tiny> instance with keep-alive across batches,
accumulates rows, auto-flushes at C<batch_size>, retries transient
HTTP failures (5xx and 599 network errors) with exponential backoff
and jitter. 4xx errors die immediately. Options:
=over 4
=item C<host>, C<port>, C<database>, C<user>, C<password>, C<scheme>, C<timeout>
Same as L</for_table>; passed to L<HTTP::Tiny>.
=item C<table>
Required; same identifier rule as L</for_table>.
=item C<encoder> or C<columns>
Pass either an existing encoder or a column list (used to build one).
=item C<batch_size>
Auto-flush threshold (default 10_000).
=item C<retries>
Max retries on transient failure (default 3). Set to 0 to disable.
=item C<retry_wait>
Base backoff in seconds (default 0.5). Waits grow exponentially -
the window for attempt I<n> is C<retry_wait * 2 ** n> - with equal
jitter (a random point in the upper half of the window), so
concurrent inserters retrying the same failed server do not
resynchronise into a thundering herd.
=item C<retry_max_wait>
Upper bound in seconds on the backoff window (default 30), capping
the exponential growth from C<retry_wait>.
=item C<compress>
C<'raw'> (default), C<'zstd'>, or C<'gzip'>. Sets the
C<Content-Encoding> header accordingly.
=item C<scheme>, C<ssl_options>, C<verify_SSL>
C<scheme =E<gt> 'https'> enables TLS via L<HTTP::Tiny> (install
L<IO::Socket::SSL> and L<Net::SSLeay>). C<ssl_options> and
C<verify_SSL> pass through to L<HTTP::Tiny>.
=item C<settings>
Hashref of per-query CH settings (C<max_execution_time>,
C<max_memory_usage>, ...) appended to every flush as URL params.
=item C<dedup_token>
Stamps every POST with C<insert_deduplication_token>. Identical
retries are rejected server-side, making the inserter
transactionally idempotent.
( run in 1.315 second using v1.01-cache-2.11-cpan-df04353d9ac )