EV-ClickHouse

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

    Core:
    - Async queries via HTTP protocol (port 8123)
    - Native TCP binary protocol (port 9000, revision 54459)
    - TLS/SSL support via OpenSSL (both protocols)
    - LZ4 compression for native protocol (with CityHash checksums)
    - Gzip compression for HTTP protocol (request and response)
    - HTTP chunked transfer encoding
    - Connection URI parsing (clickhouse[+native]://user:pass@host:port/db)
    - Per-query and connection-level ClickHouse settings
    - Parameterized queries via params => { name => value }
    - Keepalive ping timer for idle native connections
    - Auto-reconnect with exponential backoff (reconnect_delay, reconnect_max_delay)
    - Graceful drain callback (fires when all pending queries complete)
    - Query cancellation and skip_pending

    Query features:
    - TabSeparated format parser for HTTP responses
    - INSERT with TSV string or arrayref data (both protocols)
    - Raw query mode for HTTP (return unparsed response body)
    - on_data streaming callback for native protocol (per-block delivery)
    - on_progress callback for native protocol progress packets

ClickHouse.xs  view on Meta::CPAN

    SV *on_disconnect;
    int tls_skip_verify;
    double query_timeout;
    int auto_reconnect;
    uint32_t decode_flags;
    AV *native_col_names;   /* column names from last native result */
    AV *native_col_types;   /* column type strings from last native result */
    SV *on_drain;           /* callback fired when pending_count drops to 0 */
    char *last_query_id;    /* query_id of the last dispatched query */
    SV *on_trace;           /* debug trace callback */
    ev_timer ka_timer;      /* keepalive timer */
    double keepalive;       /* keepalive interval (0 = disabled) */
    int ka_timing;
    int callback_depth;
    /* error info from last SERVER_EXCEPTION or HTTP error */
    int32_t last_error_code;
    /* profile info from last SERVER_PROFILE_INFO */
    uint64_t profile_rows;
    uint64_t profile_bytes;
    uint64_t profile_rows_before_limit;
    /* totals / extremes from last native query */
    AV *native_totals;

ClickHouse.xs  view on Meta::CPAN

    SV          *on_data;        /* per-query streaming callback */
    double       query_timeout;  /* per-query timeout */
    char        *query_id;       /* query_id for tracking */
    ngx_queue_t  queue;
};

/* forward declarations */
static void io_cb(EV_P_ ev_io *w, int revents);
static void timer_cb(EV_P_ ev_timer *w, int revents);
static void ka_timer_cb(EV_P_ ev_timer *w, int revents);
static void start_keepalive(ev_clickhouse_t *self);
static void stop_keepalive(ev_clickhouse_t *self);
static void schedule_reconnect(ev_clickhouse_t *self);
static void lc_free_dicts(ev_clickhouse_t *self);
static void start_reading(ev_clickhouse_t *self);
static void stop_reading(ev_clickhouse_t *self);
static void start_writing(ev_clickhouse_t *self);
static void stop_writing(ev_clickhouse_t *self);
static void emit_error(ev_clickhouse_t *self, const char *msg);
static void emit_trace(ev_clickhouse_t *self, const char *fmt, ...);
static void cleanup_connection(ev_clickhouse_t *self);
static int  cancel_pending(ev_clickhouse_t *self, const char *errmsg);

ClickHouse.xs  view on Meta::CPAN

    return (inet_pton(AF_INET, s, &a4) == 1 ||
            inet_pton(AF_INET6, s, &a6) == 1);
}

static void cleanup_connection(ev_clickhouse_t *self) {
    int was_connected = self->connected;

    if (was_connected) emit_trace(self, "disconnect");
    stop_reading(self);
    stop_writing(self);
    stop_keepalive(self);
    if (self->timing) {
        ev_timer_stop(self->loop, &self->timer);
        self->timing = 0;
    }

#ifdef HAVE_OPENSSL
    if (self->ssl) {
        SSL_shutdown(self->ssl);
        SSL_free(self->ssl);
        self->ssl = NULL;

ClickHouse.xs  view on Meta::CPAN

    pos += snprintf(req + pos, req_cap - pos,
                    "Host: %s:%u\r\n", self->host, self->port);
    if (self->user) {
        pos += snprintf(req + pos, req_cap - pos,
                        "X-ClickHouse-User: %s\r\n", self->user);
    }
    if (self->password && self->password[0]) {
        pos += snprintf(req + pos, req_cap - pos,
                        "X-ClickHouse-Key: %s\r\n", self->password);
    }
    pos += snprintf(req + pos, req_cap - pos, "Connection: keep-alive\r\n");

    if (content_encoding)
        pos += snprintf(req + pos, req_cap - pos, "%s", content_encoding);

    if (self->compress)
        pos += snprintf(req + pos, req_cap - pos, "Accept-Encoding: gzip\r\n");

    pos += snprintf(req + pos, req_cap - pos,
                    "Content-Length: %lu\r\n\r\n", (unsigned long)body_len);

ClickHouse.xs  view on Meta::CPAN

    pos += snprintf(req + pos, req_cap - pos,
                    "Host: %s:%u\r\n", self->host, self->port);
    if (self->user) {
        pos += snprintf(req + pos, req_cap - pos,
                        "X-ClickHouse-User: %s\r\n", self->user);
    }
    if (self->password && self->password[0]) {
        pos += snprintf(req + pos, req_cap - pos,
                        "X-ClickHouse-Key: %s\r\n", self->password);
    }
    pos += snprintf(req + pos, req_cap - pos, "Connection: keep-alive\r\n");

    if (do_compress)
        pos += snprintf(req + pos, req_cap - pos, "Accept-Encoding: gzip\r\n");

    if (content_encoding)
        pos += snprintf(req + pos, req_cap - pos, "%s", content_encoding);

    pos += snprintf(req + pos, req_cap - pos,
                    "Content-Length: %lu\r\n\r\n", (unsigned long)body_len);

ClickHouse.xs  view on Meta::CPAN

/* Build HTTP GET /ping request */
static char* build_http_ping_request(ev_clickhouse_t *self, size_t *req_len) {
    char *req;
    size_t req_cap = 128 + (self->host ? strlen(self->host) : 0);
    size_t pos = 0;

    Newx(req, req_cap, char);
    pos = snprintf(req, req_cap,
                   "GET /ping HTTP/1.1\r\n"
                   "Host: %s:%u\r\n"
                   "Connection: keep-alive\r\n\r\n",
                   self->host, self->port);
    if (pos >= req_cap) pos = req_cap - 1;
    *req_len = pos;
    return req;
}

/* --- HTTP response parsing --- */

/* Find \r\n\r\n in recv_buf. Returns offset past it, or 0 if not found. */
static size_t find_header_end(const char *buf, size_t len) {

ClickHouse.xs  view on Meta::CPAN

        if (deliver_error(self, "query timeout")) return;

        /* Must reconnect — server may still be processing */
        if (cancel_pending(self, "query timeout")) return;
        cleanup_connection(self);
        if (self->auto_reconnect && self->host)
            schedule_reconnect(self);
    }
}

/* --- Keepalive timer callback --- */

static void ka_timer_cb(EV_P_ ev_timer *w, int revents) {
    ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
        offsetof(ev_clickhouse_t, ka_timer));
    (void)revents;

    if (self->magic != EV_CH_MAGIC) return;
    if (!self->connected || self->send_count > 0) return;

    /* Send a ping to keep the connection alive */
    if (self->protocol == PROTO_NATIVE) {
        native_buf_t pkt;
        nbuf_init(&pkt);
        nbuf_varuint(&pkt, CLIENT_PING);
        ensure_send_cap(self, self->send_len + pkt.len);
        Copy(pkt.data, self->send_buf + self->send_len, pkt.len, char);
        self->send_len += pkt.len;
        Safefree(pkt.data);
        if (!self->writing) start_writing(self);
    }
    /* HTTP: no-op ping — just rely on TCP keepalive or let the
     * connection drop and auto-reconnect handles it. */
}

static void start_keepalive(ev_clickhouse_t *self) {
    if (self->keepalive > 0 && !self->ka_timing && self->connected) {
        ev_timer_init(&self->ka_timer, ka_timer_cb, self->keepalive, self->keepalive);
        ev_timer_start(self->loop, &self->ka_timer);
        self->ka_timing = 1;
    }
}

static void stop_keepalive(ev_clickhouse_t *self) {
    if (self->ka_timing) {
        ev_timer_stop(self->loop, &self->ka_timer);
        self->ka_timing = 0;
    }
}

/* --- Reconnect with backoff --- */

static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
    ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -

ClickHouse.xs  view on Meta::CPAN

            if (SvTRUE(ERRSV))
                warn("EV::ClickHouse: drain callback died: %s",
                     SvPV_nolen(ERRSV));
            FREETMPS;
            LEAVE;
        }
        SvREFCNT_dec(drain_cb);
        if (check_destroyed(self)) return 1;
    }

    /* Restart keepalive timer when idle */
    if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0
        && self->keepalive > 0 && !self->ka_timing) {
        start_keepalive(self);
    }

    /* send next request from queue */
    if (!ngx_queue_empty(&self->send_queue)) {
        ngx_queue_t *q = ngx_queue_head(&self->send_queue);
        ev_ch_send_t *send = ngx_queue_data(q, ev_ch_send_t, queue);

        /* Stop keepalive during active query */
        stop_keepalive(self);
        emit_trace(self, "dispatch query (pending=%d)", self->pending_count);

        /* set up send buffer */
        ensure_send_cap(self, send->data_len);
        Copy(send->data, self->send_buf, send->data_len, char);
        self->send_len = send->data_len;
        self->send_pos = 0;

        /* move cb to recv queue */
        ngx_queue_remove(q);

ClickHouse.xs  view on Meta::CPAN

SV*
on_trace(EV::ClickHouse self, SV *handler = NULL)
CODE:
{
    RETVAL = handler_accessor(&self->on_trace, handler, items > 1);
}
OUTPUT:
    RETVAL

void
_set_keepalive(EV::ClickHouse self, double val)
CODE:
{
    self->keepalive = val;
}

void
_set_reconnect_delay(EV::ClickHouse self, double val)
CODE:
{
    self->reconnect_delay = val;
}

void

MANIFEST  view on Meta::CPAN

Changes
cityhash.h
ClickHouse.xs
cpanfile
eg/decode_options.pl
eg/drain.pl
eg/error_handling.pl
eg/insert.pl
eg/keepalive.pl
eg/native.pl
eg/native_compress.pl
eg/params.pl
eg/ping.pl
eg/query.pl
eg/queue.pl
eg/settings.pl
eg/streaming.pl
eg/totals.pl
eg/uri.pl

eg/keepalive.pl  view on Meta::CPAN

#!/usr/bin/env perl
# Keepalive — periodic pings keep idle connections alive
use strict;
use warnings;
use EV;
use EV::ClickHouse;

my $ch;
$ch = EV::ClickHouse->new(
    host      => $ENV{CLICKHOUSE_HOST} // '127.0.0.1',
    port      => $ENV{CLICKHOUSE_NATIVE_PORT} // 9000,
    protocol  => 'native',
    keepalive => 30,   # ping every 30 seconds when idle
    on_connect => sub {
        printf "Connected with keepalive=30s: %s\n", $ch->server_info;

        # Simulate idle time, then query
        my $t; $t = EV::timer(2, 0, sub {
            undef $t;
            printf "Still connected after 2s idle: %s\n",
                $ch->is_connected ? "yes" : "no";
            $ch->query("SELECT 1", sub {
                my ($rows, $err) = @_;
                printf "Query after idle: %s\n", $err // "ok";
                $ch->finish;

lib/EV/ClickHouse.pm  view on Meta::CPAN

    my $tls      = delete $args{tls}      // 0;
    my $tls_ca_file    = delete $args{tls_ca_file};
    my $tls_skip_verify = delete $args{tls_skip_verify} // 0;

    # options
    my $compress        = delete $args{compress}        // 0;
    my $session_id      = delete $args{session_id};
    my $connect_timeout = delete $args{connect_timeout};
    my $query_timeout   = delete $args{query_timeout};
    my $auto_reconnect    = delete $args{auto_reconnect}    // 0;
    my $keepalive         = delete $args{keepalive}          // 0;
    my $reconnect_delay   = delete $args{reconnect_delay}    // 0;
    my $reconnect_max_delay = delete $args{reconnect_max_delay} // 0;

    # decode options (native protocol)
    my $decode_datetime = delete $args{decode_datetime}  // 0;
    my $decode_decimal  = delete $args{decode_decimal}   // 0;
    my $decode_enum     = delete $args{decode_enum}      // 0;
    my $named_rows      = delete $args{named_rows}       // 0;

    die "EV::ClickHouse: unknown protocol '$protocol' (expected 'http' or 'native')\n"

lib/EV/ClickHouse.pm  view on Meta::CPAN


    $self->_set_protocol($protocol eq 'native' ? 1 : 0);
    $self->_set_compress($compress)            if $compress;
    $self->_set_session_id($session_id)        if defined $session_id;
    $self->_set_connect_timeout($connect_timeout) if $connect_timeout;
    $self->_set_query_timeout($query_timeout)  if $query_timeout;
    $self->_set_tls($tls)                      if $tls;
    $self->_set_tls_ca_file($tls_ca_file)      if defined $tls_ca_file;
    $self->_set_tls_skip_verify($tls_skip_verify) if $tls_skip_verify;
    $self->_set_auto_reconnect($auto_reconnect) if $auto_reconnect;
    $self->_set_keepalive($keepalive)          if $keepalive;
    $self->_set_reconnect_delay($reconnect_delay) if $reconnect_delay;
    $self->_set_reconnect_max_delay($reconnect_max_delay) if $reconnect_max_delay;

    # compute decode_flags bitmask
    my $decode_flags = 0;
    $decode_flags |= 1 if $decode_datetime;  # DECODE_DT_STR
    $decode_flags |= 2 if $decode_decimal;   # DECODE_DEC_SCALE
    $decode_flags |= 4 if $decode_enum;      # DECODE_ENUM_STR
    $decode_flags |= 8 if $named_rows;       # DECODE_NAMED_ROWS
    $self->_set_decode_flags($decode_flags)   if $decode_flags;

lib/EV/ClickHouse.pm  view on Meta::CPAN

When enabled, queued (unsent) queries are preserved across reconnects;
in-flight queries receive an error callback.

=item settings => \%hash

Connection-level ClickHouse settings applied to every query and insert.
Per-query settings (see L</query>, L</insert>) override these defaults.

    settings => { async_insert => 1, max_threads => 4 }

=item keepalive => $seconds

Send periodic native protocol ping packets to keep the connection alive
during idle periods. Set to C<0> (default) to disable. Only effective
with the native protocol.

=item reconnect_delay => $seconds

Initial delay for reconnect backoff when C<auto_reconnect> is enabled.
The delay doubles after each failed attempt, up to C<reconnect_max_delay>.
Set to C<0> (default) for immediate reconnect (no backoff).

=item reconnect_max_delay => $seconds

lib/EV/ClickHouse.pm  view on Meta::CPAN

        [2, undef],               # NULL
        [3, [10, 20]],            # Array column
    ], sub { ... });

The optional C<\%settings> hashref works the same as in L</query>.

=head2 ping

    $ch->ping(sub { my ($result, $err) = @_ });

Checks if the connection is alive. On success C<$result> is a true value
and C<$err> is undef.  On error: C<(undef, $error_message)>.

=head2 finish

    $ch->finish;

Disconnects. Cancels pending operations.

=head2 reset

t/13_params_uri.t  view on Meta::CPAN

        });
        # Also test UInt256
        $ch->query("SELECT toUInt256('99999999999999999999') as big", sub {
            my ($rows, $err) = @_;
            is($rows->[0][0], '99999999999999999999', 'UInt256: correct value');
        });
        $ch->drain(sub { EV::break });
    },
);

# Test 20-22: Keepalive (just verify it doesn't crash and connection stays alive)
SKIP: {
    skip "Native port not reachable", 3 unless $nat_ok;
    my ($ka_ch, $ka_wait);
    $ka_ch = EV::ClickHouse->new(
        host       => $host,
        port       => $nat_port,
        protocol   => 'native',
        keepalive  => 1,
        on_connect => sub {
            ok(1, 'keepalive: connected');
            # Wait a bit, then query
            $ka_wait = EV::timer(0.5, 0, sub {
                ok($ka_ch->is_connected, 'keepalive: still connected');
                $ka_ch->query("SELECT 1", sub {
                    my ($rows, $err) = @_;
                    ok(!$err, 'keepalive: query after wait ok');
                    EV::break;
                });
            });
        },
        on_error => sub { diag("KA error: $_[0]"); EV::break },
    );
    my $timeout = EV::timer(10, 0, sub { EV::break });
    EV::run;
    $ka_ch->finish if $ka_ch && $ka_ch->is_connected;
}



( run in 0.927 second using v1.01-cache-2.11-cpan-39bf76dae61 )