view release on metacpan or search on metacpan
#include "buffer.h"
#include "encode.h"
#include "decode.h"
#include "runtime.h"
#include "cityhash.h"
/* Read one varint from an SV's byte buffer starting at *off; advances
* *off past the consumed bytes. Croaks with TCP-style messages on
* truncation or overflow. Used by the ClickHouse::Encoder::TCP XSUBs
* which operate on offsets into a Perl scalar rather than the (p,end)
* cursor pair used by the internal native decoder. */
static UV tcp_read_varint(pTHX_ const unsigned char *p, UV buf_len,
UV *off) {
UV v = 0;
int shift = 0;
while (1) {
if (*off >= buf_len)
croak("varint: truncated at offset %lu", (unsigned long)*off);
unsigned char b = p[(*off)++];
v |= ((UV)(b & 0x7f)) << shift;
if (!(b & 0x80)) break;
int keep = 1;
if (keep_set && !hv_exists(keep_set, name, name_len))
keep = 0;
SV *values;
if (keep) {
values = decode_column(aTHX_ &p, end, t, (SSize_t)nrows);
} else {
/* Decode then discard - we still must consume the wire
* bytes to keep the cursor aligned for the next column.
* The AV is freed immediately so peak memory is one
* column's values, not the full block's. */
SV *tmp = decode_column(aTHX_ &p, end, t, (SSize_t)nrows);
SvREFCNT_dec(tmp);
AV *placeholder = newAV();
if (nrows > 0) {
av_extend(placeholder, nrows - 1);
SSize_t r;
for (r = 0; r < (SSize_t)nrows; r++)
av_store(placeholder, r, newSV(0));
#include "types.h"
#include "decimal.h"
#include "datetime.h"
#include "json_kind.h"
#include "decode.h"
/* ===== DECODER ============================================================
* Symmetric counterpart to encode_column. Reads raw Native bytes through
* a (cursor, end) pair, recursively building SVs. parse_type() returns
* the same TypeInfo* used on the encode side, so the type tree is shared. */
/* Subtraction form: the more obvious `(*p) + (needed) > end` form
* overflows the pointer when `needed` is attacker-controlled via a
* crafted varint (CH varints can encode up to ~2^63). All call sites
* maintain the invariant `*p <= end`, so `end - *p` is a safe pointer
* difference yielding a non-negative `ptrdiff_t` we can compare against
* `needed` as a UV. */
#define DEC_NEED(needed) \
do { \
const unsigned char *end,
const char **out_s, STRLEN *out_len) {
UV len = dec_varint(aTHX_ p, end);
DEC_NEED(len);
*out_s = (const char *)(*p);
*out_len = (STRLEN)len;
*p += len;
}
/* Shared prologue for decode_block / decode_block_rows: validate the
* input SV, position the cursor at the requested offset, read the
* block header (ncols + nrows), and run bounds checks. `fname` is
* embedded in croak messages so each XSUB reports its own name.
* Returns ncols/nrows by out-param; the cursor pair (p, end) is set
* up so the caller can resume column-by-column decoding. */
void decode_block_prologue(pTHX_ SV *bytes, UV start_offset,
const char *fname,
const unsigned char **out_start,
const unsigned char **out_p,
const unsigned char **out_end,
UV *out_ncols, UV *out_nrows) {
/* Materialize lvalue / magical SVs (e.g. the PVLV returned by 2-arg
* substr) before inspecting. SvOK on a fresh substr-LV returns
* false until SvGETMAGIC has run; SvPVbyte itself triggers the
/* Defensive: total elements must fit into the remaining buffer
* (1+ bytes per element minimum). Catches corrupted offset
* lists before they trigger huge AV allocations. */
if (total > (uint64_t)(end - *p))
croak("decode JSON: Array variant total=%lu exceeds remaining "
"buffer (%lu bytes)",
(unsigned long)total, (unsigned long)(end - *p));
for (k = 0; k < nv_rows; k++)
av_store(sub, k, newRV_noinc((SV*)newAV()));
/* Inner cursor walks through elements while row_idx advances
* each time we hit the cumulative offset boundary. */
uint64_t prev = 0;
SSize_t row_idx = 0;
AV *inner = (AV*)SvRV(*av_fetch(sub, 0, 0));
if (offs[0] > 0) av_extend(inner, (SSize_t)offs[0] - 1);
SSize_t inner_cursor = 0;
STRLEN per_elem = (kind == JV_ARRAY_BOOL) ? 1 : 8;
/* `total` is attacker-controlled (sum of wire offsets); use the
* division-form check to avoid the multiplication overflowing. */
if (kind != JV_ARRAY_STRING
&& total > (uint64_t)(end - *p) / per_elem)
croak("decode: buffer truncated (need %lu more bytes)",
(unsigned long)(per_elem * total));
uint64_t i;
for (i = 0; i < total; i++) {
while (inner_cursor >= (SSize_t)(offs[row_idx] - prev)) {
prev = offs[row_idx];
row_idx++;
/* If a corrupted offset list has trailing zero-length
* rows that the outer total didn't cover, row_idx
* could walk past the populated entries. Bail before
* av_fetch returns NULL and we deref it. */
if (row_idx >= nv_rows)
croak("decode: array variant offsets advanced past "
"nv_rows=%ld (corrupted block)", (long)nv_rows);
inner_cursor = 0;
inner = (AV*)SvRV(*av_fetch(sub, row_idx, 0));
uint64_t n2 = offs[row_idx] - prev;
if (n2 > 0) av_extend(inner, (SSize_t)n2 - 1);
}
SV *ev;
switch (kind) {
case JV_ARRAY_BOOL: {
unsigned char b = *(*p)++;
ev = make_json_bool_sv(aTHX_ b);
break;
break;
}
case JV_ARRAY_STRING: {
const char *vs; STRLEN vl;
dec_lenpfx_string(aTHX_ p, end, &vs, &vl);
ev = newSVpvn(vs, vl);
break;
}
default: ev = newSV(0); /* unreachable */
}
av_store(inner, inner_cursor++, ev);
}
return;
}
croak("decode %s: internal: unknown kind %d", ctx, kind);
}
/* Helpers that bulk-read same-size scalars into the array, since the per-
* row dispatch overhead of unpack-style XS loops dwarfs the data read. */
#define DEC_SCALAR_LOOP(av, nrows, sv_expr) do { \
SSize_t r; \
/* Decode each sub-column in wire (alphabetical) order; the
* decl index of wire position w is t->variant_wire_to_decl[w]. */
SV **subcols;
Newx(subcols, nvar, SV*);
SAVEFREEPV(subcols);
int w;
for (w = 0; w < nvar; w++) {
int decl = t->variant_wire_to_decl[w];
subcols[w] = decode_column(aTHX_ p, end, t->tuple[decl], counts[w]);
}
SSize_t *cursors;
Newxz(cursors, nvar, SSize_t);
SAVEFREEPV(cursors);
for (r = 0; r < nrows; r++) {
unsigned char wd = wire_disc[r];
if (wd == 255) {
av_store(av, r, newSV(0));
} else {
int decl = t->variant_wire_to_decl[wd];
SV **elem = av_fetch((AV *)SvRV(subcols[wd]), cursors[wd]++, 0);
AV *pair = newAV();
av_extend(pair, 1);
av_store(pair, 0, newSViv(decl));
av_store(pair, 1, elem ? SvREFCNT_inc(*elem) : newSV(0));
av_store(av, r, newRV_noinc((SV*)pair));
}
}
for (w = 0; w < nvar; w++) SvREFCNT_dec(subcols[w]);
break;
}
SSize_t nv_rows = var_counts[slot];
AV *sub = newAV();
var_avs[slot] = sub;
sv_2mortal((SV*)sub);
if (nv_rows > 0) av_extend(sub, nv_rows - 1);
decode_dynamic_variant_slot(aTHX_ p, end, sub,
slot_to_kind_or_shared[slot], nv_rows, "JSON");
}
/* Distribute values into per-row hashes. */
SSize_t *cursors;
Newxz(cursors, wire_slots, SSize_t);
SAVEFREEPV(cursors);
for (r = 0; r < nrows; r++) {
unsigned char d = discs[r];
if (d == 0xff) continue;
SV **e = av_fetch(var_avs[d], cursors[d]++, 0);
if (!e) continue;
SV *row_rv = *av_fetch(av, r, 0);
HV *row_hv = (HV*)SvRV(row_rv);
SV *val = SvREFCNT_inc(*e);
hv_store(row_hv, paths[pi], (I32)path_lens[pi], val, 0);
}
}
/* Trailing shared data: N UInt64 offsets, then if final
* offset > 0, offsets[N-1] key strings + value strings.
for (slot = 0; slot < wire_slots; slot++) {
SSize_t nv_rows = var_counts[slot];
AV *sub = newAV();
var_avs[slot] = sub;
sv_2mortal((SV*)sub);
if (nv_rows > 0) av_extend(sub, nv_rows - 1);
decode_dynamic_variant_slot(aTHX_ p, end, sub,
slot_to_kind[slot], nv_rows, "Dynamic");
}
SSize_t *cursors;
Newxz(cursors, wire_slots, SSize_t);
SAVEFREEPV(cursors);
for (r = 0; r < nrows; r++) {
unsigned char d = discs[r];
if (d == 0xff) { av_store(av, r, newSV(0)); continue; }
SV **e = av_fetch(var_avs[d], cursors[d]++, 0);
av_store(av, r, e ? SvREFCNT_inc(*e) : newSV(0));
}
break;
}
default:
croak("decode: unhandled type code %d", t->code);
}
return newRV_noinc((SV *)av);
}
#ifndef CHE_DECODE_H
#define CHE_DECODE_H
/* See buffer.h for the include-order convention (EXTERN.h + perl.h +
* XSUB.h must be included by the caller before this header). */
#include "types.h"
/* Read one LEB128 varint from the byte stream, advancing *p. Croaks
* on truncation or 64-bit overflow. The cursor pair (p, end) lets
* the caller share one bounds check across many calls. */
UV dec_varint(pTHX_ const unsigned char **p, const unsigned char *end);
/* Read a length-prefixed string: varint length, then `length` raw bytes.
* `*out_s` points into the same buffer (NOT copied); `*out_len` is the
* byte length. Caller must not free the returned pointer. */
void dec_lenpfx_string(pTHX_ const unsigned char **p,
const unsigned char *end,
const char **out_s, STRLEN *out_len);
/* Shared prologue for decode_block / decode_block_rows: validate the
* input SV, position the cursor at the requested offset, read the
* block header (ncols + nrows), and run bounds checks. `fname` is
* embedded in croak messages so each XSUB reports its own name. */
void decode_block_prologue(pTHX_ SV *bytes, UV start_offset,
const char *fname,
const unsigned char **out_start,
const unsigned char **out_p,
const unsigned char **out_end,
UV *out_ncols, UV *out_nrows);
/* Decode a single column of `nrows` values of type `t`. Returns a
eg/json_path_projection.pl view on Meta::CPAN
#!/usr/bin/env perl
# Project a small subset of columns out of a wide select - decode
# only the columns we care about; the rest have their wire bytes
# consumed (so the cursor stays aligned) but are not materialized
# into SVs. Memory stays bounded by the kept-columns subset.
#
# Usage:
# perl eg/json_path_projection.pl \
# --host=db --port=8123 \
# --table=events \
# --keep=id,event_type
#
# Pin: the response is a Native stream; we walk it via select_blocks
# with `keep`, demonstrating that the projection is honored across
eg/postgres_to_clickhouse.pl view on Meta::CPAN
#!/usr/bin/env perl
# Replicate a PostgreSQL table to ClickHouse: discover the destination
# schema via for_table(), select the matching columns from Pg, and
# stream rows through this encoder's streamer to a chunked HTTP insert.
# Memory is bounded by the batch size, so the script handles tables
# with millions of rows.
#
# This uses plain DBI fetchrow_arrayref. For very large source tables
# you typically also want server-side cursors -- DBD::Pg supports them
# via $dbh->{pg_server_prepare} = 1 and ordinary selects become
# cursor-driven; or use the pg COPY protocol explicitly via
# pg_putcopydata / pg_getcopydata for the lowest per-row overhead.
#
# Usage:
# PG_DSN='dbi:Pg:host=h;dbname=src' PG_USER=... PG_PASS=...
# CH_PORT=8123 \
# perl eg/postgres_to_clickhouse.pl src_schema.events dest_table
#
# Both sides must have the same column order; types must be compatible
# (Pg int4 -> CH Int32, etc.). The script discovers the destination
# schema via for_table() and assumes the source has matching column
}
/* Per-variant data in lex order. SharedVariant has zero
* rows in our encoder's output. */
int s;
for (s = 0; s < wire_slots; s++) {
int k_match = slots[s];
if (k_match < 0) continue;
/* Array(T) variants: first pass emits offsets (so the
* downstream offset cursor is contiguous) and counts
* total elements; second pass emits inner values. */
int is_array = (k_match >= JV_ARRAY_BOOL
&& k_match <= JV_ARRAY_STRING);
if (is_array) {
uint64_t offset = 0;
for (r = 0; r < num_rows; r++) {
if (!row_hvs[r]) continue;
SV **e = hv_fetch(row_hvs[r], paths[p],
(I32)path_lens[p], 0);
if (!e || !SvOK(*e)) continue;
lib/ClickHouse/Encoder.pm view on Meta::CPAN
The 3-arg form avoids the O(N) C<substr> copy per call that
C<<< substr($bytes, $offset) >>> would entail.
An optional fourth-argument hashref filters which columns to keep:
my $block = ClickHouse::Encoder->decode_block(
$bytes, 0, { id => 1, ts => 1 });
Columns whose name isn't in the filter still consume their wire
bytes (so the cursor stays aligned) but their C<values> array is
replaced with N C<undef>s and the column hashref carries a
C<<< skipped => 1 >>> marker. Skips the SV-allocation cost for
unwanted columns; useful on wide C<select *> responses.
XS implementation: walks the Native byte stream using the same type
parser the encoder uses, so symmetric round-trips are guaranteed for
every type C<encode> handles (C<BFloat16>, alphabetical Variant
remapping, C<LowCardinality> dict indirection,
C<SimpleAggregateFunction> passthrough, JSON typed paths, etc.).
lib/ClickHouse/Encoder.pm view on Meta::CPAN
useful for very long selects where the full block list wouldn't fit
comfortably in memory.
Uses the 3-arg form of L</decode_block> (with explicit offset) to
keep total work O(N) regardless of block count. Stops cleanly when
bytes are exhausted; partial trailing bytes croak.
The optional C<keep =E<gt> \%names> hashref forwards a column filter
to L</decode_block> for every block in the stream, matching the
same semantics: present keys are decoded, absent ones still have
their bytes consumed (to keep the cursor aligned) but their values
are not materialized and their column hash carries C<skipped =E<gt> 1>.
Useful for big-fan-out select responses where only a few columns
of a wide row matter.
ClickHouse::Encoder->decode_blocks($bytes, $cb,
keep => { id => 1, event => 1 });
=head2 decode_blocks_iter
my $iter = ClickHouse::Encoder->decode_blocks_iter($bytes);
lib/ClickHouse/Encoder.pm view on Meta::CPAN
keep => { id => 1, event => 1 });
Pull bytes incrementally from a filehandle (or any read-able IO
handle), yielding each complete block to the callback as it
arrives. Uses a sliding buffer; on a truncated decode it reads more
bytes and retries. Memory stays bounded by C<chunk_size> + one
block, so this is the right entry point for select responses too
large to buffer in full. Croaks on partial trailing bytes.
The C<keep> filter is the same one L</decode_block> accepts:
unwanted columns still have their bytes consumed (so the cursor
stays aligned) but their values are not materialized into an SV
array, so peak memory stays bounded by the kept columns.
With C<decompress =E<gt> 1>, C<$fh> is expected to deliver a stream
of compressed-block-framed Native blocks (the format CH's HTTP
C<?compress=1> response uses, or a captured native-TCP Data stream
under compression). C<decode_stream> peels each compressed block
via L</decompress_native_block> before feeding the resulting raw
Native bytes into L</decode_block>.
lib/ClickHouse/Encoder.pm view on Meta::CPAN
right entry point for selects that return more than fits in
process memory.
C<$sql> must NOT end with a C<format ...> clause - C<select_blocks>
appends C<format Native> at the URL level and croaks when the SQL
trails with a different format pin. A C<FORMAT> token inside the
query body (for example a column literal) is fine.
The optional C<keep =E<gt> \%names> hashref forwards a column
filter to L</decode_block>: skipped columns still have their bytes
consumed (so the cursor stays aligned) but their values are not
materialized into SVs. Useful when you only need a few of many
select-list columns.
With C<decompress =E<gt> 1> the URL is augmented with
C<?compress=1> so ClickHouse wraps each response Native block in
its compressed-block framing (16-byte CityHash128 + 9-byte header
+ LZ4 payload). The HTTP body is then a stream of compressed blocks
which C<select_blocks> peels and decompresses block-by-block via
L</decompress_native_block> before feeding the result to
L</decode_block>. Memory stays bounded by one HTTP chunk plus one
lib/ClickHouse/Encoder.pm view on Meta::CPAN
C<Compress::Zstd> for C<'zstd'>. Both are listed as runtime
C<recommends>.
=head2 decompress_native_block
my ($plain, $consumed) = ClickHouse::Encoder->decompress_native_block(
$framed); # default hasher = bundled
my $plain = ClickHouse::Encoder->decompress_native_block(
$framed, hasher => undef); # skip checksum verification
my ($plain, $n) = ClickHouse::Encoder->decompress_native_block(
$stream, offset => $cursor); # walk a multi-block stream
Inverse of L</compress_native_block>: verifies the checksum (unless
C<hasher =E<gt> undef>), unpacks the payload by method tag, and
returns the raw Native bytes. In list context also returns the
number of bytes consumed from C<$bytes> (16 + 9 + payload length),
so the caller can advance an offset cursor through a stream of
back-to-back compressed blocks.
=head1 TYPES
=head2 Supported
=over 4
=item *
t/decode-projections.t view on Meta::CPAN
#!/usr/bin/env perl
use strict;
use warnings;
use Test::More;
use lib 'blib/lib', 'blib/arch';
use ClickHouse::Encoder;
# Column projections: pass a hashref of column names to KEEP; columns
# not in the set still decode (to advance the cursor) but their values
# array is replaced with one undef per row and the column carries a
# `skipped => 1` marker.
my $enc = ClickHouse::Encoder->new(columns => [
['id', 'UInt64'],
['user', 'String'],
['tags', 'Array(String)'],
['ts', 'DateTime'],
]);
my $bytes = $enc->encode([
t/decode-projections.t view on Meta::CPAN
'skipped col: placeholder undefs');
}
# Filter that matches nothing -> all columns skipped, but bytes consumed
{
my $block = ClickHouse::Encoder->decode_block(
$bytes, 0, { nonexistent => 1 });
is_deeply([map $_->{skipped}, @{ $block->{columns} }],
[1, 1, 1, 1],
'no matches: every column skipped');
is($block->{consumed}, length $bytes, 'cursor advanced fully');
}
# Empty filter hashref -> same as no-match
{
my $block = ClickHouse::Encoder->decode_block($bytes, 0, {});
is_deeply([map $_->{skipped}, @{ $block->{columns} }],
[1, 1, 1, 1], 'empty filter: all skipped');
}
# undef filter -> full decode (3rd arg is optional)
t/json-stress.t view on Meta::CPAN
# documented. The structural invariant (hashref-per-row, paths preserved
# if no collisions) is the real assertion.
my $vals = $block->{columns}[0]{values};
my $all_hash = !grep { ref $_ ne 'HASH' } @$vals;
ok($all_hash, "iter $iter: all rows decode as hashref");
# Spot-check leaf values that DO round-trip identically:
# - String leaves (no type collapse)
# - Int64 leaves (when input was an integer)
# - Array(String) leaves (homogeneous strings)
# This catches off-by-one cursor bugs and path-permutation bugs that
# the bare hashref check would miss.
for my $r (0..$#rows) {
my $orig = $rows[$r][0];
my $got = $vals->[$r];
_check_stable_leaves($orig, $got, "iter $iter row $r");
}
}
sub _check_stable_leaves {
my ($orig, $got, $tag) = @_;
t/roundtrip.t view on Meta::CPAN
for (1..$nrows) { push @wire_disc, ord(substr($$buf, $$off, 1)); $$off += 1 }
# wire_disc is in alphabetical-order space; map to declaration idx.
my @parts = @{ $type->{parts} };
my $wire_to_decl = $type->{wire_to_decl};
my @counts; for my $w (@wire_disc) { $counts[$w]++ if $w != 255 }
my @subcols; # subcols[wire_idx] = decoded values for that wire arm
for my $w (0 .. $#parts) {
my $decl = $wire_to_decl->[$w];
$subcols[$w] = _decode_column($buf, $off, $parts[$decl], $counts[$w] // 0);
}
my @cursors = (0) x scalar @parts;
for my $r (0 .. $nrows - 1) {
my $w = $wire_disc[$r];
if ($w == 255) { push @vals, undef; next }
push @vals, [$wire_to_decl->[$w], $subcols[$w][ $cursors[$w]++ ]];
}
}
elsif ($code eq 'LowCardinality') {
my $version = unpack('Q<', substr($$buf, $$off, 8)); $$off += 8;
my $flags = unpack('Q<', substr($$buf, $$off, 8)); $$off += 8;
my $dict_n = unpack('Q<', substr($$buf, $$off, 8)); $$off += 8;
die "LC version != 1" unless $version == 1;
my $idx_type = $flags & 0xff;
my $inner = $type->{inner};
$inner = $inner->{inner} if $inner->{code} eq 'Nullable';
t/rowbinary.t view on Meta::CPAN
# decode_row_binary needs an encoder instance for its column types;
# calling it as a class method has nothing to decode against.
{
local $@;
eval { ClickHouse::Encoder->decode_row_binary("\x01\x02") };
like($@, qr/must be called on an encoder instance/,
'decode_row_binary as class method croaks');
}
# A zero-column encoder would loop forever on any non-empty buffer
# (no per-column work to advance the cursor); guard explicitly.
{
my $enc = ClickHouse::Encoder->new(columns => []);
is_deeply($enc->decode_row_binary(''), [],
'zero columns + empty buffer -> no rows');
local $@;
eval { $enc->decode_row_binary("\x01") };
like($@, qr/no columns but/, 'zero columns + non-empty buffer croaks');
}
done_testing();