ClickHouse-Encoder

 view release on metacpan or  search on metacpan

Encoder.xs  view on Meta::CPAN

#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"

#include <string.h>
#include <stdint.h>

#if IVSIZE < 8
#error "ClickHouse::Encoder requires a 64-bit Perl (IVSIZE >= 8)"
#endif

#include "types.h"
#include "buffer.h"
#include "encode.h"
#include "decode.h"
#include "runtime.h"
#include "cityhash.h"

/* Read one varint from an SV's byte buffer starting at *off; advances
 * *off past the consumed bytes. Croaks with TCP-style messages on
 * truncation or overflow. Used by the ClickHouse::Encoder::TCP XSUBs
 * which operate on offsets into a Perl scalar rather than the (p,end)
 * cursor pair used by the internal native decoder. */
static UV tcp_read_varint(pTHX_ const unsigned char *p, UV buf_len,
                          UV *off) {
    UV v = 0;
    int shift = 0;
    while (1) {
        if (*off >= buf_len)
            croak("varint: truncated at offset %lu", (unsigned long)*off);
        unsigned char b = p[(*off)++];
        v |= ((UV)(b & 0x7f)) << shift;
        if (!(b & 0x80)) break;
        shift += 7;
        if (shift >= 64) croak("varint exceeds 64 bits");
    }
    return v;
}

MODULE = ClickHouse::Encoder  PACKAGE = ClickHouse::Encoder

SV*
new(class, ...)
    const char *class
CODE:
{
    Encoder *enc = NULL;
    AV *cols_av;
    SSize_t i, n;
    SV *cols_sv = NULL;

    if (items % 2 == 0)
        croak("Expected key-value pairs");

    for (i = 1; i < items; i += 2) {
        STRLEN klen;
        const char *key = SvPV(ST(i), klen);
        if (klen == 7 && memcmp(key, "columns", 7) == 0)
            cols_sv = ST(i+1);
    }

    if (!cols_sv || !SvROK(cols_sv) || SvTYPE(SvRV(cols_sv)) != SVt_PVAV)
        croak("columns required and must be arrayref");

    cols_av = (AV*)SvRV(cols_sv);
    n = av_len(cols_av) + 1;

    /* Wrap allocation in our own ENTER/LEAVE so the cleanup destructor fires
     * before this XSUB returns (XSUBs don't get implicit ENTER/LEAVE). */
    ENTER;
    Newxz(enc, 1, Encoder);
    SAVEDESTRUCTOR_X(cleanup_encoder_slot, &enc);

    Newxz(enc->columns, n, Column);
    enc->num_columns = n;

    for (i = 0; i < n; i++) {
        SV **col_sv = av_fetch(cols_av, i, 0);
        AV *col_av;
        SV **name_sv, **type_sv;
        STRLEN len;
        const char *s;

Encoder.xs  view on Meta::CPAN

{
    PERL_UNUSED_VAR(class);
    /* Optional 3rd arg: byte offset into the input. Lets callers walk
     * a concatenated multi-block stream in O(N) instead of O(N*K) - the
     * Perl-side decode_blocks wrapper relies on this to avoid substr
     * copies on each iteration.
     *
     * Optional 4th arg: hashref of column names to KEEP. When provided,
     * columns whose name isn't in the set get a placeholder values
     * arrayref (one undef per row) and the wire bytes are still consumed
     * for that column's data, but no SVs are allocated for the values.
     * Memory win on wide SELECT * when caller wants a few columns. */
    UV start_offset = 0;
    HV *keep_set = NULL;
    if (items >= 3) {
        IV signed_off = SvIV(ST(2));
        if (signed_off < 0)
            croak("decode_block: offset must be non-negative (got %"
                  IVdf ")", signed_off);
        start_offset = (UV)signed_off;
    }
    if (items >= 4 && SvOK(ST(3))) {
        if (!SvROK(ST(3)) || SvTYPE(SvRV(ST(3))) != SVt_PVHV)
            croak("decode_block: columns filter must be a hashref "
                  "(got %s)",
                  SvROK(ST(3)) ? sv_reftype(SvRV(ST(3)), 0)
                               : "non-reference");
        keep_set = (HV *)SvRV(ST(3));
    }
    const unsigned char *start, *p, *end;
    UV ncols, nrows;
    decode_block_prologue(aTHX_ bytes, start_offset, "decode_block",
                          &start, &p, &end, &ncols, &nrows);

    /* Mortalize cols so a mid-loop croak (from decode_column,
     * dec_lenpfx_string, etc.) reclaims it instead of leaking. We
     * SvREFCNT_inc when transferring ownership to the result HV. */
    AV *cols = (AV *)sv_2mortal((SV *)newAV());
    if (ncols > 0) av_extend(cols, ncols - 1);
    UV c;
    for (c = 0; c < ncols; c++) {
        const char *name; STRLEN name_len;
        const char *tstr; STRLEN tlen;
        dec_lenpfx_string(aTHX_ &p, end, &name, &name_len);
        dec_lenpfx_string(aTHX_ &p, end, &tstr, &tlen);
        /* parse_type uses heap-slot SAVEDESTRUCTOR_X for cleanup on
         * croak; on success the slot is disarmed. We're in the XSUB's
         * implicit save scope, so a croak from any nested decode
         * unwinds the type back through this cleanup. */
        TypeInfo *t = parse_type(aTHX_ tstr, tlen);

        int keep = 1;
        if (keep_set && !hv_exists(keep_set, name, name_len))
            keep = 0;

        SV *values;
        if (keep) {
            values = decode_column(aTHX_ &p, end, t, (SSize_t)nrows);
        } else {
            /* Decode then discard - we still must consume the wire
             * bytes to keep the cursor aligned for the next column.
             * The AV is freed immediately so peak memory is one
             * column's values, not the full block's. */
            SV *tmp = decode_column(aTHX_ &p, end, t, (SSize_t)nrows);
            SvREFCNT_dec(tmp);
            AV *placeholder = newAV();
            if (nrows > 0) {
                av_extend(placeholder, nrows - 1);
                SSize_t r;
                for (r = 0; r < (SSize_t)nrows; r++)
                    av_store(placeholder, r, newSV(0));
            }
            values = newRV_noinc((SV *)placeholder);
        }
        /* Free this column's TypeInfo eagerly to avoid piling them up
         * on the save stack for wide blocks. */
        free_typeinfo(aTHX_ t);

        HV *col_hv = newHV();
        (void)hv_stores(col_hv, "name",   newSVpvn(name, name_len));
        (void)hv_stores(col_hv, "type",   newSVpvn(tstr, tlen));
        (void)hv_stores(col_hv, "values", values);
        if (!keep) (void)hv_stores(col_hv, "skipped", newSViv(1));
        av_store(cols, c, newRV_noinc((SV *)col_hv));
    }

    HV *result = newHV();
    (void)hv_stores(result, "ncols",    newSVuv(ncols));
    (void)hv_stores(result, "nrows",    newSVuv(nrows));
    /* Transfer ownership out of the mortal: bump refcount, then the
     * mortal-stack cleanup at scope exit drops one back, leaving the
     * net refcount at 1 (owned by `result`). */
    (void)hv_stores(result, "columns",
                    newRV_inc((SV *)cols));
    (void)hv_stores(result, "consumed", newSVuv((UV)(p - start)));
    RETVAL = newRV_noinc((SV *)result);
}
OUTPUT: RETVAL

# Row-oriented decoder: same wire walk as decode_block, but values are
# distributed into row-major arrayrefs as each column is decoded, then
# the per-column AV is freed. Peak memory holds one column's values
# plus all row AVs (vs decode_rows-via-Perl which holds both
# representations + does the transpose in Perl).
SV *
decode_block_rows(class, bytes, ...)
    SV *class
    SV *bytes
CODE:
{
    PERL_UNUSED_VAR(class);
    UV start_offset = 0;
    if (items >= 3) {
        IV signed_off = SvIV(ST(2));
        if (signed_off < 0)
            croak("decode_block_rows: offset must be non-negative "
                  "(got %" IVdf ")", signed_off);
        start_offset = (UV)signed_off;
    }
    const unsigned char *start, *p, *end;
    UV ncols, nrows;



( run in 0.777 second using v1.01-cache-2.11-cpan-140bd7fdf52 )