ClickHouse-Encoder
view release on metacpan or search on metacpan
eg/clickhouse_replication.pl view on Meta::CPAN
#!/usr/bin/env perl
# Replicate one ClickHouse table to another (potentially on a different
# server) by streaming Native bytes end-to-end -- no Perl-side row
# decode at all. The encoder isn't strictly needed here since both
# sides speak Native, but for_table() validates the destination schema
# and the script demonstrates the streaming pipe pattern users build
# their own variants on (filtering, transforms, etc.).
#
# This is the "free" case: source select format native, destination
# insert format native, identical schemas. With matching schemas the
# server-side codepath is a copy through the column ColumnPtr layer,
# which is the fastest cluster-reshard primitive ClickHouse offers.
#
# Usage:
# CH_SRC=src.example:8123 CH_DST=dst.example:8123 \
# perl eg/clickhouse_replication.pl src.events dst.events
#
# # With a where clause:
# SQL_FILTER="dt >= today() - 7" perl eg/clickhouse_replication.pl ...
use strict;
use warnings;
use lib 'blib/lib', 'blib/arch';
use ClickHouse::Encoder;
use HTTP::Tiny;
use File::Temp qw(tempfile);
my ($src_table, $dst_table) = @ARGV;
die "Usage: $0 <src_table> <dst_table>\n" unless $src_table && $dst_table;
my $src_endpoint = $ENV{CH_SRC} // 'localhost:8123';
my $dst_endpoint = $ENV{CH_DST} // 'localhost:8123';
my $src_url = "http://$src_endpoint/";
my $dst_url = "http://$dst_endpoint/";
my $where = $ENV{SQL_FILTER};
my ($dst_host, $dst_port) = split /:/, $dst_endpoint, 2;
$dst_port //= 8123;
# Validate the destination schema against the actual destination server
# (not localhost). We don't use the encoder for the data path -- rows go
# server-to-server as Native bytes.
my $enc = ClickHouse::Encoder->for_table($dst_table,
via => 'http',
host => $dst_host,
port => $dst_port,
);
print STDERR "destination schema validates: ", scalar(@{ $enc->columns }),
" columns\n";
# Pull from source as Native, spool to a temp file, post to destination.
# HTTP::Tiny is synchronous so we can't pipe one connection straight into
# the other; the temp file keeps process memory bounded by HTTP::Tiny's
# chunk buffer (Perl-side), trading bandwidth-disk-bandwidth for RAM.
# For tables that don't fit on local disk, partition the source query
# (where on a key range) and run this script per partition.
my $select = "select * from $src_table"
. ($where ? " where $where" : '')
. " format native";
my $insert = "insert into $dst_table format native";
my $http = HTTP::Tiny->new(timeout => 600);
my ($spool, $spool_path) = tempfile('ch-repl-XXXXXX', UNLINK => 1, TMPDIR => 1);
binmode $spool;
my $total = 0;
my $resp = $http->get($src_url . '?query=' . _esc($select), {
data_callback => sub {
print $spool $_[0] or die "spool write: $!";
$total += length $_[0];
print STDERR " fetched $total bytes...\r" if $total % (1024*1024) < 8192;
},
});
die "source select failed (status $resp->{status}): $resp->{content}"
unless $resp->{success};
close $spool or die "spool close: $!";
print STDERR "\nfetched $total bytes total, posting to destination...\n";
# Stream the spool back to the destination via a content generator;
# HTTP::Tiny calls this until it returns "" (EOF).
open my $rfh, '<', $spool_path or die "open spool $spool_path: $!";
binmode $rfh;
my $resp2 = $http->post($dst_url . '?query=' . _esc($insert), {
content => sub {
my $buf;
my $n = read($rfh, $buf, 64 * 1024);
return defined $n && $n > 0 ? $buf : '';
},
headers => {
'Content-Type' => 'application/octet-stream',
'Content-Length' => $total,
},
});
close $rfh;
die "destination insert failed (status $resp2->{status}): $resp2->{content}"
( run in 0.824 second using v1.01-cache-2.11-cpan-0d23b851a93 )