ClickHouse-Encoder
view release on metacpan or search on metacpan
lib/ClickHouse/Encoder.pm view on Meta::CPAN
my ($name, $type) = @$c;
if ($type =~ /\ANested\((.+)\)\z/s) {
my @parts = _split_paren_list($1);
@parts or die "Nested($1) for column '$name' has no elements";
for my $part (@parts) {
$part =~ /\A([A-Za-z_][A-Za-z0-9_]*)\s+(.+?)\s*\z/s
or die "Nested element '$part' is not 'name Type'";
push @out, ["$name.$1", "Array($2)"];
}
} else {
push @out, [$name, $type];
}
}
return \@out;
}
# Split a comma-separated list at depth-0 commas (so Tuple(Int32, String)
# inside a Nested element stays one entry).
sub _split_paren_list {
my $body = shift;
my @parts;
my ($start, $depth, $len) = (0, 0, length $body);
for (my $i = 0; $i <= $len; $i++) {
my $c = $i < $len ? substr($body, $i, 1) : ',';
if ($c eq '(') { $depth++ }
elsif ($c eq ')') { $depth-- }
elsif ($c eq ',' && $depth == 0) {
(my $p = substr($body, $start, $i - $start)) =~ s/\A\s+|\s+\z//g;
push @parts, $p if length $p;
$start = $i + 1;
}
}
return @parts;
}
# Row-oriented decode: { ncols, nrows, names, types, rows } where
# rows is an arrayref of arrayrefs. Calls the XS decode_block_rows,
# which distributes column values into per-row arrayrefs as each
# column is decoded and frees the column AV eagerly. Peak memory
# is one column's AV plus the row AVs (vs a Perl-side transpose
# that holds ALL column AVs alongside the half-built row AVs).
# Throughput is similar to the column-major path; the win is the
# tighter memory profile on wide blocks.
sub decode_rows {
my ($class, $bytes, $offset) = @_;
return $class->decode_block_rows($bytes, $offset // 0);
}
# Decode a concatenated stream of Native blocks (the body of a
# `select ... format native` response). Returns an arrayref of the
# same hashref shape as decode_block. Stops cleanly when bytes are
# exhausted; partial trailing bytes raise an error from XS.
# Uses the 3-arg form of decode_block (with offset) to avoid O(N^2)
# substr copies on long streams.
sub decode_blocks {
my ($class, $bytes, $cb, %opts) = @_;
my $keep = $opts{keep};
my $off = 0;
my $len = length $bytes;
# Callback form: hand each block to $cb as it's decoded; never
# accumulate. Useful for streaming selects where the full block
# list would not fit comfortably in memory.
if ($cb) {
while ($off < $len) {
my $block = $class->decode_block($bytes, $off, $keep);
$off += $block->{consumed};
$cb->($block);
}
return;
}
my @blocks;
while ($off < $len) {
my $block = $class->decode_block($bytes, $off, $keep);
$off += $block->{consumed};
push @blocks, $block;
}
return \@blocks;
}
# Return a coderef that yields one block per call (undef when done).
# Holds a reference to $bytes; the closure is the only thing that
# survives between calls.
sub decode_blocks_iter {
my ($class, $bytes, %opts) = @_;
my $keep = $opts{keep};
my $off = 0;
my $len = length $bytes;
return sub {
return if $off >= $len;
my $block = $class->decode_block($bytes, $off, $keep);
$off += $block->{consumed};
return $block;
};
}
# Pull-style decoder that reads incrementally from a filehandle (or
# any IO::Handle-ish object). For each complete block, invokes $cb
# with the block hashref. Keeps a sliding buffer; on truncated decode
# it reads more bytes and retries. Useful when the response body is
# too large to fit in memory.
sub decode_stream {
my ($class, $fh, $cb, %opts) = @_;
my $chunk_size = $opts{chunk_size} // 64 * 1024;
my $keep = $opts{keep};
my $decompress = $opts{decompress};
my $buf = ''; # raw bytes from the filehandle
my $inner = ''; # decompressed Native bytes (== $buf when !decompress)
my $done = 0;
until ($done) {
# Phase 1: peel compressed-block frames out of $buf into $inner.
if ($decompress) {
while (length($buf) >= 25) {
my $csize = unpack 'V', substr($buf, 17, 4);
last if length($buf) < 16 + $csize;
my ($plain, $consumed) = eval {
$class->decompress_native_block($buf)
};
die $@ if $@;
$inner .= $plain;
substr($buf, 0, $consumed, '');
}
lib/ClickHouse/Encoder.pm view on Meta::CPAN
=over 4
=item *
Numeric types go through C<SvIV> / C<SvUV> / C<SvNV>. Negative inputs to
unsigned types are bit-cast (standard Perl behaviour).
=item *
C<Date> / C<Date32>: integer (or integer-valued string) is interpreted as
days since the epoch; a C<YYYY-MM-DD> string is parsed. Pass DateTime /
Time::Piece / Time::Moment objects as C<< $dt->epoch / 86400 >> -- the
encoder doesn't dispatch through C<< ->epoch >> itself.
=item *
C<DateTime>: integer is Unix seconds; a C<YYYY-MM-DD HH:MM:SS> string is
parsed. ISO 8601 forms are accepted: the C<T> separator, plus an optional
trailing timezone marker (C<Z>, C<+HH:MM>, C<-HH:MM>, C<+HHMM>, C<+HH>,
C<-HH>) is applied to convert to UTC. Pass date-objects via their
C<< ->epoch >>.
=item *
C<DateTime64(P)>: integer is in scaled units (i.e. ticks of
C<10^-P> seconds); a float is in seconds and scaled to ticks; a
C<YYYY-MM-DD HH:MM:SS.fff> string is parsed. For sub-second-aware
objects pass C<< $dt->hires_epoch >> (or C<< ->epoch >> if the object
is integer-only).
=item *
C<Decimal*>: a number goes through C<double> (lossy past 2^53); a
B<string> matching C<[+-]?digits[.digits]?> is parsed digit-by-digit and
scaled exactly. B<If precision matters, pass strings.>
=item *
C<Enum8> / C<Enum16>: accept either the declared name or its integer
value; mixing the two within a column is fine.
=item *
C<Nullable(T)>: C<undef> writes a null-bitmap entry plus a type-shaped
placeholder (zero scalar, empty array, or a recursive zero tuple).
=back
=head1 EXAMPLES
The F<eg/> directory ships runnable scripts:
=over 4
=item F<eg/insert_http.pl>
End-to-end insert over HTTP via L<HTTP::Tiny>. The shortest path to "insert
real data into a real ClickHouse".
=item F<eg/insert_streaming.pl>
Reuse one encoder across many batches, piping each batch to
C<clickhouse-client>. Demonstrates the intended one-encoder-many-batches
pattern.
=item F<eg/for_table.pl>
Schema discovery via C<for_table>.
=item F<eg/from_csv.pl>
Read a CSV with L<Text::CSV_XS>, map columns to a ClickHouse schema, and
insert via HTTP.
=item F<eg/insert_clickhouse_local.pl>
Server-less ETL: encode rows, pipe Native bytes into C<clickhouse-local>,
have it write a Parquet (or ORC, etc.) file.
=item F<eg/etl_dbi.pl>
Read rows from a source database via L<DBI>, encode to Native, insert into
ClickHouse via HTTP. Reuses one encoder across all fetched batches.
=item F<eg/insert_compressed.pl>
insert with on-the-wire compression (zstd via L<Compress::Zstd>, falling
back to gzip via core L<IO::Compress::Gzip>). Sets C<Content-Encoding>
so ClickHouse decompresses transparently.
=item F<eg/insert_async_ev.pl>
Non-blocking concurrent inserts using L<EV>'s event loop with raw HTTP
sockets, paired with this encoder's L</streamer>. Demonstrates the
"many in-flight inserts without blocking on each round-trip" pattern.
=item F<eg/insert_with_lowcardinality.pl>
Measures the wire-size reduction (~50% on event/log data) and encoding
throughput of C<LowCardinality(String)> versus plain C<String> for the
typical case where a column has few distinct values that repeat across
many rows.
=item F<eg/json_lines_ingest.pl>
Reads NDJSON from STDIN or a file, maps each object's fields onto a
ClickHouse table's columns (discovered via C<for_table>), and inserts
batched blocks over HTTP.
=item F<eg/streaming_aggregate.pl>
Pre-aggregates an event stream in Perl (per minute, per key) and
flushes rolled-up counters to a C<SummingMergeTree> on a wall-clock
timer. The classic pattern when the firehose is too high-cardinality
to store as raw rows.
=item F<eg/postgres_to_clickhouse.pl>
Replicates a PostgreSQL table to ClickHouse using L<DBD::Pg> on the
source side and this encoder's streamer on the destination side.
Memory is bounded by the batch size, so it scales to hundreds of
millions of rows.
=item F<eg/clickhouse_replication.pl>
Replicates one ClickHouse table to another (potentially on a different
server) by streaming Native bytes end-to-end via a temp-file spool.
=item F<eg/parallel_loader.pl>
Forks N worker processes, each ingesting one slice of the input.
Workers share nothing; each opens its own HTTP connection. Scales
network-bound ingestion linearly with worker count.
=item F<eg/redis_to_clickhouse.pl>
Drains a Redis stream (XREADGROUP) or list (BRPOP) into a ClickHouse
table, with idle-flush so the destination doesn't see arbitrarily
delayed batches when the source is quiet.
=item F<eg/syslog_ingest.pl>
Reads RFC 5424 syslog lines from STDIN, parses lossily (any
unparseable line still goes through with the raw text in C<msg>),
and inserts into a fixed schema.
=item F<eg/json_streaming.pl>
NDJSON from STDIN into a C<JSON> column via the encoder's streaming
mode; one HTTP request per batch instead of one per line.
=item F<eg/json_query.pl>
C<select ... format Native> over HTTP and walks the returned blocks
via L</decode_blocks>, demonstrating the symmetric decode path for
JSON columns.
=item F<eg/json_aggregate.pl>
Sketches an aggregation pipeline that bins JSON events by path and
emits the aggregates as a second insert.
=item F<eg/migrate_table.pl>
Copies one CH table into another (possibly on a different host) by
discovering the source schema via L</for_table> and streaming Native
blocks through.
=item F<eg/native_to_jsonl.pl>
Reads a Native byte stream from STDIN and prints each row as NDJSON
on STDOUT; the dual of F<json_lines_ingest.pl>.
=item F<eg/replay.pl>
Replays a captured Native byte stream against a table, useful for
post-hoc reproduction of an ingest bug from a saved request body.
=item F<eg/select_blocks_streaming.pl>
Streaming select counterpart to F<insert_streaming.pl>: uses
L</select_blocks> to walk a select response block-by-block, with
optional column projection via C<--keep>.
=item F<eg/json_path_projection.pl>
Demo of C<keep =E<gt> {...}> projection on top of L</select_blocks>:
decodes only the requested columns and prints one row per line.
=item F<eg/csv_export.pl>
select to CSV: counterpart to F<from_csv.pl>. Drives a CSV writer
from a streaming select, emitting the header row from the first
block's column names.
=item F<eg/migrate_with_transform.pl>
CH-to-CH migration with a row-level transform between read and
write. Discovers source schema via L</for_table>, streams the rows
via L</select_blocks>, applies a user-supplied transform coderef,
and forwards survivors through L</bulk_inserter>.
=item F<eg/replay_pcap.pl>
Replay a captured Native byte stream (e.g. saved from
C<curl --output> of a C<select ... format native> response) and
print a block-by-block summary. Off-line debugging tool.
=item F<eg/tcp_compressed_pipeline.pl>
End-to-end TCP insert pipeline that negotiates compression in
C<pack_query>, then wraps every C<pack_data> / C<pack_data_end>
in CH's compressed-block framing. Showcases the matched-pair
convention against a real ClickHouse server (protocol revision
E<lt>= 54474; see L<ClickHouse::Encoder::TCP/CAVEATS>).
=item F<eg/rowbinary_insert.pl>
insert using the C<RowBinary> format via L</encode_row_binary>,
with a local L</decode_row_binary> round-trip check. For interop
with pipelines that speak RowBinary rather than Native.
=item F<eg/async_insert.pl>
Server-side async insert - C<async_insert=1> (and optionally
C<wait_for_async_insert>) passed through the C<settings> option,
so the server buffers and background-flushes the batch.
=item F<eg/geo_from_wkt.pl>
Ingest geometry given as Well-Known-Text into C<Point> / C<Polygon>
columns using L</parse_wkt> to convert each WKT string.
=item F<eg/insert_with_settings.pl>
insert with per-query CH C<settings> and an C<insert_deduplication_token>,
showing how an identical retry under the same token is deduplicated
server-side.
=item F<eg/ping_healthcheck.pl>
A wait-for-server readiness gate built on L</ping> - retry until the
C</ping> endpoint answers, then proceed.
=item F<eg/observability.pl>
Reads server-side stats from an insert pipeline: per-batch
C<< $bi->last_response->{ch} >> detail plus the cumulative
C<< $bi->summary >> rollup of C<X-ClickHouse-Summary> counters.
=item F<eg/schema_migrate.pl>
Fetches C<show create table>, parses it with L</parse_create_table>,
( run in 0.808 second using v1.01-cache-2.11-cpan-140bd7fdf52 )