AnyMongo

 view release on metacpan or  search on metacpan

lib/AnyMongo/Connection.pm  view on Meta::CPAN

                    delete $guards->{$server_id};
                    if ($self->{master_id} eq $server_id) {
                        delete $self->{master_id};
                        $self->clear_master_handle;
                        $self->connected(0);
                    }
                    $h->{cv}->croak($msg) if $h->{cv};
                    $h->destroy();
                },
            );
            $self->{_handles}->{$server_id} = $h;
        }
        # return.
        if (ref $cb eq 'CODE') {
            $cb->($h);
        }
        else {
            $self->cv->end;
        }
    };
}

sub _parse_servers {
    my ($self) = @_;
    my $str = $self->host;
    $str = substr $self->host, 10 if $str =~ /^mongodb:\/\//;
    my @pairs = split ",", $str;
    my $servers = {};
    my $server_seeds_cnt = 0;
    for my $h (@pairs) {
        my ($host,$port) = split ':',$h;
        $port ||= 27017;
        $servers->{$host.':'.$port} = {
            connected => 0,
            handle => undef,
            host => $host,
            port => $port,
            is_master => 0,
        };
    }
    # $self->_servers($servers);
    $self->{mongo_servers} = $servers;
}

sub send_message {
    my ($self,$data,$hd) = @_;
    croak 'connection lost' unless $hd or $self->_check_connection;
    $hd ||= $self->master_handle;
    $hd->push_write($data);
}

sub _check_connection {
    my ($self) = @_;
    $self->connected or ($self->auto_reconnect and $self->connect);
    $self->connected;
}

sub recv_message {
    my ($self,$hd) = @_;
    my ($message_length,$request_id,$response_to,$op) = $self->_receive_header($hd);
    my ($response_flags,$cursor_id,$starting_from,$number_returned) = $self->_receive_response_header($hd);
    $self->_check_respone_flags($response_flags);
    my $results =  $self->_read_documents($message_length-36,$cursor_id,$hd);
    return ($number_returned,$cursor_id,$results);
}

sub _check_respone_flags {
    my ($self,$flags) = @_;
    if (($flags & REPLY_CURSOR_NOT_FOUND) != 0) {
        croak("cursor not found");
    }
}

sub receive_data {
    my ($self,$size,$hd) = @_;
    $hd ||= $self->master_handle;
    croak 'connection lost' unless $hd or $self->_check_connection;
    my $cv = AE::cv;
    my $timer; $timer = AnyEvent->timer( after => $self->query_timeout ,cb => sub {
        undef $timer;
        $cv->croak('receive_data timeout');
    });
    $hd->push_read(chunk => $size, sub {
        my ($hdl, $bytes) = @_;
        $cv->send($_[1]);
    });
    $hd->{cv} = $cv;
    my $data = $cv->recv;
    delete $hd->{cv};
    $data;
}


sub _receive_header {
    my ($self,$hd) = @_;
    my $header_buf = $self->receive_data(STANDARD_HEADER_SIZE,$hd);
    croak 'Short read for DB response header; length:'.length($header_buf) unless length $header_buf == STANDARD_HEADER_SIZE;
    return unpack('V4',$header_buf);
}

sub _receive_response_header {
    my ($self,$hd) = @_;
    my $header_buf = $self->receive_data(RESPONSE_HEADER_SIZE,$hd);
    croak 'Short read for DB response header' unless length $header_buf == RESPONSE_HEADER_SIZE;
    my ($response_flags) = unpack 'V',substr($header_buf,0,BSON_INT32);
    my ($cursor_id) = unpack 'j',substr($header_buf,BSON_INT32,BSON_INT64);
    my ($starting_from,$number_returned) = unpack 'V2',substr($header_buf,BSON_INT32+BSON_INT64);
    return ($response_flags,$cursor_id,$starting_from,$number_returned);
}

sub _read_documents {
    my ($self,$doc_message_length,$cursor_id,$hd) = @_;
    my $remaining = $doc_message_length;
    my $bson_buf;
    # do {
    #     my $buf_len = $remaining > 4096? 4096:$remaining;
    #     $bson_buf .= $self->receive_data($buf_len);
    #     $remaining -= $buf_len;
    # } while ($remaining >0 );
    $bson_buf = $self->receive_data($doc_message_length,$hd);
    return unless $bson_buf;
    # warn "#_read_documents:bson_buf size:".length($bson_buf);
    # my $docs = decode_bson_documents($bson_buf,length($bson_buf));
    # warn '#_read_documents decode_bson_documents ...';
    my $docs = decode_bson_documents($bson_buf);
    # warn "docs:$docs";
    # warn "#_read_documents:".Dumper($docs)."\n";
    return $docs;
}

sub database_names {
    my ($self) = @_;
    my $ret = $self->admin->run_command({ listDatabases => 1 });
    return map { $_->{name} } @{ $ret->{databases} };
}


sub get_database {
    my ($self, $database_name) = @_;
    return AnyMongo::Database->new(
        _connection => $self,
        name        => $database_name,
    );
}

sub authenticate {
    my ($self, $dbname, $username, $password, $is_digest) = @_;
    my $hash = $password;

    # create a hash if the password isn't yet encrypted
    if (!$is_digest) {
        $hash = Digest::MD5::md5_hex("${username}:mongo:${password}");
    }

    # get the nonce
    my $db = $self->get_database($dbname);
    my $result = $db->run_command({getnonce => 1});
    if (!$result->{'ok'}) {
        return $result;
    }

    my $nonce = $result->{'nonce'};
    my $digest = Digest::MD5::md5_hex($nonce.$username.$hash);

    # run the login command
    my $login = tie(my %hash, 'Tie::IxHash');
    %hash = (authenticate => 1,
             user => $username,
             nonce => $nonce,
             key => $digest);
    $result = $db->run_command($login);



( run in 0.594 second using v1.01-cache-2.11-cpan-39bf76dae61 )