ClickHouse-Encoder

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

    compressed_writer($mode, \&writer)    wrap a writer with gzip/zstd
    flatten_nested(\@cols)                expand Nested(...) -> flat name.field
    encode_row_binary(\@rows)             RowBinary body (row-major format)
    decode_row_binary($bytes)             decode a RowBinary byte string

HTTP insert
    ClickHouse::Encoder->insert_http(host=>..., port=>..., table=>..., rows=>...)
    one-shot HTTP insert (POSTs Native bytes, optional zstd/gzip).
    ClickHouse::Encoder->bulk_inserter(host=>..., table=>..., columns=>...)
    long-lived inserter with auto-flush at batch_size, retries on
    transient errors, keep-alive, optional compression. ->summary
    rolls up CH X-ClickHouse-Summary stats across batches;
    ->last_response gives the most recent flush's HTTP response with
    parsed CH metadata attached at ->{ch}{query-id,server,summary,...}.
    ClickHouse::Encoder->for_query($select_sql, host=>..., port=>...)
    runs describe ($select_sql) and returns an encoder configured for
    that result shape; useful when the schema isn't a real table.
    ClickHouse::Encoder->ping(host=>..., port=>...)
    liveness probe via /ping; returns 1 or croaks.

    All HTTP entry points accept scheme=>'https' (needs IO::Socket::SSL

eg/migrate_with_transform.pl  view on Meta::CPAN

# undef to drop the row. Customize this for the actual migration.
my $transform = sub {
    my $row = shift;
    # Example: drop rows where the first column is NULL; uppercase a
    # known string column at index 1.
    return undef if !defined $row->[0];
    $row->[1] = uc($row->[1]) if defined $row->[1];
    return $row;
};

# Long-lived sink: bulk_inserter pools an HTTP::Tiny with keep-alive
# and auto-flushes at batch_size.
my $bi = ClickHouse::Encoder->bulk_inserter(
    host       => $dst_host, port => $dst_port,
    table      => $dst_tbl,
    columns    => \@columns,
    batch_size => $batch_size,
    retries    => 3,
);

my $copied = 0;

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

        unless $scheme eq 'http' || $scheme eq 'https';
    die "endpoint: host must not contain URL-structural characters "
      . "(got '$host')\n"
        if $host =~ m{[:/?#&\s]} || !length $host;
    die "endpoint: port must be a positive integer (got '$port')\n"
        unless $port =~ /\A[1-9]\d{0,4}\z/ && $port < 65536;
    return ($scheme, $host, $port);
}

# Build an HTTP::Tiny instance honoring ssl_options (verify_SSL, SSL_ca_file,
# etc.) and keep_alive. Shared by insert_http, bulk_inserter, server_version,
# ping, select_blocks; callers pass %opts unchanged. Loading HTTP::Tiny here
# keeps the require local to HTTP code paths.
sub _http_tiny {
    my (%opts) = @_;
    require HTTP::Tiny;
    my @args = (timeout => $opts{timeout} // 60);
    push @args, keep_alive => 1 if $opts{keep_alive};
    push @args, SSL_options => $opts{ssl_options} if $opts{ssl_options};
    push @args, verify_SSL  => $opts{verify_SSL}  if exists $opts{verify_SSL};
    return HTTP::Tiny->new(@args);
}

# Parse a flat CH JSON object string (X-ClickHouse-Summary /
# X-ClickHouse-Progress) without depending on JSON::PP. Both are small
# flat objects of stringified integers (read_rows, written_rows,
# total_rows_to_read, elapsed_ns, ...). Returns a hashref or undef.
sub _parse_ch_kv {

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

      . "after last complete compressed block\n"
        if $decompress && length $buf;
    die "select_blocks: " . length($inner_buf) . " trailing bytes "
      . "after last complete block\n"
        if length $inner_buf;
    return;
}

# Returns a bulk-inserter object: ->push($row), ->push_many(\@rows),
# ->flush (idempotent), ->finish. Holds a single HTTP::Tiny instance
# across batches (so keepalive applies) and auto-flushes when the
# accumulated row count crosses batch_size. Transient HTTP failures
# (5xx, network errors) are retried up to retries times with linear
# backoff; 4xx errors die immediately.
sub bulk_inserter {
    my ($class_or_self, %args) = @_;
    return ClickHouse::Encoder::BulkInserter->new(%args,
        _origin => $class_or_self);
}

package ClickHouse::Encoder::BulkInserter;  ## no critic (ProhibitMultiplePackages)

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

        enc        => $enc,
        url        => $url,
        hdr        => $hdr,
        rows       => [],
        batch_size => $args{batch_size} // 10_000,
        retries    => $args{retries}    // 3,
        retry_wait => $args{retry_wait} // 0.5,
        retry_max_wait => $args{retry_max_wait} // 30,
        compress   => $compress,
        http       => ClickHouse::Encoder::_http_tiny(
            %args, timeout => $timeout, keep_alive => 1),
        origin     => $origin,
        sent_rows  => 0,
        sent_batches => 0,
        last_response => undef,
        summary    => {},
    }, $class;
}

sub push :method {  ## no critic (ProhibitBuiltinHomonyms)
    my ($self, $row) = @_;

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

    $st->push_row($row);
    $st->push_row($row);
    ...
    $st->finish;

Returns a C<ClickHouse::Encoder::Streamer> object that buffers rows and
flushes a complete Native block to C<&writer> every C<batch_size> rows.
Call C<finish> to flush any partial last batch, or C<reset> to discard
the buffered rows without flushing (useful for error recovery after an
upstream failure). C<buffered_count> and C<is_empty> let producers
inspect the current backlog. The streamer keeps the encoder alive for
its own lifetime, so dropping the encoder reference after creating the
streamer is safe.

Options:

=over 4

=item C<batch_size =E<gt> N>

Flush every C<N> rows. Default 10_000.

lib/ClickHouse/Encoder.pm  view on Meta::CPAN

        names => [...],
        types => [...],
        rows  => [[...], [...], ...],
        consumed => $bytes_used,
    }

Calls an XS row-major decoder (C<decode_block_rows>) that walks each
column then immediately distributes its values into the per-row
arrayrefs and frees the column AV. Peak memory holds one column's
AV plus the row AVs (vs both column- and row-major representations
fully alive, which a Perl-side transpose would entail). Throughput
is similar to L</decode_block>; the win is the tighter peak memory
on wide blocks.

=head2 decode_block_rows

    my $r = ClickHouse::Encoder->decode_block_rows($bytes, $offset);

XS row-major decoder; same return shape as L</decode_rows>. Direct
entry point if you want to avoid the L</decode_rows> Perl-side
trampoline.

lib/ClickHouse/Encoder.pm  view on Meta::CPAN


=head2 bulk_inserter

    my $bi = ClickHouse::Encoder->bulk_inserter(
        host => 'db.example', port => 8123, table => 'events',
        columns => \@cols, batch_size => 5000, compress => 'zstd',
        retries => 3);
    $bi->push([$row]) for @rows;
    $bi->finish;

Holds an L<HTTP::Tiny> instance with keep-alive across batches,
accumulates rows, auto-flushes at C<batch_size>, retries transient
HTTP failures (5xx and 599 network errors) with exponential backoff
and jitter. 4xx errors die immediately. Options:

=over 4

=item C<host>, C<port>, C<database>, C<user>, C<password>, C<scheme>, C<timeout>

Same as L</for_table>; passed to L<HTTP::Tiny>.

runtime.h  view on Meta::CPAN

    TypeInfo *type;
} Column;

typedef struct {
    Column *columns;
    int num_columns;
} Encoder;

/* Streamer: collects rows in an AV and flushes one Native block per
 * batch. Holds strong refs to keep the underlying encoder/writer/buffer
 * AVs alive for the streamer's lifetime. */
typedef struct {
    SV *enc_sv;        /* strong ref to keep the encoder alive */
    Encoder *enc;      /* unowned cached pointer */
    SV *writer;        /* strong ref to the writer CV */
    AV *buffer;        /* strong ref - owned */
    int batch_size;
    /* When non-NULL, every emitted batch is wrapped in CH's
     * compressed-block framing via compress_native_block before being
     * passed to writer. compress_mode is one of "lz4" / "zstd" /
     * "auto" / "none"; hasher_sv (if non-NULL) overrides the default
     * cityhash128. Both are owned by the Streamer. */
    char *compress_mode;

t/extended_types.t  view on Meta::CPAN

    cmp_ok($sizes[0], '>', 50, 'first chunk has substantial size');
}
{
    my $enc = ClickHouse::Encoder->new(columns => [['x','UInt32']]);
    my $bytes = 0;
    my $st = $enc->streamer(sub { $bytes += length($_[0]) }, batch_size => 7);
    $st->push_row([$_]) for 1..20;
    $st->finish;
    cmp_ok($bytes, '>', 0, 'streamer push_row + finish writes bytes');
}
# (encoder-dropped-while-streamer-alive and writer-croak-recovery cases
#  are exhaustively covered in t/streamer-edge.t.)

# Decimal256 ----------------------------------------------------------------
{
    my $enc = ClickHouse::Encoder->new(columns => [['v','Decimal256(2)']]);
    my $bin = $enc->encode([['12345.67'], ['-99999.99']]);
    my $off = skip_header($bin);
    is(unpack('Q<', substr($bin, $off,    8)), 1234567,
       'Decimal256(2) lo limb for 12345.67');
    is(unpack('Q<', substr($bin, $off+8,  8)), 0, 'Decimal256 limb 1');

t/ssl-options.t  view on Meta::CPAN

    is($t->{verify_SSL}, 1, 'verify_SSL forwarded to constructor');
}

# ssl_options passthrough (a hashref of IO::Socket::SSL options).
{
    my $opts = { SSL_ca_file => '/etc/ssl/certs/ca.pem' };
    my $t = ClickHouse::Encoder::_http_tiny(ssl_options => $opts);
    is_deeply($t->{SSL_options}, $opts, 'ssl_options forwarded as SSL_options');
}

# timeout + keep_alive still work alongside SSL options.
{
    my $t = ClickHouse::Encoder::_http_tiny(
        timeout => 17, keep_alive => 1, verify_SSL => 0);
    is($t->{timeout},    17, 'timeout forwarded');
    is($t->{keep_alive}, 1,  'keep_alive forwarded');
    is($t->{verify_SSL}, 0,  'verify_SSL=0 forwarded (not dropped as falsy)');
}

# No SSL options -> a plain HTTP::Tiny with the default timeout.
{
    my $t = ClickHouse::Encoder::_http_tiny();
    isa_ok($t, 'HTTP::Tiny', 'plain builder');
    ok(!exists $t->{SSL_options},
       'no SSL_options key when ssl_options not given');
}

t/streamer-edge.t  view on Meta::CPAN


# push_row after finish is allowed; behaves like a fresh streaming session.
{
    my @blocks;
    my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 10);
    $st->push_row([1]); $st->finish;
    $st->push_row([2]); $st->finish;
    is(scalar @blocks, 2, 'push_row after finish reopens the streamer');
}

# Encoder dropped before streamer: streamer keeps the encoder alive
# (XS holds a refcount) and finish still works.
{
    my @blocks;
    my $st;
    {
        my $enc = make_enc();
        $st = $enc->streamer(sub { push @blocks, $_[0] }, batch_size => 5);
    }   # $enc drops out of scope here
    $st->push_row([$_]) for 1..3;
    $st->finish;



( run in 1.852 second using v1.01-cache-2.11-cpan-df04353d9ac )