AnyMongo
view release on metacpan or search on metacpan
lib/AnyMongo/Connection.pm view on Meta::CPAN
delete $guards->{$server_id};
if ($self->{master_id} eq $server_id) {
delete $self->{master_id};
$self->clear_master_handle;
$self->connected(0);
}
$h->{cv}->croak($msg) if $h->{cv};
$h->destroy();
},
);
$self->{_handles}->{$server_id} = $h;
}
# return.
if (ref $cb eq 'CODE') {
$cb->($h);
}
else {
$self->cv->end;
}
};
}
sub _parse_servers {
my ($self) = @_;
my $str = $self->host;
$str = substr $self->host, 10 if $str =~ /^mongodb:\/\//;
my @pairs = split ",", $str;
my $servers = {};
my $server_seeds_cnt = 0;
for my $h (@pairs) {
my ($host,$port) = split ':',$h;
$port ||= 27017;
$servers->{$host.':'.$port} = {
connected => 0,
handle => undef,
host => $host,
port => $port,
is_master => 0,
};
}
# $self->_servers($servers);
$self->{mongo_servers} = $servers;
}
sub send_message {
my ($self,$data,$hd) = @_;
croak 'connection lost' unless $hd or $self->_check_connection;
$hd ||= $self->master_handle;
$hd->push_write($data);
}
sub _check_connection {
my ($self) = @_;
$self->connected or ($self->auto_reconnect and $self->connect);
$self->connected;
}
sub recv_message {
my ($self,$hd) = @_;
my ($message_length,$request_id,$response_to,$op) = $self->_receive_header($hd);
my ($response_flags,$cursor_id,$starting_from,$number_returned) = $self->_receive_response_header($hd);
$self->_check_respone_flags($response_flags);
my $results = $self->_read_documents($message_length-36,$cursor_id,$hd);
return ($number_returned,$cursor_id,$results);
}
sub _check_respone_flags {
my ($self,$flags) = @_;
if (($flags & REPLY_CURSOR_NOT_FOUND) != 0) {
croak("cursor not found");
}
}
sub receive_data {
my ($self,$size,$hd) = @_;
$hd ||= $self->master_handle;
croak 'connection lost' unless $hd or $self->_check_connection;
my $cv = AE::cv;
my $timer; $timer = AnyEvent->timer( after => $self->query_timeout ,cb => sub {
undef $timer;
$cv->croak('receive_data timeout');
});
$hd->push_read(chunk => $size, sub {
my ($hdl, $bytes) = @_;
$cv->send($_[1]);
});
$hd->{cv} = $cv;
my $data = $cv->recv;
delete $hd->{cv};
$data;
}
sub _receive_header {
my ($self,$hd) = @_;
my $header_buf = $self->receive_data(STANDARD_HEADER_SIZE,$hd);
croak 'Short read for DB response header; length:'.length($header_buf) unless length $header_buf == STANDARD_HEADER_SIZE;
return unpack('V4',$header_buf);
}
sub _receive_response_header {
my ($self,$hd) = @_;
my $header_buf = $self->receive_data(RESPONSE_HEADER_SIZE,$hd);
croak 'Short read for DB response header' unless length $header_buf == RESPONSE_HEADER_SIZE;
my ($response_flags) = unpack 'V',substr($header_buf,0,BSON_INT32);
my ($cursor_id) = unpack 'j',substr($header_buf,BSON_INT32,BSON_INT64);
my ($starting_from,$number_returned) = unpack 'V2',substr($header_buf,BSON_INT32+BSON_INT64);
return ($response_flags,$cursor_id,$starting_from,$number_returned);
}
sub _read_documents {
my ($self,$doc_message_length,$cursor_id,$hd) = @_;
my $remaining = $doc_message_length;
my $bson_buf;
# do {
# my $buf_len = $remaining > 4096? 4096:$remaining;
# $bson_buf .= $self->receive_data($buf_len);
# $remaining -= $buf_len;
# } while ($remaining >0 );
$bson_buf = $self->receive_data($doc_message_length,$hd);
return unless $bson_buf;
# warn "#_read_documents:bson_buf size:".length($bson_buf);
# my $docs = decode_bson_documents($bson_buf,length($bson_buf));
# warn '#_read_documents decode_bson_documents ...';
my $docs = decode_bson_documents($bson_buf);
# warn "docs:$docs";
# warn "#_read_documents:".Dumper($docs)."\n";
return $docs;
}
sub database_names {
my ($self) = @_;
my $ret = $self->admin->run_command({ listDatabases => 1 });
return map { $_->{name} } @{ $ret->{databases} };
}
sub get_database {
my ($self, $database_name) = @_;
return AnyMongo::Database->new(
_connection => $self,
name => $database_name,
);
}
sub authenticate {
my ($self, $dbname, $username, $password, $is_digest) = @_;
my $hash = $password;
# create a hash if the password isn't yet encrypted
if (!$is_digest) {
$hash = Digest::MD5::md5_hex("${username}:mongo:${password}");
}
# get the nonce
my $db = $self->get_database($dbname);
my $result = $db->run_command({getnonce => 1});
if (!$result->{'ok'}) {
return $result;
}
my $nonce = $result->{'nonce'};
my $digest = Digest::MD5::md5_hex($nonce.$username.$hash);
# run the login command
my $login = tie(my %hash, 'Tie::IxHash');
%hash = (authenticate => 1,
user => $username,
nonce => $nonce,
key => $digest);
$result = $db->run_command($login);
( run in 0.594 second using v1.01-cache-2.11-cpan-39bf76dae61 )