view release on metacpan or search on metacpan
eg/clickhouse_replication.pl view on Meta::CPAN
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use ClickHouse::Encoder;
use HTTP::Tiny;
use File::Temp qw(tempfile);
my ($src_table, $dst_table) = @ARGV;
die "Usage: $0 <src_table> <dst_table>\n" unless $src_table && $dst_table;
my $src_endpoint = $ENV{CH_SRC} // 'localhost:8123';
my $dst_endpoint = $ENV{CH_DST} // 'localhost:8123';
my $src_url = "http://$src_endpoint/";
my $dst_url = "http://$dst_endpoint/";
my $where = $ENV{SQL_FILTER};
my ($dst_host, $dst_port) = split /:/, $dst_endpoint, 2;
$dst_port //= 8123;
# Validate the destination schema against the actual destination server
# (not localhost). We don't use the encoder for the data path -- rows go
# server-to-server as Native bytes.
my $enc = ClickHouse::Encoder->for_table($dst_table,
via => 'http',
host => $dst_host,
port => $dst_port,
);
eg/ping_healthcheck.pl view on Meta::CPAN
#!/usr/bin/env perl
# Wait for a ClickHouse server to become reachable before starting an
# ingest job. ping() hits the /ping endpoint and croaks on connection
# refused / timeout / non-2xx, so a retry loop around it is a clean
# readiness gate for orchestration scripts and container entrypoints.
#
# Usage:
# perl eg/ping_healthcheck.pl --host=db --port=8123 [--tries=30]
use strict;
use warnings;
use Getopt::Long;
use Time::HiRes ();
eg/select_blocks_streaming.pl view on Meta::CPAN
#!/usr/bin/env perl
# Streaming select via select_blocks(): runs a select against a
# ClickHouse HTTP endpoint and processes each result block as it
# arrives, never buffering the full response in memory. Pairs
# naturally with insert_streaming.pl on the write side.
#
# Usage:
# perl eg/select_blocks_streaming.pl --host=db --port=8123 \
# --sql='select event, count() c from events group by event' \
# --keep=event,c
use strict;
use warnings;
lib/ClickHouse/Encoder.pm view on Meta::CPAN
# the same defaults as for_table / insert_http. UTF-8 encodes before
# percent-escaping so non-ASCII (caf%C3%A9, emoji) round-trips correctly.
sub _http_url_headers {
my ($sql, %opts) = @_;
require Encode;
my $esc = sub {
my $s = Encode::encode('UTF-8', $_[0], 0);
$s =~ s/([^A-Za-z0-9\-_.~])/sprintf('%%%02X', ord($1))/ge;
$s;
};
my ($scheme, $host, $port) = _check_endpoint(\%opts);
my $database = $opts{database} // 'default';
my $user = $opts{user} // 'default';
my $password = $opts{password} // '';
my $url = "$scheme://$host:$port/?database=" . $esc->($database);
$url .= "&query=" . $esc->($sql) if length $sql;
# Per-query settings: { max_memory_usage => '...', max_execution_time => 30 }
if (my $s = $opts{settings}) {
for my $k (sort keys %$s) {
$url .= "&" . $esc->($k) . "=" . $esc->($s->{$k});
}
lib/ClickHouse/Encoder.pm view on Meta::CPAN
my %hdr = ('X-ClickHouse-User' => $user);
$hdr{'X-ClickHouse-Key'} = $password if $password ne '';
return ($url, \%hdr);
}
# Validate the host/port/scheme triple shared by every HTTP entry point.
# Rejects anything other than http/https, ensures the port is a positive
# integer, and refuses host strings that contain URL-structural characters
# (':/?#&'). Centralised here so insert_http, bulk_inserter, ping, and
# select_blocks share a single allow-list and identical error messages.
sub _check_endpoint {
my ($opts) = @_;
my $scheme = $opts->{scheme} // 'http';
my $host = $opts->{host} // 'localhost';
my $port = $opts->{port} // 8123;
die "endpoint: scheme must be 'http' or 'https' (got '$scheme')\n"
unless $scheme eq 'http' || $scheme eq 'https';
die "endpoint: host must not contain URL-structural characters "
. "(got '$host')\n"
if $host =~ m{[:/?#&\s]} || !length $host;
die "endpoint: port must be a positive integer (got '$port')\n"
unless $port =~ /\A[1-9]\d{0,4}\z/ && $port < 65536;
return ($scheme, $host, $port);
}
# Build an HTTP::Tiny instance honoring ssl_options (verify_SSL, SSL_ca_file,
# etc.) and keep_alive. Shared by insert_http, bulk_inserter, server_version,
# ping, select_blocks; callers pass %opts unchanged. Loading HTTP::Tiny here
# keeps the require local to HTTP code paths.
sub _http_tiny {
my (%opts) = @_;
lib/ClickHouse/Encoder.pm view on Meta::CPAN
die "decode_stream: " . length($inner) . " trailing bytes "
. "after last complete block" if length $inner;
$done = 1;
} else {
$buf .= $more;
}
}
return;
}
# Query the ClickHouse HTTP endpoint for its version. Returns a list
# of (major, minor, patch, build) integers and the raw string. Useful
# for capability gating in user code (e.g. only use JSON columns if
# the server is at least 24.8). HTTP-only; native TCP not supported.
sub server_version {
my ($class, %opts) = @_;
my ($url, $hdr) = _http_url_headers('select version()', %opts);
my $resp = _http_tiny(%opts, timeout => $opts{timeout} // 10)
->get($url, { headers => $hdr });
die "HTTP select version() failed (status $resp->{status}): "
. "$resp->{content}\n"
unless $resp->{success};
(my $raw = $resp->{content}) =~ s/\s+\z//;
my @parts = ($raw =~ /(\d+)/g);
return wantarray
? (@parts, $raw)
: { major => $parts[0] // 0, minor => $parts[1] // 0,
patch => $parts[2] // 0, build => $parts[3] // 0,
raw => $raw };
}
# Lightweight liveness check via CH's /ping endpoint. Returns 1 on
# success; croaks on HTTP failure (which includes connection refused,
# timeout, or non-2xx response). Use to gate on server availability in
# bootstrap scripts and bulk-load orchestration.
sub ping {
my ($class, %opts) = @_;
my ($scheme, $host, $port) = _check_endpoint(\%opts);
my $url = "$scheme://$host:$port/ping";
my $resp = _http_tiny(%opts, timeout => $opts{timeout} // 5)->get($url);
die "ping: HTTP $resp->{status}: $resp->{content}\n"
unless $resp->{success};
return 1;
}
# Parse a Well-Known-Text (WKT) geometry string into the nested-arrayref
# representation that the Geo column encoders accept. Supported geometries:
# POINT, LINESTRING, MULTILINESTRING, POLYGON, MULTIPOLYGON. Coordinates
lib/ClickHouse/Encoder.pm view on Meta::CPAN
}
# Issue an insert ... format native over HTTP using HTTP::Tiny. Returns
# the response hashref from HTTP::Tiny (->{success}, ->{status},
# ->{content}). Compresses with zstd/gzip if `compress` is set; takes
# whatever encoder produces (so `for_table` + rows is the typical
# combination). Does not retry; the caller does HTTP-level error policy.
# Set up URL + headers for an insert ... format native HTTP request.
# Shared by insert_http and BulkInserter::new. Validates the table name
# and stamps the Content-Type / Content-Encoding headers as needed.
sub _build_insert_endpoint {
my ($table, $compress, %args) = @_;
_validate_table_name($table);
die "unknown compress='$compress' "
. "(expected 'raw', 'zstd', or 'gzip')\n"
unless $compress eq 'raw' || $compress eq 'zstd'
|| $compress eq 'gzip';
my ($url, $hdr) = _http_url_headers(
"insert into $table format native", %args);
$hdr->{'Content-Type'} = 'application/octet-stream';
$hdr->{'Content-Encoding'} = $compress
if $compress eq 'zstd' || $compress eq 'gzip';
return ($url, $hdr);
}
# Apply zstd/gzip compression to $body in place (or pass through for 'raw').
# $compress is validated upstream by _build_insert_endpoint; we trust it
# here. $origin is the class used to resolve compressed_writer (so the
# helper works for class-method and instance callers alike).
sub _apply_compression {
my ($origin, $compress, $body) = @_;
return $body if $compress eq 'raw';
my $compressed;
my $wrap = $origin->compressed_writer(
$compress, sub { $compressed = $_[0] });
$wrap->($body);
return $compressed;
lib/ClickHouse/Encoder.pm view on Meta::CPAN
my $enc = $args{encoder} // do {
my $cols = $args{columns} or die "insert_http needs columns or encoder";
$class_or_self->new(columns => $cols);
};
my $rows = $args{rows} or die "insert_http needs rows arrayref";
my $table = $args{table} or die "insert_http needs table";
my $timeout = $args{timeout} // 60;
my $compress = $args{compress} // 'raw';
my $origin = ref $class_or_self || $class_or_self;
my ($url, $hdr) = _build_insert_endpoint($table, $compress, %args);
my $body = _apply_compression($origin, $compress, $enc->encode($rows));
my $resp = _http_tiny(%args, timeout => $timeout)
->post($url, { headers => $hdr, content => $body });
return _decorate_response($resp);
}
# Stream a select response: POST the SQL with default_format=Native,
# feed the response chunks into a sliding buffer, decode complete blocks
# as they arrive, and pass each one to $opts{on_block}. Memory stays
lib/ClickHouse/Encoder.pm view on Meta::CPAN
my $origin_raw = delete $args{_origin};
my $origin = (ref $origin_raw || $origin_raw) || 'ClickHouse::Encoder';
my $enc = $args{encoder} // do {
my $cols = $args{columns} or die "bulk_inserter needs columns or encoder";
$origin->new(columns => $cols);
};
my $table = $args{table} or die "bulk_inserter needs table";
my $compress = $args{compress} // 'raw';
my $timeout = $args{timeout} // 60;
my ($url, $hdr) = ClickHouse::Encoder::_build_insert_endpoint(
$table, $compress, %args);
return bless {
enc => $enc,
url => $url,
hdr => $hdr,
rows => [],
batch_size => $args{batch_size} // 10_000,
retries => $args{retries} // 3,
retry_wait => $args{retry_wait} // 0.5,
lib/ClickHouse/Encoder.pm view on Meta::CPAN
C<$fh> must support Perl's C<read()> builtin (any plain filehandle
or L<IO::Handle> subclass). Raw socket descriptors that only
support C<sysread> need to be wrapped via L<IO::Socket> or read
into a buffer that is then fed to L</decode_block> directly.
=head2 ping
ClickHouse::Encoder->ping(host => 'db', port => 8123);
ClickHouse::Encoder->ping(scheme => 'https', host => 'db', port => 8443);
Liveness check via CH's C</ping> endpoint. Returns C<1> on success;
croaks on connection refused, timeout, or any non-2xx HTTP status.
Accepts the same C<scheme>/C<host>/C<port>/C<timeout>/C<ssl_options>
options as the rest of the HTTP entry points.
=head2 server_version
my $v = ClickHouse::Encoder->server_version(
host => 'db', port => 8123);
if ($v->{major} >= 24) { ... }
lib/ClickHouse/Encoder.pm view on Meta::CPAN
ClickHouse::Encoder->select_blocks(
'select id, event, ts from events where date = today()',
host => 'db.example', port => 8123,
database => 'default', user => 'default',
on_block => sub { my $block = shift; ... },
keep => { id => 1, event => 1 }, # optional projection
);
Streaming counterpart to L</insert_http>: POSTs C<$sql> to the
ClickHouse HTTP endpoint with C<default_format=Native>, feeds the
response chunks into a sliding buffer, and invokes C<on_block> for
every complete L</decode_block>-shaped block as it arrives. Memory
stays bounded by one HTTP::Tiny chunk plus one block; this is the
right entry point for selects that return more than fits in
process memory.
C<$sql> must NOT end with a C<format ...> clause - C<select_blocks>
appends C<format Native> at the URL level and croaks when the SQL
trails with a different format pin. A C<FORMAT> token inside the
query body (for example a column literal) is fine.
lib/ClickHouse/Encoder.pm view on Meta::CPAN
=item F<eg/insert_with_settings.pl>
insert with per-query CH C<settings> and an C<insert_deduplication_token>,
showing how an identical retry under the same token is deduplicated
server-side.
=item F<eg/ping_healthcheck.pl>
A wait-for-server readiness gate built on L</ping> - retry until the
C</ping> endpoint answers, then proceed.
=item F<eg/observability.pl>
Reads server-side stats from an insert pipeline: per-batch
C<< $bi->last_response->{ch} >> detail plus the cumulative
C<< $bi->summary >> rollup of C<X-ClickHouse-Summary> counters.
=item F<eg/schema_migrate.pl>
Fetches C<show create table>, parses it with L</parse_create_table>,
t/insert-deduplication.t view on Meta::CPAN
# A token must be URI-encoded so callers can pass an opaque blob
# (uuid, sha256, payload checksum) without risk of breaking the URL.
($url) = ClickHouse::Encoder::_http_url_headers(
'insert into t format native',
dedup_token => '2026-05-19T12:00:00+00:00 # comment & extra',
);
like($url, qr/insert_deduplication_token=2026-05-19T12%3A00%3A00%2B00%3A00%20%23%20comment%20%26%20extra/,
'dedup_token: URI-encoded so embedded "&"/"=" cannot smuggle params');
# _build_insert_endpoint feeds %args verbatim to _http_url_headers, so
# dedup_token is honored on both insert_http and bulk_inserter paths.
my ($iurl) = ClickHouse::Encoder::_build_insert_endpoint(
'my_table', 'raw',
dedup_token => 'idem-key',
);
like($iurl, qr/insert_deduplication_token=idem-key/,
'_build_insert_endpoint: dedup_token propagated');
# When dedup_token is undef, the param is omitted entirely - CH
# treats absence as "every batch is a fresh insert" which is the
# safe default.
my ($url_clean) = ClickHouse::Encoder::_http_url_headers(
'insert into t format native',
);
unlike($url_clean, qr/insert_deduplication_token/,
'dedup_token: omitted when not provided');
# check for the bundled CityHash128 v1.0.2 port - if our hash diverges
# from cityhash102 in any detail the server rejects with "Checksum
# doesn't match in compressed block" before storing any rows.
SKIP: {
eval { require HTTP::Tiny; require Compress::LZ4; 1 }
or skip 'HTTP::Tiny or Compress::LZ4 not installed', 4;
my $port = $http_port;
my $http = HTTP::Tiny->new(timeout => 5);
my $ping = $http->get("http://127.0.0.1:$port/ping");
skip "HTTP endpoint not reachable on :$port", 4
unless $ping->{success} && $ping->{content} =~ /Ok/;
# Drop + create a fresh table via the existing client; then push
# rows through the HTTP `?decompress=1` path.
ch_query("drop table if exists test_compressed");
ch_query("create table test_compressed
(id Int32, msg String, ts DateTime) engine = Memory");
my $enc = ClickHouse::Encoder->new(columns =>
[['id','Int32'], ['msg','String'], ['ts','DateTime']]);
t/ssl-options.t view on Meta::CPAN
}
# No SSL options -> a plain HTTP::Tiny with the default timeout.
{
my $t = ClickHouse::Encoder::_http_tiny();
isa_ok($t, 'HTTP::Tiny', 'plain builder');
ok(!exists $t->{SSL_options},
'no SSL_options key when ssl_options not given');
}
# https scheme is accepted by the endpoint guard (TLS itself is
# HTTP::Tiny's concern and needs IO::Socket::SSL at request time).
{
my ($scheme) = ClickHouse::Encoder::_check_endpoint(
{ scheme => 'https', host => 'db', port => 8443 });
is($scheme, 'https', 'https scheme passes endpoint validation');
}
done_testing();