ClickHouse-Encoder
view release on metacpan or search on metacpan
t/streamer-edge.t view on Meta::CPAN
my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 5);
$st->push_row([$_]) for 1..6;
is(scalar @blocks, 1, '6 rows / batch=5 => 1 auto block (leftover buffered)');
is($st->buffered_count, 1, 'buffered_count = 1 leftover');
ok(!$st->is_empty, 'is_empty false with leftover');
$st->finish;
is(scalar @blocks, 2, 'finish flushes the leftover block');
is($st->buffered_count, 0, 'buffered_count = 0 after finish');
ok($st->is_empty, 'is_empty true after finish');
}
# Writer croaks mid-batch: streamer state is consistent (empty buffer).
# A subsequent push_row must not replay the failed batch.
{
my $emit = 0;
my $writer = sub {
$emit++;
die "writer failed" if $emit == 1;
};
my @ok_blocks;
my $st = make_enc()->streamer($writer, batch_size => 2);
eval { $st->push_row([$_]) for 1..3 };
like($@, qr/writer failed/, 'writer croak propagated');
is($st->buffered_count, 0,
'streamer recovers after writer croak: empty buffer (not replaying)')
or diag "buffered=" . $st->buffered_count;
# Replace the writer is not supported (writer is bound at streamer
# creation), so we have to either retry from a new streamer or accept
# the error. The contract: subsequent pushes proceed cleanly.
my $st2 = make_enc()->streamer(sub { push @ok_blocks, $_[0] }, batch_size => 2);
$st2->push_row([$_]) for 1..2;
is(scalar @ok_blocks, 1, 'fresh streamer post-error: works as normal');
}
# reset() discards buffered rows without flushing.
{
my @blocks;
my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 100);
$st->push_row([$_]) for 1..5;
is($st->buffered_count, 5, '5 rows buffered before reset');
$st->reset;
is($st->buffered_count, 0, 'reset clears buffer');
is(scalar @blocks, 0, 'reset emitted no block');
$st->push_row([$_]) for 1..3;
$st->finish;
is(scalar @blocks, 1, 'streamer reusable after reset, finish flushes the new batch');
}
# Double-finish is harmless.
{
my @blocks;
my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 10);
$st->push_row([1]);
$st->finish;
is(scalar @blocks, 1, 'first finish emits the buffered batch');
$st->finish;
is(scalar @blocks, 1, 'second finish on empty streamer: no extra block');
}
# push_row after finish is allowed; behaves like a fresh streaming session.
{
my @blocks;
my $st = make_enc()->streamer(sub { push @blocks, $_[0] }, batch_size => 10);
$st->push_row([1]); $st->finish;
$st->push_row([2]); $st->finish;
is(scalar @blocks, 2, 'push_row after finish reopens the streamer');
}
# Encoder dropped before streamer: streamer keeps the encoder alive
# (XS holds a refcount) and finish still works.
{
my @blocks;
my $st;
{
my $enc = make_enc();
$st = $enc->streamer(sub { push @blocks, $_[0] }, batch_size => 5);
} # $enc drops out of scope here
$st->push_row([$_]) for 1..3;
$st->finish;
is(scalar @blocks, 1, 'encoder dropped: streamer still flushes a block');
}
# streamer(compress => 'lz4'): each emitted batch goes through
# compress_native_block before reaching the writer.
SKIP: {
eval { require Compress::LZ4; 1 }
or skip 'Compress::LZ4 not installed', 5;
my $enc = ClickHouse::Encoder->new(columns => [['x','Int32']]);
my @emitted;
my $st = $enc->streamer(sub { push @emitted, $_[0] },
batch_size => 3, compress => 'lz4');
$st->push_row([$_]) for 1..7;
$st->finish;
is(scalar @emitted, 3, 'three compressed batches emitted');
# Each emitted blob must be a compressed-block-framed payload:
# method tag 0x82 at offset 16. Round-trip via decompress_native_block
# and verify the inner Native bytes decode to the expected rows.
my @all_ids;
for my $i (0..$#emitted) {
my $tag = ord substr($emitted[$i], 16, 1);
is($tag, 0x82, "emitted block $i carries LZ4 method tag");
my $plain = ClickHouse::Encoder->decompress_native_block($emitted[$i]);
my $blk = ClickHouse::Encoder->decode_block($plain);
push @all_ids, @{ $blk->{columns}[0]{values} };
}
is_deeply(\@all_ids, [1..7], 'compressed streamer: ids round-trip in order');
}
# streamer(compress => 'none') is the same as no compress option:
# the writer sees raw Native bytes (no compressed-block framing).
{
my $enc = ClickHouse::Encoder->new(columns => [['x','Int32']]);
my @emitted;
my $st = $enc->streamer(sub { push @emitted, $_[0] },
batch_size => 2, compress => 'none');
$st->push_row([10]);
$st->push_row([20]);
$st->finish;
is(scalar @emitted, 1, 'compress=none: one batch emitted');
( run in 0.686 second using v1.01-cache-2.11-cpan-140bd7fdf52 )