ClickHouse-Encoder

 view release on metacpan or  search on metacpan

lib/ClickHouse/Encoder/TCP.pm  view on Meta::CPAN

# protocol packets. Built for insert pipelines: pack a Hello + Query,
# then wrap encoded Native blocks in Data packets, then signal
# end-of-insert. Transport is the caller's job (IO::Socket,
# AnyEvent::Handle, IO::Async::Stream, etc.).
#
# Varint and length-prefixed string codecs are XS (shared with the
# main encoder's buffer helpers); packet-level layout stays in Perl
# for readability since it's not a hot path.
#
# Targets protocol revision 54429: predates flexible settings,
# inter-server secret, OpenTelemetry, parallel-replica fields, and
# the recent chunking negotiation extension. Modern CH servers
# (protocol revision >= ~54475) handshake additional bytes past
# Hello that this subset doesn't respond to - prefer HTTP for
# integration with recent servers.

# XS loader handled by the parent module; ensure it's loaded so the
# XSUBs under PACKAGE = ClickHouse::Encoder::TCP are available.
use ClickHouse::Encoder ();

## no critic (ProhibitConstantPragma)
# Readability beats Readonly here - these are protocol-defined
# numeric tags, used as bare identifiers throughout the module.
use constant DEFAULT_REVISION => 54429;

# Client packet types
use constant CLIENT_HELLO  => 0;
use constant CLIENT_QUERY  => 1;
use constant CLIENT_DATA   => 2;
use constant CLIENT_CANCEL => 3;
use constant CLIENT_PING   => 4;

# Server packet types
use constant SERVER_HELLO          => 0;
use constant SERVER_DATA           => 1;
use constant SERVER_EXCEPTION      => 2;
use constant SERVER_PROGRESS       => 3;
use constant SERVER_PONG           => 4;
use constant SERVER_END_OF_STREAM  => 5;
use constant SERVER_PROFILE_INFO   => 6;
use constant SERVER_TOTALS         => 7;
use constant SERVER_EXTREMES       => 8;
use constant SERVER_TABLE_COLUMNS  => 11;
use constant SERVER_PROFILE_EVENTS => 14;

# Query processing stages
use constant STAGE_FETCH_COLUMNS      => 0;
use constant STAGE_WITH_MERGEABLE     => 1;
use constant STAGE_COMPLETE           => 2;

# Compression flags in Query packet
use constant COMPRESSION_DISABLE => 0;
use constant COMPRESSION_ENABLE  => 1;

# Varint and length-prefixed string codecs are XS-backed; the bound
# subs (pack_varint, unpack_varint, pack_string, unpack_string)
# live in PACKAGE = ClickHouse::Encoder::TCP inside Encoder.xs.

# ----- client packets ------------------------------------------------

# Hello packet: announces our protocol revision and credentials.
#
#   pack_hello(
#       client_name => 'ClickHouse::Encoder',  # default
#       major       => 1,                       # default
#       minor       => 0,                       # default
#       revision    => 54429,                   # default
#       database    => 'default',
#       user        => 'default',
#       password    => '',
#   );
sub pack_hello {
    my (undef, %o) = @_;
    return pack_varint(CLIENT_HELLO)
         . pack_string($o{client_name} // 'ClickHouse::Encoder')
         . pack_varint($o{major}    // 1)
         . pack_varint($o{minor}    // 0)
         . pack_varint($o{revision} // DEFAULT_REVISION)
         . pack_string($o{database} // 'default')
         . pack_string($o{user}     // 'default')
         . pack_string($o{password} // '');
}

# Query packet. At revision 54429 the body is:
#   varint  CLIENT_QUERY (=1)
#   string  query_id (often empty so the server generates one)
#   <ClientInfo>      (block - see _pack_client_info)
#   string  settings  (\n-separated key=value pairs, empty for none)
#   varint  query_processing_stage (default Complete)
#   varint  compression flag (default Disable)
#   string  query SQL text
sub pack_query {
    my (undef, %o) = @_;
    return pack_varint(CLIENT_QUERY)
         . pack_string($o{query_id} // '')
         . _pack_client_info(\%o)
         . _pack_settings($o{settings})
         . pack_varint($o{stage}       // STAGE_COMPLETE)
         . pack_varint($o{compression} // COMPRESSION_DISABLE)
         . pack_string($o{query} // die "pack_query: 'query' is required\n");
}

# ClientInfo subblock. At revision 54429 the fields after the
# fixed prefix are: quota_key (since 54058), version_patch (since
# 54401). We use those defaults; later-revision fields (interserver
# secret, OpenTelemetry trace context, parallel-replica metadata)
# would need a higher client revision to be negotiated.
sub _pack_client_info {
    my $o = shift;
    my $rev = $o->{revision} // DEFAULT_REVISION;
    my $out = '';
    $out .= chr($o->{query_kind} // 1);   # 1 = initial query
    $out .= pack_string($o->{initial_user}     // '');
    $out .= pack_string($o->{initial_query_id} // '');
    $out .= pack_string($o->{initial_address}  // '0.0.0.0:0');
    # interface: 1 = TCP
    $out .= chr(1);
    # os_user, client_hostname, client_name
    $out .= pack_string($o->{os_user}         // '');
    $out .= pack_string($o->{client_hostname} // 'localhost');
    $out .= pack_string($o->{client_name}     // 'ClickHouse::Encoder');



( run in 1.078 second using v1.01-cache-2.11-cpan-140bd7fdf52 )