ClickHouse-Encoder

 view release on metacpan or  search on metacpan

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

        my ($name, $type) = @$c;
        if ($type =~ /\ANested\((.+)\)\z/s) {
            my @parts = _split_paren_list($1);
            @parts or die "Nested($1) for column '$name' has no elements";
            for my $part (@parts) {
                $part =~ /\A([A-Za-z_][A-Za-z0-9_]*)\s+(.+?)\s*\z/s
                    or die "Nested element '$part' is not 'name Type'";
                push @out, ["$name.$1", "Array($2)"];
            }
        } else {
            push @out, [$name, $type];
        }
    }
    return \@out;
}

# Split a comma-separated list at depth-0 commas (so Tuple(Int32, String)
# inside a Nested element stays one entry).
sub _split_paren_list {
    my $body = shift;
    my @parts;
    my ($start, $depth, $len) = (0, 0, length $body);
    for (my $i = 0; $i <= $len; $i++) {
        my $c = $i < $len ? substr($body, $i, 1) : ',';
        if    ($c eq '(') { $depth++ }
        elsif ($c eq ')') { $depth-- }
        elsif ($c eq ',' && $depth == 0) {
            (my $p = substr($body, $start, $i - $start)) =~ s/\A\s+|\s+\z//g;
            push @parts, $p if length $p;
            $start = $i + 1;
        }
    }
    return @parts;
}

# Row-oriented decode: { ncols, nrows, names, types, rows } where
# rows is an arrayref of arrayrefs. Calls the XS decode_block_rows,
# which distributes column values into per-row arrayrefs as each
# column is decoded and frees the column AV eagerly. Peak memory
# is one column's AV plus the row AVs (vs a Perl-side transpose
# that holds ALL column AVs alongside the half-built row AVs).
# Throughput is similar to the column-major path; the win is the
# tighter memory profile on wide blocks.
sub decode_rows {
    my ($class, $bytes, $offset) = @_;
    return $class->decode_block_rows($bytes, $offset // 0);
}

# Decode a concatenated stream of Native blocks (the body of a
# `select ... format native` response). Returns an arrayref of the
# same hashref shape as decode_block. Stops cleanly when bytes are
# exhausted; partial trailing bytes raise an error from XS.
# Uses the 3-arg form of decode_block (with offset) to avoid O(N^2)
# substr copies on long streams.
sub decode_blocks {
    my ($class, $bytes, $cb, %opts) = @_;
    my $keep = $opts{keep};
    my $off = 0;
    my $len = length $bytes;
    # Callback form: hand each block to $cb as it's decoded; never
    # accumulate. Useful for streaming selects where the full block
    # list would not fit comfortably in memory.
    if ($cb) {
        while ($off < $len) {
            my $block = $class->decode_block($bytes, $off, $keep);
            $off += $block->{consumed};
            $cb->($block);
        }
        return;
    }
    my @blocks;
    while ($off < $len) {
        my $block = $class->decode_block($bytes, $off, $keep);
        $off += $block->{consumed};
        push @blocks, $block;
    }
    return \@blocks;
}

# Return a coderef that yields one block per call (undef when done).
# Holds a reference to $bytes; the closure is the only thing that
# survives between calls.
sub decode_blocks_iter {
    my ($class, $bytes, %opts) = @_;
    my $keep = $opts{keep};
    my $off = 0;
    my $len = length $bytes;
    return sub {
        return if $off >= $len;
        my $block = $class->decode_block($bytes, $off, $keep);
        $off += $block->{consumed};
        return $block;
    };
}

# Pull-style decoder that reads incrementally from a filehandle (or
# any IO::Handle-ish object). For each complete block, invokes $cb
# with the block hashref. Keeps a sliding buffer; on truncated decode
# it reads more bytes and retries. Useful when the response body is
# too large to fit in memory.
sub decode_stream {
    my ($class, $fh, $cb, %opts) = @_;
    my $chunk_size = $opts{chunk_size} // 64 * 1024;
    my $keep       = $opts{keep};
    my $decompress = $opts{decompress};
    my $buf = '';        # raw bytes from the filehandle
    my $inner = '';      # decompressed Native bytes (== $buf when !decompress)
    my $done = 0;
    until ($done) {
        # Phase 1: peel compressed-block frames out of $buf into $inner.
        if ($decompress) {
            while (length($buf) >= 25) {
                my $csize = unpack 'V', substr($buf, 17, 4);
                last if length($buf) < 16 + $csize;
                my ($plain, $consumed) = eval {
                    $class->decompress_native_block($buf)
                };
                die $@ if $@;
                $inner .= $plain;
                substr($buf, 0, $consumed, '');
            }

lib/ClickHouse/Encoder.pm  view on Meta::CPAN


=over 4

=item *

Numeric types go through C<SvIV> / C<SvUV> / C<SvNV>. Negative inputs to
unsigned types are bit-cast (standard Perl behaviour).

=item *

C<Date> / C<Date32>: integer (or integer-valued string) is interpreted as
days since the epoch; a C<YYYY-MM-DD> string is parsed. Pass DateTime /
Time::Piece / Time::Moment objects as C<< $dt->epoch / 86400 >> -- the
encoder doesn't dispatch through C<< ->epoch >> itself.

=item *

C<DateTime>: integer is Unix seconds; a C<YYYY-MM-DD HH:MM:SS> string is
parsed. ISO 8601 forms are accepted: the C<T> separator, plus an optional
trailing timezone marker (C<Z>, C<+HH:MM>, C<-HH:MM>, C<+HHMM>, C<+HH>,
C<-HH>) is applied to convert to UTC. Pass date-objects via their
C<< ->epoch >>.

=item *

C<DateTime64(P)>: integer is in scaled units (i.e. ticks of
C<10^-P> seconds); a float is in seconds and scaled to ticks; a
C<YYYY-MM-DD HH:MM:SS.fff> string is parsed. For sub-second-aware
objects pass C<< $dt->hires_epoch >> (or C<< ->epoch >> if the object
is integer-only).

=item *

C<Decimal*>: a number goes through C<double> (lossy past 2^53); a
B<string> matching C<[+-]?digits[.digits]?> is parsed digit-by-digit and
scaled exactly. B<If precision matters, pass strings.>

=item *

C<Enum8> / C<Enum16>: accept either the declared name or its integer
value; mixing the two within a column is fine.

=item *

C<Nullable(T)>: C<undef> writes a null-bitmap entry plus a type-shaped
placeholder (zero scalar, empty array, or a recursive zero tuple).

=back

=head1 EXAMPLES

The F<eg/> directory ships runnable scripts:

=over 4

=item F<eg/insert_http.pl>

End-to-end insert over HTTP via L<HTTP::Tiny>. The shortest path to "insert
real data into a real ClickHouse".

=item F<eg/insert_streaming.pl>

Reuse one encoder across many batches, piping each batch to
C<clickhouse-client>. Demonstrates the intended one-encoder-many-batches
pattern.

=item F<eg/for_table.pl>

Schema discovery via C<for_table>.

=item F<eg/from_csv.pl>

Read a CSV with L<Text::CSV_XS>, map columns to a ClickHouse schema, and
insert via HTTP.

=item F<eg/insert_clickhouse_local.pl>

Server-less ETL: encode rows, pipe Native bytes into C<clickhouse-local>,
have it write a Parquet (or ORC, etc.) file.

=item F<eg/etl_dbi.pl>

Read rows from a source database via L<DBI>, encode to Native, insert into
ClickHouse via HTTP. Reuses one encoder across all fetched batches.

=item F<eg/insert_compressed.pl>

insert with on-the-wire compression (zstd via L<Compress::Zstd>, falling
back to gzip via core L<IO::Compress::Gzip>). Sets C<Content-Encoding>
so ClickHouse decompresses transparently.

=item F<eg/insert_async_ev.pl>

Non-blocking concurrent inserts using L<EV>'s event loop with raw HTTP
sockets, paired with this encoder's L</streamer>. Demonstrates the
"many in-flight inserts without blocking on each round-trip" pattern.

=item F<eg/insert_with_lowcardinality.pl>

Measures the wire-size reduction (~50% on event/log data) and encoding
throughput of C<LowCardinality(String)> versus plain C<String> for the
typical case where a column has few distinct values that repeat across
many rows.

=item F<eg/json_lines_ingest.pl>

Reads NDJSON from STDIN or a file, maps each object's fields onto a
ClickHouse table's columns (discovered via C<for_table>), and inserts
batched blocks over HTTP.

=item F<eg/streaming_aggregate.pl>

Pre-aggregates an event stream in Perl (per minute, per key) and
flushes rolled-up counters to a C<SummingMergeTree> on a wall-clock
timer. The classic pattern when the firehose is too high-cardinality
to store as raw rows.

=item F<eg/postgres_to_clickhouse.pl>

Replicates a PostgreSQL table to ClickHouse using L<DBD::Pg> on the
source side and this encoder's streamer on the destination side.
Memory is bounded by the batch size, so it scales to hundreds of
millions of rows.

=item F<eg/clickhouse_replication.pl>

Replicates one ClickHouse table to another (potentially on a different
server) by streaming Native bytes end-to-end via a temp-file spool.

=item F<eg/parallel_loader.pl>

Forks N worker processes, each ingesting one slice of the input.
Workers share nothing; each opens its own HTTP connection. Scales
network-bound ingestion linearly with worker count.

=item F<eg/redis_to_clickhouse.pl>

Drains a Redis stream (XREADGROUP) or list (BRPOP) into a ClickHouse
table, with idle-flush so the destination doesn't see arbitrarily
delayed batches when the source is quiet.

=item F<eg/syslog_ingest.pl>

Reads RFC 5424 syslog lines from STDIN, parses lossily (any
unparseable line still goes through with the raw text in C<msg>),
and inserts into a fixed schema.

=item F<eg/json_streaming.pl>

NDJSON from STDIN into a C<JSON> column via the encoder's streaming
mode; one HTTP request per batch instead of one per line.

=item F<eg/json_query.pl>

C<select ... format Native> over HTTP and walks the returned blocks
via L</decode_blocks>, demonstrating the symmetric decode path for
JSON columns.

=item F<eg/json_aggregate.pl>

Sketches an aggregation pipeline that bins JSON events by path and
emits the aggregates as a second insert.

=item F<eg/migrate_table.pl>

Copies one CH table into another (possibly on a different host) by
discovering the source schema via L</for_table> and streaming Native
blocks through.

=item F<eg/native_to_jsonl.pl>

Reads a Native byte stream from STDIN and prints each row as NDJSON
on STDOUT; the dual of F<json_lines_ingest.pl>.

=item F<eg/replay.pl>

Replays a captured Native byte stream against a table, useful for
post-hoc reproduction of an ingest bug from a saved request body.

=item F<eg/select_blocks_streaming.pl>

Streaming select counterpart to F<insert_streaming.pl>: uses
L</select_blocks> to walk a select response block-by-block, with
optional column projection via C<--keep>.

=item F<eg/json_path_projection.pl>

Demo of C<keep =E<gt> {...}> projection on top of L</select_blocks>:
decodes only the requested columns and prints one row per line.

=item F<eg/csv_export.pl>

select to CSV: counterpart to F<from_csv.pl>. Drives a CSV writer
from a streaming select, emitting the header row from the first
block's column names.

=item F<eg/migrate_with_transform.pl>

CH-to-CH migration with a row-level transform between read and
write. Discovers source schema via L</for_table>, streams the rows
via L</select_blocks>, applies a user-supplied transform coderef,
and forwards survivors through L</bulk_inserter>.

=item F<eg/replay_pcap.pl>

Replay a captured Native byte stream (e.g. saved from
C<curl --output> of a C<select ... format native> response) and
print a block-by-block summary. Off-line debugging tool.

=item F<eg/tcp_compressed_pipeline.pl>

End-to-end TCP insert pipeline that negotiates compression in
C<pack_query>, then wraps every C<pack_data> / C<pack_data_end>
in CH's compressed-block framing. Showcases the matched-pair
convention against a real ClickHouse server (protocol revision
E<lt>= 54474; see L<ClickHouse::Encoder::TCP/CAVEATS>).

=item F<eg/rowbinary_insert.pl>

insert using the C<RowBinary> format via L</encode_row_binary>,
with a local L</decode_row_binary> round-trip check. For interop
with pipelines that speak RowBinary rather than Native.

=item F<eg/async_insert.pl>

Server-side async insert - C<async_insert=1> (and optionally
C<wait_for_async_insert>) passed through the C<settings> option,
so the server buffers and background-flushes the batch.

=item F<eg/geo_from_wkt.pl>

Ingest geometry given as Well-Known-Text into C<Point> / C<Polygon>
columns using L</parse_wkt> to convert each WKT string.

=item F<eg/insert_with_settings.pl>

insert with per-query CH C<settings> and an C<insert_deduplication_token>,
showing how an identical retry under the same token is deduplicated
server-side.

=item F<eg/ping_healthcheck.pl>

A wait-for-server readiness gate built on L</ping> - retry until the
C</ping> endpoint answers, then proceed.

=item F<eg/observability.pl>

Reads server-side stats from an insert pipeline: per-batch
C<< $bi->last_response->{ch} >> detail plus the cumulative
C<< $bi->summary >> rollup of C<X-ClickHouse-Summary> counters.

=item F<eg/schema_migrate.pl>

Fetches C<show create table>, parses it with L</parse_create_table>,



( run in 0.808 second using v1.01-cache-2.11-cpan-140bd7fdf52 )