AnyMongo
view release on metacpan or search on metacpan
lib/AnyMongo/Connection.pm view on Meta::CPAN
is => 'rw',
isa => 'Str',
required => 1,
default => 'admin',
);
has query_timeout => (
is => 'rw',
isa => 'Int',
required => 1,
default => sub { return $AnyMongo::Cursor::timeout; },
);
has auto_connect => (
is => 'ro',
isa => 'Bool',
required => 1,
default => 1,
);
has auto_reconnect => (
is => 'ro',
isa => 'Bool',
required => 1,
default => 1,
);
has host => (
is => 'ro',
isa => 'Str',
required => 1,
default => 'mongodb://localhost:27017',
);
has w => (
is => 'rw',
isa => 'Int',
default => 1,
);
has wtimeout => (
is => 'rw',
isa => 'Int',
default => 1000,
);
has timeout => (
is => 'ro',
isa => 'Int',
required => 1,
default => 20000,
);
has username => (
is => 'rw',
isa => 'Str',
required => 0,
);
has password => (
is => 'rw',
isa => 'Str',
required => 0,
);
has connected => (
isa => 'Bool',
is => 'rw',
default => 0,
);
has cv => (
isa => 'AnyEvent::CondVar',
is => 'ro',
lazy_build => 1,
clearer => 'clear_cv',
);
sub _build_cv {
my ($self) = @_;
AE::cv;
}
has _connection_error => (
isa => 'Bool',
is => 'rw',
default => 0,
);
sub CLONE_SKIP { 1 }
sub BUILD { shift->_init }
sub _init {
my ($self) = @_;
eval "use ${_}" # no Any::Moose::load_class becase the namespaces already have symbols from the xs bootstrap
for qw/AnyMongo::Database AnyMongo::Cursor AnyMongo::BSON::OID/;
$self->_parse_servers();
if ($self->auto_connect) {
$self->connect;
# if (defined $self->username && defined $self->password) {
# $self->authenticate($self->db_name, $self->username, $self->password);
# }
}
}
sub connect {
my ($self,%args) = @_;
return if $self->connected || $self->{_trying_connect};
# warn "connect...\n";
$self->{_trying_connect} = 1;
#setup connection timeout watcher
my $timer; $timer = AnyEvent->timer( after => 5 ,cb => sub {
undef $timer;
unless ( $self->connected ) {
$self->{_trying_connect} = 0;
$self->cv->croak('Failed to connect to any mongodb servers,timeout');
}
});
$self->cv->begin( sub {
undef $timer;
if ($self->connected) {
$self->_connection_error(0);
shift->send;
}
else {
shift->croak("Failed to connect to any mongodb servers");
}
});
my $servers = $self->{mongo_servers};
my $seed_queue = $self->{_seed_queue} = [ keys %{ $servers } ];
my $seed_tried = $self->{_seed_tried} = {};
while (!$self->connected && @{$seed_queue} ) {
while (my $h = shift @{$seed_queue}) {
$seed_tried->{$h} = 1;
$self->_check_master($h);
}
}
$self->cv->recv;
$self->{_trying_connect} = 0;
# warn "connect done.\n";
}
sub _set_master {
my ($self,$server_id,$h) = @_;
$self->master_handle($h);
$self->{mongo_servers}->{$server_id}->{is_master} = 1;
$self->{master_id} = $server_id;
$self->connected(1);
# warn "master_id:$server_id\n";
}
sub _is_master {
my ($config) = @_;
$config && (ref($config) ne 'SCALAR') && ($config->{ismaster});
}
sub _check_master {
lib/AnyMongo/Connection.pm view on Meta::CPAN
$hd->{cv} = $cv;
my $data = $cv->recv;
delete $hd->{cv};
$data;
}
sub _receive_header {
my ($self,$hd) = @_;
my $header_buf = $self->receive_data(STANDARD_HEADER_SIZE,$hd);
croak 'Short read for DB response header; length:'.length($header_buf) unless length $header_buf == STANDARD_HEADER_SIZE;
return unpack('V4',$header_buf);
}
sub _receive_response_header {
my ($self,$hd) = @_;
my $header_buf = $self->receive_data(RESPONSE_HEADER_SIZE,$hd);
croak 'Short read for DB response header' unless length $header_buf == RESPONSE_HEADER_SIZE;
my ($response_flags) = unpack 'V',substr($header_buf,0,BSON_INT32);
my ($cursor_id) = unpack 'j',substr($header_buf,BSON_INT32,BSON_INT64);
my ($starting_from,$number_returned) = unpack 'V2',substr($header_buf,BSON_INT32+BSON_INT64);
return ($response_flags,$cursor_id,$starting_from,$number_returned);
}
sub _read_documents {
my ($self,$doc_message_length,$cursor_id,$hd) = @_;
my $remaining = $doc_message_length;
my $bson_buf;
# do {
# my $buf_len = $remaining > 4096? 4096:$remaining;
# $bson_buf .= $self->receive_data($buf_len);
# $remaining -= $buf_len;
# } while ($remaining >0 );
$bson_buf = $self->receive_data($doc_message_length,$hd);
return unless $bson_buf;
# warn "#_read_documents:bson_buf size:".length($bson_buf);
# my $docs = decode_bson_documents($bson_buf,length($bson_buf));
# warn '#_read_documents decode_bson_documents ...';
my $docs = decode_bson_documents($bson_buf);
# warn "docs:$docs";
# warn "#_read_documents:".Dumper($docs)."\n";
return $docs;
}
sub database_names {
my ($self) = @_;
my $ret = $self->admin->run_command({ listDatabases => 1 });
return map { $_->{name} } @{ $ret->{databases} };
}
sub get_database {
my ($self, $database_name) = @_;
return AnyMongo::Database->new(
_connection => $self,
name => $database_name,
);
}
sub authenticate {
my ($self, $dbname, $username, $password, $is_digest) = @_;
my $hash = $password;
# create a hash if the password isn't yet encrypted
if (!$is_digest) {
$hash = Digest::MD5::md5_hex("${username}:mongo:${password}");
}
# get the nonce
my $db = $self->get_database($dbname);
my $result = $db->run_command({getnonce => 1});
if (!$result->{'ok'}) {
return $result;
}
my $nonce = $result->{'nonce'};
my $digest = Digest::MD5::md5_hex($nonce.$username.$hash);
# run the login command
my $login = tie(my %hash, 'Tie::IxHash');
%hash = (authenticate => 1,
user => $username,
nonce => $nonce,
key => $digest);
$result = $db->run_command($login);
return $result;
}
sub admin { shift->get_database('admin') }
sub disconnect {
my ($self) = @_;
$self->clear_master_handle;
my $guards = $self->{_guards};
my $handles = $self->{_handles};
map { delete $guards->{$_} } keys %{ $guards };
map { (delete $handles->{$_})->destroy } keys %{$handles};
$self->{mongo_servers} = {};
$self->{_is_connected} = 0;
$self->connected(0);
}
sub DEMOLISH {
shift->disconnect;
}
__PACKAGE__->meta->make_immutable;
1;
=pod
=head1 NAME
AnyMongo::Connection - Asynchronous MongoDB::Connection
=head1 VERSION
version 0.03
=head1 SYNOPSIS
=head1 DESCRIPTION
( run in 0.610 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )