AC-Yenta

 view release on metacpan or  search on metacpan

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

use AC::Cache;
use AC::Import;
use Digest::SHA 'sha1_base64';
use strict;

our @EXPORT = qw(encode_version decode_version encode_shard decode_shard);

my $CACHESIZE = 256;

sub merkle_init {
    my $me = shift;

    $me->{merkle_cache} = AC::Cache->new( $CACHESIZE );
}

my($cachechk, $cachemiss, $cacheT);

sub _mcget {
    my $me = shift;
    my $mk = shift;

    $cachechk++;
    my $d = $me->{merkle_cache}->fetch( $mk );
    return $d if $d;
    $cachemiss ++;

    my $db = $me->{db};
    return $db->get($me->{name}, 'merkle', $mk);
}

sub _mcput {
    my $me = shift;
    my $mk = shift;
    my $d  = shift;

    my $db = $me->{db};
    $db->put($me->{name}, 'merkle', $mk, $d);
    $me->{merkle_cache}->store( $mk, $d );
}

sub _mcdel {
    my $me = shift;
    my $mk = shift;

    my $db = $me->{db};
    $db->del($me->{name}, 'merkle', $mk);
    $me->{merkle_cache}->remove( $mk );

}

sub get_merkle {
    my $me    = shift;
    my $shard = shift;
    my $ver   = shift;
    my $lev   = shift;

    return if $lev > $me->{merkle_height};

    my $db = $me->{db};
    my $mk = $me->_mkey(encode_shard($shard), encode_version($ver), $lev);
    debug("getting merkle for $mk");

    my $d = $me->_mcget( $mk );
    return unless $d;
    my @d = split /\0/, $d;

    my @res;

    if( $^T - $cacheT > 60 ){
        debug("merk cache stats: check: $cachechk, miss: $cachemiss") if $cachechk > 1;
        $cacheT = $^T;
    }

    if( $lev == $me->{merkle_height} ){
        # data is: lkey, ...
        for my $r (@d){
            my($s,$v,$k) = $me->_decode_lkey($r);
            push @res, { version => decode_version($v), key => $k, count => 1, shard => decode_shard($s) };
        }
    }else{
        # data is: mkey => hash count, ...
        my %d = @d;
        for my $lv (keys %d){
            my($l, $s, $v) = $me->_decode_mkey( $lv );
            my($h,$c)  = split /\s/, $d{$lv};
            push @res, { version => decode_version($v), level => hex($l), hash => $h, count => $c, shard => decode_shard($s) };
        }
    }

    return \@res;
}

################################################################
# we maintain a 16-ary merkle tree of all of the <key,version>s we have stored

sub merkle {
    my $me    = shift;
    my $add   = shift;
    my @del   = @_;

    # update leaf nodes
    my %todo;
    for my $rm (@del){
        my($ns,$nv,$h,$c) = $me->_merkle_leaf_del(encode_shard($rm->{shard}), $rm->{key}, encode_version($rm->{version}));
        $todo{"$ns $nv"} = { ver => $nv, hash => $h, count => $c, shard => $ns };
    }
    if( defined $add ){
        my($ns,$nv,$h,$c) = $me->_merkle_leaf_add(encode_shard($add->{shard}), $add->{key}, encode_version($add->{version}));
        $todo{"$ns $nv"} = { ver => $nv, hash => $h, count => $c, shard => $ns };
    }

    # update upper levels
    my $level = $me->{merkle_height};

    while($level >= 0 && keys %todo){
        my %next = %todo;
        %todo = ();

        for my $hd (values %next){
            # update level - 1 with hash
            # put level-1 hash into todo
            my($ns, $nv, $h, $c) = $me->_merkle_update( $hd->{shard}, $level, $hd->{ver}, $hd->{hash}, $hd->{count} );
            $todo{"$ns $nv"} = { ver => $nv, hash => $h, count => $c, shard => $ns } if defined $nv;
        }
        $level --;
    }
}

# non-leaf node:
#  list of (ver => hash+count)
#  of up to 16 next-level-down vers
#  \0 delimited, sorted by ver

# update merkle node
sub _merkle_update {
    my $me    = shift;
    my $shard = shift;
    my $lev   = shift;
    my $ver   = shift;
    my $hash  = shift;
    my $count = shift;

    my $db = $me->{db};

    my $k0 = $me->_mkey($shard, $ver, $lev);
    my $k1 = $me->_mkey($shard, $ver, $lev - 1);

    my(undef, $nextshard, $nextver) = $me->_decode_mkey($k1);

    unless( $lev ){
        # root hash - not used
        debug("updating merkle node root => $hash");
        $me->_mcput( 'root', $hash );
        return;
    }

    # get node
    my $d = $me->_mcget( $k1 );
    my $oldh = sha1_base64($d);
    my %d = split /\0/, $d;

    if($hash){
        # add/update
        debug("updating merkle node $k1 + { $k0 => $hash, $count }");
        $d{$k0} = "$hash $count";
    }else{
        # remove
        debug("updating merkle node $k1 - { $k0 => empty }");
        delete $d{$k0};
    }

    if( keys %d ){

        $d = join("\0", map {"$_\0$d{$_}"} (sort keys %d));
        $me->_mcput( $k1, $d );
        my $newh = sha1_base64($d);
        return if $newh eq $oldh;	# unchanged
        return ($nextshard, $nextver, $newh, scalar keys %d);
    }else{
        $me->_mcdel( $k1 );
        return unless $oldh;		# unchanged
        return ($nextshard, $nextver, undef);
    }
}

# leaf nodes:
#   list of all "ver/key"
#   \0 delimited. sorted by "ver/key"

# add new <key,version> to merkle leaf
sub _merkle_leaf_add {
    my $me    = shift;
    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;

    my $db = $me->{db};
    my $mk = $me->_mkey($shard, $ver, $me->{merkle_height});
    my $vk = $me->_lkey($key, $ver, $shard);

    debug("adding to merkle leaf $mk - $vk");

    # get current data
    my $d = $me->_mcget( $mk );
    my @d = split /\0/, $d;
    # append new item + uniqify
    my %d;
    @d{@d} = ();
    $d{$vk}   = undef;
    $d = join("\0", sort keys %d);
    $me->_mcput( $mk, $d );

    my $hash = sha1_base64($d);
    return ($shard, $ver, $hash, scalar keys %d);
}

# remove <key,version> from merkle leaf
sub _merkle_leaf_del {
    my $me    = shift;
    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;

    my $db = $me->{db};
    my $mk = $me->_mkey($shard, $ver, $me->{merkle_height});
    my $vk = $me->_lkey($key, $ver, $shard);

    debug("removing from merkle leaf $mk - $vk");

    # get current data
    my $d = $me->_mcget( $mk );
    my @d = split /\0/, $d;
    # remove item
    @d = grep { $vk ne $_ } @d;

    if( @d ){
        $d = join("\0", @d);
        $me->_mcput( $mk, $d );

        my $hash = sha1_base64($d);
        return ($shard, $ver, $hash, scalar @d);
    }else{
        $me->_mcdel( $mk );
        # empty node => empty hash
        return ($shard, $ver, undef);
    }
}

################################################################

sub _get_actual_keys {
    my $me    = shift;
    my $shard = shift;
    my $ver   = shift;

    my $db = $me->{db};

    # get range on data
    my @key = map {
        my $k = $_->{k}; $k =~ s|.*/||; $k
    } $db->range($me->{name}, 'data', encode_version($ver), encode_version($ver + 1));
    debug("actual key: @key");

    return @key unless defined $shard;

    # get vers list to filter on shard
    my $sh = encode_shard($shard);
    return grep {
        my $k = $_;

        my $vl = $db->get($me->{name}, 'vers', $k);
        my($s) = $vl =~ /;\s*(.*)/;

        $s == $sh;
    } @key;
}

# check merkle leaf node against actual data
sub merkle_scrub {
    my $me    = shift;
    my $shard = shift;
    my $ver   = shift;

    debug("scrub $me->{name} $shard/$ver");

    # get list of keys from merkle leaf node
    my $mlist = $me->get_merkle($shard, $ver, $me->{merkle_height}) || [];
    my @mkey  = map { $_->{key} } @$mlist;
    my %mkey;
    @mkey{@mkey} = @mkey;

    # get list of keys from actual data
    my @akey  = $me->_get_actual_keys( $shard, $ver );

    # compare lists

    for my $k (@akey){
        next if $mkey{$k};
        debug("missing key in merkle tree: $shard/$ver/$k");
        $me->merkle( { key => $k, shard => $shard, version => $ver } );
    }
}


################################################################

sub _mkey {
    my $me    = shift;
    my $shard = shift;
    my $ver   = shift;
    my $lev   = shift;

    # 10/000484D6594DB72B
    sprintf '%02X/%s', $lev, $me->_ver_lev($ver, $lev);
}

sub _decode_mkey {
    my $me = shift;
    my $mk = shift;

    # 10/000484D6594DB72B
    my($l,$sv) = split m|/|, $mk, 2;
    # level, shard, version
    return ($l, undef, $sv);
}

sub _lkey {
    my $me = shift;
    my $k  = shift;
    my $v  = shift;
    my $s  = shift;

    # 000484D6594DB72B/foobar
    return "$v/$k";
}

sub _decode_lkey {
    my $me = shift;
    my $lk = shift;

    # 000484D6594DB72B/foobar
    my($sv, $k) = split m|/|, $lk, 2;
    # shard, version, key
    return (undef, $sv, $k);
}

sub _ver_lev {
    my $me  = shift;
    my $ver = shift;
    my $lev = shift;

    return substr($ver, 0, $lev) . ('0' x ($me->{merkle_height} - $lev));
}

################################################################

sub encode_version {
    my $v = shift;
    return x64_number_to_hex($v);



( run in 1.899 second using v1.01-cache-2.11-cpan-39bf76dae61 )