Avro

 view release on metacpan or  search on metacpan

lib/Avro/DataFileReader.pm  view on Meta::CPAN

        while ($count--) {
            Avro::BinaryDecoder->skip($writer_schema, $datafile->reader);
            $datafile->{object_count}--;
        }
    }
}

sub read_block_header {
    my $datafile = shift;
    my $fh = $datafile->{fh};
    my $codec = $datafile->codec;

    $datafile->header unless $datafile->{_header};

    $datafile->{object_count} = Avro::BinaryDecoder->decode_long(
        undef, undef, $fh,
    );
    $datafile->{block_size} = Avro::BinaryDecoder->decode_long(
        undef, undef, $fh,
    );
    $datafile->{block_start} = tell $fh;

    return if $codec eq 'null';

    ## we need to read the entire block into memory, to inflate it
    my $nread = read $fh, my $block, $datafile->{block_size} + MARKER_SIZE
        or croak "Error reading from file: $!";

    ## remove the marker
    my $marker = substr $block, -(MARKER_SIZE), MARKER_SIZE, '';
    $datafile->{block_marker} = $marker;

    ## this is our new reader
    $datafile->{reader} = do {
        if ($codec eq 'deflate') {
            IO::Uncompress::RawInflate->new(\$block);
        }
        elsif ($codec eq 'bzip2') {
            my $uncompressed;
            bunzip2 \$block => \$uncompressed;
            do { open $fh, '<', \$uncompressed; $fh };
        }
        elsif ($codec eq 'zstandard') {
            do { open $fh, '<', \(decompress(\$block)); $fh };
        }
    };

    return;
}

sub verify_marker {
    my $datafile = shift;

    my $marker = $datafile->{block_marker};
    unless (defined $marker) {
        ## we are in the fh case
        read $datafile->{fh}, $marker, MARKER_SIZE;
    }

    unless (($marker || "") eq $datafile->sync_marker) {
        croak "Oops synchronization issue (marker mismatch)";
    }
    return;
}

sub skip_to_block_end {
    my $datafile = shift;

    if (my $reader = $datafile->{reader}) {
        seek $reader, 0, Fcntl->SEEK_END;
        return;
    }

    my $remaining_size = $datafile->{block_size}
                       + $datafile->{block_start}
                       - tell $datafile->{fh};

    seek $datafile->{fh}, $remaining_size, 0;
    $datafile->verify_marker; ## will do a read
    return 1;
}

sub read_to_block_end {
    my $datafile = shift;

    my $reader = $datafile->reader;
    my @objs = $datafile->read_within_block( $datafile->{object_count} );
    $datafile->verify_marker;
    return @objs;
}

sub reader {
    my $datafile = shift;
    return $datafile->{reader} || $datafile->{fh};
}

## end of block
sub eob {
    my $datafile = shift;

    return 1 if $datafile->eof;

    if ($datafile->{reader}) {
        return 1 if $datafile->{reader}->eof;
    }
    else {
        my $pos = tell $datafile->{fh};
        return 1 unless $datafile->{block_start};
        return 1 if $pos >= $datafile->{block_start} + $datafile->{block_size};
    }
    return 0;
}

sub eof {
    my $datafile = shift;
    if ($datafile->{reader}) {
        return 0 unless $datafile->{reader}->eof;
    }
    return 1 if $datafile->{fh}->eof;
    return 0;
}



( run in 0.864 second using v1.01-cache-2.11-cpan-39bf76dae61 )