ClickHouse-Encoder
view release on metacpan or search on metacpan
compressed_writer($mode, \&writer) wrap a writer with gzip/zstd
flatten_nested(\@cols) expand Nested(...) -> flat name.field
encode_row_binary(\@rows) RowBinary body (row-major format)
decode_row_binary($bytes) decode a RowBinary byte string
HTTP insert
ClickHouse::Encoder->insert_http(host=>..., port=>..., table=>..., rows=>...)
one-shot HTTP insert (POSTs Native bytes, optional zstd/gzip).
ClickHouse::Encoder->bulk_inserter(host=>..., table=>..., columns=>...)
long-lived inserter with auto-flush at batch_size, retries on
transient errors, keep-alive, optional compression. ->summary
rolls up CH X-ClickHouse-Summary stats across batches;
->last_response gives the most recent flush's HTTP response with
parsed CH metadata attached at ->{ch}{query-id,server,summary,...}.
ClickHouse::Encoder->for_query($select_sql, host=>..., port=>...)
runs describe ($select_sql) and returns an encoder configured for
that result shape; useful when the schema isn't a real table.
ClickHouse::Encoder->ping(host=>..., port=>...)
liveness probe via /ping; returns 1 or croaks.
All HTTP entry points accept scheme=>'https' (needs IO::Socket::SSL
eg/migrate_with_transform.pl view on Meta::CPAN
# undef to drop the row. Customize this for the actual migration.
my $transform = sub {
my $row = shift;
# Example: drop rows where the first column is NULL; uppercase a
# known string column at index 1.
return undef if !defined $row->[0];
$row->[1] = uc($row->[1]) if defined $row->[1];
return $row;
};
# Long-lived sink: bulk_inserter pools an HTTP::Tiny with keep-alive
# and auto-flushes at batch_size.
my $bi = ClickHouse::Encoder->bulk_inserter(
host => $dst_host, port => $dst_port,
table => $dst_tbl,
columns => \@columns,
batch_size => $batch_size,
retries => 3,
);
my $copied = 0;
lib/ClickHouse/Encoder.pm view on Meta::CPAN
unless $scheme eq 'http' || $scheme eq 'https';
die "endpoint: host must not contain URL-structural characters "
. "(got '$host')\n"
if $host =~ m{[:/?#&\s]} || !length $host;
die "endpoint: port must be a positive integer (got '$port')\n"
unless $port =~ /\A[1-9]\d{0,4}\z/ && $port < 65536;
return ($scheme, $host, $port);
}
# Build an HTTP::Tiny instance honoring ssl_options (verify_SSL, SSL_ca_file,
# etc.) and keep_alive. Shared by insert_http, bulk_inserter, server_version,
# ping, select_blocks; callers pass %opts unchanged. Loading HTTP::Tiny here
# keeps the require local to HTTP code paths.
sub _http_tiny {
my (%opts) = @_;
require HTTP::Tiny;
my @args = (timeout => $opts{timeout} // 60);
push @args, keep_alive => 1 if $opts{keep_alive};
push @args, SSL_options => $opts{ssl_options} if $opts{ssl_options};
push @args, verify_SSL => $opts{verify_SSL} if exists $opts{verify_SSL};
return HTTP::Tiny->new(@args);
}
# Parse a flat CH JSON object string (X-ClickHouse-Summary /
# X-ClickHouse-Progress) without depending on JSON::PP. Both are small
# flat objects of stringified integers (read_rows, written_rows,
# total_rows_to_read, elapsed_ns, ...). Returns a hashref or undef.
sub _parse_ch_kv {
lib/ClickHouse/Encoder.pm view on Meta::CPAN
. "after last complete compressed block\n"
if $decompress && length $buf;
die "select_blocks: " . length($inner_buf) . " trailing bytes "
. "after last complete block\n"
if length $inner_buf;
return;
}
# Returns a bulk-inserter object: ->push($row), ->push_many(\@rows),
# ->flush (idempotent), ->finish. Holds a single HTTP::Tiny instance
# across batches (so keepalive applies) and auto-flushes when the
# accumulated row count crosses batch_size. Transient HTTP failures
# (5xx, network errors) are retried up to retries times with linear
# backoff; 4xx errors die immediately.
sub bulk_inserter {
my ($class_or_self, %args) = @_;
return ClickHouse::Encoder::BulkInserter->new(%args,
_origin => $class_or_self);
}
package ClickHouse::Encoder::BulkInserter; ## no critic (ProhibitMultiplePackages)
lib/ClickHouse/Encoder.pm view on Meta::CPAN
enc => $enc,
url => $url,
hdr => $hdr,
rows => [],
batch_size => $args{batch_size} // 10_000,
retries => $args{retries} // 3,
retry_wait => $args{retry_wait} // 0.5,
retry_max_wait => $args{retry_max_wait} // 30,
compress => $compress,
http => ClickHouse::Encoder::_http_tiny(
%args, timeout => $timeout, keep_alive => 1),
origin => $origin,
sent_rows => 0,
sent_batches => 0,
last_response => undef,
summary => {},
}, $class;
}
sub push :method { ## no critic (ProhibitBuiltinHomonyms)
my ($self, $row) = @_;
lib/ClickHouse/Encoder.pm view on Meta::CPAN
$st->push_row($row);
$st->push_row($row);
...
$st->finish;
Returns a C<ClickHouse::Encoder::Streamer> object that buffers rows and
flushes a complete Native block to C<&writer> every C<batch_size> rows.
Call C<finish> to flush any partial last batch, or C<reset> to discard
the buffered rows without flushing (useful for error recovery after an
upstream failure). C<buffered_count> and C<is_empty> let producers
inspect the current backlog. The streamer keeps the encoder alive for
its own lifetime, so dropping the encoder reference after creating the
streamer is safe.
Options:
=over 4
=item C<batch_size =E<gt> N>
Flush every C<N> rows. Default 10_000.
lib/ClickHouse/Encoder.pm view on Meta::CPAN
names => [...],
types => [...],
rows => [[...], [...], ...],
consumed => $bytes_used,
}
Calls an XS row-major decoder (C<decode_block_rows>) that walks each
column then immediately distributes its values into the per-row
arrayrefs and frees the column AV. Peak memory holds one column's
AV plus the row AVs (vs both column- and row-major representations
fully alive, which a Perl-side transpose would entail). Throughput
is similar to L</decode_block>; the win is the tighter peak memory
on wide blocks.
=head2 decode_block_rows
my $r = ClickHouse::Encoder->decode_block_rows($bytes, $offset);
XS row-major decoder; same return shape as L</decode_rows>. Direct
entry point if you want to avoid the L</decode_rows> Perl-side
trampoline.
lib/ClickHouse/Encoder.pm view on Meta::CPAN
=head2 bulk_inserter
my $bi = ClickHouse::Encoder->bulk_inserter(
host => 'db.example', port => 8123, table => 'events',
columns => \@cols, batch_size => 5000, compress => 'zstd',
retries => 3);
$bi->push([$row]) for @rows;
$bi->finish;
Holds an L<HTTP::Tiny> instance with keep-alive across batches,
accumulates rows, auto-flushes at C<batch_size>, retries transient
HTTP failures (5xx and 599 network errors) with exponential backoff
and jitter. 4xx errors die immediately. Options:
=over 4
=item C<host>, C<port>, C<database>, C<user>, C<password>, C<scheme>, C<timeout>
Same as L</for_table>; passed to L<HTTP::Tiny>.
TypeInfo *type;
} Column;
typedef struct {
Column *columns;
int num_columns;
} Encoder;
/* Streamer: collects rows in an AV and flushes one Native block per
* batch. Holds strong refs to keep the underlying encoder/writer/buffer
* AVs alive for the streamer's lifetime. */
typedef struct {
SV *enc_sv; /* strong ref to keep the encoder alive */
Encoder *enc; /* unowned cached pointer */
SV *writer; /* strong ref to the writer CV */
AV *buffer; /* strong ref - owned */
int batch_size;
/* When non-NULL, every emitted batch is wrapped in CH's
* compressed-block framing via compress_native_block before being
* passed to writer. compress_mode is one of "lz4" / "zstd" /
* "auto" / "none"; hasher_sv (if non-NULL) overrides the default
* cityhash128. Both are owned by the Streamer. */
char *compress_mode;
t/extended_types.t view on Meta::CPAN
cmp_ok($sizes[0], '>', 50, 'first chunk has substantial size');
}
{
my $enc = ClickHouse::Encoder->new(columns => [['x','UInt32']]);
my $bytes = 0;
my $st = $enc->streamer(sub { $bytes += length($_[0]) }, batch_size => 7);
$st->push_row([$_]) for 1..20;
$st->finish;
cmp_ok($bytes, '>', 0, 'streamer push_row + finish writes bytes');
}
# (encoder-dropped-while-streamer-alive and writer-croak-recovery cases
# are exhaustively covered in t/streamer-edge.t.)
# Decimal256 ----------------------------------------------------------------
{
my $enc = ClickHouse::Encoder->new(columns => [['v','Decimal256(2)']]);
my $bin = $enc->encode([['12345.67'], ['-99999.99']]);
my $off = skip_header($bin);
is(unpack('Q<', substr($bin, $off, 8)), 1234567,
'Decimal256(2) lo limb for 12345.67');
is(unpack('Q<', substr($bin, $off+8, 8)), 0, 'Decimal256 limb 1');
t/ssl-options.t view on Meta::CPAN
is($t->{verify_SSL}, 1, 'verify_SSL forwarded to constructor');
}
# ssl_options passthrough (a hashref of IO::Socket::SSL options).
{
my $opts = { SSL_ca_file => '/etc/ssl/certs/ca.pem' };
my $t = ClickHouse::Encoder::_http_tiny(ssl_options => $opts);
is_deeply($t->{SSL_options}, $opts, 'ssl_options forwarded as SSL_options');
}
# timeout + keep_alive still work alongside SSL options.
{
my $t = ClickHouse::Encoder::_http_tiny(
timeout => 17, keep_alive => 1, verify_SSL => 0);
is($t->{timeout}, 17, 'timeout forwarded');
is($t->{keep_alive}, 1, 'keep_alive forwarded');
is($t->{verify_SSL}, 0, 'verify_SSL=0 forwarded (not dropped as falsy)');
}
# No SSL options -> a plain HTTP::Tiny with the default timeout.
{
my $t = ClickHouse::Encoder::_http_tiny();
isa_ok($t, 'HTTP::Tiny', 'plain builder');
ok(!exists $t->{SSL_options},
'no SSL_options key when ssl_options not given');
}
t/streamer-edge.t view on Meta::CPAN
# push_row after finish is allowed; behaves like a fresh streaming session.
{
my @blocks;
my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 10);
$st->push_row([1]); $st->finish;
$st->push_row([2]); $st->finish;
is(scalar @blocks, 2, 'push_row after finish reopens the streamer');
}
# Encoder dropped before streamer: streamer keeps the encoder alive
# (XS holds a refcount) and finish still works.
{
my @blocks;
my $st;
{
my $enc = make_enc();
$st = $enc->streamer(sub { push @blocks, $_[0] }, batch_size => 5);
} # $enc drops out of scope here
$st->push_row([$_]) for 1..3;
$st->finish;
( run in 3.255 seconds using v1.01-cache-2.11-cpan-df04353d9ac )