ClickHouse-Encoder

 view release on metacpan or  search on metacpan

MANIFEST  view on Meta::CPAN

eg/clickhouse_replication.pl
eg/csv_export.pl
eg/etl_dbi.pl
eg/for_table.pl
eg/from_csv.pl
eg/geo_from_wkt.pl
eg/insert_async_ev.pl
eg/insert_clickhouse_local.pl
eg/insert_compressed.pl
eg/insert_http.pl
eg/insert_streaming.pl
eg/insert_with_lowcardinality.pl
eg/insert_with_settings.pl
eg/json_aggregate.pl
eg/json_lines_ingest.pl
eg/json_path_projection.pl
eg/json_query.pl
eg/json_streaming.pl
eg/migrate_table.pl
eg/migrate_with_transform.pl
eg/native_to_jsonl.pl
eg/observability.pl
eg/parallel_loader.pl
eg/ping_healthcheck.pl
eg/postgres_to_clickhouse.pl
eg/redis_to_clickhouse.pl
eg/replay.pl
eg/replay_pcap.pl
eg/rowbinary_insert.pl
eg/schema_migrate.pl
eg/select_blocks_streaming.pl
eg/streaming_aggregate.pl
eg/syslog_ingest.pl
eg/tcp_compressed_pipeline.pl
encode.c
encode.h
Encoder.xs
json_kind.c
json_kind.h
lib/ClickHouse/Encoder.pm
lib/ClickHouse/Encoder/TCP.pm
Makefile.PL

README  view on Meta::CPAN

    incrementally from a filehandle - memory bounded by one block at
    a time. ->decode_block($bytes, $offset, \%keep) skips data for
    unwanted columns (memory win on wide select *).

DOCUMENTATION
    See `perldoc ClickHouse::Encoder` after install, or the POD in
    lib/ClickHouse/Encoder.pm.

EXAMPLES
    eg/insert_http.pl              - end-to-end insert over HTTP::Tiny
    eg/insert_streaming.pl         - reuse one encoder across many batches
    eg/for_table.pl                - schema discovery via clickhouse-client
    eg/from_csv.pl                 - read CSV, encode, insert via HTTP
    eg/insert_clickhouse_local.pl  - server-less ingest to Parquet/ORC
    eg/etl_dbi.pl                  - DBI -> Native -> insert pipeline
    eg/insert_compressed.pl        - zstd/gzip compression on the wire
    eg/insert_async_ev.pl          - non-blocking concurrent inserts via EV
    eg/insert_with_lowcardinality.pl - LC(String) wire-size demo
    eg/json_lines_ingest.pl          - NDJSON streaming -> for_table -> insert
    eg/streaming_aggregate.pl        - pre-aggregate, flush to SummingMergeTree
    eg/postgres_to_clickhouse.pl     - DBD::Pg -> Native -> insert, streaming
    eg/clickhouse_replication.pl     - CH -> CH replication via Native pipe
    eg/parallel_loader.pl            - fork N workers, parallel partition load
    eg/redis_to_clickhouse.pl        - drain a Redis stream/list into a CH table
    eg/syslog_ingest.pl              - parse RFC 5424 syslog lines, ingest
    eg/json_streaming.pl             - NDJSON -> JSON column via streamer
    eg/json_query.pl                 - select format native -> decode_blocks -> walk
    eg/json_aggregate.pl             - group-by aggregation pipeline over JSON
    eg/migrate_table.pl              - copy CH -> CH, schema auto-detected
    eg/replay.pl                     - replay a captured Native byte stream
    eg/native_to_jsonl.pl            - convert Native stream to NDJSON
    eg/select_blocks_streaming.pl    - streaming select via select_blocks
    eg/json_path_projection.pl       - column projection on JSON select
    eg/csv_export.pl                 - select Native -> CSV writer
    eg/migrate_with_transform.pl     - CH -> CH ETL with row transform
    eg/replay_pcap.pl                - off-line dump of captured Native bytes
    eg/tcp_compressed_pipeline.pl    - TCP insert with negotiated LZ4 compression
    eg/rowbinary_insert.pl           - insert via the RowBinary format
    eg/async_insert.pl               - server-side async insert via settings
    eg/geo_from_wkt.pl               - WKT geometry -> Geo columns via parse_wkt
    eg/insert_with_settings.pl       - per-query settings + dedup token
    eg/ping_healthcheck.pl           - wait-for-server readiness gate via ping

eg/clickhouse_replication.pl  view on Meta::CPAN

#!/usr/bin/env perl
# Replicate one ClickHouse table to another (potentially on a different
# server) by streaming Native bytes end-to-end -- no Perl-side row
# decode at all. The encoder isn't strictly needed here since both
# sides speak Native, but for_table() validates the destination schema
# and the script demonstrates the streaming pipe pattern users build
# their own variants on (filtering, transforms, etc.).
#
# This is the "free" case: source select format native, destination
# insert format native, identical schemas. With matching schemas the
# server-side codepath is a copy through the column ColumnPtr layer,
# which is the fastest cluster-reshard primitive ClickHouse offers.
#
# Usage:
#     CH_SRC=src.example:8123 CH_DST=dst.example:8123 \
#     perl eg/clickhouse_replication.pl src.events dst.events

eg/insert_streaming.pl  view on Meta::CPAN

#!/usr/bin/env perl
# Streaming inserts: reuse one encoder across many batches and pipe each
# batch to clickhouse-client.
#
#   perl eg/insert_streaming.pl                # 10 batches x 10000 rows
#   ROWS=50000 BATCHES=20 perl eg/insert_streaming.pl
#
# An encoder built once with `ClickHouse::Encoder->new(...)` is reusable:
# you pay the type-parsing cost upfront and then encode many batches with the
# same column layout. Pair this with any HTTP / TCP / pipe transport.

use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use Time::HiRes qw(time);
use ClickHouse::Encoder;

eg/json_streaming.pl  view on Meta::CPAN

#!/usr/bin/env perl
# json_streaming.pl - read NDJSON from stdin, encode into a JSON column,
# stream to ClickHouse via insert format native in batches.
#
# Usage:
#   echo '{"event":"click","ts":1}' | json_streaming.pl --table events --col j
#
# Requirements:
#   - Cpanel::JSON::XS or JSON::PP for parsing input lines
#   - HTTP::Tiny for POSTing to ClickHouse
#   - A table created beforehand with a single JSON column, e.g.:
#       create table events (j JSON) engine=MergeTree order by tuple()
#         settings allow_experimental_json_type=1
use strict;
use warnings;
use Getopt::Long;

eg/select_blocks_streaming.pl  view on Meta::CPAN

#!/usr/bin/env perl
# Streaming select via select_blocks(): runs a select against a
# ClickHouse HTTP endpoint and processes each result block as it
# arrives, never buffering the full response in memory. Pairs
# naturally with insert_streaming.pl on the write side.
#
# Usage:
#     perl eg/select_blocks_streaming.pl --host=db --port=8123 \
#         --sql='select event, count() c from events group by event' \
#         --keep=event,c

use strict;
use warnings;
use Getopt::Long;
use ClickHouse::Encoder;

my $host = '127.0.0.1';
my $port = 8123;

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

# same hashref shape as decode_block. Stops cleanly when bytes are
# exhausted; partial trailing bytes raise an error from XS.
# Uses the 3-arg form of decode_block (with offset) to avoid O(N^2)
# substr copies on long streams.
sub decode_blocks {
    my ($class, $bytes, $cb, %opts) = @_;
    my $keep = $opts{keep};
    my $off = 0;
    my $len = length $bytes;
    # Callback form: hand each block to $cb as it's decoded; never
    # accumulate. Useful for streaming selects where the full block
    # list would not fit comfortably in memory.
    if ($cb) {
        while ($off < $len) {
            my $block = $class->decode_block($bytes, $off, $keep);
            $off += $block->{consumed};
            $cb->($block);
        }
        return;
    }
    my @blocks;

lib/ClickHouse/Encoder.pm  view on Meta::CPAN


The F<eg/> directory ships runnable scripts:

=over 4

=item F<eg/insert_http.pl>

End-to-end insert over HTTP via L<HTTP::Tiny>. The shortest path to "insert
real data into a real ClickHouse".

=item F<eg/insert_streaming.pl>

Reuse one encoder across many batches, piping each batch to
C<clickhouse-client>. Demonstrates the intended one-encoder-many-batches
pattern.

=item F<eg/for_table.pl>

Schema discovery via C<for_table>.

=item F<eg/from_csv.pl>

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

throughput of C<LowCardinality(String)> versus plain C<String> for the
typical case where a column has few distinct values that repeat across
many rows.

=item F<eg/json_lines_ingest.pl>

Reads NDJSON from STDIN or a file, maps each object's fields onto a
ClickHouse table's columns (discovered via C<for_table>), and inserts
batched blocks over HTTP.

=item F<eg/streaming_aggregate.pl>

Pre-aggregates an event stream in Perl (per minute, per key) and
flushes rolled-up counters to a C<SummingMergeTree> on a wall-clock
timer. The classic pattern when the firehose is too high-cardinality
to store as raw rows.

=item F<eg/postgres_to_clickhouse.pl>

Replicates a PostgreSQL table to ClickHouse using L<DBD::Pg> on the
source side and this encoder's streamer on the destination side.
Memory is bounded by the batch size, so it scales to hundreds of
millions of rows.

=item F<eg/clickhouse_replication.pl>

Replicates one ClickHouse table to another (potentially on a different
server) by streaming Native bytes end-to-end via a temp-file spool.

=item F<eg/parallel_loader.pl>

Forks N worker processes, each ingesting one slice of the input.
Workers share nothing; each opens its own HTTP connection. Scales
network-bound ingestion linearly with worker count.

=item F<eg/redis_to_clickhouse.pl>

Drains a Redis stream (XREADGROUP) or list (BRPOP) into a ClickHouse
table, with idle-flush so the destination doesn't see arbitrarily
delayed batches when the source is quiet.

=item F<eg/syslog_ingest.pl>

Reads RFC 5424 syslog lines from STDIN, parses lossily (any
unparseable line still goes through with the raw text in C<msg>),
and inserts into a fixed schema.

=item F<eg/json_streaming.pl>

NDJSON from STDIN into a C<JSON> column via the encoder's streaming
mode; one HTTP request per batch instead of one per line.

=item F<eg/json_query.pl>

C<select ... format Native> over HTTP and walks the returned blocks
via L</decode_blocks>, demonstrating the symmetric decode path for
JSON columns.

=item F<eg/json_aggregate.pl>

Sketches an aggregation pipeline that bins JSON events by path and
emits the aggregates as a second insert.

=item F<eg/migrate_table.pl>

Copies one CH table into another (possibly on a different host) by
discovering the source schema via L</for_table> and streaming Native
blocks through.

=item F<eg/native_to_jsonl.pl>

Reads a Native byte stream from STDIN and prints each row as NDJSON
on STDOUT; the dual of F<json_lines_ingest.pl>.

=item F<eg/replay.pl>

Replays a captured Native byte stream against a table, useful for
post-hoc reproduction of an ingest bug from a saved request body.

=item F<eg/select_blocks_streaming.pl>

Streaming select counterpart to F<insert_streaming.pl>: uses
L</select_blocks> to walk a select response block-by-block, with
optional column projection via C<--keep>.

=item F<eg/json_path_projection.pl>

Demo of C<keep =E<gt> {...}> projection on top of L</select_blocks>:
decodes only the requested columns and prints one row per line.

=item F<eg/csv_export.pl>

select to CSV: counterpart to F<from_csv.pl>. Drives a CSV writer
from a streaming select, emitting the header row from the first
block's column names.

=item F<eg/migrate_with_transform.pl>

CH-to-CH migration with a row-level transform between read and
write. Discovers source schema via L</for_table>, streams the rows
via L</select_blocks>, applies a user-supplied transform coderef,
and forwards survivors through L</bulk_inserter>.

=item F<eg/replay_pcap.pl>

lib/ClickHouse/Encoder/TCP.pm  view on Meta::CPAN

L</read_packet> is provided as a convenience for blocking
C<IO::Socket>-style use; for non-blocking transports, call
L</unpack_packet> on a sliding byte buffer directly.

Out of scope:

=over 4

=item * Settings with typed values (newer flexible-setting wire form).

=item * select result streaming (covers what's needed for inserts).

=item * Server's prepared-query parameters protocol.

=back

Wire compression is supported as an opt-in: L</pack_data> /
L</pack_data_end> accept C<<< compress => 'lz4' >>> or C<'zstd'>,
and L</read_packet> accepts C<<< compressed => 1 >>>. See CAVEATS
for the negotiation handshake the caller must perform first.

lib/ClickHouse/Encoder/TCP.pm  view on Meta::CPAN

=back

=head1 SEE ALSO

L<ClickHouse::Encoder> - the wire-format encoder these packets carry,
plus L<compress_native_block|ClickHouse::Encoder/compress_native_block>
/ L<decompress_native_block|ClickHouse::Encoder/decompress_native_block>
for the matching block-framing helpers.

L<EV::ClickHouse> - full async ClickHouse client (TCP + HTTP) for
select result streaming, prepared queries with parameter binding,
and chunking-negotiation against modern CH revisions.

=head1 AUTHOR

vividsnow

=head1 LICENSE

Same terms as Perl itself.

t/select-blocks.t  view on Meta::CPAN

#!/usr/bin/env perl
# select_blocks is mainly a streaming HTTP wrapper; the meaningful
# unit-test we can do without a real server is to verify the
# argument validation and SQL-format guard. The end-to-end behavior
# is exercised by t/live.t against a real ClickHouse server.
use strict;
use warnings;
use Test::More;
use lib 'blib/lib', 'blib/arch';
use ClickHouse::Encoder;

# Required on_block coderef

t/streamer-edge.t  view on Meta::CPAN

{
    my @blocks;
    my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 10);
    $st->push_row([1]);
    $st->finish;
    is(scalar @blocks, 1, 'first finish emits the buffered batch');
    $st->finish;
    is(scalar @blocks, 1, 'second finish on empty streamer: no extra block');
}

# push_row after finish is allowed; behaves like a fresh streaming session.
{
    my @blocks;
    my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 10);
    $st->push_row([1]); $st->finish;
    $st->push_row([2]); $st->finish;
    is(scalar @blocks, 2, 'push_row after finish reopens the streamer');
}

# Encoder dropped before streamer: streamer keeps the encoder alive
# (XS holds a refcount) and finish still works.

t/url-helpers.t  view on Meta::CPAN

    is($resp->{ch}{summary}{elapsed_ns}, 1500,      'summary: elapsed_ns parsed');
}

# A response without any X-ClickHouse-* headers must not add a ch slot.
{
    my $resp = { success => 1, status => 200, headers => {} };
    ClickHouse::Encoder::_decorate_response($resp);
    ok(!exists $resp->{ch}, 'no ch slot when no CH headers present');
}

# X-ClickHouse-Progress repeats during a streaming query;
# HTTP::Tiny collapses repeated headers into an arrayref. The last
# snapshot is the final one - the most complete - so _decorate_response
# parses that one as $resp->{ch}{progress}.
{
    my $resp = {
        success => 1, status => 200,
        headers => {
            'x-clickhouse-progress' => [
                '{"read_rows":"100","total_rows_to_read":"1000"}',
                '{"read_rows":"500","total_rows_to_read":"1000"}',



( run in 1.431 second using v1.01-cache-2.11-cpan-140bd7fdf52 )