EV-ClickHouse

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

    - Parameterized queries via params => { name => value }
    - Keepalive ping timer for idle native connections
    - Auto-reconnect with exponential backoff (reconnect_delay, reconnect_max_delay)
    - Graceful drain callback (fires when all pending queries complete)
    - Query cancellation and skip_pending

    Query features:
    - TabSeparated format parser for HTTP responses
    - INSERT with TSV string or arrayref data (both protocols)
    - Raw query mode for HTTP (return unparsed response body)
    - on_data streaming callback for native protocol (per-block delivery)
    - on_progress callback for native protocol progress packets
    - query_id tracking via last_query_id accessor
    - query_timeout per-query with automatic cancellation

    Native protocol type support:
    - Int8/16/32/64/128/256, UInt8/16/32/64/128/256
    - Float32/Float64, Bool
    - String, FixedString(N)
    - Date, Date32, DateTime, DateTime64 (with timezone support)
    - Decimal32/64/128 (with optional scaling via decode_decimal)

ClickHouse.xs  view on Meta::CPAN

    int reconnect_timing;
    /* LowCardinality cross-block dictionary state */
    SV ***lc_dicts;           /* array of dictionaries, one per column */
    uint64_t *lc_dict_sizes;  /* size of each dictionary */
    int lc_num_cols;          /* number of columns with LC state */
};

struct ev_ch_cb_s {
    SV          *cb;
    int          raw;        /* return raw response body instead of parsed rows */
    SV          *on_data;    /* per-query streaming callback (fires per block) */
    double       query_timeout;  /* per-query timeout (0=use default) */
    ngx_queue_t  queue;
};

struct ev_ch_send_s {
    char        *data;      /* full HTTP request or native packet */
    size_t       data_len;
    SV          *cb;
    char        *insert_data;     /* deferred TSV data for native INSERT */
    size_t       insert_data_len;
    SV          *insert_av;      /* deferred AV* data for native INSERT */
    int          raw;            /* return raw response body */
    SV          *on_data;        /* per-query streaming callback */
    double       query_timeout;  /* per-query timeout */
    char        *query_id;       /* query_id for tracking */
    ngx_queue_t  queue;
};

/* forward declarations */
static void io_cb(EV_P_ ev_io *w, int revents);
static void timer_cb(EV_P_ ev_timer *w, int revents);
static void ka_timer_cb(EV_P_ ev_timer *w, int revents);
static void start_keepalive(ev_clickhouse_t *self);

ClickHouse.xs  view on Meta::CPAN

                    if (num_cols > 0)
                        av_extend(row, num_cols - 1);
                    for (c = 0; c < num_cols; c++) {
                        av_push(row, columns[c][r]);
                    }
                    av_push(*target, newRV_noinc((SV*)row));
                }
            }
            }

            /* Fire on_data streaming callback if set (only for DATA, not TOTALS/EXTREMES) */
            {
                SV *on_data = (ptype == SERVER_DATA) ? peek_cb_on_data(self) : NULL;
                if (on_data && self->native_rows) {
                    self->callback_depth++;
                    {
                        dSP;
                        ENTER; SAVETMPS;
                        PUSHMARK(SP);
                        PUSHs(sv_2mortal(newRV_inc((SV*)self->native_rows)));
                        PUTBACK;

MANIFEST  view on Meta::CPAN

eg/error_handling.pl
eg/insert.pl
eg/keepalive.pl
eg/native.pl
eg/native_compress.pl
eg/params.pl
eg/ping.pl
eg/query.pl
eg/queue.pl
eg/settings.pl
eg/streaming.pl
eg/totals.pl
eg/uri.pl
lib/EV/ClickHouse.pm
LICENSE
Makefile.PL
MANIFEST			This list of files
MANIFEST.SKIP
META.json
META.yml
ngx_queue.h

lib/EV/ClickHouse.pm  view on Meta::CPAN

        my ($body, $err) = @_;
        # $body is the raw CSV text
    });

Not supported with the native protocol (croaks).

=item C<query_timeout> — per-query timeout in seconds, overriding the
connection-level C<query_timeout>.

=item C<on_data> — native protocol only. A code ref called for each data
block as it arrives. Enables streaming: rows are delivered incrementally
and not accumulated.

    $ch->query("SELECT * FROM big_table",
        { on_data => sub { my ($rows) = @_; process_batch($rows) } },
        sub { my (undef, $err) = @_; ... }  # final callback
    );

=back

B<Native protocol type notes:> With the native protocol, column values

t/11_new_features.t  view on Meta::CPAN

    cb    => sub {
        $ch->query("SELECT nonexistent_column FROM system.one", sub {
            my ($rows, $err) = @_;
            ok($err, 'error_code: got error');
            like($err, qr/Code: \d+/, 'error_code: contains Code: N');
            EV::break;
        });
    },
);

# Test 24-26: streaming on_data callback (native)
with_native(
    tests => 3,
    cb    => sub {
        my @blocks;
        $ch->query(
            "SELECT number FROM numbers(100)",
            { on_data => sub { push @blocks, $_[0] } },
            sub {
                my ($rows, $err) = @_;
                ok(!$err, 'on_data: no error');



( run in 2.024 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )