AnyMongo

 view release on metacpan or  search on metacpan

lib/AnyMongo/Connection.pm  view on Meta::CPAN

    is       => 'rw',
    isa      => 'Str',
    required => 1,
    default  => 'admin',
);

has query_timeout => (
    is       => 'rw',
    isa      => 'Int',
    required => 1,
    default  => sub { return $AnyMongo::Cursor::timeout; },
);

has auto_connect => (
    is       => 'ro',
    isa      => 'Bool',
    required => 1,
    default  => 1,
);

has auto_reconnect => (
    is       => 'ro',
    isa      => 'Bool',
    required => 1,
    default  => 1,
);

has host => (
    is       => 'ro',
    isa      => 'Str',
    required => 1,
    default  => 'mongodb://localhost:27017',
);

has w => (
    is      => 'rw',
    isa     => 'Int',
    default => 1,
);


has wtimeout => (
    is      => 'rw',
    isa     => 'Int',
    default => 1000,
);

has timeout => (
    is       => 'ro',
    isa      => 'Int',
    required => 1,
    default  => 20000,
);

has username => (
    is       => 'rw',
    isa      => 'Str',
    required => 0,
);

has password => (
    is       => 'rw',
    isa      => 'Str',
    required => 0,
);

has connected => (
    isa => 'Bool',
    is  => 'rw',
    default  => 0,
);

has cv => (
    isa => 'AnyEvent::CondVar',
    is  => 'ro',
    lazy_build => 1,
    clearer  => 'clear_cv',
);

sub _build_cv {
    my ($self) = @_;
    AE::cv;
}

has _connection_error => (
    isa => 'Bool',
    is  => 'rw',
    default  => 0,
);


sub CLONE_SKIP { 1 }

sub BUILD { shift->_init }

sub _init {
    my ($self) = @_;
    eval "use ${_}" # no Any::Moose::load_class becase the namespaces already have symbols from the xs bootstrap
        for qw/AnyMongo::Database AnyMongo::Cursor AnyMongo::BSON::OID/;
    $self->_parse_servers();
    if ($self->auto_connect) {
        $self->connect;
        # if (defined $self->username && defined $self->password) {
        #     $self->authenticate($self->db_name, $self->username, $self->password);
        # }
    }

}

sub connect {
    my ($self,%args) = @_;
    return if $self->connected || $self->{_trying_connect};
    # warn "connect...\n";
    $self->{_trying_connect} = 1;
    #setup connection timeout watcher
    my $timer; $timer = AnyEvent->timer( after => 5 ,cb => sub {
        undef $timer;
        unless ( $self->connected ) {
            $self->{_trying_connect} = 0;
            $self->cv->croak('Failed to connect to any mongodb servers,timeout');
        }
    });
    $self->cv->begin( sub {
        undef $timer;
        if ($self->connected) {
            $self->_connection_error(0);
            shift->send;
        }
        else {
            shift->croak("Failed to connect to any mongodb servers");
        }
    });

    my $servers = $self->{mongo_servers};
    my $seed_queue = $self->{_seed_queue} = [ keys %{ $servers } ];
    my $seed_tried = $self->{_seed_tried} = {};
    while (!$self->connected && @{$seed_queue} ) {
        while (my $h = shift @{$seed_queue}) {
            $seed_tried->{$h} = 1;
            $self->_check_master($h);
        }
    }
    $self->cv->recv;
    $self->{_trying_connect} = 0;
    # warn "connect done.\n";
}


sub _set_master {
    my ($self,$server_id,$h) = @_;
    $self->master_handle($h);
    $self->{mongo_servers}->{$server_id}->{is_master} = 1;
    $self->{master_id} = $server_id;
    $self->connected(1);
    # warn "master_id:$server_id\n";
}


sub _is_master {
    my ($config) = @_;
    $config && (ref($config) ne 'SCALAR') && ($config->{ismaster});
}

sub _check_master {

lib/AnyMongo/Connection.pm  view on Meta::CPAN

    $hd->{cv} = $cv;
    my $data = $cv->recv;
    delete $hd->{cv};
    $data;
}


sub _receive_header {
    my ($self,$hd) = @_;
    my $header_buf = $self->receive_data(STANDARD_HEADER_SIZE,$hd);
    croak 'Short read for DB response header; length:'.length($header_buf) unless length $header_buf == STANDARD_HEADER_SIZE;
    return unpack('V4',$header_buf);
}

sub _receive_response_header {
    my ($self,$hd) = @_;
    my $header_buf = $self->receive_data(RESPONSE_HEADER_SIZE,$hd);
    croak 'Short read for DB response header' unless length $header_buf == RESPONSE_HEADER_SIZE;
    my ($response_flags) = unpack 'V',substr($header_buf,0,BSON_INT32);
    my ($cursor_id) = unpack 'j',substr($header_buf,BSON_INT32,BSON_INT64);
    my ($starting_from,$number_returned) = unpack 'V2',substr($header_buf,BSON_INT32+BSON_INT64);
    return ($response_flags,$cursor_id,$starting_from,$number_returned);
}

sub _read_documents {
    my ($self,$doc_message_length,$cursor_id,$hd) = @_;
    my $remaining = $doc_message_length;
    my $bson_buf;
    # do {
    #     my $buf_len = $remaining > 4096? 4096:$remaining;
    #     $bson_buf .= $self->receive_data($buf_len);
    #     $remaining -= $buf_len;
    # } while ($remaining >0 );
    $bson_buf = $self->receive_data($doc_message_length,$hd);
    return unless $bson_buf;
    # warn "#_read_documents:bson_buf size:".length($bson_buf);
    # my $docs = decode_bson_documents($bson_buf,length($bson_buf));
    # warn '#_read_documents decode_bson_documents ...';
    my $docs = decode_bson_documents($bson_buf);
    # warn "docs:$docs";
    # warn "#_read_documents:".Dumper($docs)."\n";
    return $docs;
}

sub database_names {
    my ($self) = @_;
    my $ret = $self->admin->run_command({ listDatabases => 1 });
    return map { $_->{name} } @{ $ret->{databases} };
}


sub get_database {
    my ($self, $database_name) = @_;
    return AnyMongo::Database->new(
        _connection => $self,
        name        => $database_name,
    );
}

sub authenticate {
    my ($self, $dbname, $username, $password, $is_digest) = @_;
    my $hash = $password;

    # create a hash if the password isn't yet encrypted
    if (!$is_digest) {
        $hash = Digest::MD5::md5_hex("${username}:mongo:${password}");
    }

    # get the nonce
    my $db = $self->get_database($dbname);
    my $result = $db->run_command({getnonce => 1});
    if (!$result->{'ok'}) {
        return $result;
    }

    my $nonce = $result->{'nonce'};
    my $digest = Digest::MD5::md5_hex($nonce.$username.$hash);

    # run the login command
    my $login = tie(my %hash, 'Tie::IxHash');
    %hash = (authenticate => 1,
             user => $username,
             nonce => $nonce,
             key => $digest);
    $result = $db->run_command($login);

    return $result;
}

sub admin { shift->get_database('admin') }

sub disconnect {
    my ($self) = @_;
    $self->clear_master_handle;
    my $guards = $self->{_guards};
    my $handles = $self->{_handles};
    map { delete $guards->{$_} } keys %{ $guards };
    map { (delete $handles->{$_})->destroy } keys %{$handles};
    $self->{mongo_servers} = {};
    $self->{_is_connected} = 0;
    $self->connected(0);
}

sub DEMOLISH {
    shift->disconnect;
}

__PACKAGE__->meta->make_immutable;

1;


=pod

=head1 NAME

AnyMongo::Connection - Asynchronous MongoDB::Connection

=head1 VERSION

version 0.03

=head1 SYNOPSIS

=head1 DESCRIPTION



( run in 0.610 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )