MongoDB-Async

 view release on metacpan or  search on metacpan

mongo_link.c  view on Meta::CPAN

/*
 *  Copyright 2009 10gen, Inc.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

#include "mongo_link.h"
#include "perl_mongo.h"

static int mongo_link_sockaddr(struct sockaddr_in *addr, char *host, int port);


void mongo_get_coro_ev_api(void) {
	I_CORO_API ("MongoDB::Async");
	I_EV_API ("MongoDB::Async");
}

static void ev_sockwatcher_cb (struct ev_loop *loop, ev_io *w, int revents){
	mongo_async_sockwatcher_state *state = (mongo_async_sockwatcher_state *)w;
	int temp_len = (state->len - state->done) > 4096 ? 4096 : (state->len - state->done);
	int num;
    // windows gives a WSAEFAULT if you try to get more bytes
	 //do {
		if(revents & EV_WRITE){
			num = send(w->fd, (char*)state->buffer, temp_len, 0);
		}else{
			num = recv(w->fd, (char*)state->buffer, temp_len, 0);
		}
		
		
		state->buffer = (char*)state->buffer + num;
		state->done += num;
	 //} while( (num == 4096) && (state->done < state->len) );
	
	if(num < 0){
		state->len = -1;
		state->done = -1;
	}
	
	if( state->done >= state->len ){
		state->len = 0;
		ev_io_stop(EV_DEFAULT, w);
		// printf("recv %d\n",state->done);
		CORO_READY(state->coro);
		SvREFCNT_dec(state->coro);
	}
}



/**
 * Waits "timeout" ms for the socket to be ready.  Returns 1 on success, 0 on
 * failure.
 */
static int mongo_link_timeout(int socket, time_t timeout);

static void set_timeout(int socket, time_t timeout) {
#ifdef WIN32
  DWORD tv = (DWORD)timeout * 1000;
  const char *tv_ptr = (const char*)&tv;
#else
  struct timeval tv;
  tv.tv_sec = 1;
  tv.tv_usec = 0;
  const void *tv_ptr = (void*)&tv;
#endif
  setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, tv_ptr, sizeof(tv));
  setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, tv_ptr, sizeof(tv));
}


void perl_mongo_connect(mongo_link* link) {
#ifdef MONGO_SSL
  if(link->ssl){
    ssl_connect(link);
    link->sender = ssl_send;
    link->receiver = ssl_recv;
    return;
  }
#endif

  non_ssl_connect(link);
  link->sender = non_ssl_send;
  link->receiver = non_ssl_recv;
}

/*
 * Returns -1 on failure, the socket fh on success.
 *
 * Note: this cannot return 0 on failure, because reconnecting sometimes makes
 * the fh 0 (briefly).
 */

 // ^^^ this is from old version and it's a lie. Function returns always returns 0, socket fh stored in link strusture
 
void non_ssl_connect(mongo_link* link) {
  int sock, status, connected = 0;
  struct sockaddr_in addr;

#ifdef WIN32
  WORD version;
  WSADATA wsaData;
  int error;
  u_long no = 0;
  const char yes = 1;

mongo_link.c  view on Meta::CPAN


    // Connect the SSL struct to our connection
    if(!SSL_set_fd(link->ssl_handle, link->master->socket)){
      ERR_print_errors_fp(stderr);
    }

    // Initiate SSL handshake
    if(SSL_connect (link->ssl_handle) != 1){
      ERR_print_errors_fp(stderr);
    }

    SSL_CTX_set_timeout(link->ssl_context, (long)link->timeout);

    link->master->connected = 1;
  }
}

int ssl_send(void* link, const char* buffer, size_t len){
  return SSL_write(((mongo_link*)link)->ssl_handle, buffer, len);
}

int ssl_recv(mongo_link* link, void *dest, int len) {
	
	int num = 1, read = 0;

	// this can return FAILED if there is just no more data from db
	while (read < len && num > 0) {
	int temp_len = (len - read) > 4096 ? 4096 : (len - read);

	// windows gives a WSAEFAULT if you try to get more bytes
	num = SSL_read(((mongo_link*)link)->ssl_handle, (void*)dest, temp_len);
	if (num < 0) {
	  return -1;
	}

	dest = (char*)dest + num;
	read += num;
	}
  return read;
}


#endif


static int _make_async_mongo_io(void* link, const char* buffer, size_t len, int revent){
	mongo_async_sockwatcher_state *state = &( ((mongo_link*)link)->master->sockwatcher );

	if(state->buffer){
		croak("Two queries to one connection at same time. Use MongoDB::Async::Pool");
	}
	
	state->buffer = (char *)buffer;
	state->len = len;
	state->done = 0;
	
	if(state->len){
		ev_io_init (& ((mongo_link*)link)->master->sockwatcher.w, ev_sockwatcher_cb, ((mongo_link*)link)->master->socket, revent);
		ev_io_start (EV_DEFAULT, &state->w);
		
		state->coro = CORO_CURRENT;
		SvREFCNT_inc(state->coro);
	};
	
	while(state->len > 0 && state->done >= 0){
		CORO_SCHEDULE;
	}
	
	state->buffer = (char *) 0;
	
	return state->done;
}


int non_ssl_send(void* link, const char* buffer, size_t len){
	// return send(((mongo_link*)link)->master->socket, buffer, len, 0);
	return _make_async_mongo_io(link, buffer, len, EV_WRITE ); //TODO
}


int non_ssl_recv(void* link, const char* buffer, size_t len){
	return _make_async_mongo_io(link, buffer, len, EV_READ );
}


static int mongo_link_timeout(int sock, time_t to) {
  struct timeval timeout, now, prev;

  if (to <= 0) {
    return 1;
  }

  timeout.tv_sec = to > 0 ? ((long)to / 1000) : 20;
  timeout.tv_usec = to > 0 ? ((to % 1000) * 1000) : 0;

  // initialize prev, in case we get interrupted
  if (gettimeofday(&prev, 0) == -1) {
    return 0;
  }

  while (1) {
    fd_set rset, wset, eset;
    int sock_status;

    FD_ZERO(&rset);
    FD_SET(sock, &rset);
    FD_ZERO(&wset);
    FD_SET(sock, &wset);
    FD_ZERO(&eset);
    FD_SET(sock, &eset);

    sock_status = select(sock+1, &rset, &wset, &eset, &timeout);

    // error
    if (sock_status == -1) {

#ifdef WIN32
      errno = WSAGetLastError();
#endif

      if (errno == EINTR) {
        if (gettimeofday(&now, 0) == -1) {

mongo_link.c  view on Meta::CPAN

  if (link->receiver(link, cursor->buf.pos, cursor->header.length) == -1) {
#ifdef WIN32
    croak("WSA error getting database response: %d\n", WSAGetLastError());
#else
    croak("error getting database response: %s\n", strerror(errno));
#endif
    return 0;
  }

  cursor->num += num_returned;
  return num_returned > 0;
}


/*
 * closes sockets and sets "connected" to 0
 */
void set_disconnected(SV *link_sv) {
  mongo_link *link;

  link = (mongo_link*)perl_mongo_get_ptr_from_instance(link_sv, &connection_vtbl);

  // check if there's nothing to do
  if (link->master == 0 || link->master->connected == 0) {
      return;
  }

#ifdef WIN32
  shutdown(link->master->socket, 2);
  closesocket(link->master->socket);
  WSACleanup();
#else
  close(link->master->socket);
#endif

#ifdef MONGO_SSL
  if(link->ssl){
    ssl_disconnect(link);
  }
#endif

  link->master->connected = 0;

  // TODO: set $self->_master to 0?
  if (link->copy) {
      link->master = 0;
      perl_mongo_call_method(link_sv, "_master", G_DISCARD, 1, &PL_sv_no);
  }
}

int perl_mongo_master(SV *link_sv, int auto_reconnect) {
  SV *master;
  mongo_link *link;

  link = (mongo_link*)perl_mongo_get_ptr_from_instance(link_sv, &connection_vtbl);

  if (link->master && link->master->connected) {
      return link->master->socket;
  }
  
  //TODO: Make coro threads for calling connect and get_master, here just set threads to ready state. Because callings perl functions may break perl's stack when perl function switches coro's and returns in this function but in other thread. 
  
  // if we didn't have a connection above and this isn't a connection holder
  if (!link->copy) {
      // if this is a real connection, try to reconnect
      if (auto_reconnect && link->auto_reconnect) {
          perl_mongo_call_method(link_sv, "connect", G_DISCARD, 0);
          if (link->master && link->master->connected) {
              return link->master->socket;
          }
      }

      return -1;
  }

  master = perl_mongo_call_method(link_sv, "get_master", 0, 0);
  if (SvROK(master)) {
    mongo_link *m_link;

    m_link = (mongo_link*)perl_mongo_get_ptr_from_instance(master, &connection_vtbl);
    link->copy = 1;
    link->master = m_link->master;
    link->ssl = m_link->ssl;
#ifdef MONGO_SSL
    link->ssl_handle = m_link->ssl_handle;
    link->ssl_context = m_link->ssl_context;
#endif
    link->sender = m_link->sender;
    link->receiver = m_link->receiver;

    return link->master->socket;
  }

  link->master = 0;
  return -1;
}

#ifdef MONGO_SSL
// Establish a regular tcp connection
void tcp_setup(mongo_link* link){
  int error, handle;
  struct hostent *host;
  struct sockaddr_in server;

  host = gethostbyname (link->master->host);
  handle = socket (AF_INET, SOCK_STREAM, 0);
  if (handle == -1){
    handle = 0;
  }
  else {
    server.sin_family = AF_INET;
    server.sin_port = htons (link->master->port);
    server.sin_addr = *((struct in_addr *) host->h_addr);
    bzero (&(server.sin_zero), 8);

    error = connect(handle, (struct sockaddr *) &server, sizeof (struct sockaddr));
    if (error == -1){
      handle = 0;
    }
  }



( run in 1.102 second using v1.01-cache-2.11-cpan-39bf76dae61 )