ClickHouse-Encoder
view release on metacpan or search on metacpan
eg/migrate_with_transform.pl view on Meta::CPAN
#!/usr/bin/env perl
# CH-to-CH migration with a row-level transform between read and
# write. The source schema is discovered via for_table, the target
# encoder mirrors it (or you can override with --target-columns),
# and the transform coderef gets a chance to mutate every row
# between decode and re-encode. Useful for column renames,
# type coercions, or dropping rows.
#
# Usage:
# perl eg/migrate_with_transform.pl \
# --src-host=src.db --src-port=8123 \
# --dst-host=dst.db --dst-port=8123 \
# --src-table=events_old --dst-table=events_new
use strict;
use warnings;
use Getopt::Long;
use ClickHouse::Encoder;
my ($src_host, $src_port, $src_tbl) = ('127.0.0.1', 8123, 'src_t');
my ($dst_host, $dst_port, $dst_tbl) = ('127.0.0.1', 8123, 'dst_t');
my $batch_size = 5000;
GetOptions(
'src-host=s' => \$src_host,
'src-port=i' => \$src_port,
'src-table=s' => \$src_tbl,
'dst-host=s' => \$dst_host,
'dst-port=i' => \$dst_port,
'dst-table=s' => \$dst_tbl,
'batch-size=i' => \$batch_size,
) or die "bad options\n";
# Discover the source schema. Tweak the column list here to point
# at the destination's shape if it diverges (rename / type change).
my $src_enc = ClickHouse::Encoder->for_table($src_tbl,
via => 'http', host => $src_host, port => $src_port);
my @columns = @{ $src_enc->columns };
# Per-row transform: receives a row (arrayref aligned with @columns)
# and returns either a modified row (or the same one) to forward, or
# undef to drop the row. Customize this for the actual migration.
my $transform = sub {
my $row = shift;
# Example: drop rows where the first column is NULL; uppercase a
# known string column at index 1.
return undef if !defined $row->[0];
$row->[1] = uc($row->[1]) if defined $row->[1];
return $row;
};
# Long-lived sink: bulk_inserter pools an HTTP::Tiny with keep-alive
# and auto-flushes at batch_size.
my $bi = ClickHouse::Encoder->bulk_inserter(
host => $dst_host, port => $dst_port,
table => $dst_tbl,
columns => \@columns,
batch_size => $batch_size,
retries => 3,
);
my $copied = 0;
my $dropped = 0;
ClickHouse::Encoder->select_blocks(
"select * from $src_tbl",
host => $src_host, port => $src_port,
on_block => sub {
my $blk = shift;
for my $r (0 .. $blk->{nrows} - 1) {
my @row = map $_->{values}[$r], @{ $blk->{columns} };
my $out = $transform->(\@row);
if (defined $out) {
$bi->push($out);
$copied++;
} else {
$dropped++;
}
}
},
);
my $info = $bi->finish;
warn "# copied $copied rows ($dropped dropped) in "
. "$info->{batches} batches\n";
( run in 1.526 second using v1.01-cache-2.11-cpan-df04353d9ac )