AtteanX-Store-LMDB

 view release on metacpan or  search on metacpan

lib/AtteanX/Store/LMDB.pm  view on Meta::CPAN

	
	my $txn		= $self->env->BeginTxn(MDB_RDONLY);
	my $quads	= $txn->OpenDB({ dbname => 'quads', });
	if ($bound == 0) {
		my $quadids	= $self->_get_all_unordered_quads($quads);
		return $self->_materialize_quads($quadids);
	} else {
		if (my $best = $self->_best_index(\%bound, $txn)) {
			my ($index, $score)	= @$best;
			my $order		= $self->indexes->{$index};
			my @positions	= @$order[0..$score-1];
			my @prefix		= map { $bound{$_} } @positions;
			my @lower		= @prefix;
			my @upper		= @prefix;
			$upper[-1]++;
			my $quadids	= $self->_get_ordered_matching_quads($txn, \%bound, $index, \@lower, \@upper);
			return $self->_materialize_quads($quadids);
		}
		my $quadids	= $self->_get_unordered_matching_quads($quads, \%bound);
		return $self->_materialize_quads($quadids);
	}
}

sub _best_index {
	my $self	= shift;
	my $bound	= shift;
	my $txn		= shift;
	my @scores;
	while (my ($name, $order) = each %{ $self->indexes }) {
		my $score	= 0;
		foreach my $pos (@$order) {
			if (exists $bound->{$pos}) {
				$score++;
			} else {
				last
			}
		}
		push(@scores, [$name, $score]);
	}
     
	@scores	= sort { $b->[1] <=> $a->[1] } @scores;
	return shift(@scores);
}

=item C<< get_graphs >>

Returns an iterator over the Attean::API::Term objects comprising
the set of graphs of the stored quads.

=cut

sub get_graphs {
	my $self	= shift;
	my $txn		= $self->env->BeginTxn(MDB_RDONLY);
	my $graphs	= $txn->OpenDB({ dbname => 'graphs', });
	my $i2t		= $txn->OpenDB({ dbname => 'id_to_term', });
	my ($key, $value);
	my @graph_ids;
	$self->iterate_database($graphs, sub {
		my ($key, $value)	= @_;
		my ($gid)	= unpack('Q>', $key);
		push(@graph_ids, $gid);
	});
	
	my $sub	= sub {
		my $txn		= $self->env->BeginTxn(MDB_RDONLY);
		my $i2t		= $txn->OpenDB({ dbname => 'id_to_term', });
		GRAPH: while (my $gid = shift(@graph_ids)) {
			my $key	= pack('Q>', $gid);
			my $termdata = $i2t->get($key);
			next QUAD unless ($termdata);
			my $term	= $self->_parse_term($termdata);
			next GRAPH unless ($term);
			return $term;
		}
		return;
	};
	return Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Term' );
}

sub _exists_with_txn {
	my $self	= shift;
	my $txn		= shift;
	my $qp		= shift;
	
	my $exists	= $self->_get_quads($qp->values);
	if (my $q = $exists->next) {
		return 1;
	}
	return 0;
}

sub _exists {
	my $self	= shift;
	my $qp		= shift;
	
	my $exists	= $self->get_quads($qp->values);
	if (my $q = $exists->next) {
		return 1;
	}
	return 0;
}

=item C<< add_quad ( $quad ) >>

Adds the specified C<$quad> to the underlying model.

=cut
	
	sub add_quad {
		my $self	= shift;
		my $st		= shift;
		
		if ($self->_exists($st)) {
			return;
		}
		
		my $txn		= $self->env->BeginTxn();
		my $stats	= $txn->OpenDB({ dbname => 'stats', });
		my $t2i		= $txn->OpenDB({ dbname => 'term_to_id', });
		my $i2t		= $txn->OpenDB({ dbname => 'id_to_term', });
		my $graphs	= $txn->OpenDB({ dbname => 'graphs', });

		my @ids		= map { $self->_get_or_create_term_id($_, $txn, $stats, $t2i, $i2t) } $st->values;
		if (any { not defined($_) } @ids) {
			return;
		}

		my $next_quad	= 'next_unassigned_quad_id';
		my ($next)		= unpack('Q>', $stats->get($next_quad));
# 		warn "next quad id: $next\n";
		my $qid_value	= $next++;

		my $qid			= pack('Q>', $qid_value);
		my $qids		= pack('Q>4', @ids);
		my $gid			= pack('Q>', $ids[3]);

		my $graphs_dbi	= $txn->open('graphs');
		my $quads_dbi	= $txn->open('quads');
		my $stats_dbi	= $txn->open('stats');
		$txn->put($quads_dbi, $qid, $qids);

		my $graphs_cursor = $graphs->Cursor;
		my $key	= $gid;
		my $empty	= '';
		eval {
			local($LMDB_File::die_on_err)	= 0;
			if (my $err = $graphs_cursor->get($key, $empty, MDB_SET_RANGE)) {
				$graphs_cursor->put($gid, $empty);
			} else {
				if ($key ne $gid) {
					$graphs_cursor->put($gid, $empty);
				}
			}
		};

		$self->_add_quad_to_indexes($qid, \@ids, $txn);
		$txn->put($stats_dbi, $next_quad, pack('Q>', $next));
		$txn->commit();
	}
	
	sub add_quad_with_txn {
		my $self	= shift;
		my $txn		= shift;
		my $st		= shift;
		
		if ($self->_exists_with_txn($txn, $st)) {
			return;
		}
		
		my $stats	= $txn->OpenDB({ dbname => 'stats', });
		my $t2i		= $txn->OpenDB({ dbname => 'term_to_id', });
		my $i2t		= $txn->OpenDB({ dbname => 'id_to_term', });

		my @ids		= map { $self->_get_or_create_term_id($_, $txn, $stats, $t2i, $i2t) } $st->values;
		if (any { not defined($_) } @ids) {
			return;
		}

		my $next_quad	= 'next_unassigned_quad_id';
		my ($next)		= unpack('Q>', $stats->get($next_quad));
# 		warn "next quad id: $next\n";
		my $qid_value	= $next++;

		my $qid			= pack('Q>', $qid_value);
		my $qids		= pack('Q>4', @ids);

		my $quads_dbi	= $txn->open('quads');
		my $stats_dbi	= $txn->open('stats');
		$txn->put($quads_dbi, $qid, $qids);
		$self->_add_quad_to_indexes($qid, \@ids, $txn);
		$txn->put($stats_dbi, $next_quad, pack('Q>', $next));
	}
	
	sub _add_quad_to_indexes {
		my $self	= shift;
		my $qid		= shift;
		my $ids		= shift;
		my @ids		= @$ids;
		my $txn		= shift;
		while (my ($name, $order) = each %{ $self->indexes }) {
			my $index	= $txn->open($name);
			my @index_ordered_ids	= @ids[@$order];
			my $qids	= pack('Q>4', @index_ordered_ids);
			$txn->put($index, $qids, $qid);
		}
	}
	
	sub _remove_quad_to_indexes {
		my $self	= shift;
		my $qid		= shift;
		my $ids		= shift;
		my @ids		= @$ids;
		my $txn		= shift;
		while (my ($name, $order) = each %{ $self->indexes }) {
			my $index	= $txn->OpenDB({ dbname => $name });
			my @index_ordered_ids	= @ids[@$order];
			my $qids		= pack('Q>4', @index_ordered_ids);
			$index->del($qids);
		}
	}

=item C<< remove_quad ( $statement ) >>

Removes the specified C<$statement> from the underlying model.

=cut

	sub remove_quad {
		my $self	= shift;
		my $st		= shift;
		my $txn		= $self->env->BeginTxn();
		my $t2i		= $txn->OpenDB({ dbname => 'term_to_id', });
		my $quads	= $txn->OpenDB({ dbname => 'quads', });
		my $graphs	= $txn->OpenDB({ dbname => 'graphs', });

		my @remove_ids		= map { $self->_get_term_id($_, $t2i) } $st->values;
		unless (scalar(@remove_ids) == 4) {
			return;
		}
		unless (all { defined($_) } @remove_ids) {
			return;
		}

		my $cursor	= $quads->Cursor;
		my ($key, $value);
		eval {
			local($LMDB_File::die_on_err)	= 0;
			unless ($cursor->get($key, $value, MDB_FIRST)) {
				QUAD: while (1) {
					my $qid		= unpack('Q>', $key);
					my (@ids)	= unpack('Q>4', $value);
					if ($ids[0] == $remove_ids[0] and $ids[1] == $remove_ids[1] and $ids[2] == $remove_ids[2] and $ids[3] == $remove_ids[3]) {
						my $g		= $ids[3];
						$self->_remove_quad_to_indexes($qid, \@ids, $txn);
						$cursor->del();
						
						unless ($self->_graph_id_exists_with_txn($txn, $quads, $t2i, $g)) {
							# no more quads with this graph, so delete it from the graphs table
							my $graphs_cursor = $graphs->Cursor;
							my $gid = pack('Q>', $g);
							my $key	= $gid;
							my $empty	= '';
							unless ($graphs_cursor->get($key, $empty, MDB_SET_RANGE)) {
								if ($gid eq $key) {
									$graphs_cursor->del();
								}
							}
						}

						$txn->commit();
						return;
					}
				} continue {
					last if $cursor->get($key, $value, MDB_NEXT);
				}
			}
		};
	}

=item C<< create_graph( $graph ) >>

This is a no-op function for the memory quad-store.

=cut

	sub create_graph {
		# no-op on a quad-store
	}

=item C<< drop_graph( $graph ) >>

Removes all quads with the given C<< $graph >>.

=cut

	sub drop_graph {
		my $self	= shift;
		return $self->clear_graph(@_);
	}

=item C<< clear_graph( $graph ) >>

Removes all quads with the given C<< $graph >>.

=cut

	sub clear_graph {
		my $self	= shift;
		my $graph	= shift;
		my $quads	= $self->get_quads(undef, undef, undef, $graph);
		while (my $q = $quads->next) {
			$self->remove_quad($q);
		}
	}

	sub add_iter {
		my $BULK_LOAD	= 1;

		my $self	= shift;
		my $iter	= shift;
		my $type	= $iter->item_type;
		die "Iterator type $type isn't quads" unless (Role::Tiny::does_role($type, 'Attean::API::Quad'));

		my ($txn, $stats, $t2i, $i2t, $quads_dbi, $stats_dbi, $graphs_dbi);
		if ($BULK_LOAD) {
			$txn		= $self->env->BeginTxn();
			$stats	= $txn->OpenDB({ dbname => 'stats', });
			$t2i		= $txn->OpenDB({ dbname => 'term_to_id', });
			$i2t		= $txn->OpenDB({ dbname => 'id_to_term', });
			$graphs_dbi	= $txn->OpenDB({ dbname => 'graphs', });
			$quads_dbi	= $txn->open('quads');
			$stats_dbi	= $txn->open('stats');
		}

		my $next_quad	= 'next_unassigned_quad_id';
		my ($next)		= unpack('Q>', $stats->get($next_quad));
		my %graphs;
		while (my $q = $iter->next) {
			if ($BULK_LOAD) {
				if ($self->_quad_exists_with_txn($txn, $quads_dbi, $t2i, $q->values)) {
					next;
				}
		
				my @ids		= map { $self->_get_or_create_term_id($_, $txn, $stats, $t2i, $i2t) } $q->values;
				if (any { not defined($_) } @ids) {
					return;
				}

		# 		warn "next quad id: $next\n";
				my $qid_value	= $next++;

				my $qid			= pack('Q>', $qid_value);
				my $qids		= pack('Q>4', @ids);
				my $g			= $ids[3];
				$graphs{$g}		= pack('Q>', $g);

				$txn->put($quads_dbi, $qid, $qids);
				$self->_add_quad_to_indexes($qid, \@ids, $txn);
			} else {
				$self->add_quad($q);
			}
		}
		if ($BULK_LOAD) {
			my $empty	= '';
			my $graphs_cursor = $graphs_dbi->Cursor;
			foreach my $gid (values %graphs) {
				my $key	= $gid;
				eval {
					local($LMDB_File::die_on_err)	= 0;
					if (my $err = $graphs_cursor->get($key, $empty, MDB_SET_RANGE)) {
						$graphs_cursor->put($gid, $empty);
					} else {
						if ($key ne $gid) {
							$graphs_cursor->put($gid, $empty);
						}
					}
				};
			}
			$txn->put($stats_dbi, $next_quad, pack('Q>', $next));
			$txn->commit();
		}
	}

	sub _quad_exists_with_txn {
		my $self	= shift;
		my $txn		= shift;
		my $quads	= shift;
		my $t2i		= shift;
		my @nodes	= @_;
# 		my $bound_data	= $self->_compute_bound(@nodes, $txn);
		my %bound;
		foreach my $pos (0 .. 3) {
			my $n	= $nodes[ $pos ];
			my $id	= $self->_get_term_id($n, $t2i);
			unless ($id) {
				# one of the bound terms in the pattern doesn't exist in the database,
				# so no quads will match.
				return 0;
			}
			$bound{ $pos }	= $id;
		}
		if (my $best = $self->_best_index(\%bound, $txn)) {
			my ($index, $score)	= @$best;
			my $order		= $self->indexes->{$index};
			my @positions	= @$order[0..$score-1];
			my @prefix		= map { $bound{$_} } @positions;
			my @lower		= @prefix;
			my @upper		= @prefix;
			$upper[-1]++;
			my $quadids	= $self->_get_ordered_matching_quads($txn, \%bound, $index, \@lower, \@upper);
			return scalar(@$quadids);
		}
		my $quadids	= $self->_get_unordered_matching_quads($quads, \%bound);
		return scalar(@$quadids);
	}


	sub _graph_id_exists_with_txn {
		my $self	= shift;
		my $txn		= shift;
		my $quads	= shift;
		my $t2i		= shift;
		my $gid		= shift;
		my %bound	= (3 => $gid);
		if (my $best = $self->_best_index(\%bound, $txn)) {
			my ($index, $score)	= @$best;
			my $order		= $self->indexes->{$index};
			my @positions	= @$order[0..$score-1];
			my @prefix		= map { $bound{$_} } @positions;
			my @lower		= @prefix;
			my @upper		= @prefix;
			$upper[-1]++;
			my $quadids	= $self->_get_ordered_matching_quads($txn, \%bound, $index, \@lower, \@upper);
			return scalar(@$quadids);
		}
		my $quadids	= $self->_get_unordered_matching_quads($quads, \%bound);
		return scalar(@$quadids);
	}
}

1;

__END__

=back

=head1 BUGS

Please report any bugs or feature requests to through the GitHub web interface
at L<https://github.com/kasei/perlrdf2/issues>.

=head1 AUTHOR

Gregory Todd Williams  C<< <gwilliams@cpan.org> >>

=head1 SEE ALSO

=over 4

=item * L<Diomede|https://github.com/kasei/diomede/> is a Swift LMDB-based quadstore that shares the same file format as this module.

=back

=head1 COPYRIGHT

Copyright (c) 2020 Gregory Todd Williams. This
program is free software; you can redistribute it and/or modify it under
the same terms as Perl itself.

=cut



( run in 2.118 seconds using v1.01-cache-2.11-cpan-ceb78f64989 )