ClickHouse-Encoder

 view release on metacpan or  search on metacpan

eg/clickhouse_replication.pl  view on Meta::CPAN

use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use ClickHouse::Encoder;
use HTTP::Tiny;
use File::Temp qw(tempfile);

my ($src_table, $dst_table) = @ARGV;
die "Usage: $0 <src_table> <dst_table>\n" unless $src_table && $dst_table;

my $src_endpoint = $ENV{CH_SRC} // 'localhost:8123';
my $dst_endpoint = $ENV{CH_DST} // 'localhost:8123';
my $src_url = "http://$src_endpoint/";
my $dst_url = "http://$dst_endpoint/";
my $where   = $ENV{SQL_FILTER};

my ($dst_host, $dst_port) = split /:/, $dst_endpoint, 2;
$dst_port //= 8123;

# Validate the destination schema against the actual destination server
# (not localhost). We don't use the encoder for the data path -- rows go
# server-to-server as Native bytes.
my $enc = ClickHouse::Encoder->for_table($dst_table,
    via  => 'http',
    host => $dst_host,
    port => $dst_port,
);

eg/ping_healthcheck.pl  view on Meta::CPAN

#!/usr/bin/env perl
# Wait for a ClickHouse server to become reachable before starting an
# ingest job. ping() hits the /ping endpoint and croaks on connection
# refused / timeout / non-2xx, so a retry loop around it is a clean
# readiness gate for orchestration scripts and container entrypoints.
#
# Usage:
#     perl eg/ping_healthcheck.pl --host=db --port=8123 [--tries=30]

use strict;
use warnings;
use Getopt::Long;
use Time::HiRes ();

eg/select_blocks_streaming.pl  view on Meta::CPAN

#!/usr/bin/env perl
# Streaming select via select_blocks(): runs a select against a
# ClickHouse HTTP endpoint and processes each result block as it
# arrives, never buffering the full response in memory. Pairs
# naturally with insert_streaming.pl on the write side.
#
# Usage:
#     perl eg/select_blocks_streaming.pl --host=db --port=8123 \
#         --sql='select event, count() c from events group by event' \
#         --keep=event,c

use strict;
use warnings;

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

# the same defaults as for_table / insert_http. UTF-8 encodes before
# percent-escaping so non-ASCII (caf%C3%A9, emoji) round-trips correctly.
sub _http_url_headers {
    my ($sql, %opts) = @_;
    require Encode;
    my $esc = sub {
        my $s = Encode::encode('UTF-8', $_[0], 0);
        $s =~ s/([^A-Za-z0-9\-_.~])/sprintf('%%%02X', ord($1))/ge;
        $s;
    };
    my ($scheme, $host, $port) = _check_endpoint(\%opts);
    my $database = $opts{database} // 'default';
    my $user     = $opts{user}     // 'default';
    my $password = $opts{password} // '';
    my $url = "$scheme://$host:$port/?database=" . $esc->($database);
    $url .= "&query=" . $esc->($sql) if length $sql;
    # Per-query settings: { max_memory_usage => '...', max_execution_time => 30 }
    if (my $s = $opts{settings}) {
        for my $k (sort keys %$s) {
            $url .= "&" . $esc->($k) . "=" . $esc->($s->{$k});
        }

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

    my %hdr = ('X-ClickHouse-User' => $user);
    $hdr{'X-ClickHouse-Key'} = $password if $password ne '';
    return ($url, \%hdr);
}

# Validate the host/port/scheme triple shared by every HTTP entry point.
# Rejects anything other than http/https, ensures the port is a positive
# integer, and refuses host strings that contain URL-structural characters
# (':/?#&'). Centralised here so insert_http, bulk_inserter, ping, and
# select_blocks share a single allow-list and identical error messages.
sub _check_endpoint {
    my ($opts) = @_;
    my $scheme = $opts->{scheme} // 'http';
    my $host   = $opts->{host}   // 'localhost';
    my $port   = $opts->{port}   // 8123;
    die "endpoint: scheme must be 'http' or 'https' (got '$scheme')\n"
        unless $scheme eq 'http' || $scheme eq 'https';
    die "endpoint: host must not contain URL-structural characters "
      . "(got '$host')\n"
        if $host =~ m{[:/?#&\s]} || !length $host;
    die "endpoint: port must be a positive integer (got '$port')\n"
        unless $port =~ /\A[1-9]\d{0,4}\z/ && $port < 65536;
    return ($scheme, $host, $port);
}

# Build an HTTP::Tiny instance honoring ssl_options (verify_SSL, SSL_ca_file,
# etc.) and keep_alive. Shared by insert_http, bulk_inserter, server_version,
# ping, select_blocks; callers pass %opts unchanged. Loading HTTP::Tiny here
# keeps the require local to HTTP code paths.
sub _http_tiny {
    my (%opts) = @_;

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

            die "decode_stream: " . length($inner) . " trailing bytes "
              . "after last complete block" if length $inner;
            $done = 1;
        } else {
            $buf .= $more;
        }
    }
    return;
}

# Query the ClickHouse HTTP endpoint for its version. Returns a list
# of (major, minor, patch, build) integers and the raw string. Useful
# for capability gating in user code (e.g. only use JSON columns if
# the server is at least 24.8). HTTP-only; native TCP not supported.
sub server_version {
    my ($class, %opts) = @_;
    my ($url, $hdr) = _http_url_headers('select version()', %opts);
    my $resp = _http_tiny(%opts, timeout => $opts{timeout} // 10)
        ->get($url, { headers => $hdr });
    die "HTTP select version() failed (status $resp->{status}): "
      . "$resp->{content}\n"
        unless $resp->{success};
    (my $raw = $resp->{content}) =~ s/\s+\z//;
    my @parts = ($raw =~ /(\d+)/g);
    return wantarray
        ? (@parts, $raw)
        : { major => $parts[0] // 0, minor => $parts[1] // 0,
            patch => $parts[2] // 0, build => $parts[3] // 0,
            raw   => $raw };
}

# Lightweight liveness check via CH's /ping endpoint. Returns 1 on
# success; croaks on HTTP failure (which includes connection refused,
# timeout, or non-2xx response). Use to gate on server availability in
# bootstrap scripts and bulk-load orchestration.
sub ping {
    my ($class, %opts) = @_;
    my ($scheme, $host, $port) = _check_endpoint(\%opts);
    my $url    = "$scheme://$host:$port/ping";
    my $resp   = _http_tiny(%opts, timeout => $opts{timeout} // 5)->get($url);
    die "ping: HTTP $resp->{status}: $resp->{content}\n"
        unless $resp->{success};
    return 1;
}

# Parse a Well-Known-Text (WKT) geometry string into the nested-arrayref
# representation that the Geo column encoders accept. Supported geometries:
# POINT, LINESTRING, MULTILINESTRING, POLYGON, MULTIPOLYGON. Coordinates

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

}

# Issue an insert ... format native over HTTP using HTTP::Tiny. Returns
# the response hashref from HTTP::Tiny (->{success}, ->{status},
# ->{content}). Compresses with zstd/gzip if `compress` is set; takes
# whatever encoder produces (so `for_table` + rows is the typical
# combination). Does not retry; the caller does HTTP-level error policy.
# Set up URL + headers for an insert ... format native HTTP request.
# Shared by insert_http and BulkInserter::new. Validates the table name
# and stamps the Content-Type / Content-Encoding headers as needed.
sub _build_insert_endpoint {
    my ($table, $compress, %args) = @_;
    _validate_table_name($table);
    die "unknown compress='$compress' "
      . "(expected 'raw', 'zstd', or 'gzip')\n"
        unless $compress eq 'raw' || $compress eq 'zstd'
            || $compress eq 'gzip';
    my ($url, $hdr) = _http_url_headers(
        "insert into $table format native", %args);
    $hdr->{'Content-Type'} = 'application/octet-stream';
    $hdr->{'Content-Encoding'} = $compress
        if $compress eq 'zstd' || $compress eq 'gzip';
    return ($url, $hdr);
}

# Apply zstd/gzip compression to $body in place (or pass through for 'raw').
# $compress is validated upstream by _build_insert_endpoint; we trust it
# here. $origin is the class used to resolve compressed_writer (so the
# helper works for class-method and instance callers alike).
sub _apply_compression {
    my ($origin, $compress, $body) = @_;
    return $body if $compress eq 'raw';
    my $compressed;
    my $wrap = $origin->compressed_writer(
        $compress, sub { $compressed = $_[0] });
    $wrap->($body);
    return $compressed;

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

    my $enc      = $args{encoder} // do {
        my $cols = $args{columns} or die "insert_http needs columns or encoder";
        $class_or_self->new(columns => $cols);
    };
    my $rows     = $args{rows}  or die "insert_http needs rows arrayref";
    my $table    = $args{table} or die "insert_http needs table";
    my $timeout  = $args{timeout}  // 60;
    my $compress = $args{compress} // 'raw';
    my $origin   = ref $class_or_self || $class_or_self;

    my ($url, $hdr) = _build_insert_endpoint($table, $compress, %args);
    my $body = _apply_compression($origin, $compress, $enc->encode($rows));

    my $resp = _http_tiny(%args, timeout => $timeout)
        ->post($url, { headers => $hdr, content => $body });
    return _decorate_response($resp);
}

# Stream a select response: POST the SQL with default_format=Native,
# feed the response chunks into a sliding buffer, decode complete blocks
# as they arrive, and pass each one to $opts{on_block}. Memory stays

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

    my $origin_raw = delete $args{_origin};
    my $origin     = (ref $origin_raw || $origin_raw) || 'ClickHouse::Encoder';
    my $enc        = $args{encoder} // do {
        my $cols = $args{columns} or die "bulk_inserter needs columns or encoder";
        $origin->new(columns => $cols);
    };
    my $table      = $args{table} or die "bulk_inserter needs table";
    my $compress   = $args{compress} // 'raw';
    my $timeout    = $args{timeout}  // 60;

    my ($url, $hdr) = ClickHouse::Encoder::_build_insert_endpoint(
        $table, $compress, %args);

    return bless {
        enc        => $enc,
        url        => $url,
        hdr        => $hdr,
        rows       => [],
        batch_size => $args{batch_size} // 10_000,
        retries    => $args{retries}    // 3,
        retry_wait => $args{retry_wait} // 0.5,

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

C<$fh> must support Perl's C<read()> builtin (any plain filehandle
or L<IO::Handle> subclass). Raw socket descriptors that only
support C<sysread> need to be wrapped via L<IO::Socket> or read
into a buffer that is then fed to L</decode_block> directly.

=head2 ping

    ClickHouse::Encoder->ping(host => 'db', port => 8123);
    ClickHouse::Encoder->ping(scheme => 'https', host => 'db', port => 8443);

Liveness check via CH's C</ping> endpoint. Returns C<1> on success;
croaks on connection refused, timeout, or any non-2xx HTTP status.
Accepts the same C<scheme>/C<host>/C<port>/C<timeout>/C<ssl_options>
options as the rest of the HTTP entry points.

=head2 server_version

    my $v = ClickHouse::Encoder->server_version(
        host => 'db', port => 8123);
    if ($v->{major} >= 24) { ... }

lib/ClickHouse/Encoder.pm  view on Meta::CPAN


    ClickHouse::Encoder->select_blocks(
        'select id, event, ts from events where date = today()',
        host     => 'db.example', port => 8123,
        database => 'default',    user => 'default',
        on_block => sub { my $block = shift; ... },
        keep     => { id => 1, event => 1 },  # optional projection
    );

Streaming counterpart to L</insert_http>: POSTs C<$sql> to the
ClickHouse HTTP endpoint with C<default_format=Native>, feeds the
response chunks into a sliding buffer, and invokes C<on_block> for
every complete L</decode_block>-shaped block as it arrives. Memory
stays bounded by one HTTP::Tiny chunk plus one block; this is the
right entry point for selects that return more than fits in
process memory.

C<$sql> must NOT end with a C<format ...> clause - C<select_blocks>
appends C<format Native> at the URL level and croaks when the SQL
trails with a different format pin. A C<FORMAT> token inside the
query body (for example a column literal) is fine.

lib/ClickHouse/Encoder.pm  view on Meta::CPAN


=item F<eg/insert_with_settings.pl>

insert with per-query CH C<settings> and an C<insert_deduplication_token>,
showing how an identical retry under the same token is deduplicated
server-side.

=item F<eg/ping_healthcheck.pl>

A wait-for-server readiness gate built on L</ping> - retry until the
C</ping> endpoint answers, then proceed.

=item F<eg/observability.pl>

Reads server-side stats from an insert pipeline: per-batch
C<< $bi->last_response->{ch} >> detail plus the cumulative
C<< $bi->summary >> rollup of C<X-ClickHouse-Summary> counters.

=item F<eg/schema_migrate.pl>

Fetches C<show create table>, parses it with L</parse_create_table>,

t/insert-deduplication.t  view on Meta::CPAN


# A token must be URI-encoded so callers can pass an opaque blob
# (uuid, sha256, payload checksum) without risk of breaking the URL.
($url) = ClickHouse::Encoder::_http_url_headers(
    'insert into t format native',
    dedup_token => '2026-05-19T12:00:00+00:00 # comment & extra',
);
like($url, qr/insert_deduplication_token=2026-05-19T12%3A00%3A00%2B00%3A00%20%23%20comment%20%26%20extra/,
     'dedup_token: URI-encoded so embedded "&"/"=" cannot smuggle params');

# _build_insert_endpoint feeds %args verbatim to _http_url_headers, so
# dedup_token is honored on both insert_http and bulk_inserter paths.
my ($iurl) = ClickHouse::Encoder::_build_insert_endpoint(
    'my_table', 'raw',
    dedup_token => 'idem-key',
);
like($iurl, qr/insert_deduplication_token=idem-key/,
     '_build_insert_endpoint: dedup_token propagated');

# When dedup_token is undef, the param is omitted entirely - CH
# treats absence as "every batch is a fresh insert" which is the
# safe default.
my ($url_clean) = ClickHouse::Encoder::_http_url_headers(
    'insert into t format native',
);
unlike($url_clean, qr/insert_deduplication_token/,
       'dedup_token: omitted when not provided');

t/live.t  view on Meta::CPAN

# check for the bundled CityHash128 v1.0.2 port - if our hash diverges
# from cityhash102 in any detail the server rejects with "Checksum
# doesn't match in compressed block" before storing any rows.
SKIP: {
    eval { require HTTP::Tiny; require Compress::LZ4; 1 }
        or skip 'HTTP::Tiny or Compress::LZ4 not installed', 4;

    my $port = $http_port;
    my $http = HTTP::Tiny->new(timeout => 5);
    my $ping = $http->get("http://127.0.0.1:$port/ping");
    skip "HTTP endpoint not reachable on :$port", 4
        unless $ping->{success} && $ping->{content} =~ /Ok/;

    # Drop + create a fresh table via the existing client; then push
    # rows through the HTTP `?decompress=1` path.
    ch_query("drop table if exists test_compressed");
    ch_query("create table test_compressed
                  (id Int32, msg String, ts DateTime) engine = Memory");

    my $enc = ClickHouse::Encoder->new(columns =>
        [['id','Int32'], ['msg','String'], ['ts','DateTime']]);

t/ssl-options.t  view on Meta::CPAN

}

# No SSL options -> a plain HTTP::Tiny with the default timeout.
{
    my $t = ClickHouse::Encoder::_http_tiny();
    isa_ok($t, 'HTTP::Tiny', 'plain builder');
    ok(!exists $t->{SSL_options},
       'no SSL_options key when ssl_options not given');
}

# https scheme is accepted by the endpoint guard (TLS itself is
# HTTP::Tiny's concern and needs IO::Socket::SSL at request time).
{
    my ($scheme) = ClickHouse::Encoder::_check_endpoint(
        { scheme => 'https', host => 'db', port => 8443 });
    is($scheme, 'https', 'https scheme passes endpoint validation');
}

done_testing();



( run in 1.466 second using v1.01-cache-2.11-cpan-fe3c2283af0 )