EV-ClickHouse

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

0.03  2026-05-21
    - Multi-host failover and connection Pool (circuit breaker, hedged
      queries, fan-out, sessions); async DNS; TLS mutual auth.
    - retry, wait_mutation, native external tables, idempotent and
      async/aggregated insert, Streamer backpressure.
    - More introspection helpers, the EV::ClickHouse::Error class, and
      new types (BFloat16, Decimal256, Interval, Geo, JSON).
    - Reliability and memory-safety fixes; docs and tests.

0.02  2026-04-30
    - Bug fixes: keepalive PING/PONG routing; last_error_code reports
      top-level code; IPv6 URI literals; skip_pending teardown.
    - HTTP arrayref insert() croaks on nested refs.
    - POD rewrite; expanded t/, xt/, eg/.

0.01  2026-03-15
    - Initial release.
    - HTTP (8123) + native (9000, rev 54459) protocols.
    - TLS via OpenSSL; LZ4 native compression; gzip HTTP compression.
    - URI parsing; per-query + connection settings; parameterized
      queries; auto-reconnect; drain / cancel / skip_pending.

ClickHouse.xs  view on Meta::CPAN

    size_t max_recv_buffer;         /* 0 = unlimited; defensive recv ceiling */
    int http_basic_auth;            /* 0=X-ClickHouse-{User,Key} (default);
                                     * 1=Authorization: Basic ... (for proxies) */
    int auto_reconnect;
    uint32_t decode_flags;
    AV *native_col_names;   /* column names from last native result */
    AV *native_col_types;   /* column type strings from last native result */
    SV *on_drain;           /* callback fired when pending_count drops to 0 */
    char *last_query_id;    /* query_id of the last dispatched query */
    SV *on_trace;           /* debug trace callback */
    ev_timer ka_timer;      /* keepalive timer */
    double keepalive;       /* keepalive interval (0 = disabled) */
    int ka_timing;
    unsigned int ka_in_flight; /* keepalive pings sent but not yet ack'd */
    int callback_depth;
    /* error info from last SERVER_EXCEPTION or HTTP error */
    int32_t last_error_code;
    /* profile info from last SERVER_PROFILE_INFO */
    uint64_t profile_rows;
    uint64_t profile_bytes;
    uint64_t profile_rows_before_limit;
    /* totals / extremes from last native query */
    AV *native_totals;
    AV *native_extremes;

ClickHouse.xs  view on Meta::CPAN

    SV          *on_complete;    /* per-query on_query_complete override */
    double       query_timeout;  /* per-query timeout */
    char        *query_id;       /* query_id for tracking */
    ngx_queue_t  queue;
};

/* Forward declarations for helpers defined further down (or in xs/io.c)
 * but called from earlier code in this file or from xs/*.c included
 * before the definition site. */
static void timer_cb(EV_P_ ev_timer *w, int revents);
static void stop_keepalive(ev_clickhouse_t *self);
static void schedule_reconnect(ev_clickhouse_t *self);
static void lc_free_dicts(ev_clickhouse_t *self);
static void start_reading(ev_clickhouse_t *self);
static void stop_reading(ev_clickhouse_t *self);
static void start_writing(ev_clickhouse_t *self);
static void stop_writing(ev_clickhouse_t *self);
static void emit_error(ev_clickhouse_t *self, const char *msg);
static void emit_trace(ev_clickhouse_t *self, const char *fmt, ...);
static int cleanup_connection(ev_clickhouse_t *self);
static int  cancel_pending(ev_clickhouse_t *self, const char *errmsg);

ClickHouse.xs  view on Meta::CPAN

static int check_destroyed(ev_clickhouse_t *self) {
    if (self->magic == EV_CH_FREED && self->callback_depth == 0) {
        Safefree(self);
        return 1;
    }
    return 0;
}

/* Free the per-connection failover host list (allocated by setter). */
/* Free just the host-list arrays. Called from _set_failover before
 * re-populating + from failover_free below. Keeps on_failover alive. */
static void failover_free_hosts(ev_clickhouse_t *self) {
    if (self->failover_hosts) {
        for (int i = 0; i < self->failover_n; i++)
            if (self->failover_hosts[i]) Safefree(self->failover_hosts[i]);
        Safefree(self->failover_hosts);
        self->failover_hosts = NULL;
    }
    if (self->failover_ports) {
        Safefree(self->failover_ports);
        self->failover_ports = NULL;

ClickHouse.xs  view on Meta::CPAN

        FREETMPS; LEAVE;
    }
    self->callback_depth--;
    /* Reset so a subsequent fire for a never-dispatched cancelled
     * query (e.g. from cancel_pending draining send_queue) sees
     * query_start_time == 0 and reports dur = 0.0, not a stale
     * duration carried over from the previous in-flight query. */
    self->query_start_time = 0;
}

/* IS_KEEPALIVE_CB is defined in xs/macros.h; keepalive_noop_cb is the
 * sentinel that backs it (declared below this comment). */

static int deliver_error(ev_clickhouse_t *self, const char *errmsg) {
    SV *oqc = NULL;
    SV *cb = pop_cb_ex(self, &oqc);
    if (cb == NULL) {
        fire_on_query_complete_ex(self, errmsg, oqc);
        if (oqc) SvREFCNT_dec(oqc);
        return check_destroyed(self);
    }

ClickHouse.xs  view on Meta::CPAN

}

/* Tears down the socket + per-connection state, then fires on_disconnect.
 * Returns 1 if on_disconnect freed self (caller must not touch self), else 0. */
static int cleanup_connection(ev_clickhouse_t *self) {
    int was_connected = self->connected;

    if (was_connected) emit_trace(self, "disconnect");
    stop_reading(self);
    stop_writing(self);
    stop_keepalive(self);
    stop_timing(self);

#ifdef HAVE_OPENSSL
    if (self->ssl) {
        SSL_shutdown(self->ssl);
        SSL_free(self->ssl);
        self->ssl = NULL;
    }
    if (self->ssl_ctx) {
        SSL_CTX_free(self->ssl_ctx);

ClickHouse.xs  view on Meta::CPAN

     * don't touch a freed self. */
    if (was_connected && self->on_disconnect)
        return fire_zero_arg_cb(self, self->on_disconnect, "disconnect");
    return 0;
}

/* Fire user error callback + on_query_complete (per-query override or
 * connection-level — fire_on_query_complete_ex falls back). Honors the
 * documented "fires after every query (success or error)" contract for
 * cancelled queries. oqc is consumed (refcount-dec'd) here.
 * HTTP keepalive PINGs are suppressed to match the success-path behavior:
 * users instrumenting via on_query_complete shouldn't see spurious zero-
 * row completions for pings they didn't initiate. */
static void fire_err_and_complete(ev_clickhouse_t *self, SV *cb,
                                   const char *errmsg, SV *oqc) {
    /* Fire on_query_complete BEFORE the user cb to match the order
     * used by deliver_error / deliver_rows on the normal path —
     * instrumentation observers expect the global hook to run first.
     * Keepalive PINGs are suppressed to match the success-path
     * behavior so observers don't see spurious zero-row completions. */
    if (!IS_KEEPALIVE_CB(cb))
        fire_on_query_complete_ex(self, errmsg, oqc);
    if (oqc) SvREFCNT_dec(oqc);
    /* A user error callback may drop the last ref to $ch (DESTROY runs
     * deferred while callback_depth > 0). invoke_err_cb itself is safe
     * either way — caller's outer magic check handles the next loop. */
    invoke_err_cb(cb, errmsg);
}

ClickHouse.xs  view on Meta::CPAN

#include "xs/io.c"

/* --- XS interface --- */

MODULE = EV::ClickHouse  PACKAGE = EV::ClickHouse

BOOT:
{
    I_EV_API("EV::ClickHouse");
    ch_openssl_init();
    /* Permanent no-op CV used for internal callbacks (HTTP keepalive ping). */
    keepalive_noop_cb = newRV_inc((SV*)get_cv("EV::ClickHouse::__keepalive_noop", GV_ADD));
    /* Per-process rand() seed so reconnect_jitter desynchronises forks
     * (otherwise every worker generates the same sequence and the
     * jitter is uniform across the herd, defeating its purpose). */
    srand((unsigned)time(NULL) ^ (unsigned)getpid());
}

EV::ClickHouse
_new(char *class, EV::Loop loop)
CODE:
{

ClickHouse.xs  view on Meta::CPAN


void
DESTROY(EV::ClickHouse self)
CODE:
{
    if (self->magic != EV_CH_MAGIC) return;

    stop_reading(self);
    stop_writing(self);
    stop_timing(self);
    stop_keepalive(self);
    if (self->reconnect_timing) {
        ev_timer_stop(self->loop, &self->reconnect_timer);
        self->reconnect_timing = 0;
    }

    if (PL_dirty) {
        self->magic = EV_CH_FREED;
        while (!ngx_queue_empty(&self->send_queue)) {
            ngx_queue_t *q = ngx_queue_head(&self->send_queue);
            ev_ch_send_t *send = ngx_queue_data(q, ev_ch_send_t, queue);

ClickHouse.xs  view on Meta::CPAN

    if (cancel_pending(self, "object destroyed"))
        return;  /* inner DESTROY already freed self */

    /* A user callback fired from cancel_pending may have called
     * $ch->reset() which re-arms watchers / opens a fresh fd.
     * Stop everything again before tearing the struct down so the
     * EV loop can't dispatch into freed memory. */
    stop_reading(self);
    stop_writing(self);
    stop_timing(self);
    stop_keepalive(self);
    if (self->reconnect_timing) {
        ev_timer_stop(self->loop, &self->reconnect_timer);
        self->reconnect_timing = 0;
    }

  #ifdef HAVE_OPENSSL
    if (self->ssl) {
        SSL_shutdown(self->ssl);
        SSL_free(self->ssl);
        self->ssl = NULL;

ClickHouse.xs  view on Meta::CPAN

SV*
on_trace(EV::ClickHouse self, SV *handler = NULL)
CODE:
{
    RETVAL = handler_accessor(&self->on_trace, handler, items > 1);
}
OUTPUT:
    RETVAL

void
_set_keepalive(EV::ClickHouse self, double val)
CODE:
{
    self->keepalive = val;
}

void
_set_reconnect_delay(EV::ClickHouse self, double val)
CODE:
{
    self->reconnect_delay = val;
}

void

MANIFEST  view on Meta::CPAN

eg/graceful_shutdown.pl
eg/health_dashboard.pl
eg/health_probe.pl
eg/hedged_pool.pl
eg/idempotent_insert.pl
eg/insert.pl
eg/insert_streaming.pl
eg/ipv6.pl
eg/iterate.pl
eg/json.pl
eg/keepalive.pl
eg/log_tail.pl
eg/migration_runner.pl
eg/named_rows.pl
eg/native.pl
eg/native_compress.pl
eg/on_progress.pl
eg/params.pl
eg/ping.pl
eg/pool.pl
eg/query.pl

README.md  view on Meta::CPAN

- gzip compression (HTTP) and LZ4 compression with CityHash
checksums (native)
- TLS/SSL via OpenSSL, with optional `tls_skip_verify` for
self-signed certs and `tls_ca_file` for additional roots
- Connection URIs (`clickhouse[+native]://user:pass@host:port/db`),
including bracketed IPv6 literals
- Per-query and connection-level ClickHouse settings; parameterized
queries via `params`; external tables (native) via `external`
- Auto-reconnect with exponential backoff; queued (unsent) queries
are preserved across reconnects
- Keepalive pings for idle native connections; graceful drain;
query cancellation and skip\_pending
- Streaming results via `on_data` per-block callback (native);
on\_progress for native progress packets
- Raw HTTP response mode for CSV / JSONEachRow / Parquet / etc.
- 35+ ClickHouse types including Int/UInt 8..256, Float32/64,
BFloat16, Decimal32/64/128/256, UUID, IPv4/IPv6, Nullable, Array,
Tuple, Map, LowCardinality (with cross-block dictionaries),
SimpleAggregateFunction, Nested, Geo (Point/Ring/LineString/Polygon
and the Multi variants), and JSON / Object('json') with auto-flattened
hashref leaves (Int64/Float64/Bool/String + Array variants).

README.md  view on Meta::CPAN

    misconfigured peer or client-side bug that retry would only loop on.
    Combine with `reconnect_max_attempts` for an explicit ceiling.

- settings => \\%hash

    ClickHouse settings applied to every query and insert. Per-call settings
    (see ["query"](#query), ["insert"](#insert)) override these.

        settings => { async_insert => 1, max_threads => 4 }

- keepalive => $seconds

    Send a keepalive request every N seconds while the connection is idle:
    a native CLIENT\_PING on the native protocol or a `GET /ping` on HTTP
    (some load balancers / NATs drop idle HTTP connections after a few
    seconds; TCP-level keepalive is too coarse). Default: `0` (disabled).

- reconnect\_delay => $seconds

    Initial delay for the `auto_reconnect` exponential backoff. Each failed
    attempt doubles the delay, capped at `reconnect_max_delay`. Default:
    `0` (immediate retry, no backoff).

- reconnect\_max\_delay => $seconds

    Backoff ceiling. Default: `0`, meaning no explicit cap; the implementation

README.md  view on Meta::CPAN

- `async_insert => 1`

    Enables ClickHouse server-side insert batching by setting
    `async_insert=1, wait_for_async_insert=0`. Both sub-settings can be
    overridden by passing them explicitly.

## ping

    $ch->ping(sub { my ($result, $err) = @_ });

Send a no-op round trip to verify the connection is alive. On success
`$result` is true, `$err` is `undef`. On error: `(undef, $error)`.

## is\_healthy

    $ch->is_healthy(sub { my ($ok, $err) = @_ });
    $ch->is_healthy(sub { ... }, $timeout_seconds);

Bounded health probe: wraps ["ping"](#ping) with a deadline (default 5s). The
callback receives `(1, undef)` on a successful round trip, or
`(0, $msg)` on ping error or timeout. Failure does **not** tear down the

README.md  view on Meta::CPAN


    $ch->on_query_start(sub {
        my ($query_id) = @_;
        log_metric_start($query_id);
    });

Optional connection-level hook that fires the moment a query is
dispatched to the wire (after the query\_id has been resolved, before
the first send byte). Symmetric with ["on\_query\_complete"](#on_query_complete); useful for
deriving accurate "query in flight" durations without depending on
the per-query callback closure. Keepalive PINGs are suppressed, the
same as for `on_query_complete`. Also accepted as a constructor
argument.

## on\_query\_complete

    $ch->on_query_complete(sub {
        my ($query_id, $rows, $bytes, $error_code, $duration_s, $err) = @_;
        log_metric(...);
    });

README.md  view on Meta::CPAN

aggregated - the callback always fires with a complete list. Pass
`settings => \%h` for per-query options.

**Circuit breaker:** pass `circuit_threshold => N` at construction
to enable per-member fail-fast. After N consecutive query/insert/ping
errors on a given member, that member is excluded from `_pick` for
`circuit_cooldown` seconds (default 30). A successful callback resets
the per-member fail counter. If every member is dead at pick time the
breaker is bypassed so the next attempt still has a chance to recover.
Inspect with `$pool->circuit_state` which returns one
`{ fails => N, dead_until => $epoch, alive => 0|1 }`
hashref per member.

**Graceful shutdown:** `$pool->shutdown($grace_seconds, $cb)`
drains every member, then calls `finish` on each. If `$grace_seconds`
elapses before every member drains, members still in flight are
force-finished and `$cb` receives the string
`"Pool::shutdown timed out after Ns"`. On a clean shutdown `$cb`
receives undef. `$grace_seconds` may be 0 (or undef) to wait
indefinitely. The callback fires exactly once.

README.md  view on Meta::CPAN


    Enables LZ4 (native) or gzip (HTTP). LZ4 cost is small and saves ~50-70%
    on text-heavy columns. Gzip is heavier; turn on only if you're bandwidth-bound.

- `insert_streamer` batch\_size

    Default 10\_000 is a good baseline. Smaller (1k-2k) reduces memory pressure
    on the producer; larger (50k-100k) reduces server-side merge cost on
    MergeTree. Match to your row width: ~1 MB per batch is a sweet spot.

- `keepalive`

    Enable on long-lived idle connections (HTTP behind a load balancer or
    NAT, or a native connection that may sit minutes between queries). 15-30s
    is typical.

- `reconnect_max_attempts`

    Always set in production. Default is unlimited; a permanent failure
    (wrong host, wrong port, dead server) will spin `on_error` forever
    otherwise.

eg/cancel_streaming.pl  view on Meta::CPAN

#!/usr/bin/env perl
# Cancel a streaming select mid-flight from inside the on_data callback,
# once a condition is met. The connection stays alive for follow-up queries.
use strict;
use warnings;
use EV;
use EV::ClickHouse;

my $ch;
my $blocks_seen = 0;
$ch = EV::ClickHouse->new(
    host       => $ENV{CLICKHOUSE_HOST} // '127.0.0.1',
    port       => $ENV{CLICKHOUSE_NATIVE_PORT} // 9000,

eg/cancel_streaming.pl  view on Meta::CPAN

                $blocks_seen++;
                printf "block %d: %d rows\n", $blocks_seen, scalar @$rows;
                # Stop after we've seen a couple of blocks
                $ch->cancel if $blocks_seen >= 2;
            } },
            sub {
                # Native cancel doesn't raise — the callback simply fires
                # once the server acks with EndOfStream.
                print "Cancelled after $blocks_seen blocks\n";

                # The connection is still alive. Run a normal query.
                $ch->query("select 1 + 1", sub {
                    my ($r) = @_;
                    print "Follow-up query: ", $r->[0][0], "\n";
                    EV::break;
                });
            },
        );
    },
    on_error => sub { die "Error: $_[0]\n" },
);

eg/circuit_breaker.pl  view on Meta::CPAN

#!/usr/bin/env perl
# Pool circuit breaker: after `circuit_threshold` consecutive query
# errors on a member, the Pool marks it dead for `circuit_cooldown`
# seconds. Subsequent _pick()s skip dead members; if all are dead,
# the breaker is bypassed so recovery attempts still go through.
#
# Inspect state via $pool->circuit_state - returns a list of
# { fails => N, dead_until => $epoch, alive => 0|1 } per member.
#
# This demo connects to a real server (queries must actually be
# dispatched to reach the breaker observer) and repeatedly runs a
# query that fails server-side. After `circuit_threshold` errors
# each member trips; once all members are dead the breaker is
# bypassed so recovery attempts still go through.
use strict;
use warnings;
use EV;
use EV::ClickHouse;

eg/circuit_breaker.pl  view on Meta::CPAN

my $issued = 0;
my $w = EV::timer(0, 0.5, sub {
    # Fire a server-side error - reaches the breaker via the async path.
    for (1 .. 3) {
        $pool->query("select * from no_such_db_$$.no_such_table_$$",
            sub { });   # errors ignored - we just want the breaker hits
    }
    $issued += 3;
    my @s = $pool->circuit_state;
    for my $i (0 .. $#s) {
        printf STDERR "[breaker] member %d  fails=%d  alive=%s\n",
                      $i, $s[$i]{fails}, ($s[$i]{alive} ? 'yes' : 'no');
    }
    EV::break if $issued >= 30;
});

EV::run;
$pool->finish;

eg/health_dashboard.pl  view on Meta::CPAN


my $probe = EV::timer(0, 5, sub {
    for my $i (0 .. $#conns) {
        $conns[$i]->ping_round_trip(sub {
            my ($s, $err) = @_;
            $rtt[$i] = $err ? undef : $s;
        });
    }
});

# Tiny HTTP server. Single-shot, no keep-alive, no streaming - just
# enough to demonstrate the JSON shape.
my $listener = IO::Socket::INET->new(
    Listen => 16, LocalAddr => '0.0.0.0', LocalPort => $dash_port,
    ReuseAddr => 1, Blocking => 0,
) or die "listen $dash_port: $!";

my $accept_io = EV::io($listener->fileno, EV::READ, sub {
    while (my $cli = $listener->accept) {
        $cli->blocking(0);
        my $buf = '';

eg/hedged_pool.pl  view on Meta::CPAN

                   $latency * 1000, $winner, $rows->[0][1];
        },
    );
});

# Report breaker + win distribution every 5s.
my $report = EV::timer(5, 5, sub {
    print "--- circuit state + wins ---\n";
    my @st = $pool->circuit_state;
    for my $i (0 .. $#st) {
        printf "  member %d: fails=%d alive=%d wins=%d\n",
               $i, $st[$i]{fails}, $st[$i]{alive}, $wins[$i];
    }
});

# Graceful drain on Ctrl-C.
my $stop = EV::signal('INT', sub {
    undef $issue; undef $report;
    print "draining\n";
    $pool->shutdown(5, sub { print "done\n"; EV::break });
});

eg/keepalive.pl  view on Meta::CPAN

#!/usr/bin/env perl
# Keepalive — periodic pings keep idle connections alive
use strict;
use warnings;
use EV;
use EV::ClickHouse;

my $ch;
$ch = EV::ClickHouse->new(
    host      => $ENV{CLICKHOUSE_HOST} // $ENV{TEST_CLICKHOUSE_HOST} // '127.0.0.1',
    port      => $ENV{CLICKHOUSE_NATIVE_PORT} // $ENV{TEST_CLICKHOUSE_NATIVE_PORT} // 9000,
    protocol  => 'native',
    keepalive => 30,   # ping every 30 seconds when idle
    on_connect => sub {
        printf "Connected with keepalive=30s: %s\n", $ch->server_info;

        # Simulate idle time, then query
        my $t; $t = EV::timer(2, 0, sub {
            undef $t;
            printf "Still connected after 2s idle: %s\n",
                $ch->is_connected ? "yes" : "no";
            $ch->query("select 1", sub {
                my ($rows, $err) = @_;
                printf "Query after idle: %s\n", $err // "ok";
                $ch->finish;

lib/EV/ClickHouse.pm  view on Meta::CPAN


    $port //= ($protocol eq 'native') ? 9000 : 8123;

    $self->_set_protocol($protocol eq 'native' ? 1 : 0);

    # Pass-through setters. Skip only when the key was absent — explicit
    # 0/'' must reach the setter so e.g. `compress => 0` is honored, not
    # ignored. (Use `exists` rather than `defined` so a deliberate undef
    # is also passed through and rejected by the setter if invalid.)
    for my $opt (qw(compress tls tls_skip_verify auto_reconnect
                    keepalive reconnect_delay reconnect_max_delay
                    reconnect_jitter reconnect_max_attempts
                    progress_period http_basic_auth
                    connect_timeout query_timeout
                    max_query_size max_recv_buffer)) {
        next unless exists $args{$opt};
        my $val = delete $args{$opt};
        my $setter = "_set_$opt";
        $self->$setter($val);
    }
    for my $opt (qw(session_id tls_ca_file tls_cert_file tls_key_file)) {

lib/EV/ClickHouse.pm  view on Meta::CPAN

        && eval { require EV::cares; 1 }) {
        # Stash the resolver in %_failover, keyed by refaddr of the
        # resolver itself (NOT of $self). Two reasons:
        #   - never delete the resolver from inside its own callback —
        #     ares_destroy from a c-ares cb corrupts the channel heap.
        #     We defer the delete via EV::timer(0,...) so it runs from
        #     a clean stack frame.
        #   - keying by refaddr($self) was racy: A's deferred-delete
        #     could fire after A's struct was freed and B got the same
        #     refaddr, dropping B's resolver. refaddr($r) is unique
        #     while $r is alive in %_failover.
        my $r = EV::cares->new;
        my $key = refaddr($r);
        $_failover{$key} = $r;
        my $weak2 = $self; weaken $weak2;
        $self->_set_dns_pending(1);
        $r->resolve($host, sub {
            my ($status, @addrs) = @_;
            my $w; $w = EV::timer(0, 0, sub { undef $w; delete $_failover{$key} });
            # Skip if the connection has been DESTROYed or the user
            # finished it while DNS was in flight (cleanup_connection

lib/EV/ClickHouse.pm  view on Meta::CPAN

                if (defined $dur && $dur >= 0) {
                    if (@{ $ring->{buf} } < $ring->{size}) {
                        push @{ $ring->{buf} }, $dur;
                    } else {
                        $ring->{buf}[ $ring->{pos} ] = $dur;
                        $ring->{pos} = ($ring->{pos} + 1) % $ring->{size};
                    }
                }
            }
            $prev->(@_) if $prev;
            $guard;     # keep the guard alive for the closure's lifetime
        });
    } else {
        # Resize: linearize chronological order (oldest at $st->{pos}
        # once the ring is full), keep the newest min(N, $size) samples.
        # Plain shift would drop by physical index instead of by age.
        my $buf = $st->{buf};
        if (@$buf >= $st->{size}) {
            # Ring full: chronological is buf[pos..end] then buf[0..pos-1].
            my $pos = $st->{pos} % $st->{size};
            @$buf = (@{$buf}[$pos .. $#$buf], @{$buf}[0 .. $pos - 1]);

lib/EV/ClickHouse.pm  view on Meta::CPAN

    die "hedged_query: unknown options: " . join(', ', sort keys %opts)
        if %opts;
    my @c = @{ $self->{conns} };
    die "hedged_query: no members" unless @c;
    # Filter out circuit-broken members. If the breaker tripped everywhere
    # fall back to the full set so the caller still hears something — but
    # in that fallback skip _cb_observer too, otherwise every failed hedge
    # extends each member's dead_until (resetting cooldown indefinitely
    # under load).
    my $now = EV::time();
    my @alive = grep { $self->{cb_state}[$_]{dead_until} <= $now } 0 .. $#c;
    my $all_dead = !@alive;
    @alive = (0 .. $#c) if $all_dead;
    $hedge_n = @alive if $hedge_n > @alive;
    $hedge_n = 1      if $hedge_n < 1;
    # Reservoir-style shuffle for distinct random picks.
    my @pool = @alive;
    my @idx;
    while (@idx < $hedge_n) { push @idx, splice(@pool, int(rand(scalar @pool)), 1) }
    my $fired   = 0;
    my $pending = scalar @idx;
    my $first_err;
    for my $i (@idx) {
        my $ch = $c[$i];
        my $inner = sub {
            my ($rows, $err) = @_;
            $pending--;

lib/EV/ClickHouse.pm  view on Meta::CPAN

        # Pass $i (not $ch) so _cb_observer can index cb_state directly
        # instead of walking conns via _slot_for.
        my $obs = $self->_cb_observer($i, $inner, !$all_dead);
        if ($settings) { $ch->query($sql, $settings, $obs) }
        else           { $ch->query($sql, $obs) }
    }
    return;
}

# Circuit breaker introspection: per-member state for monitoring.
# Returns ({ fails => N, dead_until => $epoch_seconds, alive => 0|1 }, ...).
sub circuit_state {
    my $self = shift;
    my $now  = EV::time();
    map +{ %$_, alive => $_->{dead_until} <= $now },
        @{ $self->{cb_state} };
}

# Aggregate stats
sub size           { scalar @{ $_[0]{conns} } }
sub pending_count  { my $t = 0; $t += $_->pending_count for @{ $_[0]{conns} }; $t }
sub conns          { @{ $_[0]{conns} } }

# Apply a code ref to every pool member. The callback receives
# ($conn, $idx) per call. Useful for warm-up (preload dictionaries,

lib/EV/ClickHouse.pm  view on Meta::CPAN


=item * Connection URIs (C<clickhouse[+native]://user:pass@host:port/db>),
including bracketed IPv6 literals

=item * Per-query and connection-level ClickHouse settings; parameterized
queries via C<params>; external tables (native) via C<external>

=item * Auto-reconnect with exponential backoff; queued (unsent) queries
are preserved across reconnects

=item * Keepalive pings for idle native connections; graceful drain;
query cancellation and skip_pending

=item * Streaming results via C<on_data> per-block callback (native);
on_progress for native progress packets

=item * Raw HTTP response mode for CSV / JSONEachRow / Parquet / etc.

=item * 35+ ClickHouse types including Int/UInt 8..256, Float32/64,
BFloat16, Decimal32/64/128/256, UUID, IPv4/IPv6, Nullable, Array,
Tuple, Map, LowCardinality (with cross-block dictionaries),

lib/EV/ClickHouse.pm  view on Meta::CPAN

misconfigured peer or client-side bug that retry would only loop on.
Combine with C<reconnect_max_attempts> for an explicit ceiling.

=item settings => \%hash

ClickHouse settings applied to every query and insert. Per-call settings
(see L</query>, L</insert>) override these.

    settings => { async_insert => 1, max_threads => 4 }

=item keepalive => $seconds

Send a keepalive request every N seconds while the connection is idle:
a native CLIENT_PING on the native protocol or a C<GET /ping> on HTTP
(some load balancers / NATs drop idle HTTP connections after a few
seconds; TCP-level keepalive is too coarse). Default: C<0> (disabled).

=item reconnect_delay => $seconds

Initial delay for the C<auto_reconnect> exponential backoff. Each failed
attempt doubles the delay, capped at C<reconnect_max_delay>. Default:
C<0> (immediate retry, no backoff).

=item reconnect_max_delay => $seconds

Backoff ceiling. Default: C<0>, meaning no explicit cap; the implementation

lib/EV/ClickHouse.pm  view on Meta::CPAN

Enables ClickHouse server-side insert batching by setting
C<async_insert=1, wait_for_async_insert=0>. Both sub-settings can be
overridden by passing them explicitly.

=back

=head2 ping

    $ch->ping(sub { my ($result, $err) = @_ });

Send a no-op round trip to verify the connection is alive. On success
C<$result> is true, C<$err> is C<undef>. On error: C<(undef, $error)>.

=head2 is_healthy

    $ch->is_healthy(sub { my ($ok, $err) = @_ });
    $ch->is_healthy(sub { ... }, $timeout_seconds);

Bounded health probe: wraps L</ping> with a deadline (default 5s). The
callback receives C<(1, undef)> on a successful round trip, or
C<(0, $msg)> on ping error or timeout. Failure does B<not> tear down the

lib/EV/ClickHouse.pm  view on Meta::CPAN


    $ch->on_query_start(sub {
        my ($query_id) = @_;
        log_metric_start($query_id);
    });

Optional connection-level hook that fires the moment a query is
dispatched to the wire (after the query_id has been resolved, before
the first send byte). Symmetric with L</on_query_complete>; useful for
deriving accurate "query in flight" durations without depending on
the per-query callback closure. Keepalive PINGs are suppressed, the
same as for C<on_query_complete>. Also accepted as a constructor
argument.

=head2 on_query_complete

    $ch->on_query_complete(sub {
        my ($query_id, $rows, $bytes, $error_code, $duration_s, $err) = @_;
        log_metric(...);
    });

lib/EV/ClickHouse.pm  view on Meta::CPAN

aggregated - the callback always fires with a complete list. Pass
C<settings =E<gt> \%h> for per-query options.

B<Circuit breaker:> pass C<circuit_threshold =E<gt> N> at construction
to enable per-member fail-fast. After N consecutive query/insert/ping
errors on a given member, that member is excluded from C<_pick> for
C<circuit_cooldown> seconds (default 30). A successful callback resets
the per-member fail counter. If every member is dead at pick time the
breaker is bypassed so the next attempt still has a chance to recover.
Inspect with C<$pool-E<gt>circuit_state> which returns one
C<{ fails =E<gt> N, dead_until =E<gt> $epoch, alive =E<gt> 0|1 }>
hashref per member.

B<Graceful shutdown:> C<<< $pool->shutdown($grace_seconds, $cb) >>>
drains every member, then calls C<finish> on each. If C<$grace_seconds>
elapses before every member drains, members still in flight are
force-finished and C<$cb> receives the string
C<"Pool::shutdown timed out after Ns">. On a clean shutdown C<$cb>
receives undef. C<$grace_seconds> may be 0 (or undef) to wait
indefinitely. The callback fires exactly once.

lib/EV/ClickHouse.pm  view on Meta::CPAN


Enables LZ4 (native) or gzip (HTTP). LZ4 cost is small and saves ~50-70%
on text-heavy columns. Gzip is heavier; turn on only if you're bandwidth-bound.

=item C<insert_streamer> batch_size

Default 10_000 is a good baseline. Smaller (1k-2k) reduces memory pressure
on the producer; larger (50k-100k) reduces server-side merge cost on
MergeTree. Match to your row width: ~1 MB per batch is a sweet spot.

=item C<keepalive>

Enable on long-lived idle connections (HTTP behind a load balancer or
NAT, or a native connection that may sit minutes between queries). 15-30s
is typical.

=item C<reconnect_max_attempts>

Always set in production. Default is unlimited; a permanent failure
(wrong host, wrong port, dead server) will spin C<on_error> forever
otherwise.

t/13_params_uri.t  view on Meta::CPAN

        });
        # Also test UInt256
        $ch->query("select toUInt256('99999999999999999999') as big", sub {
            my ($rows, $err) = @_;
            is($rows->[0][0], '99999999999999999999', 'UInt256: correct value');
        });
        $ch->drain(sub { EV::break });
    },
);

# Test 20-22: Keepalive (just verify it doesn't crash and connection stays alive)
SKIP: {
    skip "Native port not reachable", 3 unless $nat_ok;
    my ($ka_ch, $ka_wait);
    $ka_ch = EV::ClickHouse->new(
        host       => $host,
        port       => $nat_port,
        protocol   => 'native',
        keepalive  => 1,
        on_connect => sub {
            ok(1, 'keepalive: connected');
            # Wait a bit, then query
            $ka_wait = EV::timer(0.5, 0, sub {
                ok($ka_ch->is_connected, 'keepalive: still connected');
                $ka_ch->query("select 1", sub {
                    my ($rows, $err) = @_;
                    ok(!$err, 'keepalive: query after wait ok');
                    EV::break;
                });
            });
        },
        on_error => sub { diag("KA error: $_[0]"); EV::break },
    );
    my $timeout = EV::timer(10, 0, sub { EV::break });
    EV::run;
    $ka_ch->finish if $ka_ch && $ka_ch->is_connected;
}

t/20_reconnect.t  view on Meta::CPAN

use strict;
use warnings;
use Test::More;
use EV;
use EV::ClickHouse;

# auto_reconnect coverage:
# - basic plumbing (flag set, simple query)
# - pre-connect queue drain
# - real mid-session reconnect: HTTP keep_alive_timeout closes the server
#   side after a few idle seconds; the client must recover and dispatch the
#   next query on a fresh socket.
#
# ClickHouse native protocol has no SQL-level disconnect primitive in 26.x
# (no SYSTEM drop CONNECTION; KILL QUERY only kills queries), so a true
# native disconnect-recovery test would need a TCP proxy harness or
# tcp_close_connection_after_queries_seconds in server config — left as
# integration testing.

my $host      = $ENV{TEST_CLICKHOUSE_HOST} || '127.0.0.1';

t/20_reconnect.t  view on Meta::CPAN


    is(scalar @results, 3, "queued-pre-connect: all 3 callbacks fired");
    is_deeply([sort @results], [1, 2, 3],
        "queued-pre-connect: results match queries");

    $ch->finish if $ch && $ch->is_connected;
}

}  # end native SKIP block

# 5-7: real mid-session reconnect via HTTP keep_alive_timeout (default 3s).
# Run Q1, wait long enough for the server to drop the socket on idle, then
# run Q2 — auto_reconnect must re-establish on the fly. We arm the idle
# timer only once (on_connect fires on every reconnect, so guarding with a
# flag prevents us from looping forever on Q1).
SKIP: {
    skip "HTTP port not reachable", 3 unless $http_ok;

    my $ch;
    my ($q1_err, $q2_err, $q2_val);
    my $disconnects = 0;

t/20_reconnect.t  view on Meta::CPAN

        reconnect_delay     => 0.1,
        reconnect_max_delay => 1,
        on_disconnect       => sub { $disconnects++ },
        on_error            => sub { diag("HTTP reconnect error: $_[0]") },
        on_connect          => sub {
            return unless $first_connect;
            $first_connect = 0;
            $ch->query("select 1 format TabSeparated", sub {
                (undef, $q1_err) = @_;

                # Idle past keep_alive_timeout so the server hangs up,
                # then send Q2 — auto_reconnect must dispatch it onto a
                # fresh socket.
                $idle_timer = EV::timer(4, 0, sub {
                    $ch->query("select 2 format TabSeparated", sub {
                        my ($rows, $err) = @_;
                        $q2_err = $err;
                        $q2_val = ($rows && @$rows) ? $rows->[0][0] : undef;
                        EV::break;
                    });
                });

t/25_features.t  view on Meta::CPAN

use strict;
use warnings;
use Test::More;
use EV;
use EV::ClickHouse;

# Tests for the 0.03 feature batch:
# - max_reconnect_attempts
# - HTTP keepalive PING
# - progress_period coalescing
# - for_table schema helper
# - insert_streamer streaming insert
# - cancel during on_data
# - on_disconnect not firing on connect-phase failures

my $host      = $ENV{TEST_CLICKHOUSE_HOST} || '127.0.0.1';
my $http_port = $ENV{TEST_CLICKHOUSE_PORT} || 8123;
my $nat_port  = $ENV{TEST_CLICKHOUSE_NATIVE_PORT} || 9000;

t/25_features.t  view on Meta::CPAN

        },
    );
    run_with_timeout(5);
    cmp_ok(scalar @errors, '>=', 3,
        "max_reconnect_attempts: at least N+1 on_error fires");
    ok((grep /max reconnect attempts exceeded/, @errors),
        "max_reconnect_attempts: terminal error message fires");
    $ch->finish if $ch->is_connected;
}

# 2-3: HTTP keepalive PING — set a tiny keepalive, wait, ensure pending_count
# stays at 0 (the noop ping clears itself) and the connection stays alive.
SKIP: {
    skip "HTTP port not reachable", 2 unless $http_ok;
    my ($ch, $err);
    $ch = EV::ClickHouse->new(
        host       => $host,
        port       => $http_port,
        keepalive  => 0.2,
        on_connect => sub { },
        on_error   => sub { $err = $_[0] },
    );
    # Wait long enough for at least 2 keepalive pings to fire.
    my $t = EV::timer(0.6, 0, sub { EV::break });
    EV::run;
    ok(!$err, "HTTP keepalive: no errors") or diag "err=$err";
    $ch->query("select 1 format TabSeparated", sub {
        my ($rows) = @_;
        is($rows && @$rows ? $rows->[0][0] : undef, 1,
           "HTTP keepalive: connection still usable for queries");
        EV::break;
    });
    run_with_timeout(5);
    $ch->finish if $ch->is_connected;
}

# 4: for_table returns column metadata.
SKIP: {
    skip "Native port not reachable", 1 unless $nat_ok;
    my ($ch, $info, $err);

t/28_pass2_coverage.t  view on Meta::CPAN

#!/usr/bin/env perl
# Coverage for corner cases the earlier test files miss:
#   - on_query_complete (success + error path)
#   - HTTP keepalive PING does NOT fire on_query_complete (IS_KEEPALIVE_CB)
#   - query_log_comment is applied to INSERT and on HTTP
#   - DNS failure with pre-queued queries delivers errors
#   - Pool: cancel / skip_pending / reset broadcast
#   - Iterator timeout returns undef without setting error
#   - Streamer high_water fires when buffered count crosses watermark
use strict;
use warnings;
use Test::More;
use EV;
use EV::ClickHouse;

t/28_pass2_coverage.t  view on Meta::CPAN

        },
        on_query_complete   => sub { $completed = [@_] },
        on_error            => sub { },
    );
    EV::run;
    ok $completed && $completed->[5], 'on_query_complete fired with err msg on failure';
    cmp_ok $completed->[3], '>', 0, '  error_code populated';
    $ch->finish;
}

# 3. HTTP keepalive PING does not fire on_query_complete
SKIP: {
    skip 'HTTP not reachable', 1
        unless IO::Socket::INET->new(PeerAddr => $host, PeerPort => $hport, Timeout => 2);
    my $count = 0;
    my $ch; $ch = EV::ClickHouse->new(
        host => $host, port => $hport, protocol => 'http',
        keepalive => 0.2,
        on_connect          => sub { },
        on_query_complete   => sub { $count++ },
        on_error            => sub { },
    );
    # let two keepalive pings fly
    run_with_timeout(0.6);
    is $count, 0, 'HTTP keepalive PING did not fire on_query_complete';
    $ch->finish;
}

# 4. query_log_comment is applied to INSERT (and is well-formed enough that
#    the server accepts it). We can't read system.query_log on every test
#    setup, so we verify the insert succeeds with the comment in place.
{
    my $err;
    my $ch; $ch = EV::ClickHouse->new(
        host => $host, port => $nport, protocol => 'native',

xs/io.c  view on Meta::CPAN

/* --- Async TCP connect, I/O dispatch, timers, keepalive, reconnect,
 *     pipeline advance, and OpenSSL one-time init. ---
 *
 * Forward decls for symbols defined later in this file but called from
 * earlier in the same file (cross-file callers use the forwards in
 * ClickHouse.xs).
 */
static void io_cb(EV_P_ ev_io *w, int revents);
static void on_connect_done(ev_clickhouse_t *self);

#ifdef HAVE_OPENSSL

xs/io.c  view on Meta::CPAN


        int gen = self->connect_gen;
        /* Must reconnect — server may still be processing */
        if (teardown_after_deliver(self, "query timeout", "query timeout")) return;
        if (self->connect_gen != gen) return;
        if (self->auto_reconnect && self->host)
            schedule_reconnect(self);
    }
}

/* --- Keepalive timer callback --- */

static void ka_timer_cb(EV_P_ ev_timer *w, int revents) {
    ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
        offsetof(ev_clickhouse_t, ka_timer));
    (void)revents;

    if (self->magic != EV_CH_MAGIC) return;
    if (!self->connected || self->send_count > 0) return;

    /* Native: append PING bytes directly; the SERVER_PONG handler tracks

xs/io.c  view on Meta::CPAN

        char *ping = build_native_ping(&ping_len);
        ensure_send_cap(self, self->send_len + ping_len);
        Copy(ping, self->send_buf + self->send_len, ping_len, char);
        self->send_len += ping_len;
        self->ka_in_flight++;
        Safefree(ping);
        start_writing(self);
    } else {
        /* HTTP: enqueue a real ping with a no-op callback. ClickHouse's
         * HTTP server closes idle connections after a few seconds, so
         * relying on TCP keepalive (kernel default ~2h) is not enough. */
        size_t req_len;
        char *req = build_http_ping_request(self, &req_len);
        ev_ch_send_t *s = alloc_send();
        s->data = req;
        s->data_len = req_len;
        s->cb = SvREFCNT_inc(keepalive_noop_cb);
        enqueue_send(self, s);
    }
}

static void start_keepalive(ev_clickhouse_t *self) {
    if (self->keepalive > 0 && !self->ka_timing && self->connected) {
        ev_timer_init(&self->ka_timer, ka_timer_cb, self->keepalive, self->keepalive);
        ev_timer_start(self->loop, &self->ka_timer);
        self->ka_timing = 1;
    }
}

static void stop_keepalive(ev_clickhouse_t *self) {
    if (self->ka_timing) {
        ev_timer_stop(self->loop, &self->ka_timer);
        self->ka_timing = 0;
    }
}

/* --- Reconnect with backoff --- */

static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
    ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -

xs/io.c  view on Meta::CPAN

        }
        /* Dropping the last ref to the drain CV can free a closure that
         * captured $ch — DESTROY then runs. Guard with callback_depth so
         * the free is deferred, then detect it before touching self. */
        self->callback_depth++;
        SvREFCNT_dec(drain_cb);
        self->callback_depth--;
        if (check_destroyed(self)) return 1;
    }

    /* Restart keepalive timer when idle (start_keepalive is a no-op if already
     * timing or if keepalive disabled) */
    if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0)
        start_keepalive(self);

    /* send next request from queue */
    if (!ngx_queue_empty(&self->send_queue)) {
        /* Stop keepalive during active query */
        stop_keepalive(self);
        emit_trace(self, "dispatch query (pending=%d)", self->pending_count);

        /* on_trace is user code: it may have called finish()/reset() or
         * dropped the last reference (DESTROY), any of which drains or
         * frees the send queue. Re-validate the connection and re-derive
         * the head entry AFTER the trace so we never touch a send struct
         * that cancel_pending has already released to the freelist. */
        if (self->magic != EV_CH_MAGIC || !self->connected
            || ngx_queue_empty(&self->send_queue))
            return check_destroyed(self);

xs/io.c  view on Meta::CPAN

                          send->on_data, send->on_complete,
                          send->query_timeout);
        CLEAR_SV(send->on_data);
        send->on_complete = NULL;       /* ownership transferred to cbt */
        /* Track query_id + dispatch start time (used by on_query_complete). */
        CLEAR_STR(self->last_query_id);
        if (send->query_id) { self->last_query_id = send->query_id; send->query_id = NULL; }
        self->query_start_time = ev_now(self->loop);

        /* on_query_start: fire with the resolved query_id, just before
         * the write side runs. Suppressed for keepalive PINGs to match
         * on_query_complete semantics. */
        if (self->on_query_start && !IS_KEEPALIVE_CB(dispatched_cb)) {
            dSP;
            ENTER; SAVETMPS; PUSHMARK(SP);
            EXTEND(SP, 1);
            PUSHs(self->last_query_id
                  ? sv_2mortal(newSVpv(self->last_query_id, 0))
                  : &PL_sv_undef);
            PUTBACK;
            int gen_before = self->connect_gen;

xs/macros.h  view on Meta::CPAN

/* Common idioms used across the XS — kept here so the main translation
 * unit and the file-local-to-the-TU includes can share them without
 * duplicating definitions.
 *
 * IS_KEEPALIVE_CB depends on `keepalive_noop_cb` which is defined later
 * in the .xs; this header only declares the macro so the dependency is
 * resolved at expansion time, not at parse time. */

#ifndef EV_CH_MACROS_H
#define EV_CH_MACROS_H

/* Drop an owned C string and reset the slot to NULL. */
#define CLEAR_STR(p) do { if (p) { Safefree(p); (p) = NULL; } } while (0)

/* Drop a refcounted SV and reset the slot to NULL. */

xs/macros.h  view on Meta::CPAN

 * callback threw. Use only at sites that must continue work after the
 * call (e.g. still need SPAGAIN/FREETMPS or further dispatches); helpers
 * that return immediately can omit the clear. */
#define WARN_AND_CLEAR_ERRSV(label) do { \
    if (SvTRUE(ERRSV)) { \
        warn("EV::ClickHouse: exception in " label ": %s", SvPV_nolen(ERRSV)); \
        sv_setsv(ERRSV, &PL_sv_undef); \
    } \
} while (0)

/* HTTP keepalive uses keepalive_noop_cb as a sentinel — suppress the
 * on_query_complete (and on_query_start) fire for it so observers don't
 * see spurious zero-row "completions" they didn't initiate. The native
 * keepalive path bypasses cb_queue entirely, so this only matters for
 * HTTP. The sentinel is defined later in the .xs; this macro just
 * forward-references it. */
#define IS_KEEPALIVE_CB(cb) ((cb) && (cb) == keepalive_noop_cb)

#endif /* EV_CH_MACROS_H */

xs/proto_http.c  view on Meta::CPAN

                        "Authorization: Basic %s\r\n", buf + cred_len);
        Safefree(buf);
    } else {
        if (self->user)
            pos += snprintf(req + pos, req_cap - pos,
                            "X-ClickHouse-User: %s\r\n", self->user);
        if (self->password && self->password[0])
            pos += snprintf(req + pos, req_cap - pos,
                            "X-ClickHouse-Key: %s\r\n", self->password);
    }
    pos += snprintf(req + pos, req_cap - pos, "Connection: keep-alive\r\n");
    if (self->compress)
        pos += snprintf(req + pos, req_cap - pos, "Accept-Encoding: gzip\r\n");
    if (content_encoding)
        pos += snprintf(req + pos, req_cap - pos, "%s", content_encoding);
    pos += snprintf(req + pos, req_cap - pos,
                    "Content-Length: %lu\r\n\r\n", (unsigned long)body_len);

    /* body */
    if (body_len > 0) {
        if (pos + body_len > req_cap) {

xs/proto_http.c  view on Meta::CPAN

/* Build HTTP GET /ping request */
static char* build_http_ping_request(ev_clickhouse_t *self, size_t *req_len) {
    char *req;
    size_t req_cap = 128 + (self->host ? strlen(self->host) : 0);
    size_t pos = 0;

    Newx(req, req_cap, char);
    pos = snprintf(req, req_cap,
                   "GET /ping HTTP/1.1\r\n"
                   "Host: %s:%u\r\n"
                   "Connection: keep-alive\r\n\r\n",
                   self->host, self->port);
    if (pos >= req_cap) pos = req_cap - 1;
    *req_len = pos;
    return req;
}

/* --- HTTP response parsing --- */

/* Length-bounded uint parser. Stops at the first non-digit OR at `len`,
 * whichever comes first. Used to parse X-ClickHouse-Summary values out

xs/proto_native_parse.c  view on Meta::CPAN

                self->native_rows = NULL;
                if (deliver_rows(self, rows)) return;
            }

            /* advance pipeline — may free self via try_write error */
            pipeline_advance(self);
            return;
        }

        if (rc == 3) {
            /* Pong — ack a keepalive ping, or deliver to user's ping() cb */
            self->native_state = NATIVE_IDLE;
            if (self->ka_in_flight > 0) {
                /* Keepalive ack: not tied to send_count or any user cb */
                self->ka_in_flight--;
                continue;
            }
            stop_timing(self);
            if (self->send_count > 0) self->send_count--;
            AV *rows = newAV();
            if (deliver_rows(self, rows)) return;
            pipeline_advance(self);
            return;
        }

xs/queues.c  view on Meta::CPAN

/* --- freelist for cb_queue entries + send_queue entries ---
 *
 * Both use a singly-linked-list-in-the-record-itself stash: the first
 * sizeof(void*) bytes of a released entry are repurposed as the "next
 * free" pointer. Reuse the same struct without rebuilding it from
 * scratch.
 *
 * Also lives here: the singleton sentinels (keepalive_noop_cb,
 * iter_timeout_cb) and the small helpers that munge per-query
 * settings into per-send fields.
 *
 * This file is #include'd from ClickHouse.xs as part of the single
 * translation unit; symbols stay file-local-to-the-TU.
 */

static ev_ch_cb_t *cbt_freelist = NULL;

static ev_ch_cb_t* alloc_cbt(void) {

xs/queues.c  view on Meta::CPAN

}

static ev_ch_send_t *send_freelist = NULL;

/* Iterator timeout watcher cb: just break the loop the iterator drove. */
static void iter_timeout_cb(EV_P_ ev_timer *w, int revents) {
    (void)w; (void)revents;
    ev_break(EV_A, EVBREAK_ONE);
}

/* No-op CV reference used as the callback for HTTP keepalive pings;
 * initialised once at BOOT and shared by all connections. */
static SV *keepalive_noop_cb = NULL;

static ev_ch_send_t* alloc_send(void) {
    ev_ch_send_t *s;
    if (send_freelist) {
        s = send_freelist;
        send_freelist = *(ev_ch_send_t **)s;
    } else {
        Newx(s, 1, ev_ch_send_t);
    }
    s->data = NULL;



( run in 3.572 seconds using v1.01-cache-2.11-cpan-d8267643d1d )