ClickHouse-Encoder

 view release on metacpan or  search on metacpan

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

Typical use is piping into C<clickhouse-client>:

    $enc->encode_to_command(
        ['clickhouse-client', '--query', 'insert into events format native'],
        \@rows,
    );

=head2 flatten_nested

    my $cols = ClickHouse::Encoder->flatten_nested(\@cols);

Class method that expands any C<Nested(field T, ...)> entries in a
column list into the flat C<name.field Array(T)> columns ClickHouse
stores them as on the wire. Non-Nested columns pass through unchanged.

    my $cols = ClickHouse::Encoder->flatten_nested([
        ['events', 'Nested(t DateTime, kind String)'],
        ['ts',     'DateTime'],
    ]);
    # => [ ['events.t', 'Array(DateTime)'],
    #      ['events.kind', 'Array(String)'],
    #      ['ts', 'DateTime'] ]
    my $enc = ClickHouse::Encoder->new(columns => $cols);

C<for_table> already returns the flat form because C<describe table>
reports it that way, so this helper is for hand-written schemas that
mirror the user's create table more naturally.

=head2 decode_block

    my $block = ClickHouse::Encoder->decode_block($bytes);

Decode the first Native block in C<$bytes>. Returns a hashref:

    {
        ncols    => $n_columns,
        nrows    => $n_rows,
        columns  => [
            { name => 'id',   type => 'UInt64', values => [...] },
            { name => 'name', type => 'String', values => [...] },
            ...
        ],
        consumed => $bytes_used,
    }

C<consumed> is the number of bytes used. To walk a stream of
concatenated blocks (multi-block C<select format native> response),
prefer L</decode_blocks>. Or pass a starting offset directly:

    my $block = ClickHouse::Encoder->decode_block($bytes, $offset);

The 3-arg form avoids the O(N) C<substr> copy per call that
C<<< substr($bytes, $offset) >>> would entail.

An optional fourth-argument hashref filters which columns to keep:

    my $block = ClickHouse::Encoder->decode_block(
        $bytes, 0, { id => 1, ts => 1 });

Columns whose name isn't in the filter still consume their wire
bytes (so the cursor stays aligned) but their C<values> array is
replaced with N C<undef>s and the column hashref carries a
C<<< skipped => 1 >>> marker. Skips the SV-allocation cost for
unwanted columns; useful on wide C<select *> responses.

XS implementation: walks the Native byte stream using the same type
parser the encoder uses, so symmetric round-trips are guaranteed for
every type C<encode> handles (C<BFloat16>, alphabetical Variant
remapping, C<LowCardinality> dict indirection,
C<SimpleAggregateFunction> passthrough, JSON typed paths, etc.).

C<Decimal128> values come back as C<<< [$lo_uint64, $hi_int64] >>>;
C<Decimal256> as a 4-limb arrayref. Use L</decimal128_str> /
L</decimal256_str> to convert to scaled decimal strings.

=head2 decode_rows

    my $r = ClickHouse::Encoder->decode_rows($bytes);
    my $r = ClickHouse::Encoder->decode_rows($bytes, $offset);

Row-oriented convenience. Returns:

    {
        ncols => $n_columns,
        nrows => $n_rows,
        names => [...],
        types => [...],
        rows  => [[...], [...], ...],
        consumed => $bytes_used,
    }

Calls an XS row-major decoder (C<decode_block_rows>) that walks each
column then immediately distributes its values into the per-row
arrayrefs and frees the column AV. Peak memory holds one column's
AV plus the row AVs (vs both column- and row-major representations
fully alive, which a Perl-side transpose would entail). Throughput
is similar to L</decode_block>; the win is the tighter peak memory
on wide blocks.

=head2 decode_block_rows

    my $r = ClickHouse::Encoder->decode_block_rows($bytes, $offset);

XS row-major decoder; same return shape as L</decode_rows>. Direct
entry point if you want to avoid the L</decode_rows> Perl-side
trampoline.

=head2 decode_blocks

    my $blocks = ClickHouse::Encoder->decode_blocks($bytes);
    ClickHouse::Encoder->decode_blocks($bytes, sub { my $b = shift; ... });

A C<select ... format native> response is a concatenated stream of
blocks (one per granule of C<max_block_size> rows). With no callback,
C<decode_blocks> walks the stream and returns an arrayref of the same
hashref shape as L</decode_block>. With a callback, each block is
passed to the callback as it's decoded and no list is accumulated -
useful for very long selects where the full block list wouldn't fit
comfortably in memory.

Uses the 3-arg form of L</decode_block> (with explicit offset) to
keep total work O(N) regardless of block count. Stops cleanly when
bytes are exhausted; partial trailing bytes croak.

The optional C<keep =E<gt> \%names> hashref forwards a column filter
to L</decode_block> for every block in the stream, matching the
same semantics: present keys are decoded, absent ones still have
their bytes consumed (to keep the cursor aligned) but their values
are not materialized and their column hash carries C<skipped =E<gt> 1>.
Useful for big-fan-out select responses where only a few columns
of a wide row matter.

    ClickHouse::Encoder->decode_blocks($bytes, $cb,
        keep => { id => 1, event => 1 });

=head2 decode_blocks_iter

    my $iter = ClickHouse::Encoder->decode_blocks_iter($bytes);
    while (my $block = $iter->()) { ... }

    my $iter = ClickHouse::Encoder->decode_blocks_iter($bytes,
        keep => { id => 1 });

Returns a coderef that yields one block per call (C<undef> when
exhausted). Same per-block payload as L</decode_block>; useful when
you want pull-style iteration without committing to a callback.
Accepts the same C<keep> filter as L</decode_blocks>.

=head2 decode_stream

    ClickHouse::Encoder->decode_stream($fh, sub { my $block = shift; ... },
                                       chunk_size => 65536);

    ClickHouse::Encoder->decode_stream($fh, $cb,
        keep => { id => 1, event => 1 });

Pull bytes incrementally from a filehandle (or any read-able IO
handle), yielding each complete block to the callback as it
arrives. Uses a sliding buffer; on a truncated decode it reads more
bytes and retries. Memory stays bounded by C<chunk_size> + one
block, so this is the right entry point for select responses too
large to buffer in full. Croaks on partial trailing bytes.

The C<keep> filter is the same one L</decode_block> accepts:
unwanted columns still have their bytes consumed (so the cursor
stays aligned) but their values are not materialized into an SV
array, so peak memory stays bounded by the kept columns.

With C<decompress =E<gt> 1>, C<$fh> is expected to deliver a stream
of compressed-block-framed Native blocks (the format CH's HTTP
C<?compress=1> response uses, or a captured native-TCP Data stream
under compression). C<decode_stream> peels each compressed block
via L</decompress_native_block> before feeding the resulting raw
Native bytes into L</decode_block>.

C<$fh> must support Perl's C<read()> builtin (any plain filehandle
or L<IO::Handle> subclass). Raw socket descriptors that only
support C<sysread> need to be wrapped via L<IO::Socket> or read
into a buffer that is then fed to L</decode_block> directly.

=head2 ping

    ClickHouse::Encoder->ping(host => 'db', port => 8123);
    ClickHouse::Encoder->ping(scheme => 'https', host => 'db', port => 8443);

Liveness check via CH's C</ping> endpoint. Returns C<1> on success;
croaks on connection refused, timeout, or any non-2xx HTTP status.
Accepts the same C<scheme>/C<host>/C<port>/C<timeout>/C<ssl_options>
options as the rest of the HTTP entry points.

=head2 server_version

    my $v = ClickHouse::Encoder->server_version(
        host => 'db', port => 8123);
    if ($v->{major} >= 24) { ... }

Fetches C<select version()> over HTTP. In scalar context returns
C<<< { major, minor, patch, build, raw } >>>; in list context
returns C<($major, $minor, $patch, $build, $raw)>. Useful for
capability gating in user code. Accepts the same
C<scheme>/C<host>/C<port>/C<database>/C<user>/C<password>/C<timeout>/
C<ssl_options>/C<verify_SSL>/C<settings> options as the rest of the
HTTP entry points.

=head2 types

    my @t = ClickHouse::Encoder->types;

Returns the list of supported ClickHouse type names (parametric
types as their syntactic prefix, e.g. C<Decimal>, C<Array>). For
runtime feature detection and tooling that wants to introspect
supported types without parsing POD.

=head2 schema_diff

    my $d = ClickHouse::Encoder->schema_diff(\@cols_a, \@cols_b);
    # $d = {
    #     added   => [[name, type], ...],   # in $b but not $a
    #     removed => [[name, type], ...],   # in $a but not $b
    #     changed => [[name, type_a, type_b], ...],
    # }

Compare two column lists (each an arrayref of C<[$name, $type]>
pairs, the shape L</new> takes). Useful for migration scripts and
detecting schema drift between source and destination in CH-to-CH

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

formatted values. Raw integer epochs are still cheaper to compare,
filter, or pass back into a re-encode.

=head2 parse_wkt

    my $point = ClickHouse::Encoder->parse_wkt('POINT(1.5 2.5)');
    my $poly  = ClickHouse::Encoder->parse_wkt(
        'POLYGON((0 0, 4 0, 4 4, 0 4, 0 0))');
    # round-trip into a CH Geo column:
    my $enc = ClickHouse::Encoder->new(columns => [
        ['p', 'Point'], ['poly', 'Polygon']]);
    $enc->encode([[ $point, $poly ]]);

Parse a Well-Known-Text geometry string into the nested-arrayref
shape that the Geo column encoders accept. Supports C<POINT>,
C<LINESTRING>, C<MULTILINESTRING>, C<POLYGON>, and C<MULTIPOLYGON>.
The CH C<Ring> type has no WKT name; feed a C<LINESTRING> result
into a Ring column directly. Geometry names are case-insensitive
and surrounding whitespace is tolerated. Malformed input croaks
with a message identifying the offending geometry.

=head2 estimate_size

    my $bytes = $enc->estimate_size(\@rows);
    my $bytes = $enc->estimate_size($n_rows,
                                    avg_string_size => 64);

Coarse byte-size estimate for an encoded block, parameterized on row
count (an integer or arrayref-of-rows; only the count is used).
Returns an order-of-magnitude figure for batch-split decisions
("should I split this into two POSTs?") without paying the encode
cost. Fixed-width types are byte-exact; variable types
(C<String>, C<Array>, etc.) use a configurable 16-byte average
heuristic. For byte-exact size, call C<length($enc-E<gt>encode(...))>.

=head2 select_blocks

    ClickHouse::Encoder->select_blocks(
        'select id, event, ts from events where date = today()',
        host     => 'db.example', port => 8123,
        database => 'default',    user => 'default',
        on_block => sub { my $block = shift; ... },
        keep     => { id => 1, event => 1 },  # optional projection
    );

Streaming counterpart to L</insert_http>: POSTs C<$sql> to the
ClickHouse HTTP endpoint with C<default_format=Native>, feeds the
response chunks into a sliding buffer, and invokes C<on_block> for
every complete L</decode_block>-shaped block as it arrives. Memory
stays bounded by one HTTP::Tiny chunk plus one block; this is the
right entry point for selects that return more than fits in
process memory.

C<$sql> must NOT end with a C<format ...> clause - C<select_blocks>
appends C<format Native> at the URL level and croaks when the SQL
trails with a different format pin. A C<FORMAT> token inside the
query body (for example a column literal) is fine.

The optional C<keep =E<gt> \%names> hashref forwards a column
filter to L</decode_block>: skipped columns still have their bytes
consumed (so the cursor stays aligned) but their values are not
materialized into SVs. Useful when you only need a few of many
select-list columns.

With C<decompress =E<gt> 1> the URL is augmented with
C<?compress=1> so ClickHouse wraps each response Native block in
its compressed-block framing (16-byte CityHash128 + 9-byte header
+ LZ4 payload). The HTTP body is then a stream of compressed blocks
which C<select_blocks> peels and decompresses block-by-block via
L</decompress_native_block> before feeding the result to
L</decode_block>. Memory stays bounded by one HTTP chunk plus one
compressed block plus one decompressed block.

Recognised options (besides C<on_block> / C<keep> /
C<decompress>): C<scheme>, C<host>, C<port>, C<database>, C<user>,
C<password>, C<timeout>, C<ssl_options>, C<verify_SSL>, and
C<settings> (per-query CH settings hashref, useful for
C<max_execution_time> and similar). C<dedup_token> is meaningful
only on insert and is ignored if passed here.

=head2 bulk_inserter

    my $bi = ClickHouse::Encoder->bulk_inserter(
        host => 'db.example', port => 8123, table => 'events',
        columns => \@cols, batch_size => 5000, compress => 'zstd',
        retries => 3);
    $bi->push([$row]) for @rows;
    $bi->finish;

Holds an L<HTTP::Tiny> instance with keep-alive across batches,
accumulates rows, auto-flushes at C<batch_size>, retries transient
HTTP failures (5xx and 599 network errors) with exponential backoff
and jitter. 4xx errors die immediately. Options:

=over 4

=item C<host>, C<port>, C<database>, C<user>, C<password>, C<scheme>, C<timeout>

Same as L</for_table>; passed to L<HTTP::Tiny>.

=item C<table>

Required; same identifier rule as L</for_table>.

=item C<encoder> or C<columns>

Pass either an existing encoder or a column list (used to build one).

=item C<batch_size>

Auto-flush threshold (default 10_000).

=item C<retries>

Max retries on transient failure (default 3). Set to 0 to disable.

=item C<retry_wait>

Base backoff in seconds (default 0.5). Waits grow exponentially -
the window for attempt I<n> is C<retry_wait * 2 ** n> - with equal
jitter (a random point in the upper half of the window), so

lib/ClickHouse/Encoder.pm  view on Meta::CPAN


    my $framed = ClickHouse::Encoder->compress_native_block(
        $native_bytes,
        mode   => 'lz4',  # 'lz4' | 'zstd' | 'auto' | 'none'
        # hasher => \&my_cityhash128,   # optional; default = bundled
    );

Wraps an encoded Native block in ClickHouse's CompressedReadBuffer
framing: a 16-byte checksum, then a 9-byte header (1-byte method
tag + LE UInt32 compressed_size + LE UInt32 uncompressed_size),
then the LZ4 (tag C<0x82>), ZSTD (tag C<0x90>), or uncompressed
(tag C<0x02>) payload. This is the framing used by the native TCP
protocol when compression is negotiated and by Native-over-HTTP
with C<&compress=1> / C<&decompress=1>.

Modes:

=over 4

=item C<'lz4'>

LZ4-compressed via L<Compress::LZ4>'s raw form (no length prefix).

=item C<'zstd'>

ZSTD-compressed via L<Compress::Zstd>.

=item C<'auto'>

Try LZ4 first; if the result is C<E<gt>=> the input, fall back to
C<'none'>. Mirrors CH's own C<CompressedWriteBuffer> behavior for
incompressible payloads.

=item C<'none'>

No compression but still wrapped in the framing (method tag C<0x02>).
Useful when the wire context requires compressed-block framing but
the payload doesn't benefit from compression.

=back

The checksum is CityHash128 in the "cityhash102" variant
(ClickHouse's namespace fork of Google CityHash v1.0.2). This
module bundles a port of that algorithm in F<cityhash.c>, exposed
as the XSUB C<_cityhash128>; both C<compress_native_block> and
L</decompress_native_block> default to it. Pass an explicit
C<hasher =E<gt> $coderef> only if you want to plug in a different
implementation.

C<Compress::LZ4> is required for C<'lz4'> / C<'auto'> mode;
C<Compress::Zstd> for C<'zstd'>. Both are listed as runtime
C<recommends>.

=head2 decompress_native_block

    my ($plain, $consumed) = ClickHouse::Encoder->decompress_native_block(
        $framed);                          # default hasher = bundled
    my $plain = ClickHouse::Encoder->decompress_native_block(
        $framed, hasher => undef);         # skip checksum verification
    my ($plain, $n) = ClickHouse::Encoder->decompress_native_block(
        $stream, offset => $cursor);       # walk a multi-block stream

Inverse of L</compress_native_block>: verifies the checksum (unless
C<hasher =E<gt> undef>), unpacks the payload by method tag, and
returns the raw Native bytes. In list context also returns the
number of bytes consumed from C<$bytes> (16 + 9 + payload length),
so the caller can advance an offset cursor through a stream of
back-to-back compressed blocks.

=head1 TYPES

=head2 Supported

=over 4

=item *

Integers: C<Int8>, C<Int16>, C<Int32>, C<Int64>, C<UInt8>, C<UInt16>,
C<UInt32>, C<UInt64>.

=item *

Floats: C<Float32>, C<Float64>, C<BFloat16> (CH 24.x; 2-byte truncated
Float32). C<Inf>, C<-Inf>, and C<NaN> are preserved.

=item *

Strings: C<String> (length-prefixed bytes), C<FixedString(N)> (N bytes,
null-padded). Both pass the SV's bytes through unchanged: a UTF-8 string
encodes its UTF-8 bytes, a binary blob encodes its bytes, and truncation
is by byte not codepoint.

=item *

Dates: C<Date>, C<Date32>, C<DateTime>, C<DateTime('tz')> (timezone is part
of the schema, not the value), C<DateTime64(P)> with C<P> in 0..9.

=item *

Decimals: C<Decimal32(S)> (S in 0..9), C<Decimal64(S)> (0..18),
C<Decimal128(S)> (0..38), C<Decimal256(S)> (0..76), and
C<Decimal(P, S)> with C<P> in 1..38 (auto-routed to 32/64/128).

=item *

Enums: C<Enum8('a' = 1, ...)>, C<Enum16(...)>.

=item *

C<Bool> / C<Boolean> (1 byte; truthy/falsy in Perl sense).

=item *

C<UUID> (16 bytes; accept either the standard
C<xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx> string or 16 raw bytes).

=item *

C<IPv4> (UInt32 LE; accept dotted-quad string or integer),
C<IPv6> (16 bytes network-order; accept colon-hex string or 16 raw bytes).

=item *

C<Map(K, V)> (wire-equivalent to C<Array(Tuple(K, V))>; accept either a
hashref or an arrayref of pairs).

=item *



( run in 2.997 seconds using v1.01-cache-2.11-cpan-cdf2f3d4e48 )