AnyMongo

 view release on metacpan or  search on metacpan

lib/AnyMongo/Cursor.pm  view on Meta::CPAN

package AnyMongo::Cursor;
BEGIN {
  $AnyMongo::Cursor::VERSION = '0.03';
}
#ABSTRACT: A asynchronous cursor/iterator for Mongo query results
use strict;
use warnings;
use namespace::autoclean;
use boolean;
use Tie::IxHash;
use AnyMongo::MongoSupport;
use Any::Moose;
use Carp qw(croak confess);

$AnyMongo::Cursor::slave_okay = 0;
$AnyMongo::Cursor::timeout = 30000;

has _connection => (
    is => 'ro',
    isa => 'AnyMongo::Connection',
    required => 1,
);

has _socket_handle => (
    isa => 'Maybe[AnyEvent::Handle]',
    is  => 'rw',
    lazy_build => 1,
);

sub _build__socket_handle {
    my ($self) = @_;
    $self->_connection->master_handle;
}

has tailable => (
    is => 'rw',
    isa => 'Bool',
    required => 0,
    default => 0,
);

has batch_size => (
    is => 'rw',
    isa => 'Int',
    required => 1,
    default => 0,
);

has _ns => (
    is => 'ro',
    isa => 'Str',
    required => 1,
);

has _query => (
    is => 'rw',
    required => 1,
);

has _fields => (
    is => 'rw',
    isa => 'HashRef',
    required => 0,
);

has _limit => (
    is => 'rw',
    isa => 'Int',
    required => 0,
    default => 0,
);

has _skip => (
    is => 'rw',
    isa => 'Int',
    required => 0,
    default => 0,
);

has _cursor_id => (
    is => 'rw',
    isa => 'Int',
    required => 0,
    default => 0,
);


has _num_remain => (
    is => 'rw',
    isa => 'Int',
    required => 0,
    default => 0,
);

has _result_cache => (
    is => 'rw',
    isa => 'ArrayRef',
    required => 0,
    default => sub {[]}
);

has immortal => (
    is => 'rw',
    isa => 'Bool',
    required => 0,
    default => 0,
);


has slave_okay => (
    is => 'rw',
    isa => 'Bool',
    required => 0,
    default => 0,
);
# stupid hack for inconsistent database handling of queries
has _grrrr => (
    is      => 'rw',
    isa     => 'Bool',
    default => 0,
);

has _request_id => (
    is      => 'rw',
    isa     => 'Int',
    default => 0,
);

has _print_debug => (
    is      => 'rw',
    isa     => 'Bool',
    default => 0,
);

sub CLONE_SKIP { 1 }

sub BUILD { shift->_init_cursor }

sub _init_cursor {
    my ($self) = @_;
    $self->{query_run} = 0;
    $self->{closed} = 0;
    $self->{cursor_id} = 0;
    $self->{at} = 0;
}

sub _ensure_special {
    my ($self) = @_;

    if ($self->_grrrr) {
        return;
    }

    $self->_grrrr(1);
    $self->_query({'query' => $self->_query});
}

sub fields {
    my ($self, $f) = @_;
    $self->_check_modifiable;
    confess 'not a hash reference' unless ref $f eq 'HASH';

    $self->_fields($f);
    return $self;
}

sub sort {
    my ($self, $order) = @_;
    $self->_check_modifiable;
    confess 'not a hash reference' unless ref $order eq 'HASH' || ref $order eq 'Tie::IxHash';

    $self->_ensure_special;
    $self->_query->{'orderby'} = $order;
    return $self;
}

sub limit {
    my ($self, $num) = @_;
    $self->_check_modifiable;
    $self->_limit($num);
    return $self;
}

sub skip {
    my ($self, $num) = @_;
    $self->_check_modifiable;
    $self->_skip($num);
    return $self;
}

sub snapshot {
    my ($self) = @_;
    $self->_check_modifiable;
    $self->_ensure_special;
    $self->_query->{'$snapshot'} = 1;
    return $self;
}

sub hint {
    my ($self, $index) = @_;
    $self->_check_modifiable;
    confess 'not a hash reference' unless ref $index eq 'HASH';

lib/AnyMongo/Cursor.pm  view on Meta::CPAN

    $self->_ensure_special;
    $self->_query->{'$explain'} = boolean::true;

    my $retval = $self->reset->next;
    $self->reset->limit($temp);

    return $retval;
}

sub count {
    my ($self, $all) = @_;

    my ($db, $coll) = $self->_ns =~ m/^([^\.]+)\.(.*)/;
    my $cmd = Tie::IxHash->new(count => $coll);

    if ($self->_grrrr) {
        $cmd->Push(query => $self->_query->{'query'});
    }
    else {
        $cmd->Push(query => $self->_query);
    }

    if ($all) {
        $cmd->Push(limit => $self->_limit) if $self->_limit;
        $cmd->Push(skip => $self->_skip) if $self->_skip;
    }

    my $result = $self->_connection->get_database($db)->run_command($cmd);

    # returns "ns missing" if collection doesn't exist
    return 0 unless ref $result eq 'HASH';
    return $result->{'n'};
}

sub all {
    my ($self) = @_;
    my @ret;

    while (my $entry = $self->next) {
        push @ret, $entry;
    }

    return @ret;
}
# Run query the first time we request an object from the wire
sub send_initial_query {
    my ($self) = @_;

    if ($self->{query_run}) {
        return 0;
    }

    # warn "#send_initial_query ...\n" if $self->_print_debug;

    my $opts = $AnyMongo::Cursor::slave_okay | ($self->tailable << 1) |
        ($self->slave_okay << 2) | ($self->immortal << 4);

    my $query = AnyMongo::MongoSupport::build_query_message($self->_next_request_id,$self->_ns,
        $opts, $self->_skip, $self->_limit, $self->_query, $self->_fields);
    $self->_connection->send_message($query,$self->_socket_handle);
    my ($number_received,$cursor_id,$result) = $self->_connection->recv_message($self->_socket_handle);
    # warn "#send_initial_query number_received:$number_received cursor_id:".sprintf('%x',$cursor_id)." result#:".@{$result} if $self->_print_debug;
    # warn "#send_initial_query number_received:$number_received cursor_id:".sprintf('%x',$cursor_id);
    push @{$self->{_result_cache}},@{$result} if $result;
    $self->{number_received} = $number_received;
    $self->{cursor_id} = $cursor_id;
    $self->{query_run} = 1;
    $self->close_cursor_if_query_complete;

    return 1;
}


sub next {
    my ($self) = @_;
     # warn "#next ...\n" if $self->_print_debug;
    return $self->next_document if $self->has_next && ( $self->{_limit} <= 0
         || $self->{at} < $self->{_limit} );

    return;
}

sub next_document {
    my ($self) = @_;

    # warn "refill_via_get_more ...\n" if $self->_print_debug;

    $self->refill_via_get_more if $self->num_remaining == 0;

    # warn "refill_via_get_more done.\n" if $self->_print_debug;

    my $doc = shift @{ $self->{_result_cache} };

    if ($doc and $doc->{'$err'}) {
        my $err = $doc->{'$err'};
        # todo:"not master"
        Carp::croak "query error: $err";
    }
    # warn 'leave next_document' if $self->_print_debug;
    $self->{at}++;
    $doc;
}

sub has_next { shift->num_remaining > 0 }

sub reset {
    my ($self) = @_;
    # warn "#reset ...\n" if $self->_print_debug;
    $self->{query_run} = 0;
    $self->{closed} = 0;
    $self->kill_cursor;
    $self->{at} = 0;
    $self->{_result_cache} = [];
    $self;
}

sub refill_via_get_more {
    my ($self) = @_;

    # warn "#refill_via_get_more...\n" if $self->_print_debug;

    return if $self->send_initial_query || $self->{cursor_id} == 0;

    my $request_id = $self->_next_request_id;
    # warn "#refill_via_get_more > build_get_more_message<
    #     request_id:$request_id
    #     _ns: ".$self->_ns."
    #     cursor_id:".$self->{cursor_id}."
    #     batch_size:".$self->batch_size."
    # >...\n" if $self->_print_debug;
    # get_more
    my $get_more_message = AnyMongo::MongoSupport::build_get_more_message(
        $request_id,
        $self->_ns,
        $self->{cursor_id},
        $self->batch_size);
    # warn "#refill_via_get_more > send_message...\n" if $self->_print_debug;
    $self->_connection->send_message($get_more_message,$self->_socket_handle);

    # warn "#refill_via_get_more > recv_message...\n" if $self->_print_debug;
    my ($number_received,$cursor_id,$result) = $self->_connection->recv_message($self->_socket_handle);

    # warn "#refill_via_get_more > got number_received:$number_received cursor_id:$cursor_id...\n" if $self->_print_debug;

    $self->{cursor_id} = $cursor_id;

    push @{$self->{_result_cache}},@{$result} if $result;
    $self->{number_received} = $number_received;
    $self->{cursor_id} = $cursor_id;
    $self->{query_run} = 1;
    $self->close_cursor_if_query_complete;
}

sub close_cursor_if_query_complete {
    my ($self) = @_;
    # warn "#close_cursor_if_query_complete ...\n" if $self->_print_debug;
    $self->close if $self->_limit >0 && $self->{number_received} >= $self->_limit;
}

sub num_remaining {
    my ($self) = @_;
    # warn "#num_remaining ...\n" if $self->_print_debug;
    $self->refill_via_get_more if @{$self->{_result_cache}} == 0;
    return scalar @{$self->{_result_cache}};
}

sub close {
    my ($self) = @_;
    # warn "#close ...\n" if $self->_print_debug;
    $self->kill_cursor if $self->{cursor_id};
    $self->{cursor_id} = 0;
    $self->{closed} = 1;
    $self->{at} = 0;
}

sub kill_cursor {
    my ($self) = @_;
    # warn "#kill_cursor ...\n" if $self->_print_debug;
    return unless $self->{cursor_id};
    my $message = AnyMongo::MongoSupport::build_kill_cursor_message($self->_next_request_id,$self->{cursor_id});
    $self->_connection->send_message($message,$self->_socket_handle);
    $self->{cursor_id} = 0;
}

sub _check_modifiable {
    my ($self) = @_;
    confess 'Cannot modify the query once it has been run or closed.'
        if $self->{query_run} || $self->{closed};
}

sub _next_request_id {
    my ($self) = @_;
    $self->_request_id(AnyMongo::MongoSupport::make_request_id());
    $self->_request_id;
}

__PACKAGE__->meta->make_immutable;
1;


=pod

=head1 NAME

AnyMongo::Cursor - A asynchronous cursor/iterator for Mongo query results

=head1 VERSION

version 0.03

=head1 SYNOPSIS

=head1 DESCRIPTION

=head1 AUTHORS

=over 4

=item *

Pan Fan(nightsailer) <nightsailer at gmail.com>

=item *

Kristina Chodorow <kristina at 10gen.com>

=back

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2010 by Pan Fan(nightsailer).

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut


__END__



( run in 0.450 second using v1.01-cache-2.11-cpan-39bf76dae61 )