ClickHouse-Encoder

 view release on metacpan or  search on metacpan

t/streamer-edge.t  view on Meta::CPAN

    my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 5);
    $st->push_row([$_]) for 1..6;
    is(scalar @blocks, 1, '6 rows / batch=5 => 1 auto block (leftover buffered)');
    is($st->buffered_count, 1, 'buffered_count = 1 leftover');
    ok(!$st->is_empty, 'is_empty false with leftover');
    $st->finish;
    is(scalar @blocks, 2, 'finish flushes the leftover block');
    is($st->buffered_count, 0, 'buffered_count = 0 after finish');
    ok($st->is_empty, 'is_empty true after finish');
}

# Writer croaks mid-batch: streamer state is consistent (empty buffer).
# A subsequent push_row must not replay the failed batch.
{
    my $emit = 0;
    my $writer = sub {
        $emit++;
        die "writer failed" if $emit == 1;
    };
    my @ok_blocks;
    my $st = make_enc()->streamer($writer, batch_size => 2);
    eval { $st->push_row([$_]) for 1..3 };
    like($@, qr/writer failed/, 'writer croak propagated');
    is($st->buffered_count, 0,
       'streamer recovers after writer croak: empty buffer (not replaying)')
        or diag "buffered=" . $st->buffered_count;

    # Replace the writer is not supported (writer is bound at streamer
    # creation), so we have to either retry from a new streamer or accept
    # the error.  The contract: subsequent pushes proceed cleanly.
    my $st2 = make_enc()->streamer(sub { push @ok_blocks, $_[0] }, batch_size => 2);
    $st2->push_row([$_]) for 1..2;
    is(scalar @ok_blocks, 1, 'fresh streamer post-error: works as normal');
}

# reset() discards buffered rows without flushing.
{
    my @blocks;
    my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 100);
    $st->push_row([$_]) for 1..5;
    is($st->buffered_count, 5, '5 rows buffered before reset');
    $st->reset;
    is($st->buffered_count, 0, 'reset clears buffer');
    is(scalar @blocks, 0, 'reset emitted no block');
    $st->push_row([$_]) for 1..3;
    $st->finish;
    is(scalar @blocks, 1, 'streamer reusable after reset, finish flushes the new batch');
}

# Double-finish is harmless.
{
    my @blocks;
    my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 10);
    $st->push_row([1]);
    $st->finish;
    is(scalar @blocks, 1, 'first finish emits the buffered batch');
    $st->finish;
    is(scalar @blocks, 1, 'second finish on empty streamer: no extra block');
}

# push_row after finish is allowed; behaves like a fresh streaming session.
{
    my @blocks;
    my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 10);
    $st->push_row([1]); $st->finish;
    $st->push_row([2]); $st->finish;
    is(scalar @blocks, 2, 'push_row after finish reopens the streamer');
}

# Encoder dropped before streamer: streamer keeps the encoder alive
# (XS holds a refcount) and finish still works.
{
    my @blocks;
    my $st;
    {
        my $enc = make_enc();
        $st = $enc->streamer(sub { push @blocks, $_[0] }, batch_size => 5);
    }   # $enc drops out of scope here
    $st->push_row([$_]) for 1..3;
    $st->finish;
    is(scalar @blocks, 1, 'encoder dropped: streamer still flushes a block');
}

# streamer(compress => 'lz4'): each emitted batch goes through
# compress_native_block before reaching the writer.
SKIP: {
    eval { require Compress::LZ4; 1 }
        or skip 'Compress::LZ4 not installed', 5;
    my $enc = ClickHouse::Encoder->new(columns => [['x','Int32']]);
    my @emitted;
    my $st = $enc->streamer(sub { push @emitted, $_[0] },
        batch_size => 3, compress => 'lz4');
    $st->push_row([$_]) for 1..7;
    $st->finish;
    is(scalar @emitted, 3, 'three compressed batches emitted');

    # Each emitted blob must be a compressed-block-framed payload:
    # method tag 0x82 at offset 16. Round-trip via decompress_native_block
    # and verify the inner Native bytes decode to the expected rows.
    my @all_ids;
    for my $i (0..$#emitted) {
        my $tag = ord substr($emitted[$i], 16, 1);
        is($tag, 0x82, "emitted block $i carries LZ4 method tag");
        my $plain = ClickHouse::Encoder->decompress_native_block($emitted[$i]);
        my $blk   = ClickHouse::Encoder->decode_block($plain);
        push @all_ids, @{ $blk->{columns}[0]{values} };
    }
    is_deeply(\@all_ids, [1..7], 'compressed streamer: ids round-trip in order');
}

# streamer(compress => 'none') is the same as no compress option:
# the writer sees raw Native bytes (no compressed-block framing).
{
    my $enc = ClickHouse::Encoder->new(columns => [['x','Int32']]);
    my @emitted;
    my $st = $enc->streamer(sub { push @emitted, $_[0] },
        batch_size => 2, compress => 'none');
    $st->push_row([10]);
    $st->push_row([20]);
    $st->finish;
    is(scalar @emitted, 1, 'compress=none: one batch emitted');



( run in 0.686 second using v1.01-cache-2.11-cpan-140bd7fdf52 )