MongoDB-Async

 view release on metacpan or  search on metacpan

benchmark_compare.pl  view on Meta::CPAN

		"somename5" => "somedatasomedatasomedatasomedatasomedata",
	},
	
	_id => 0
};




use Benchmark ':all';
async { # use thread and run EV. You can use MongoDB::Async not in Coro threads, but it's ineffective, because coro must run EV every time you waiting for data.
	
	print "Save 20000 docs\n";
	cmpthese ( 20000, {

		'MongoDB::Async save' => sub {$dba->save($doc); $doc->{_id}++ },		
		'MongoDB save' => sub {$db->save($doc); $doc->{_id}++ },
				
	});

	print "\nGet 20000 docs\n";

lib/MongoDB/Async.pm  view on Meta::CPAN


L<MongoDB::Async::Pool> - pool of persistent connects

Added ->data method to L<MongoDB::Async::Cursor>. Same as ->all, but returns array ref. 

dt_type now $MongoDB::Async::BSON::dt_type global variable, not connection object property 

inflate_dbrefs now $MongoDB::Async::Cursor::inflate_dbrefs global variable


This module is 20-100% (in single-(coro)threaded test , mulithreaded will be even faster) faster than original L<MongoDB>. See benchmark L<http://pastebin.com/vFWENzW7> or run benchmark_compare.pl from archive. It might be 1-5% slower than original o...

This driver NOT ithreads safe

SASL and SSL unsupported (ssl may work in blocking mode, not tested it). 

PLEASE DON'T USE documentation of this module and refere to doc of original MongoDB module with corresponding version. Because I'm porting here only features and too lazy to copy-paste docs.

Don't work with this module inside Coro::unblock_sub, it leaks memory. Use separate coro thread to work with database, and if you need callback interface you need to write it yourself.

Please report bugs/suggestions to I<nyaknyan@gmail.com> or cpan's RT.


TODO:

Make async connection - currently it may block for some time while trying to connect to node which is down.

Implement SSL support using normal SSL module object. 

lib/MongoDB/Async/Cursor.pm  view on Meta::CPAN

    $self->{_request_id} = $info->{'request_id'};

    # $self->{_client}->send($query); # now this in XS too
	return $query;
	
	
    # $self->_client->recv($self);
	
	#Weird shit happens with perl stack if we call perl from XS and try to switch Coro threads in recv. 
	#I think problem is in macros that used to call perl sub from xs.
	#Maybe it's some "static" global(per file) var in one of perl .h files used by those macro, and when coro switches stacks: 
	#  -> Cursor xs uses macros (with global var) [point 1] to call perl do_query sub -> perl sub calls XS recv() sub -> recv() switches coro threads while waiting response -> another coro perl thread calls Cursorxs sub thath calls do_query from XS usin...
	#But i'm not sure about it, maybe i'm wrong. 
	#So call recv from Cursor.xs
	

    # $self->{started_iterating}(1);
	# now XS sets this
}


sub fields {

lib/MongoDB/Async/Pool.pm  view on Meta::CPAN

use Devel::GlobalDestruction;
use EV;
use Coro;

=head1 NAME

MongoDB::Async::Pool - Pool of connections

=head1 SYNOPSIS

You can't use one connection in several coro's because one coro can try send query while other coro waiting for response from already running query. Creating new connection for every coroutine might be slow, so this module caches connections for you....

	
	my $pool = MongoDB::Async::Pool->new({}, { timeout => 0, max_conns => 50 });
	
	async { 
		my $big_data = $pool->get->testdb->testcoll->find({ giveme => 'large dataset'})->data;
	};
	
	async { 
		my $big_data = $pool->get->testdb->testcoll->find({ giveme => 'another large dataset'})->data;

lib/MongoDB/Async/Pool.pm  view on Meta::CPAN

	

=head1 METHODS

=head2 new({ MongoDB::Async::MongoClient args }, { MongoDB::Async::Pool attrs });

Creates pool of L<MongoDB::Async::Connection> objects

=head2 $pool->get;

Returns L<MongoDB::Async::MongoClient> object from pool or creates new connection if pool is empty. Might block current coro until some coro return connection if pool is empty and "max_conns" connections currently in use. You needn't think about retu...

ATTRIBUTES:

=head3 ->max_conns , ->max_conns(new value)

Max connection count. ->get will block current coroutine when max_conns reached. Not recomended to change it in runtime. Default: 0 - no limit

=head3 ->timeout , ->timeout(new value)
One unused connection will be closed every "timeout" sec. 0 - don`t close Default: 10. Requries running EV::loop

=head3 ->connections_in_use

=head3 ->connections_in_pool

=cut

mongo_link.c  view on Meta::CPAN

 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

#include "mongo_link.h"
#include "perl_mongo.h"

static int mongo_link_sockaddr(struct sockaddr_in *addr, char *host, int port);


void mongo_get_coro_ev_api(void) {
	I_CORO_API ("MongoDB::Async");
	I_EV_API ("MongoDB::Async");
}

static void ev_sockwatcher_cb (struct ev_loop *loop, ev_io *w, int revents){
	mongo_async_sockwatcher_state *state = (mongo_async_sockwatcher_state *)w;
	int temp_len = (state->len - state->done) > 4096 ? 4096 : (state->len - state->done);
	int num;
    // windows gives a WSAEFAULT if you try to get more bytes
	 //do {

mongo_link.c  view on Meta::CPAN

	
	if(num < 0){
		state->len = -1;
		state->done = -1;
	}
	
	if( state->done >= state->len ){
		state->len = 0;
		ev_io_stop(EV_DEFAULT, w);
		// printf("recv %d\n",state->done);
		CORO_READY(state->coro);
		SvREFCNT_dec(state->coro);
	}
}



/**
 * Waits "timeout" ms for the socket to be ready.  Returns 1 on success, 0 on
 * failure.
 */
static int mongo_link_timeout(int socket, time_t timeout);

mongo_link.c  view on Meta::CPAN

	}
	
	state->buffer = (char *)buffer;
	state->len = len;
	state->done = 0;
	
	if(state->len){
		ev_io_init (& ((mongo_link*)link)->master->sockwatcher.w, ev_sockwatcher_cb, ((mongo_link*)link)->master->socket, revent);
		ev_io_start (EV_DEFAULT, &state->w);
		
		state->coro = CORO_CURRENT;
		SvREFCNT_inc(state->coro);
	};
	
	while(state->len > 0 && state->done >= 0){
		CORO_SCHEDULE;
	}
	
	state->buffer = (char *) 0;
	
	return state->done;
}

mongo_link.c  view on Meta::CPAN

int perl_mongo_master(SV *link_sv, int auto_reconnect) {
  SV *master;
  mongo_link *link;

  link = (mongo_link*)perl_mongo_get_ptr_from_instance(link_sv, &connection_vtbl);

  if (link->master && link->master->connected) {
      return link->master->socket;
  }
  
  //TODO: Make coro threads for calling connect and get_master, here just set threads to ready state. Because callings perl functions may break perl's stack when perl function switches coro's and returns in this function but in other thread. 
  
  // if we didn't have a connection above and this isn't a connection holder
  if (!link->copy) {
      // if this is a real connection, try to reconnect
      if (auto_reconnect && link->auto_reconnect) {
          perl_mongo_call_method(link_sv, "connect", G_DISCARD, 0);
          if (link->master && link->master->connected) {
              return link->master->socket;
          }
      }

mongo_link.h  view on Meta::CPAN

 * socket is the actual socket the connection is using
 * connected is a boolean indicating if the socket is connected or not
 */
 

typedef struct {
	ev_io w;
	char *buffer;
	int len;
	int read;
	SV *coro;
} mongo_async_sock_reader_state;

typedef struct {
	ev_io w;
	char *buffer;
	int len;
	int done;
	SV *coro;
} mongo_async_sockwatcher_state;

 
typedef struct _mongo_server {
  char *host;
  int port;
  int socket;
  int connected;
  
  mongo_async_sockwatcher_state sockwatcher;

reconnect_stress_test.pl  view on Meta::CPAN

new_threads() for 1...100;
sub new_threads {
	print "$reads reads and $writes writes\r";
	async {
		while(){
			my $db = MongoDB::Async::Connection->new({"host" => "mongodb://127.0.0.1" });
			
			for (1..2){
				$db->get_database('test')->get_collection( 'test' )->find({})->all;
				$reads++;
				print "$reads reads and $writes writes. Ready coros: ".Coro::nready."\r";
			}
		}
		
	};
}




EV::loop;

t/pool.t  view on Meta::CPAN

	$pool->get->driver_test_db->d_test_coll->find()->data;
	$queries_running--;
	
	$queries_completed++;
	
	if($queries_completed == 5){
		is($queries_completed, 5, 'all async queries compleated');
	
		$pool->get->driver_test_db->d_test_coll->drop();
		
		# is($max_running, 3, 'max_conn blocks coro');
		# TODO: write tests for testing timeout and max_conns. I tested it manually and it seems OK
		
		
		# global destruction not detected when running make test. Cleanup
		$_->{_parent_pool} = undef for @{$pool->{pool}};
		exit 0;
	}
	
	# warn 'done';
} for (1..5);

threading_stress_test.pl  view on Meta::CPAN


new_threads() for 1...1000;
sub new_threads {
	print "$reads reads and $writes writes\r";

	async {
		my $db = MongoDB::Async::Connection->new({"host" => "mongodb://127.0.0.1"});
		
		while(1){
			$doc->{_id} = int rand($numofdoc);
			$db->get_database('test')->get_collection( 'test' )->save($doc, {safe => 1 }); # safe switches coroutines
			$writes++;
			print "$reads reads and $writes writes. Ready coros: ".Coro::nready."\r";
			
			
			$db->get_database('test')->get_collection( 'test' )->find({})->all;
			$reads++;
			print "$reads reads and $writes writes. Ready coros: ".Coro::nready."\r";
			
			# Coro::AnyEvent::sleep 0.01;
			
		}
	};
}




xs/Mongo.xs  view on Meta::CPAN

	PERL_MONGO_CALL_BOOT (boot_MongoDB__Async__Cursor);
	PERL_MONGO_CALL_BOOT (boot_MongoDB__Async__OID);
        gv_fetchpv("MongoDB::Async::Cursor::_request_id",  GV_ADDMULTI, SVt_IV);
        gv_fetchpv("MongoDB::Async::Cursor::slave_okay",  GV_ADDMULTI, SVt_IV);
        gv_fetchpv("MongoDB::Async::BSON::looks_like_number",  GV_ADDMULTI, SVt_IV);
        gv_fetchpv("MongoDB::Async::BSON::char",  GV_ADDMULTI, SVt_IV);
        gv_fetchpv("MongoDB::Async::BSON::utf8_flag_on",  GV_ADDMULTI, SVt_IV);
        gv_fetchpv("MongoDB::Async::BSON::use_boolean",  GV_ADDMULTI, SVt_IV);
        gv_fetchpv("MongoDB::Async::BSON::use_binary",  GV_ADDMULTI, SVt_IV);
        
		mongo_get_coro_ev_api();
		
		

void
write_query(ns, opts, skip, limit, query, fields = 0)
         char *ns
         int opts
         int skip
         int limit
         SV *query



( run in 0.346 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )