EV-ClickHouse
view release on metacpan or search on metacpan
lib/EV/ClickHouse.pm view on Meta::CPAN
use XSLoader;
XSLoader::load __PACKAGE__, $VERSION;
}
*q = \&query;
*reconnect = \&reset;
*disconnect = \&finish;
sub _uri_unescape { my $s = $_[0]; $s =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/ge; $s }
sub new {
my ($class, %args) = @_;
# Connection URI: clickhouse://user:pass@host:port/database
if (my $uri = delete $args{uri}) {
if ($uri =~ m{^clickhouse(?:\+(\w+))?://(?:([^:@]*?)(?::([^@]*))?\@)?([^/:]+)(?::(\d+))?(?:/([^?]*))?(?:\?(.*))?$}) {
my ($proto, $u, $pw, $h, $p, $db, $qs) = ($1, $2, $3, $4, $5, $6, $7);
$args{protocol} //= $proto if $proto;
$args{user} //= _uri_unescape($u) if defined $u && $u ne '';
$args{password} //= _uri_unescape($pw) if defined $pw;
$args{host} //= $h;
$args{port} //= $p if defined $p;
$args{database} //= _uri_unescape($db) if defined $db && $db ne '';
if (defined $qs) {
for my $pair (split /&/, $qs) {
my ($k, $v) = split /=/, $pair, 2;
$args{$k} //= _uri_unescape($v) if defined $k && defined $v;
}
}
} else {
die "EV::ClickHouse: invalid URI '$uri'\n";
}
}
my $loop = delete $args{loop} || EV::default_loop;
my $self = $class->_new($loop);
$self->on_error(exists $args{on_error} ? delete $args{on_error} : sub { die @_ });
$self->on_connect(delete $args{on_connect}) if exists $args{on_connect};
$self->on_progress(delete $args{on_progress}) if exists $args{on_progress};
$self->on_disconnect(delete $args{on_disconnect}) if exists $args{on_disconnect};
$self->on_trace(delete $args{on_trace}) if exists $args{on_trace};
my $host = delete $args{host} // '127.0.0.1';
my $port = delete $args{port};
my $protocol = delete $args{protocol} // 'http';
my $user = delete $args{user} // 'default';
my $password = delete $args{password} // '';
my $db_alias = delete $args{db};
my $database = delete $args{database} // $db_alias // 'default';
my $tls = delete $args{tls} // 0;
my $tls_ca_file = delete $args{tls_ca_file};
my $tls_skip_verify = delete $args{tls_skip_verify} // 0;
# options
my $compress = delete $args{compress} // 0;
my $session_id = delete $args{session_id};
my $connect_timeout = delete $args{connect_timeout};
my $query_timeout = delete $args{query_timeout};
my $auto_reconnect = delete $args{auto_reconnect} // 0;
my $keepalive = delete $args{keepalive} // 0;
my $reconnect_delay = delete $args{reconnect_delay} // 0;
my $reconnect_max_delay = delete $args{reconnect_max_delay} // 0;
# decode options (native protocol)
my $decode_datetime = delete $args{decode_datetime} // 0;
my $decode_decimal = delete $args{decode_decimal} // 0;
my $decode_enum = delete $args{decode_enum} // 0;
my $named_rows = delete $args{named_rows} // 0;
die "EV::ClickHouse: unknown protocol '$protocol' (expected 'http' or 'native')\n"
unless $protocol eq 'http' || $protocol eq 'native';
$port //= ($protocol eq 'native') ? 9000 : 8123;
$self->_set_protocol($protocol eq 'native' ? 1 : 0);
$self->_set_compress($compress) if $compress;
$self->_set_session_id($session_id) if defined $session_id;
$self->_set_connect_timeout($connect_timeout) if $connect_timeout;
$self->_set_query_timeout($query_timeout) if $query_timeout;
$self->_set_tls($tls) if $tls;
$self->_set_tls_ca_file($tls_ca_file) if defined $tls_ca_file;
$self->_set_tls_skip_verify($tls_skip_verify) if $tls_skip_verify;
$self->_set_auto_reconnect($auto_reconnect) if $auto_reconnect;
$self->_set_keepalive($keepalive) if $keepalive;
$self->_set_reconnect_delay($reconnect_delay) if $reconnect_delay;
$self->_set_reconnect_max_delay($reconnect_max_delay) if $reconnect_max_delay;
# compute decode_flags bitmask
my $decode_flags = 0;
$decode_flags |= 1 if $decode_datetime; # DECODE_DT_STR
$decode_flags |= 2 if $decode_decimal; # DECODE_DEC_SCALE
$decode_flags |= 4 if $decode_enum; # DECODE_ENUM_STR
$decode_flags |= 8 if $named_rows; # DECODE_NAMED_ROWS
$self->_set_decode_flags($decode_flags) if $decode_flags;
my $settings = delete $args{settings};
$self->_set_settings($settings) if $settings;
warn "EV::ClickHouse->new: unknown parameter(s): " . join(', ', sort keys %args) . "\n"
if %args;
$self->connect($host, $port, $user, $password, $database);
$self;
}
1;
__END__
=head1 NAME
EV::ClickHouse - Async ClickHouse client using EV
=head1 SYNOPSIS
use EV;
use EV::ClickHouse;
my $ch = EV::ClickHouse->new(
host => '127.0.0.1',
port => 8123,
protocol => 'http', # or 'native'
user => 'default',
password => '',
database => 'default',
settings => { max_threads => 4 }, # connection-level defaults
on_connect => sub { print "connected\n" },
on_error => sub { warn "error: $_[0]\n" },
);
# simple query
$ch->query("select * from system.one", sub {
my ($rows, $err) = @_;
if ($err) { warn $err; return }
for my $row (@$rows) {
print join(", ", @$row), "\n";
}
});
# query with per-query settings
$ch->query("select 1", { max_execution_time => 30 }, sub {
my ($rows, $err) = @_;
});
lib/EV/ClickHouse.pm view on Meta::CPAN
=over 4
=item on_connect => sub { }
Called when the connection is established.
=item on_error => sub { my ($message) = @_ }
Called on connection-level errors. Default: C<sub { die @_ }>.
=item on_progress => sub { my ($rows, $bytes, $total_rows, $written_rows, $written_bytes) = @_ }
Called on native protocol progress packets. Not fired for HTTP.
=item on_disconnect => sub { }
Called when the connection is closed (either by C<finish()>, server disconnect,
or error). Useful for reconnect logic or cleanup.
=item on_trace => sub { my ($message) = @_ }
Debug trace callback. Called with internal state machine messages
(e.g. query dispatch). Useful for debugging protocol issues.
=back
B<Options:>
=over 4
=item compress => 0 | 1
Enable compression. Default: C<0>.
=item session_id => $id
HTTP session ID for stateful operations.
=item connect_timeout => $seconds
Connection timeout in seconds.
=item query_timeout => $seconds
Default query timeout applied to all queries. Can be overridden per-query
via the C<query_timeout> key in the settings hashref.
=item auto_reconnect => 0 | 1
Automatically reconnect on connection loss. Default: C<0>.
When enabled, queued (unsent) queries are preserved across reconnects;
in-flight queries receive an error callback.
=item settings => \%hash
Connection-level ClickHouse settings applied to every query and insert.
Per-query settings (see L</query>, L</insert>) override these defaults.
settings => { async_insert => 1, max_threads => 4 }
=item keepalive => $seconds
Send periodic native protocol ping packets to keep the connection alive
during idle periods. Set to C<0> (default) to disable. Only effective
with the native protocol.
=item reconnect_delay => $seconds
Initial delay for reconnect backoff when C<auto_reconnect> is enabled.
The delay doubles after each failed attempt, up to C<reconnect_max_delay>.
Set to C<0> (default) for immediate reconnect (no backoff).
=item reconnect_max_delay => $seconds
Maximum reconnect delay. Default: C<0> (no cap).
=back
B<Decode options (native protocol only):>
These options control how column values are formatted when returned from
the native protocol. All are opt-in and default to C<0> (returning raw
numeric values for backward compatibility).
=over 4
=item decode_datetime => 0 | 1
Return C<Date>, C<Date32>, C<DateTime>, and C<DateTime64> columns as
formatted strings (e.g. C<"2024-01-15">, C<"2024-01-15 10:30:00">) instead
of raw integer values. Uses UTC by default; if the column has an explicit
timezone (e.g. C<DateTime('America/New_York')>), values are converted to
that timezone.
=item decode_decimal => 0 | 1
Return C<Decimal32>/C<Decimal64>/C<Decimal128> columns as scaled
floating-point numbers instead of unscaled integers.
=item decode_enum => 0 | 1
Return C<Enum8>/C<Enum16> columns as string labels instead of numeric codes.
=item named_rows => 0 | 1
Return each row as a hashref (keyed by column name) instead of an arrayref.
my $ch = EV::ClickHouse->new(named_rows => 1, ...);
$ch->query("SELECT 1 as n", sub {
my ($rows, $err) = @_;
print $rows->[0]{n}; # 1
});
=back
=head1 METHODS
=head2 query
$ch->query($sql, sub { my ($rows, $err) = @_ });
$ch->query($sql, \%settings, sub { my ($rows, $err) = @_ });
Executes a SQL query. For SELECT: callback receives C<($arrayref_of_arrayrefs)>.
lib/EV/ClickHouse.pm view on Meta::CPAN
=item C<query_timeout> â per-query timeout in seconds, overriding the
connection-level C<query_timeout>.
=item C<on_data> â native protocol only. A code ref called for each data
block as it arrives. Enables streaming: rows are delivered incrementally
and not accumulated.
$ch->query("SELECT * FROM big_table",
{ on_data => sub { my ($rows) = @_; process_batch($rows) } },
sub { my (undef, $err) = @_; ... } # final callback
);
=back
B<Native protocol type notes:> With the native protocol, column values
are returned as typed Perl scalars by default. C<Date> and C<DateTime>
columns return integer values (days since epoch and Unix timestamps);
enable C<decode_datetime> for formatted strings. C<Enum> columns return
numeric codes; enable C<decode_enum> for string labels. C<Decimal>
columns return unscaled integers; enable C<decode_decimal> for scaled
floats. C<SimpleAggregateFunction> columns are transparently decoded as
their inner type. C<Nested> columns are decoded as arrays of tuples.
C<LowCardinality> columns work across multi-block results with shared
dictionaries.
=head2 insert
$ch->insert($table, $data, sub { my (undef, $err) = @_ });
$ch->insert($table, $data, \%settings, sub { my (undef, $err) = @_ });
C<$data> can be either:
=over 4
=item * A string in TabSeparated format (tab-separated columns, newline-separated rows)
=item * An arrayref of arrayrefs: C<[ [$col1, $col2, ...], ... ]>
=back
When using arrayrefs, values are encoded directly without TSV escaping:
C<undef> maps to NULL, strings may contain tabs and newlines freely,
arrayrefs encode Array/Tuple columns, and hashrefs encode Map columns.
# TSV string (existing)
$ch->insert("my_table", "1\thello\n2\tworld\n", sub { ... });
# Arrayref (new) â no escaping needed
$ch->insert("my_table", [
[1, "hello\tworld"], # embedded tab
[2, undef], # NULL
[3, [10, 20]], # Array column
], sub { ... });
The optional C<\%settings> hashref works the same as in L</query>.
=head2 ping
$ch->ping(sub { my ($result, $err) = @_ });
Checks if the connection is alive. On success C<$result> is a true value
and C<$err> is undef. On error: C<(undef, $error_message)>.
=head2 finish
$ch->finish;
Disconnects. Cancels pending operations.
=head2 reset
$ch->reset;
Disconnects and reconnects using original parameters.
=head2 drain
$ch->drain(sub { ... });
Registers a callback to be invoked when all pending queries have completed.
If no queries are pending, the callback fires immediately (synchronously).
Useful for graceful shutdown: queue your final queries, then call C<drain>
with a callback that calls C<finish>.
$ch->query("SELECT 1", sub { ... });
$ch->query("SELECT 2", sub { ... });
$ch->drain(sub {
print "all done\n";
$ch->finish;
});
=head2 cancel
$ch->cancel;
Cancels the currently running query. For the native protocol, sends a
CLIENT_CANCEL packet. For HTTP, closes the connection. Pending callbacks
receive an error.
=head2 skip_pending
$ch->skip_pending;
Cancels all pending operations. Each pending callback is invoked
with C<(undef, $error_message)>. If a request is currently in flight,
the connection is closed (subsequent queries require reconnection
via C<reset>).
=head1 ACCESSORS
=over 4
=item is_connected
Returns true if the connection is established.
=item pending_count
Number of pending (queued + in-flight) operations.
=item server_info
( run in 1.004 second using v1.01-cache-2.11-cpan-39bf76dae61 )