ClickHouse-Encoder
view release on metacpan or search on metacpan
#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include <string.h>
#include <stdint.h>
#if IVSIZE < 8
#error "ClickHouse::Encoder requires a 64-bit Perl (IVSIZE >= 8)"
#endif
#include "types.h"
#include "buffer.h"
#include "encode.h"
#include "decode.h"
#include "runtime.h"
#include "cityhash.h"
/* Read one varint from an SV's byte buffer starting at *off; advances
* *off past the consumed bytes. Croaks with TCP-style messages on
* truncation or overflow. Used by the ClickHouse::Encoder::TCP XSUBs
* which operate on offsets into a Perl scalar rather than the (p,end)
* cursor pair used by the internal native decoder. */
static UV tcp_read_varint(pTHX_ const unsigned char *p, UV buf_len,
UV *off) {
UV v = 0;
int shift = 0;
while (1) {
if (*off >= buf_len)
croak("varint: truncated at offset %lu", (unsigned long)*off);
unsigned char b = p[(*off)++];
v |= ((UV)(b & 0x7f)) << shift;
if (!(b & 0x80)) break;
shift += 7;
if (shift >= 64) croak("varint exceeds 64 bits");
}
return v;
}
MODULE = ClickHouse::Encoder PACKAGE = ClickHouse::Encoder
SV*
new(class, ...)
const char *class
CODE:
{
Encoder *enc = NULL;
AV *cols_av;
SSize_t i, n;
SV *cols_sv = NULL;
if (items % 2 == 0)
croak("Expected key-value pairs");
for (i = 1; i < items; i += 2) {
STRLEN klen;
const char *key = SvPV(ST(i), klen);
if (klen == 7 && memcmp(key, "columns", 7) == 0)
cols_sv = ST(i+1);
}
if (!cols_sv || !SvROK(cols_sv) || SvTYPE(SvRV(cols_sv)) != SVt_PVAV)
croak("columns required and must be arrayref");
cols_av = (AV*)SvRV(cols_sv);
n = av_len(cols_av) + 1;
/* Wrap allocation in our own ENTER/LEAVE so the cleanup destructor fires
* before this XSUB returns (XSUBs don't get implicit ENTER/LEAVE). */
ENTER;
Newxz(enc, 1, Encoder);
SAVEDESTRUCTOR_X(cleanup_encoder_slot, &enc);
Newxz(enc->columns, n, Column);
enc->num_columns = n;
for (i = 0; i < n; i++) {
SV **col_sv = av_fetch(cols_av, i, 0);
AV *col_av;
SV **name_sv, **type_sv;
STRLEN len;
const char *s;
{
PERL_UNUSED_VAR(class);
/* Optional 3rd arg: byte offset into the input. Lets callers walk
* a concatenated multi-block stream in O(N) instead of O(N*K) - the
* Perl-side decode_blocks wrapper relies on this to avoid substr
* copies on each iteration.
*
* Optional 4th arg: hashref of column names to KEEP. When provided,
* columns whose name isn't in the set get a placeholder values
* arrayref (one undef per row) and the wire bytes are still consumed
* for that column's data, but no SVs are allocated for the values.
* Memory win on wide SELECT * when caller wants a few columns. */
UV start_offset = 0;
HV *keep_set = NULL;
if (items >= 3) {
IV signed_off = SvIV(ST(2));
if (signed_off < 0)
croak("decode_block: offset must be non-negative (got %"
IVdf ")", signed_off);
start_offset = (UV)signed_off;
}
if (items >= 4 && SvOK(ST(3))) {
if (!SvROK(ST(3)) || SvTYPE(SvRV(ST(3))) != SVt_PVHV)
croak("decode_block: columns filter must be a hashref "
"(got %s)",
SvROK(ST(3)) ? sv_reftype(SvRV(ST(3)), 0)
: "non-reference");
keep_set = (HV *)SvRV(ST(3));
}
const unsigned char *start, *p, *end;
UV ncols, nrows;
decode_block_prologue(aTHX_ bytes, start_offset, "decode_block",
&start, &p, &end, &ncols, &nrows);
/* Mortalize cols so a mid-loop croak (from decode_column,
* dec_lenpfx_string, etc.) reclaims it instead of leaking. We
* SvREFCNT_inc when transferring ownership to the result HV. */
AV *cols = (AV *)sv_2mortal((SV *)newAV());
if (ncols > 0) av_extend(cols, ncols - 1);
UV c;
for (c = 0; c < ncols; c++) {
const char *name; STRLEN name_len;
const char *tstr; STRLEN tlen;
dec_lenpfx_string(aTHX_ &p, end, &name, &name_len);
dec_lenpfx_string(aTHX_ &p, end, &tstr, &tlen);
/* parse_type uses heap-slot SAVEDESTRUCTOR_X for cleanup on
* croak; on success the slot is disarmed. We're in the XSUB's
* implicit save scope, so a croak from any nested decode
* unwinds the type back through this cleanup. */
TypeInfo *t = parse_type(aTHX_ tstr, tlen);
int keep = 1;
if (keep_set && !hv_exists(keep_set, name, name_len))
keep = 0;
SV *values;
if (keep) {
values = decode_column(aTHX_ &p, end, t, (SSize_t)nrows);
} else {
/* Decode then discard - we still must consume the wire
* bytes to keep the cursor aligned for the next column.
* The AV is freed immediately so peak memory is one
* column's values, not the full block's. */
SV *tmp = decode_column(aTHX_ &p, end, t, (SSize_t)nrows);
SvREFCNT_dec(tmp);
AV *placeholder = newAV();
if (nrows > 0) {
av_extend(placeholder, nrows - 1);
SSize_t r;
for (r = 0; r < (SSize_t)nrows; r++)
av_store(placeholder, r, newSV(0));
}
values = newRV_noinc((SV *)placeholder);
}
/* Free this column's TypeInfo eagerly to avoid piling them up
* on the save stack for wide blocks. */
free_typeinfo(aTHX_ t);
HV *col_hv = newHV();
(void)hv_stores(col_hv, "name", newSVpvn(name, name_len));
(void)hv_stores(col_hv, "type", newSVpvn(tstr, tlen));
(void)hv_stores(col_hv, "values", values);
if (!keep) (void)hv_stores(col_hv, "skipped", newSViv(1));
av_store(cols, c, newRV_noinc((SV *)col_hv));
}
HV *result = newHV();
(void)hv_stores(result, "ncols", newSVuv(ncols));
(void)hv_stores(result, "nrows", newSVuv(nrows));
/* Transfer ownership out of the mortal: bump refcount, then the
* mortal-stack cleanup at scope exit drops one back, leaving the
* net refcount at 1 (owned by `result`). */
(void)hv_stores(result, "columns",
newRV_inc((SV *)cols));
(void)hv_stores(result, "consumed", newSVuv((UV)(p - start)));
RETVAL = newRV_noinc((SV *)result);
}
OUTPUT: RETVAL
# Row-oriented decoder: same wire walk as decode_block, but values are
# distributed into row-major arrayrefs as each column is decoded, then
# the per-column AV is freed. Peak memory holds one column's values
# plus all row AVs (vs decode_rows-via-Perl which holds both
# representations + does the transpose in Perl).
SV *
decode_block_rows(class, bytes, ...)
SV *class
SV *bytes
CODE:
{
PERL_UNUSED_VAR(class);
UV start_offset = 0;
if (items >= 3) {
IV signed_off = SvIV(ST(2));
if (signed_off < 0)
croak("decode_block_rows: offset must be non-negative "
"(got %" IVdf ")", signed_off);
start_offset = (UV)signed_off;
}
const unsigned char *start, *p, *end;
UV ncols, nrows;
( run in 0.777 second using v1.01-cache-2.11-cpan-140bd7fdf52 )