ClickHouse-Encoder
view release on metacpan or search on metacpan
lib/ClickHouse/Encoder/TCP.pm view on Meta::CPAN
# protocol packets. Built for insert pipelines: pack a Hello + Query,
# then wrap encoded Native blocks in Data packets, then signal
# end-of-insert. Transport is the caller's job (IO::Socket,
# AnyEvent::Handle, IO::Async::Stream, etc.).
#
# Varint and length-prefixed string codecs are XS (shared with the
# main encoder's buffer helpers); packet-level layout stays in Perl
# for readability since it's not a hot path.
#
# Targets protocol revision 54429: predates flexible settings,
# inter-server secret, OpenTelemetry, parallel-replica fields, and
# the recent chunking negotiation extension. Modern CH servers
# (protocol revision >= ~54475) handshake additional bytes past
# Hello that this subset doesn't respond to - prefer HTTP for
# integration with recent servers.
# XS loader handled by the parent module; ensure it's loaded so the
# XSUBs under PACKAGE = ClickHouse::Encoder::TCP are available.
use ClickHouse::Encoder ();
## no critic (ProhibitConstantPragma)
# Readability beats Readonly here - these are protocol-defined
# numeric tags, used as bare identifiers throughout the module.
use constant DEFAULT_REVISION => 54429;
# Client packet types
use constant CLIENT_HELLO => 0;
use constant CLIENT_QUERY => 1;
use constant CLIENT_DATA => 2;
use constant CLIENT_CANCEL => 3;
use constant CLIENT_PING => 4;
# Server packet types
use constant SERVER_HELLO => 0;
use constant SERVER_DATA => 1;
use constant SERVER_EXCEPTION => 2;
use constant SERVER_PROGRESS => 3;
use constant SERVER_PONG => 4;
use constant SERVER_END_OF_STREAM => 5;
use constant SERVER_PROFILE_INFO => 6;
use constant SERVER_TOTALS => 7;
use constant SERVER_EXTREMES => 8;
use constant SERVER_TABLE_COLUMNS => 11;
use constant SERVER_PROFILE_EVENTS => 14;
# Query processing stages
use constant STAGE_FETCH_COLUMNS => 0;
use constant STAGE_WITH_MERGEABLE => 1;
use constant STAGE_COMPLETE => 2;
# Compression flags in Query packet
use constant COMPRESSION_DISABLE => 0;
use constant COMPRESSION_ENABLE => 1;
# Varint and length-prefixed string codecs are XS-backed; the bound
# subs (pack_varint, unpack_varint, pack_string, unpack_string)
# live in PACKAGE = ClickHouse::Encoder::TCP inside Encoder.xs.
# ----- client packets ------------------------------------------------
# Hello packet: announces our protocol revision and credentials.
#
# pack_hello(
# client_name => 'ClickHouse::Encoder', # default
# major => 1, # default
# minor => 0, # default
# revision => 54429, # default
# database => 'default',
# user => 'default',
# password => '',
# );
sub pack_hello {
my (undef, %o) = @_;
return pack_varint(CLIENT_HELLO)
. pack_string($o{client_name} // 'ClickHouse::Encoder')
. pack_varint($o{major} // 1)
. pack_varint($o{minor} // 0)
. pack_varint($o{revision} // DEFAULT_REVISION)
. pack_string($o{database} // 'default')
. pack_string($o{user} // 'default')
. pack_string($o{password} // '');
}
# Query packet. At revision 54429 the body is:
# varint CLIENT_QUERY (=1)
# string query_id (often empty so the server generates one)
# <ClientInfo> (block - see _pack_client_info)
# string settings (\n-separated key=value pairs, empty for none)
# varint query_processing_stage (default Complete)
# varint compression flag (default Disable)
# string query SQL text
sub pack_query {
my (undef, %o) = @_;
return pack_varint(CLIENT_QUERY)
. pack_string($o{query_id} // '')
. _pack_client_info(\%o)
. _pack_settings($o{settings})
. pack_varint($o{stage} // STAGE_COMPLETE)
. pack_varint($o{compression} // COMPRESSION_DISABLE)
. pack_string($o{query} // die "pack_query: 'query' is required\n");
}
# ClientInfo subblock. At revision 54429 the fields after the
# fixed prefix are: quota_key (since 54058), version_patch (since
# 54401). We use those defaults; later-revision fields (interserver
# secret, OpenTelemetry trace context, parallel-replica metadata)
# would need a higher client revision to be negotiated.
sub _pack_client_info {
my $o = shift;
my $rev = $o->{revision} // DEFAULT_REVISION;
my $out = '';
$out .= chr($o->{query_kind} // 1); # 1 = initial query
$out .= pack_string($o->{initial_user} // '');
$out .= pack_string($o->{initial_query_id} // '');
$out .= pack_string($o->{initial_address} // '0.0.0.0:0');
# interface: 1 = TCP
$out .= chr(1);
# os_user, client_hostname, client_name
$out .= pack_string($o->{os_user} // '');
$out .= pack_string($o->{client_hostname} // 'localhost');
$out .= pack_string($o->{client_name} // 'ClickHouse::Encoder');
( run in 1.078 second using v1.01-cache-2.11-cpan-140bd7fdf52 )