view release on metacpan or search on metacpan
eg/clickhouse_replication.pl
eg/csv_export.pl
eg/etl_dbi.pl
eg/for_table.pl
eg/from_csv.pl
eg/geo_from_wkt.pl
eg/insert_async_ev.pl
eg/insert_clickhouse_local.pl
eg/insert_compressed.pl
eg/insert_http.pl
eg/insert_streaming.pl
eg/insert_with_lowcardinality.pl
eg/insert_with_settings.pl
eg/json_aggregate.pl
eg/json_lines_ingest.pl
eg/json_path_projection.pl
eg/json_query.pl
eg/json_streaming.pl
eg/migrate_table.pl
eg/migrate_with_transform.pl
eg/native_to_jsonl.pl
eg/observability.pl
eg/parallel_loader.pl
eg/ping_healthcheck.pl
eg/postgres_to_clickhouse.pl
eg/redis_to_clickhouse.pl
eg/replay.pl
eg/replay_pcap.pl
eg/rowbinary_insert.pl
eg/schema_migrate.pl
eg/select_blocks_streaming.pl
eg/streaming_aggregate.pl
eg/syslog_ingest.pl
eg/tcp_compressed_pipeline.pl
encode.c
encode.h
Encoder.xs
json_kind.c
json_kind.h
lib/ClickHouse/Encoder.pm
lib/ClickHouse/Encoder/TCP.pm
Makefile.PL
incrementally from a filehandle - memory bounded by one block at
a time. ->decode_block($bytes, $offset, \%keep) skips data for
unwanted columns (memory win on wide select *).
DOCUMENTATION
See `perldoc ClickHouse::Encoder` after install, or the POD in
lib/ClickHouse/Encoder.pm.
EXAMPLES
eg/insert_http.pl - end-to-end insert over HTTP::Tiny
eg/insert_streaming.pl - reuse one encoder across many batches
eg/for_table.pl - schema discovery via clickhouse-client
eg/from_csv.pl - read CSV, encode, insert via HTTP
eg/insert_clickhouse_local.pl - server-less ingest to Parquet/ORC
eg/etl_dbi.pl - DBI -> Native -> insert pipeline
eg/insert_compressed.pl - zstd/gzip compression on the wire
eg/insert_async_ev.pl - non-blocking concurrent inserts via EV
eg/insert_with_lowcardinality.pl - LC(String) wire-size demo
eg/json_lines_ingest.pl - NDJSON streaming -> for_table -> insert
eg/streaming_aggregate.pl - pre-aggregate, flush to SummingMergeTree
eg/postgres_to_clickhouse.pl - DBD::Pg -> Native -> insert, streaming
eg/clickhouse_replication.pl - CH -> CH replication via Native pipe
eg/parallel_loader.pl - fork N workers, parallel partition load
eg/redis_to_clickhouse.pl - drain a Redis stream/list into a CH table
eg/syslog_ingest.pl - parse RFC 5424 syslog lines, ingest
eg/json_streaming.pl - NDJSON -> JSON column via streamer
eg/json_query.pl - select format native -> decode_blocks -> walk
eg/json_aggregate.pl - group-by aggregation pipeline over JSON
eg/migrate_table.pl - copy CH -> CH, schema auto-detected
eg/replay.pl - replay a captured Native byte stream
eg/native_to_jsonl.pl - convert Native stream to NDJSON
eg/select_blocks_streaming.pl - streaming select via select_blocks
eg/json_path_projection.pl - column projection on JSON select
eg/csv_export.pl - select Native -> CSV writer
eg/migrate_with_transform.pl - CH -> CH ETL with row transform
eg/replay_pcap.pl - off-line dump of captured Native bytes
eg/tcp_compressed_pipeline.pl - TCP insert with negotiated LZ4 compression
eg/rowbinary_insert.pl - insert via the RowBinary format
eg/async_insert.pl - server-side async insert via settings
eg/geo_from_wkt.pl - WKT geometry -> Geo columns via parse_wkt
eg/insert_with_settings.pl - per-query settings + dedup token
eg/ping_healthcheck.pl - wait-for-server readiness gate via ping
eg/clickhouse_replication.pl view on Meta::CPAN
#!/usr/bin/env perl
# Replicate one ClickHouse table to another (potentially on a different
# server) by streaming Native bytes end-to-end -- no Perl-side row
# decode at all. The encoder isn't strictly needed here since both
# sides speak Native, but for_table() validates the destination schema
# and the script demonstrates the streaming pipe pattern users build
# their own variants on (filtering, transforms, etc.).
#
# This is the "free" case: source select format native, destination
# insert format native, identical schemas. With matching schemas the
# server-side codepath is a copy through the column ColumnPtr layer,
# which is the fastest cluster-reshard primitive ClickHouse offers.
#
# Usage:
# CH_SRC=src.example:8123 CH_DST=dst.example:8123 \
# perl eg/clickhouse_replication.pl src.events dst.events
eg/insert_streaming.pl view on Meta::CPAN
#!/usr/bin/env perl
# Streaming inserts: reuse one encoder across many batches and pipe each
# batch to clickhouse-client.
#
# perl eg/insert_streaming.pl # 10 batches x 10000 rows
# ROWS=50000 BATCHES=20 perl eg/insert_streaming.pl
#
# An encoder built once with `ClickHouse::Encoder->new(...)` is reusable:
# you pay the type-parsing cost upfront and then encode many batches with the
# same column layout. Pair this with any HTTP / TCP / pipe transport.
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use Time::HiRes qw(time);
use ClickHouse::Encoder;
eg/json_streaming.pl view on Meta::CPAN
#!/usr/bin/env perl
# json_streaming.pl - read NDJSON from stdin, encode into a JSON column,
# stream to ClickHouse via insert format native in batches.
#
# Usage:
# echo '{"event":"click","ts":1}' | json_streaming.pl --table events --col j
#
# Requirements:
# - Cpanel::JSON::XS or JSON::PP for parsing input lines
# - HTTP::Tiny for POSTing to ClickHouse
# - A table created beforehand with a single JSON column, e.g.:
# create table events (j JSON) engine=MergeTree order by tuple()
# settings allow_experimental_json_type=1
use strict;
use warnings;
use Getopt::Long;
eg/select_blocks_streaming.pl view on Meta::CPAN
#!/usr/bin/env perl
# Streaming select via select_blocks(): runs a select against a
# ClickHouse HTTP endpoint and processes each result block as it
# arrives, never buffering the full response in memory. Pairs
# naturally with insert_streaming.pl on the write side.
#
# Usage:
# perl eg/select_blocks_streaming.pl --host=db --port=8123 \
# --sql='select event, count() c from events group by event' \
# --keep=event,c
use strict;
use warnings;
use Getopt::Long;
use ClickHouse::Encoder;
my $host = '127.0.0.1';
my $port = 8123;
lib/ClickHouse/Encoder.pm view on Meta::CPAN
# same hashref shape as decode_block. Stops cleanly when bytes are
# exhausted; partial trailing bytes raise an error from XS.
# Uses the 3-arg form of decode_block (with offset) to avoid O(N^2)
# substr copies on long streams.
sub decode_blocks {
my ($class, $bytes, $cb, %opts) = @_;
my $keep = $opts{keep};
my $off = 0;
my $len = length $bytes;
# Callback form: hand each block to $cb as it's decoded; never
# accumulate. Useful for streaming selects where the full block
# list would not fit comfortably in memory.
if ($cb) {
while ($off < $len) {
my $block = $class->decode_block($bytes, $off, $keep);
$off += $block->{consumed};
$cb->($block);
}
return;
}
my @blocks;
lib/ClickHouse/Encoder.pm view on Meta::CPAN
The F<eg/> directory ships runnable scripts:
=over 4
=item F<eg/insert_http.pl>
End-to-end insert over HTTP via L<HTTP::Tiny>. The shortest path to "insert
real data into a real ClickHouse".
=item F<eg/insert_streaming.pl>
Reuse one encoder across many batches, piping each batch to
C<clickhouse-client>. Demonstrates the intended one-encoder-many-batches
pattern.
=item F<eg/for_table.pl>
Schema discovery via C<for_table>.
=item F<eg/from_csv.pl>
lib/ClickHouse/Encoder.pm view on Meta::CPAN
throughput of C<LowCardinality(String)> versus plain C<String> for the
typical case where a column has few distinct values that repeat across
many rows.
=item F<eg/json_lines_ingest.pl>
Reads NDJSON from STDIN or a file, maps each object's fields onto a
ClickHouse table's columns (discovered via C<for_table>), and inserts
batched blocks over HTTP.
=item F<eg/streaming_aggregate.pl>
Pre-aggregates an event stream in Perl (per minute, per key) and
flushes rolled-up counters to a C<SummingMergeTree> on a wall-clock
timer. The classic pattern when the firehose is too high-cardinality
to store as raw rows.
=item F<eg/postgres_to_clickhouse.pl>
Replicates a PostgreSQL table to ClickHouse using L<DBD::Pg> on the
source side and this encoder's streamer on the destination side.
Memory is bounded by the batch size, so it scales to hundreds of
millions of rows.
=item F<eg/clickhouse_replication.pl>
Replicates one ClickHouse table to another (potentially on a different
server) by streaming Native bytes end-to-end via a temp-file spool.
=item F<eg/parallel_loader.pl>
Forks N worker processes, each ingesting one slice of the input.
Workers share nothing; each opens its own HTTP connection. Scales
network-bound ingestion linearly with worker count.
=item F<eg/redis_to_clickhouse.pl>
Drains a Redis stream (XREADGROUP) or list (BRPOP) into a ClickHouse
table, with idle-flush so the destination doesn't see arbitrarily
delayed batches when the source is quiet.
=item F<eg/syslog_ingest.pl>
Reads RFC 5424 syslog lines from STDIN, parses lossily (any
unparseable line still goes through with the raw text in C<msg>),
and inserts into a fixed schema.
=item F<eg/json_streaming.pl>
NDJSON from STDIN into a C<JSON> column via the encoder's streaming
mode; one HTTP request per batch instead of one per line.
=item F<eg/json_query.pl>
C<select ... format Native> over HTTP and walks the returned blocks
via L</decode_blocks>, demonstrating the symmetric decode path for
JSON columns.
=item F<eg/json_aggregate.pl>
Sketches an aggregation pipeline that bins JSON events by path and
emits the aggregates as a second insert.
=item F<eg/migrate_table.pl>
Copies one CH table into another (possibly on a different host) by
discovering the source schema via L</for_table> and streaming Native
blocks through.
=item F<eg/native_to_jsonl.pl>
Reads a Native byte stream from STDIN and prints each row as NDJSON
on STDOUT; the dual of F<json_lines_ingest.pl>.
=item F<eg/replay.pl>
Replays a captured Native byte stream against a table, useful for
post-hoc reproduction of an ingest bug from a saved request body.
=item F<eg/select_blocks_streaming.pl>
Streaming select counterpart to F<insert_streaming.pl>: uses
L</select_blocks> to walk a select response block-by-block, with
optional column projection via C<--keep>.
=item F<eg/json_path_projection.pl>
Demo of C<keep =E<gt> {...}> projection on top of L</select_blocks>:
decodes only the requested columns and prints one row per line.
=item F<eg/csv_export.pl>
select to CSV: counterpart to F<from_csv.pl>. Drives a CSV writer
from a streaming select, emitting the header row from the first
block's column names.
=item F<eg/migrate_with_transform.pl>
CH-to-CH migration with a row-level transform between read and
write. Discovers source schema via L</for_table>, streams the rows
via L</select_blocks>, applies a user-supplied transform coderef,
and forwards survivors through L</bulk_inserter>.
=item F<eg/replay_pcap.pl>
lib/ClickHouse/Encoder/TCP.pm view on Meta::CPAN
L</read_packet> is provided as a convenience for blocking
C<IO::Socket>-style use; for non-blocking transports, call
L</unpack_packet> on a sliding byte buffer directly.
Out of scope:
=over 4
=item * Settings with typed values (newer flexible-setting wire form).
=item * select result streaming (covers what's needed for inserts).
=item * Server's prepared-query parameters protocol.
=back
Wire compression is supported as an opt-in: L</pack_data> /
L</pack_data_end> accept C<<< compress => 'lz4' >>> or C<'zstd'>,
and L</read_packet> accepts C<<< compressed => 1 >>>. See CAVEATS
for the negotiation handshake the caller must perform first.
lib/ClickHouse/Encoder/TCP.pm view on Meta::CPAN
=back
=head1 SEE ALSO
L<ClickHouse::Encoder> - the wire-format encoder these packets carry,
plus L<compress_native_block|ClickHouse::Encoder/compress_native_block>
/ L<decompress_native_block|ClickHouse::Encoder/decompress_native_block>
for the matching block-framing helpers.
L<EV::ClickHouse> - full async ClickHouse client (TCP + HTTP) for
select result streaming, prepared queries with parameter binding,
and chunking-negotiation against modern CH revisions.
=head1 AUTHOR
vividsnow
=head1 LICENSE
Same terms as Perl itself.
t/select-blocks.t view on Meta::CPAN
#!/usr/bin/env perl
# select_blocks is mainly a streaming HTTP wrapper; the meaningful
# unit-test we can do without a real server is to verify the
# argument validation and SQL-format guard. The end-to-end behavior
# is exercised by t/live.t against a real ClickHouse server.
use strict;
use warnings;
use Test::More;
use lib 'blib/lib', 'blib/arch';
use ClickHouse::Encoder;
# Required on_block coderef
t/streamer-edge.t view on Meta::CPAN
{
my @blocks;
my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 10);
$st->push_row([1]);
$st->finish;
is(scalar @blocks, 1, 'first finish emits the buffered batch');
$st->finish;
is(scalar @blocks, 1, 'second finish on empty streamer: no extra block');
}
# push_row after finish is allowed; behaves like a fresh streaming session.
{
my @blocks;
my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 10);
$st->push_row([1]); $st->finish;
$st->push_row([2]); $st->finish;
is(scalar @blocks, 2, 'push_row after finish reopens the streamer');
}
# Encoder dropped before streamer: streamer keeps the encoder alive
# (XS holds a refcount) and finish still works.
t/url-helpers.t view on Meta::CPAN
is($resp->{ch}{summary}{elapsed_ns}, 1500, 'summary: elapsed_ns parsed');
}
# A response without any X-ClickHouse-* headers must not add a ch slot.
{
my $resp = { success => 1, status => 200, headers => {} };
ClickHouse::Encoder::_decorate_response($resp);
ok(!exists $resp->{ch}, 'no ch slot when no CH headers present');
}
# X-ClickHouse-Progress repeats during a streaming query;
# HTTP::Tiny collapses repeated headers into an arrayref. The last
# snapshot is the final one - the most complete - so _decorate_response
# parses that one as $resp->{ch}{progress}.
{
my $resp = {
success => 1, status => 200,
headers => {
'x-clickhouse-progress' => [
'{"read_rows":"100","total_rows_to_read":"1000"}',
'{"read_rows":"500","total_rows_to_read":"1000"}',