BerkeleyDB-Manager

 view release on metacpan or  search on metacpan

lib/BerkeleyDB/Manager.pm  view on Meta::CPAN

}

sub txn_checkpoint {
	my $self = shift;

	if ( my $ret = $self->env->txn_checkpoint( $self->checkpoint_kbyte, $self->checkpoint_min, 0 ) ) {
		die $ret;
	}
}

sub dup_cursor_stream {
	my ( $self, @args ) = @_;

	my %args = @args;

	my ( $init, $key, $first, $cb, $cursor, $db, $n ) = delete @args{qw(init key callback_first callback cursor db chunk_size)};

	my ( $values, $keys ) = @args{qw(values keys)};
	my $pairs = !$values && !$keys;
	croak "'values' and 'keys' are mutually exclusive" if $values && $keys;

	$key ||= '';

	$cursor ||= ( $db || croak "either 'cursor' or 'db' is a required argument" )->db_cursor;

	$first ||= sub {
		my ( $c, $r ) = @_;
		my $v;

		my $ret;

		if ( ( $ret = $c->c_get($key, $v, DB_SET) ) == 0 ) {
			push(@$r, $pairs ? [ $key, $v ] : ( $values ? $v : $key ));
		} elsif ( $ret == DB_NOTFOUND ) {

lib/BerkeleyDB/Manager.pm  view on Meta::CPAN

		}
	};

	$n ||= $self->chunk_size;

	my $g = $init && $self->$init(%args);

	my $ret = [];
	my $bulk = Data::Stream::Bulk::Array->new( array => $ret );

	if ( $cursor->$first($ret) ) {
		$cursor->c_count(my $count);

		if ( $count > 1 ) { # more entries for the same value

			# fetch up to $n times
			for ( 1 .. $n-1 ) {
				unless ( $cursor->$cb($ret) ) {
					return $bulk;
				}
			}

			# and defer the rest
			my $rest = $self->cursor_stream(@args, callback => $cb, cursor => $cursor);
			return $bulk->cat($rest);
		}

		return $bulk;
	} else {
		return nil();
	}
}

sub cursor_stream {
	my ( $self, %args ) = @_;

	my ( $init, $cb, $cursor, $db, $f, $n ) = delete @args{qw(init callback cursor db flag chunk_size)};

	my ( $values, $keys ) = @args{qw(values keys)};
	my $pairs = !$values && !$keys;
	croak "'values' and 'keys' are mutually exclusive" if $values && $keys;

	$cursor ||= ( $db || croak "either 'cursor' or 'db' is a required argument" )->db_cursor;

	$f ||= DB_NEXT;

	$cb ||= do {
		my ( $k, $v ) = ( '', '' );

		sub {
			my ( $c, $r ) = @_;

			if ( $c->c_get($k, $v, $f) == 0 ) {

lib/BerkeleyDB/Manager.pm  view on Meta::CPAN

			} else {
				die $BerkeleyDB::Error;
			}
		}
	};

	$n ||= $self->chunk_size;

	Data::Stream::Bulk::Callback->new(
		callback => sub {
			return unless $cursor;

			my $g = $init && $self->$init(%args);

			my $ret = [];

			for ( 1 .. $n ) {
				unless ( $cursor->$cb($ret) ) {
					# we're done, this is the last block
					undef $cursor;
					return ( scalar(@$ret) && $ret );
				}
			}

			return $ret;
		},
	);
}

sub assert_version {

lib/BerkeleyDB/Manager.pm  view on Meta::CPAN

	);

	my $db = $m->open_db( file => "foo" ); # defaults

	$m->txn_do(sub {
		$db->db_put("foo", "bar");
		die "error!"; # rolls back
	});

	# fetch all key/value pairs as a Data::Stream::Bulk
	my $pairs = $m->cursor_stream( db => $db );

=head1 DESCRIPTION

This object provides a convenience wrapper for L<BerkeleyDB>

=head1 ATTRIBUTES

=over 4

=item home

lib/BerkeleyDB/Manager.pm  view on Meta::CPAN

=item db_properties

Properties to pass to C<instantiate_db>. Overrides C<dup> and C<dupsort>.

=item open_dbs

The hash of currently open dbs.

=item chunk_size

See C<cursor_stream>.

Defaults to 500.

=back

=head1 METHODS

=over 4

=item open_db %args

lib/BerkeleyDB/Manager.pm  view on Meta::CPAN

Fetching on C<secondary> with a secondary key returns the value from C<primary>.

Fetching with C<pb_get> will also return the primary key.

See the BDB documentation for more details.

=item all_open_dbs

Returns a list of all the registered databases.

=item cursor_stream %args

Fetches data from a cursor, returning a L<Data::Stream::Bulk>.

If C<cursor> is not provided but C<db> is, a new cursor will be created.

If C<callback> is provided it will be invoked on the cursor with an accumilator
array repeatedly until it returns a false value. For example, to extract
triplets from a secondary index, you can use this callback:

	my ( $sk, $pk, $v ) = ( '', '', '' ); # to avoid uninitialized warnings from BDB

	$m->cursor_stream(
		db => $db,
		callback => {
			my ( $cursor, $accumilator ) = @_;

			if ( $cursor->c_pget( $sk, $pk, $v ) == 0 ) {
				push @$accumilator, [ $sk, $pk, $v ];
				return 1;
			}

			return; # nothing left
		}
	);

If it is not provided, C<c_get> will be used, returning C<[ $key, $value ]> for
each cursor position. C<flag> can be passed, and defaults to C<DB_NEXT>.

C<chunk_size> controls the number of pairs returned in each chunk. If it isn't
provided the attribute C<chunk_size> is used instead.

If C<values> or C<keys> is set to a true value then only values or keys will be
returned. These two arguments are mutually exclusive.

Lastly, C<init> is an optional callback that is invoked once before each chunk,
that can be used to set up the database. The return value is retained until the
chunk is finished, so this callback can return a L<Scope::Guard> to perform
cleanup.

=item dup_cursor_stream %args

A specialization of C<cursor_stream> for fetching duplicate key entries.

Takes the same arguments as C<cursor_stream>, but adds a few more.

C<key> can be passed in to initialize the cursor with C<DB_SET>.

To do manual initialization C<callback_first> can be provided instead.

C<callback> is generated to use C<DB_NEXT_DUP> instead of C<DB_NEXT>, and
C<flag> is ignored.

=back

=head1 VERSION CONTROL

t/04_streams.t  view on Meta::CPAN


	$m->txn_do(sub {
		my $db = $m->open_db("streams.db");

		my @entries = qw(foo bar gorch zot oink tra la di quxx baz moose elk bunny);

		$db->db_put($_ => $_) for @entries;

		foreach my $chunk_size ( undef, 100, 1, 2, 3 ) {
			{
				my $s = $m->cursor_stream( db => $db, chunk_size => $chunk_size, keys => 1 );

				does_ok( $s, "Data::Stream::Bulk" );

				ok( !$s->is_done, "not done" );

				my @all = $s->all;

				ok( $s->is_done, "now done" );

				is( scalar(@all), scalar(@entries), "stream size is like entries size" );

t/04_streams.t  view on Meta::CPAN

				is_deeply(
					[ sort @all ],
					[ sort @entries ],
					"got all keys",
				);
			}

			{
				my ( $key, $value ) = ( '', '' );

				my $s = $m->cursor_stream(
					chunk_size => $chunk_size,
					db       => $db,
					callback => sub {
						my ( $cursor, $ret ) = @_;

						if ( $cursor->c_get( $key, $value, DB_NEXT ) == 0 ) {
							push @$ret, $key;
							return 1;
						} else {
							return;
						}
					},
				);

				does_ok( $s, "Data::Stream::Bulk" );

t/04_streams.t  view on Meta::CPAN

	$m->txn_do(sub {
		my $db = $m->open_db("streams_dup.db");

		my $i;
		my @entries = map { [ $_ => $i++ ] } qw(foo bar bar foo baz zot foo bar gorch foo foo moose);

		$db->db_put(@$_) for @entries;

		foreach my $chunk_size ( undef, 100, 1, 2, 3 ) {
			{
				my $s = $m->cursor_stream( db => $db, chunk_size => $chunk_size );

				does_ok( $s, "Data::Stream::Bulk" );

				ok( !$s->is_done, "not done" );

				my @all = $s->all;

				ok( $s->is_done, "now done" );

				is( scalar(@all), scalar(@entries), "stream size is like entries size" );

				is_deeply(
					[ sort { $a->[1] <=> $b->[1] } @all ],
					[ sort { $a->[1] <=> $b->[1] } @entries ],
					"got all pairs",
				);
			}

			{
				my $s = $m->cursor_stream( db => $db, chunk_size => $chunk_size, values => 1 );

				does_ok( $s, "Data::Stream::Bulk" );

				ok( !$s->is_done, "not done" );

				my @all = $s->all;

				ok( $s->is_done, "now done" );

				is( scalar(@all), scalar(@entries), "stream size is like entries size" );

t/04_streams.t  view on Meta::CPAN

				is_deeply(
					[ sort @all ],
					[ sort map { $_->[1] } @entries ],
					"got all pairs",
				);
			}

			{
				my ( $key, $value ) = ( '', '' );

				my $s = $m->cursor_stream(
					chunk_size => $chunk_size,
					db       => $db,
					callback => sub {
						my ( $cursor, $ret ) = @_;

						if ( $cursor->c_get( $key, $value, DB_NEXT ) == 0 ) {
							push @$ret, $key;
							return 1;
						} else {
							return;
						}
					},
				);

				does_ok( $s, "Data::Stream::Bulk" );

t/04_streams.t  view on Meta::CPAN


				is_deeply(
					[ sort @all ],
					[ sort map { $_->[0] } @entries ],
					"got all keys",
				);
			}

			{
				my @foos = grep { $_->[0] eq 'foo' } @entries;
				my $s = $m->dup_cursor_stream( db => $db, chunk_size => $chunk_size, key => "foo" );

				does_ok( $s, "Data::Stream::Bulk" );

				ok( !$s->is_done, "not done" );

				my @all = $s->all;

				ok( $s->is_done, "now done" );

				is( scalar(@all), scalar(@foos), "stream size is like foos size" );

				is_deeply(
					[ sort { $a->[1] <=> $b->[1] } @all ],
					[ sort { $a->[1] <=> $b->[1] } @foos ],
					"got all pairs of foo",
				);
			}

			{
				my $s = $m->dup_cursor_stream( db => $db, chunk_size => $chunk_size, key => "moose" );

				does_ok( $s, "Data::Stream::Bulk" );

				ok( !$s->is_done, "not done" );

				my @all = $s->all;

				ok( $s->is_done, "now done" );

				is( scalar(@all), 1, "stream size is one" );

				is_deeply(
					[ @all ],
					[ $entries[-1] ],
					"got pair",
				);
			}

			{
				my $s = $m->dup_cursor_stream( db => $db, chunk_size => $chunk_size, key => "not present" );

				does_ok( $s, "Data::Stream::Bulk" );

				my @all = $s->all;

				ok( $s->is_done, "now done" );

				is( scalar(@all), 0, "stream is empty" );
			}
		}



( run in 0.256 second using v1.01-cache-2.11-cpan-4d50c553e7e )