AC-Yenta
view release on metacpan or search on metacpan
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-30 19:21 (EDT)
# Function: storage maps
#
# $Id$
package AC::Yenta::Store::Map;
use AC::Yenta::Store::File;
use AC::Yenta::Store::Merkle;
use AC::Yenta::Debug 'map';
use AC::Yenta::Conf;
use AC::Cache;
use strict;
our @ISA = 'AC::Yenta::Store::Merkle';
my $DEFAULT = 'bdb';
my %BACKEND ;
my $CACHESIZE = 1024; # gives us ~90% cache hit rate
sub add_backend {
my $class = shift;
my $name = shift;
my $impl = shift;
$BACKEND{$name} = $impl;
}
sub new {
my $class = shift;
my $name = shift;
my $bkend = shift;
my $conf = shift;
unless( $bkend ){
# from extension, or default
my($ext) = $conf->{dbfile} =~ /\.(.+)$/;
$bkend = $ext if $BACKEND{$ext};
}
my $c = $BACKEND{$bkend || $DEFAULT};
unless( $c ){
problem("invalid storage backend: $bkend - ignoring map");
return ;
}
debug("configuring map $name with $c");
my $db = $c->new( $name, $conf );
my $fs = AC::Yenta::Store::File->new( $name, $conf );
my $me = bless {
name => $name,
conf => $conf,
db => $db,
fs => $fs,
merkle_height => 16,
vers_cache => AC::Cache->new( $CACHESIZE ),
}, $class;
$me->merkle_init();
return $me;
}
my($cachechk, $cachemiss, $cacheT);
sub _versget {
my $me = shift;
my $key = shift;
$cachechk ++;
my $d = $me->{vers_cache}->fetch( $key );
return @$d if $d;
$cachemiss ++;
my $db = $me->{db};
my($versions, $foundver) = $db->get($me->{name}, 'vers', $key);
my @d = split /\s+/, $versions;
$me->{vers_cache}->store( $key, \@d );
return @d;
}
sub _versput {
my $me = shift;
my $key = shift;
my $db = $me->{db};
$db->put($me->{name}, 'vers', $key, join(' ', @_));
$me->{vers_cache}->store( $key, \@_ );
}
sub _versdel {
my $me = shift;
my $key = shift;
$me->{vers_cache}->remove( $key );
}
sub get {
my $me = shift;
my $key = shift;
my $ver = shift;
my $db = $me->{db};
my @versions = $me->_versget( $key );
return unless @versions;
debug("found ver: @versions");
if( $ver ){
$ver = encode_version($ver);
return unless grep { $_ eq $ver } @versions;
}else{
$ver = $versions[0];
}
my $vk = $me->vkey($key, $ver);
my $extver = decode_version($ver);
my($data, $founddat) = $db->get($me->{name}, 'data', $vk);
if( wantarray ){
if( $founddat ){
my $meta = $db->get($me->{name}, 'meta', $vk);
my $file = $me->{fs}->get($data) if $data;
return( $data, $extver, $file, $meta );
}else{
# we don't have data, but we have it in history; fake it.
return (undef, $extver, undef, undef);
}
}
return $data;
}
# someone sent me something, do I want it?
sub want {
my $me = shift;
my $shard = shift;
my $key = shift;
my $ver = shift;
my $cf = $me->{conf};
my $db = $me->{db};
my $v = encode_version($ver);
# data belongs here?
return if $me->is_sharded() && !$me->is_my_shard($shard);
my @versions = $me->_versget( $key );
if( $^T - $cacheT > 60 ){
debug("cache stats: check: $cachechk, miss: $cachemiss") if $cachechk > 1;
$cacheT = $^T;
}
# I have it?
return if grep { $_ eq $v } @versions;
# expired?
return if $cf->{expire} && ($ver < timet_to_yenta_version($^T - $cf->{expire}));
# I want everything?
return 1 unless $cf->{history};
# I have room for it?
return 1 if @versions < $cf->{history};
# I can make room for it?
return 1 if $v gt $versions[-1];
# I'll just throw it away.
return;
}
sub put {
my $me = shift;
my $shard = shift;
my $key = shift;
my $ver = shift;
my $data = shift;
my $file = shift; # reference
my $meta = shift;
my $cf = $me->{conf};
my $db = $me->{db};
my $v = encode_version($ver);
my $vk = $me->vkey($key, $v);
debug("storing $vk");
# get version history
my @deletehist;
my %deletedata;
my @versions = $me->_versget( $key );
return if grep { $_ eq $v } @versions; # dupe!
# is this the newest version? should we save this data?
if( !@versions || ($v gt $versions[0]) || $cf->{keepold} ){
# save file; data is filename
if( $file ){
my $r = $me->{fs}->put($data, $file);
return unless $r;
}
# put meta + data
$db->put($me->{name}, 'meta', $vk, $meta) if length $meta;
$db->put($me->{name}, 'data', $vk, $data);
unless( $cf->{keepold} ){
# unless we are keeping old data, remove previous version
$deletedata{$versions[0]} = 1 if @versions;
}
}
# add new version to list. newest 1st
@versions = sort {$b cmp $a} (@versions, $v);
if( $cf->{history} && @versions > $cf->{history} ){
# trim list
my @rm = splice @versions, $cf->{history}, @versions, ();
push @deletehist, (map { ({version => decode_version($_), key => $key, shard => $shard}) } @rm);
$deletedata{$_} = 1 for @_;
}
if( $me->is_sharded() ){
# QQQ - shard changed?
$db->put($me->{name}, 'shard', $key, encode_shard($shard || 0));
}
my $dd = join(' ', map { $_->{version} } @deletehist);
debug("version list: @versions [delete: $dd]");
$me->_versput( $key, @versions );
# update merkles
$me->merkle( { shard => $shard, key => $key, version => $ver }, @deletehist);
# delete old data
for my $rm (keys %deletedata){
debug("removing old version $key/$rm");
my $rmvk = $me->vkey($key, $rm);
$db->del($me->{name}, 'data', $rmvk);
$db->del($me->{name}, 'meta', $rmvk);
}
$db->sync();
return 1;
}
sub remove {
my $me = shift;
my $key = shift;
my $ver = shift;
my $shard = $me->_remove( $key, $ver );
$me->merkle( undef, { shard => decode_shard($shard), key => $key, version => $ver } );
$me->{db}->sync();
return 1;
}
# NB: does not update merkle tree
sub _remove {
my $me = shift;
my $key = shift;
my $ver = shift;
my $db = $me->{db};
my $v = encode_version($ver);
my $cshard = $db->get($me->{name}, 'shard', $key);
my @versions = grep { $_ ne $v } $me->_versget( $key );
debug("new ver list: @versions");
if( @versions ){
$me->_versput( $key, @versions );
}else{
$db->del($me->{name}, 'vers', $key);
$db->del($me->{name}, 'shard', $key);
$me->_versdel( $key );
}
my $vk = $me->vkey($key, $ver);
$db->del($me->{name}, 'data', $vk);
$db->del($me->{name}, 'meta', $vk);
return $cshard;
}
################################################################
sub range {
my $me = shift;
my $start = shift;
my $end = shift;
my $db = $me->{db};
return $db->range($me->{name}, 'vers', $start, $end);
}
################################################################
sub get_internal {
my $me = shift;
my $key = shift;
my($d, $found) = $me->{db}->get($me->{name}, 'internal', $key);
return $d;
}
sub set_internal {
my $me = shift;
my $key = shift;
my $val = shift;
$me->{db}->put($me->{name}, 'internal', $key, $val);
}
################################################################
sub expire {
my $me = shift;
my $expire = shift;
debug("expiring $me->{name}");
my $db = $me->{db};
# walk merkle tree, find all k/v to remove
my @delete;
my @walk = { level => 0, version => 0, shard => 0 };
while(@walk){
my @next;
for my $node (@walk){
my $res = $me->get_merkle( $node->{shard}, $node->{version}, $node->{level} );
for my $r (@$res){
next if $r->{version} > $expire;
if( $r->{key} ){
push @delete, { key => $r->{key}, version => $r->{version}, shard => $r->{shard} };
}else{
push @next, $r;
}
}
}
@walk = @next;
}
# remove k/v
for my $r (@delete){
debug("expiring $r->{key}/$r->{version}");
$me->_remove( $r->{key}, $r->{version} );
}
# update merkle
$me->merkle(undef, @delete);
$db->sync();
}
################################################################
sub vkey {
my $me = shift;
my $k = shift;
my $v = shift;
return "$v/$k";
}
################################################################
sub is_sharded {
return 0;
}
sub is_my_shard {
return 1;
}
1;
=head1 NAME
AC::Yenta::Store::Map - persistent storage for yenta maps
=head1 SYNOPSIS
your code:
AC::Yenta::Store::Map->add_backend( postgres => 'Local::Yenta::Postgres' );
your config:
map mappyfoo {
backend postgres
# ...
}
=cut
( run in 0.864 second using v1.01-cache-2.11-cpan-39bf76dae61 )