Avro
view release on metacpan or search on metacpan
lib/Avro/DataFileReader.pm view on Meta::CPAN
while ($count--) {
Avro::BinaryDecoder->skip($writer_schema, $datafile->reader);
$datafile->{object_count}--;
}
}
}
sub read_block_header {
my $datafile = shift;
my $fh = $datafile->{fh};
my $codec = $datafile->codec;
$datafile->header unless $datafile->{_header};
$datafile->{object_count} = Avro::BinaryDecoder->decode_long(
undef, undef, $fh,
);
$datafile->{block_size} = Avro::BinaryDecoder->decode_long(
undef, undef, $fh,
);
$datafile->{block_start} = tell $fh;
return if $codec eq 'null';
## we need to read the entire block into memory, to inflate it
my $nread = read $fh, my $block, $datafile->{block_size} + MARKER_SIZE
or croak "Error reading from file: $!";
## remove the marker
my $marker = substr $block, -(MARKER_SIZE), MARKER_SIZE, '';
$datafile->{block_marker} = $marker;
## this is our new reader
$datafile->{reader} = do {
if ($codec eq 'deflate') {
IO::Uncompress::RawInflate->new(\$block);
}
elsif ($codec eq 'bzip2') {
my $uncompressed;
bunzip2 \$block => \$uncompressed;
do { open $fh, '<', \$uncompressed; $fh };
}
elsif ($codec eq 'zstandard') {
do { open $fh, '<', \(decompress(\$block)); $fh };
}
};
return;
}
sub verify_marker {
my $datafile = shift;
my $marker = $datafile->{block_marker};
unless (defined $marker) {
## we are in the fh case
read $datafile->{fh}, $marker, MARKER_SIZE;
}
unless (($marker || "") eq $datafile->sync_marker) {
croak "Oops synchronization issue (marker mismatch)";
}
return;
}
sub skip_to_block_end {
my $datafile = shift;
if (my $reader = $datafile->{reader}) {
seek $reader, 0, Fcntl->SEEK_END;
return;
}
my $remaining_size = $datafile->{block_size}
+ $datafile->{block_start}
- tell $datafile->{fh};
seek $datafile->{fh}, $remaining_size, 0;
$datafile->verify_marker; ## will do a read
return 1;
}
sub read_to_block_end {
my $datafile = shift;
my $reader = $datafile->reader;
my @objs = $datafile->read_within_block( $datafile->{object_count} );
$datafile->verify_marker;
return @objs;
}
sub reader {
my $datafile = shift;
return $datafile->{reader} || $datafile->{fh};
}
## end of block
sub eob {
my $datafile = shift;
return 1 if $datafile->eof;
if ($datafile->{reader}) {
return 1 if $datafile->{reader}->eof;
}
else {
my $pos = tell $datafile->{fh};
return 1 unless $datafile->{block_start};
return 1 if $pos >= $datafile->{block_start} + $datafile->{block_size};
}
return 0;
}
sub eof {
my $datafile = shift;
if ($datafile->{reader}) {
return 0 unless $datafile->{reader}->eof;
}
return 1 if $datafile->{fh}->eof;
return 0;
}
( run in 0.864 second using v1.01-cache-2.11-cpan-39bf76dae61 )