AtteanX-Store-LMDB

 view release on metacpan or  search on metacpan

lib/AtteanX/Store/LMDB.pm  view on Meta::CPAN

=head1 NAME

AtteanX::Store::LMDB - LMDB-based RDF store

=head1 VERSION

This document describes AtteanX::Store::LMDB version 0.001

=head1 SYNOPSIS

 use AtteanX::Store::LMDB;

=head1 DESCRIPTION

AtteanX::Store::LMDB provides a persistent quad-store based on LMDB.

=cut

use v5.14;
use warnings;

package AtteanX::Store::LMDB {
our $VERSION	= '0.001';
use Moo;
use Type::Tiny::Role;
use Types::Standard qw(Bool Str InstanceOf HashRef);
use LMDB_File qw(:flags :cursor_op);
use Digest::SHA qw(sha256 sha256_hex);
use Scalar::Util qw(refaddr reftype blessed);
use Math::Cartesian::Product;
use List::Util qw(any all first);
use File::Path qw(make_path);
use DateTime::Format::W3CDTF;
use Devel::Peek;
use Encode qw(encode_utf8 decode_utf8);
use namespace::clean;

with 'Attean::API::QuadStore';
with 'Attean::API::MutableQuadStore';

=head1 METHODS

Beyond the methods documented below, this class inherits methods from the
L<Attean::API::QuadStore> class.

=over 4

=item C<< new () >>

Returns a new LMDB-backed store object.

=cut

has initialize => (is => 'ro', isa => Bool, default => 0);
has filename => (is => 'ro', isa => Str, required => 1);
has env		=> (is => 'rw', isa => InstanceOf['LMDB::Env']);
has indexes	=> (is => 'rw', isa => HashRef, default => sub { +{} });

sub BUILD {
	my $self	= shift;
	my $file	= $self->filename;
	
	unless (-d $file) {
		make_path($file);
	}
	my $env = LMDB::Env->new($file, {
		mapsize => 100 * 1024 * 1024 * 1024, # Plenty space, don't worry
		maxdbs => 20, # Some databases
		mode   => 0640,
	});
	$self->env($env);
	if ($self->initialize) {
		my $txn		= $self->env->BeginTxn();
		my %databases;
		foreach my $name (qw(quads stats fullIndexes term_to_id id_to_term graphs prefixes)) {
			$databases{$name}	= $txn->OpenDB({ dbname => $name, flags => MDB_CREATE });
		}
		
		my $f		= DateTime::Format::W3CDTF->new();
		my $stats	= $databases{'stats'};
		$stats->put("Diomede-Version", '0.0.13');
		$stats->put("Last-Modified", $f->format_datetime(DateTime->now()));
		foreach my $key (qw(next_unassigned_term_id next_unassigned_quad_id)) {
			$stats->put($key, pack('Q>', 1));
		}
		my $indexes	= $databases{'fullIndexes'};
		my %positions = ('s' => 0, 'p' => 1, 'o' => 2, 'g' => 3);
		foreach my $key (qw(spog pogs gops)) {
			my @pos	= map { $positions{$_} } split(//, $key);
			$txn->OpenDB({ dbname => $key, flags => MDB_CREATE });
			$indexes->put($key, pack('Q>4', @pos));
		}
		
		$txn->commit();
	}
	
	my $txn		= $self->env->BeginTxn(MDB_RDONLY);
	my $indexes	= $txn->OpenDB({ dbname => 'fullIndexes' });
	$self->iterate_database($indexes, sub {
		my ($key, $value)	= @_;
		my @order	= unpack('Q>4', $value);
		$self->indexes->{$key}	= \@order;
	});
}

sub iterate_database {
	my $self	= shift;
	my $db		= shift;
	my $handler	= shift;
	my $cursor	= $db->Cursor;
	eval {
		local($LMDB_File::die_on_err)	= 0;
		my ($key, $value);
		unless ($cursor->get($key, $value, MDB_FIRST)) {
			while (1) {
				$handler->($key, $value);
				last if $cursor->get($key, $value, MDB_NEXT);
			}
		}
	};
}

sub iterate_database_range {
	my $self	= shift;
	my $db		= shift;
	my $from	= shift;
	my $to		= shift;
	my $handler	= shift;
	my $cursor	= $db->Cursor;
	eval {
		local($LMDB_File::die_on_err)	= 0;
		my $key	= $from;
		my $value;
		unless ($cursor->get($key, $value, MDB_SET_RANGE)) {
			while (1) {
				use bytes;
				my $c = $key cmp $to;
				last if ($c >= 0);
				$handler->($key, $value);
				last if $cursor->get($key, $value, MDB_NEXT);
			}
		}
	};
}

=item C<< size >>

Returns the number of quads in the store.

=cut

sub size {
	my $self	= shift;
	my $txn		= $self->env->BeginTxn(MDB_RDONLY);
	my $quads	= $txn->OpenDB({ dbname => 'quads', });
	my $stat	= $quads->stat // {};
	return $stat->{'entries'} // 0;
}

=item C<< get_quads ( $subject, $predicate, $object, $graph ) >>

Returns a stream object of all statements matching the specified subject,
predicate, object, and graph. Any of the arguments may be undef to match any value.

=cut

sub get_quads {
	my $self	= shift;
	my $needs_cartesian	= any { ref($_) eq 'ARRAY' } @_;
	if ($needs_cartesian) {
		my @nodes	= map { ref($_) eq 'ARRAY' ? $_ : [$_] } @_;
		my @iters;
		cartesian { push(@iters, $self->_get_quads(@_)) } @nodes;
		if (scalar(@iters) == 1) {
			return shift(@iters);
		} else {
			return Attean::IteratorSequence->new( iterators => \@iters, item_type => 'Attean::API::Quad' );
		}
	} else {
		return $self->_get_quads(@_);
	}
}

sub _encode_term {
	my $self	= shift;
	my $term	= shift;
	my $value	= $term->value;
	if ($term->isa('Attean::IRI')) {
		return encode_utf8('I"' . $value);
	} elsif ($term->isa('Attean::Literal')) {
		if (my $lang = $term->language) {
				return encode_utf8('L' . $lang . '"' . $value);
		} elsif (my $dt = $term->datatype) {
			if ($dt->value eq 'http://www.w3.org/2001/XMLSchema#string') {
				return encode_utf8('S"' . $value);
			} else {
				return encode_utf8('D' . $dt->value . '"' . $value);
			}
		} else {
			return encode_utf8('S"' . $value);

lib/AtteanX/Store/LMDB.pm  view on Meta::CPAN

	my $txn		= shift;
	my $qp		= shift;
	
	my $exists	= $self->_get_quads($qp->values);
	if (my $q = $exists->next) {
		return 1;
	}
	return 0;
}

sub _exists {
	my $self	= shift;
	my $qp		= shift;
	
	my $exists	= $self->get_quads($qp->values);
	if (my $q = $exists->next) {
		return 1;
	}
	return 0;
}

=item C<< add_quad ( $quad ) >>

Adds the specified C<$quad> to the underlying model.

=cut
	
	sub add_quad {
		my $self	= shift;
		my $st		= shift;
		
		if ($self->_exists($st)) {
			return;
		}
		
		my $txn		= $self->env->BeginTxn();
		my $stats	= $txn->OpenDB({ dbname => 'stats', });
		my $t2i		= $txn->OpenDB({ dbname => 'term_to_id', });
		my $i2t		= $txn->OpenDB({ dbname => 'id_to_term', });
		my $graphs	= $txn->OpenDB({ dbname => 'graphs', });

		my @ids		= map { $self->_get_or_create_term_id($_, $txn, $stats, $t2i, $i2t) } $st->values;
		if (any { not defined($_) } @ids) {
			return;
		}

		my $next_quad	= 'next_unassigned_quad_id';
		my ($next)		= unpack('Q>', $stats->get($next_quad));
# 		warn "next quad id: $next\n";
		my $qid_value	= $next++;

		my $qid			= pack('Q>', $qid_value);
		my $qids		= pack('Q>4', @ids);
		my $gid			= pack('Q>', $ids[3]);

		my $graphs_dbi	= $txn->open('graphs');
		my $quads_dbi	= $txn->open('quads');
		my $stats_dbi	= $txn->open('stats');
		$txn->put($quads_dbi, $qid, $qids);

		my $graphs_cursor = $graphs->Cursor;
		my $key	= $gid;
		my $empty	= '';
		eval {
			local($LMDB_File::die_on_err)	= 0;
			if (my $err = $graphs_cursor->get($key, $empty, MDB_SET_RANGE)) {
				$graphs_cursor->put($gid, $empty);
			} else {
				if ($key ne $gid) {
					$graphs_cursor->put($gid, $empty);
				}
			}
		};

		$self->_add_quad_to_indexes($qid, \@ids, $txn);
		$txn->put($stats_dbi, $next_quad, pack('Q>', $next));
		$txn->commit();
	}
	
	sub add_quad_with_txn {
		my $self	= shift;
		my $txn		= shift;
		my $st		= shift;
		
		if ($self->_exists_with_txn($txn, $st)) {
			return;
		}
		
		my $stats	= $txn->OpenDB({ dbname => 'stats', });
		my $t2i		= $txn->OpenDB({ dbname => 'term_to_id', });
		my $i2t		= $txn->OpenDB({ dbname => 'id_to_term', });

		my @ids		= map { $self->_get_or_create_term_id($_, $txn, $stats, $t2i, $i2t) } $st->values;
		if (any { not defined($_) } @ids) {
			return;
		}

		my $next_quad	= 'next_unassigned_quad_id';
		my ($next)		= unpack('Q>', $stats->get($next_quad));
# 		warn "next quad id: $next\n";
		my $qid_value	= $next++;

		my $qid			= pack('Q>', $qid_value);
		my $qids		= pack('Q>4', @ids);

		my $quads_dbi	= $txn->open('quads');
		my $stats_dbi	= $txn->open('stats');
		$txn->put($quads_dbi, $qid, $qids);
		$self->_add_quad_to_indexes($qid, \@ids, $txn);
		$txn->put($stats_dbi, $next_quad, pack('Q>', $next));
	}
	
	sub _add_quad_to_indexes {
		my $self	= shift;
		my $qid		= shift;
		my $ids		= shift;
		my @ids		= @$ids;
		my $txn		= shift;
		while (my ($name, $order) = each %{ $self->indexes }) {
			my $index	= $txn->open($name);
			my @index_ordered_ids	= @ids[@$order];
			my $qids	= pack('Q>4', @index_ordered_ids);
			$txn->put($index, $qids, $qid);
		}
	}
	
	sub _remove_quad_to_indexes {
		my $self	= shift;
		my $qid		= shift;
		my $ids		= shift;
		my @ids		= @$ids;
		my $txn		= shift;
		while (my ($name, $order) = each %{ $self->indexes }) {
			my $index	= $txn->OpenDB({ dbname => $name });
			my @index_ordered_ids	= @ids[@$order];
			my $qids		= pack('Q>4', @index_ordered_ids);
			$index->del($qids);
		}
	}

=item C<< remove_quad ( $statement ) >>

Removes the specified C<$statement> from the underlying model.

=cut

	sub remove_quad {
		my $self	= shift;
		my $st		= shift;
		my $txn		= $self->env->BeginTxn();
		my $t2i		= $txn->OpenDB({ dbname => 'term_to_id', });
		my $quads	= $txn->OpenDB({ dbname => 'quads', });
		my $graphs	= $txn->OpenDB({ dbname => 'graphs', });

		my @remove_ids		= map { $self->_get_term_id($_, $t2i) } $st->values;
		unless (scalar(@remove_ids) == 4) {
			return;
		}
		unless (all { defined($_) } @remove_ids) {
			return;
		}

		my $cursor	= $quads->Cursor;
		my ($key, $value);
		eval {
			local($LMDB_File::die_on_err)	= 0;
			unless ($cursor->get($key, $value, MDB_FIRST)) {
				QUAD: while (1) {
					my $qid		= unpack('Q>', $key);
					my (@ids)	= unpack('Q>4', $value);
					if ($ids[0] == $remove_ids[0] and $ids[1] == $remove_ids[1] and $ids[2] == $remove_ids[2] and $ids[3] == $remove_ids[3]) {
						my $g		= $ids[3];
						$self->_remove_quad_to_indexes($qid, \@ids, $txn);
						$cursor->del();
						
						unless ($self->_graph_id_exists_with_txn($txn, $quads, $t2i, $g)) {
							# no more quads with this graph, so delete it from the graphs table
							my $graphs_cursor = $graphs->Cursor;
							my $gid = pack('Q>', $g);
							my $key	= $gid;
							my $empty	= '';
							unless ($graphs_cursor->get($key, $empty, MDB_SET_RANGE)) {
								if ($gid eq $key) {
									$graphs_cursor->del();
								}
							}
						}

						$txn->commit();
						return;
					}
				} continue {
					last if $cursor->get($key, $value, MDB_NEXT);
				}
			}
		};
	}

=item C<< create_graph( $graph ) >>

This is a no-op function for the memory quad-store.

=cut

	sub create_graph {
		# no-op on a quad-store
	}

=item C<< drop_graph( $graph ) >>

Removes all quads with the given C<< $graph >>.

=cut

	sub drop_graph {
		my $self	= shift;
		return $self->clear_graph(@_);
	}

=item C<< clear_graph( $graph ) >>

Removes all quads with the given C<< $graph >>.

=cut

	sub clear_graph {
		my $self	= shift;
		my $graph	= shift;
		my $quads	= $self->get_quads(undef, undef, undef, $graph);
		while (my $q = $quads->next) {
			$self->remove_quad($q);
		}
	}

	sub add_iter {
		my $BULK_LOAD	= 1;

		my $self	= shift;
		my $iter	= shift;
		my $type	= $iter->item_type;
		die "Iterator type $type isn't quads" unless (Role::Tiny::does_role($type, 'Attean::API::Quad'));

		my ($txn, $stats, $t2i, $i2t, $quads_dbi, $stats_dbi, $graphs_dbi);
		if ($BULK_LOAD) {
			$txn		= $self->env->BeginTxn();
			$stats	= $txn->OpenDB({ dbname => 'stats', });
			$t2i		= $txn->OpenDB({ dbname => 'term_to_id', });
			$i2t		= $txn->OpenDB({ dbname => 'id_to_term', });
			$graphs_dbi	= $txn->OpenDB({ dbname => 'graphs', });
			$quads_dbi	= $txn->open('quads');
			$stats_dbi	= $txn->open('stats');
		}

		my $next_quad	= 'next_unassigned_quad_id';
		my ($next)		= unpack('Q>', $stats->get($next_quad));
		my %graphs;
		while (my $q = $iter->next) {
			if ($BULK_LOAD) {
				if ($self->_quad_exists_with_txn($txn, $quads_dbi, $t2i, $q->values)) {
					next;
				}
		
				my @ids		= map { $self->_get_or_create_term_id($_, $txn, $stats, $t2i, $i2t) } $q->values;
				if (any { not defined($_) } @ids) {
					return;
				}

		# 		warn "next quad id: $next\n";
				my $qid_value	= $next++;

				my $qid			= pack('Q>', $qid_value);
				my $qids		= pack('Q>4', @ids);
				my $g			= $ids[3];
				$graphs{$g}		= pack('Q>', $g);

				$txn->put($quads_dbi, $qid, $qids);
				$self->_add_quad_to_indexes($qid, \@ids, $txn);
			} else {
				$self->add_quad($q);
			}
		}
		if ($BULK_LOAD) {
			my $empty	= '';
			my $graphs_cursor = $graphs_dbi->Cursor;
			foreach my $gid (values %graphs) {
				my $key	= $gid;
				eval {
					local($LMDB_File::die_on_err)	= 0;
					if (my $err = $graphs_cursor->get($key, $empty, MDB_SET_RANGE)) {
						$graphs_cursor->put($gid, $empty);
					} else {
						if ($key ne $gid) {
							$graphs_cursor->put($gid, $empty);
						}
					}
				};
			}
			$txn->put($stats_dbi, $next_quad, pack('Q>', $next));
			$txn->commit();
		}
	}

	sub _quad_exists_with_txn {
		my $self	= shift;
		my $txn		= shift;
		my $quads	= shift;
		my $t2i		= shift;
		my @nodes	= @_;
# 		my $bound_data	= $self->_compute_bound(@nodes, $txn);
		my %bound;
		foreach my $pos (0 .. 3) {
			my $n	= $nodes[ $pos ];
			my $id	= $self->_get_term_id($n, $t2i);
			unless ($id) {
				# one of the bound terms in the pattern doesn't exist in the database,
				# so no quads will match.
				return 0;
			}
			$bound{ $pos }	= $id;
		}
		if (my $best = $self->_best_index(\%bound, $txn)) {
			my ($index, $score)	= @$best;
			my $order		= $self->indexes->{$index};
			my @positions	= @$order[0..$score-1];
			my @prefix		= map { $bound{$_} } @positions;
			my @lower		= @prefix;
			my @upper		= @prefix;
			$upper[-1]++;
			my $quadids	= $self->_get_ordered_matching_quads($txn, \%bound, $index, \@lower, \@upper);
			return scalar(@$quadids);
		}
		my $quadids	= $self->_get_unordered_matching_quads($quads, \%bound);
		return scalar(@$quadids);
	}


	sub _graph_id_exists_with_txn {
		my $self	= shift;
		my $txn		= shift;
		my $quads	= shift;
		my $t2i		= shift;
		my $gid		= shift;
		my %bound	= (3 => $gid);
		if (my $best = $self->_best_index(\%bound, $txn)) {
			my ($index, $score)	= @$best;
			my $order		= $self->indexes->{$index};
			my @positions	= @$order[0..$score-1];
			my @prefix		= map { $bound{$_} } @positions;
			my @lower		= @prefix;
			my @upper		= @prefix;
			$upper[-1]++;
			my $quadids	= $self->_get_ordered_matching_quads($txn, \%bound, $index, \@lower, \@upper);
			return scalar(@$quadids);



( run in 1.710 second using v1.01-cache-2.11-cpan-39bf76dae61 )