ClickHouse-Encoder

 view release on metacpan or  search on metacpan

encode.c  view on Meta::CPAN

                int wire_slots = wire_slot_counts[p];
                int kc = wire_slots - 1;  /* minus SharedVariant */

                buf_le64(aTHX_ b, 1);                   /* Dynamic V1 */
                buf_varint(aTHX_ b, (UV)kc);            /* max_dynamic_types */
                buf_varint(aTHX_ b, (UV)kc);            /* actual count */

                /* User variant type names in lex order (skip SharedVariant). */
                int *slots = path_slots + p*JSON_LEX_SLOTS;
                int s;
                for (s = 0; s < wire_slots; s++) {
                    int k = slots[s];
                    if (k < 0) continue;
                    const char *nm = json_kind_type_name[k];
                    buf_string(aTHX_ b, nm, strlen(nm));
                }
                buf_le64(aTHX_ b, 0);                   /* Variant BASIC */
            }

            /* Step 5b: emit typed-path column data (sorted by name).
             * Typed paths come before dynamic-path Variant data on the
             * wire; their inner types are simple-prefix so encode_column
             * emits only the column body. typed_vals is only allocated
             * when both n_typed and num_rows are positive; skip when
             * num_rows == 0 to avoid dereferencing a NULL outer array. */
            if (n_typed > 0 && num_rows > 0) {
                int tp;
                for (tp = 0; tp < n_typed; tp++)
                    encode_column(aTHX_ b, typed_vals[tp], num_rows,
                                  t->tuple[tp]);
            }

            /* Step 6: per-path Variant data (discs + per-variant values).
             * For Array(T) variants the column's wire layout is:
             *   N UInt64 LE offsets (cumulative element counts)
             *   <inner-type values concatenated>
             * We emit offsets and inner data in a single pass per variant. */
            for (p = 0; p < num_paths; p++) {
                int wire_slots = wire_slot_counts[p];
                int *slots = path_slots + p*JSON_LEX_SLOTS;

                /* Discriminator byte per row. */
                for (r = 0; r < num_rows; r++) {
                    if (!row_hvs[r]) { buf_byte(aTHX_ b, 0xff); continue; }
                    SV **e = hv_fetch(row_hvs[r], paths[p],
                                      (I32)path_lens[p], 0);
                    if (!e || !SvOK(*e)) { buf_byte(aTHX_ b, 0xff); continue; }
                    int k = json_classify_value(aTHX_ *e);
                    buf_byte(aTHX_ b,
                             (uint8_t)json_kind_disc_in(k, slots, wire_slots));
                }

                /* Per-variant data in lex order. SharedVariant has zero
                 * rows in our encoder's output. */
                int s;
                for (s = 0; s < wire_slots; s++) {
                    int k_match = slots[s];
                    if (k_match < 0) continue;

                    /* Array(T) variants: first pass emits offsets (so the
                     * downstream offset cursor is contiguous) and counts
                     * total elements; second pass emits inner values. */
                    int is_array = (k_match >= JV_ARRAY_BOOL
                                 && k_match <= JV_ARRAY_STRING);
                    if (is_array) {
                        uint64_t offset = 0;
                        for (r = 0; r < num_rows; r++) {
                            if (!row_hvs[r]) continue;
                            SV **e = hv_fetch(row_hvs[r], paths[p],
                                              (I32)path_lens[p], 0);
                            if (!e || !SvOK(*e)) continue;
                            if (json_classify_value(aTHX_ *e) != k_match)
                                continue;
                            AV *av = (AV*)SvRV(*e);
                            SSize_t n = av_len(av) + 1;
                            offset += (uint64_t)n;
                            buf_le64(aTHX_ b, offset);
                        }
                    }

                    /* Element-value pass (Array(T)) or scalar pass (T). */
                    for (r = 0; r < num_rows; r++) {
                        if (!row_hvs[r]) continue;
                        SV **e = hv_fetch(row_hvs[r], paths[p],
                                          (I32)path_lens[p], 0);
                        if (!e || !SvOK(*e)) continue;
                        if (json_classify_value(aTHX_ *e) != k_match) continue;

                        if (is_array) {
                            AV *av = (AV*)SvRV(*e);
                            SSize_t n = av_len(av) + 1, i;
                            for (i = 0; i < n; i++) {
                                SV **elem = av_fetch(av, i, 0);
                                SV *ev = (elem && SvOK(*elem))
                                       ? *elem : &PL_sv_undef;
                                json_emit_array_elem(aTHX_ b, ev, k_match);
                            }
                        } else {
                            json_emit_scalar(aTHX_ b, *e, k_match);
                        }
                    }
                }
            }

            /* Step 7: shared data trailer (Array(Tuple(String,String))
             * with all rows empty -> N UInt64 LE zero offsets). */
            if (num_rows > 0) {
                STRLEN nbytes = (STRLEN)num_rows * 8;
                buf_grow(aTHX_ b, nbytes);
                memset(b->ptr + b->len, 0, nbytes);
                b->len += nbytes;
            }
            break;
        }

        case T_DYNAMIC: {
            /* Standalone Dynamic column: same wire format as one JSON
             * path's Dynamic sub-column (Dynamic V1 prefix + Variant
             * mode + Variant data) with no Object wrapping or shared
             * data trailer. Each row is a scalar / array / undef. */
            unsigned mask = 0;



( run in 1.459 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )