MongoDB-Async
view release on metacpan or search on metacpan
mongo_link.c view on Meta::CPAN
/*
* Copyright 2009 10gen, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "mongo_link.h"
#include "perl_mongo.h"
static int mongo_link_sockaddr(struct sockaddr_in *addr, char *host, int port);
void mongo_get_coro_ev_api(void) {
I_CORO_API ("MongoDB::Async");
I_EV_API ("MongoDB::Async");
}
static void ev_sockwatcher_cb (struct ev_loop *loop, ev_io *w, int revents){
mongo_async_sockwatcher_state *state = (mongo_async_sockwatcher_state *)w;
int temp_len = (state->len - state->done) > 4096 ? 4096 : (state->len - state->done);
int num;
// windows gives a WSAEFAULT if you try to get more bytes
//do {
if(revents & EV_WRITE){
num = send(w->fd, (char*)state->buffer, temp_len, 0);
}else{
num = recv(w->fd, (char*)state->buffer, temp_len, 0);
}
state->buffer = (char*)state->buffer + num;
state->done += num;
//} while( (num == 4096) && (state->done < state->len) );
if(num < 0){
state->len = -1;
state->done = -1;
}
if( state->done >= state->len ){
state->len = 0;
ev_io_stop(EV_DEFAULT, w);
// printf("recv %d\n",state->done);
CORO_READY(state->coro);
SvREFCNT_dec(state->coro);
}
}
/**
* Waits "timeout" ms for the socket to be ready. Returns 1 on success, 0 on
* failure.
*/
static int mongo_link_timeout(int socket, time_t timeout);
static void set_timeout(int socket, time_t timeout) {
#ifdef WIN32
DWORD tv = (DWORD)timeout * 1000;
const char *tv_ptr = (const char*)&tv;
#else
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
const void *tv_ptr = (void*)&tv;
#endif
setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, tv_ptr, sizeof(tv));
setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, tv_ptr, sizeof(tv));
}
void perl_mongo_connect(mongo_link* link) {
#ifdef MONGO_SSL
if(link->ssl){
ssl_connect(link);
link->sender = ssl_send;
link->receiver = ssl_recv;
return;
}
#endif
non_ssl_connect(link);
link->sender = non_ssl_send;
link->receiver = non_ssl_recv;
}
/*
* Returns -1 on failure, the socket fh on success.
*
* Note: this cannot return 0 on failure, because reconnecting sometimes makes
* the fh 0 (briefly).
*/
// ^^^ this is from old version and it's a lie. Function returns always returns 0, socket fh stored in link strusture
void non_ssl_connect(mongo_link* link) {
int sock, status, connected = 0;
struct sockaddr_in addr;
#ifdef WIN32
WORD version;
WSADATA wsaData;
int error;
u_long no = 0;
const char yes = 1;
mongo_link.c view on Meta::CPAN
// Connect the SSL struct to our connection
if(!SSL_set_fd(link->ssl_handle, link->master->socket)){
ERR_print_errors_fp(stderr);
}
// Initiate SSL handshake
if(SSL_connect (link->ssl_handle) != 1){
ERR_print_errors_fp(stderr);
}
SSL_CTX_set_timeout(link->ssl_context, (long)link->timeout);
link->master->connected = 1;
}
}
int ssl_send(void* link, const char* buffer, size_t len){
return SSL_write(((mongo_link*)link)->ssl_handle, buffer, len);
}
int ssl_recv(mongo_link* link, void *dest, int len) {
int num = 1, read = 0;
// this can return FAILED if there is just no more data from db
while (read < len && num > 0) {
int temp_len = (len - read) > 4096 ? 4096 : (len - read);
// windows gives a WSAEFAULT if you try to get more bytes
num = SSL_read(((mongo_link*)link)->ssl_handle, (void*)dest, temp_len);
if (num < 0) {
return -1;
}
dest = (char*)dest + num;
read += num;
}
return read;
}
#endif
static int _make_async_mongo_io(void* link, const char* buffer, size_t len, int revent){
mongo_async_sockwatcher_state *state = &( ((mongo_link*)link)->master->sockwatcher );
if(state->buffer){
croak("Two queries to one connection at same time. Use MongoDB::Async::Pool");
}
state->buffer = (char *)buffer;
state->len = len;
state->done = 0;
if(state->len){
ev_io_init (& ((mongo_link*)link)->master->sockwatcher.w, ev_sockwatcher_cb, ((mongo_link*)link)->master->socket, revent);
ev_io_start (EV_DEFAULT, &state->w);
state->coro = CORO_CURRENT;
SvREFCNT_inc(state->coro);
};
while(state->len > 0 && state->done >= 0){
CORO_SCHEDULE;
}
state->buffer = (char *) 0;
return state->done;
}
int non_ssl_send(void* link, const char* buffer, size_t len){
// return send(((mongo_link*)link)->master->socket, buffer, len, 0);
return _make_async_mongo_io(link, buffer, len, EV_WRITE ); //TODO
}
int non_ssl_recv(void* link, const char* buffer, size_t len){
return _make_async_mongo_io(link, buffer, len, EV_READ );
}
static int mongo_link_timeout(int sock, time_t to) {
struct timeval timeout, now, prev;
if (to <= 0) {
return 1;
}
timeout.tv_sec = to > 0 ? ((long)to / 1000) : 20;
timeout.tv_usec = to > 0 ? ((to % 1000) * 1000) : 0;
// initialize prev, in case we get interrupted
if (gettimeofday(&prev, 0) == -1) {
return 0;
}
while (1) {
fd_set rset, wset, eset;
int sock_status;
FD_ZERO(&rset);
FD_SET(sock, &rset);
FD_ZERO(&wset);
FD_SET(sock, &wset);
FD_ZERO(&eset);
FD_SET(sock, &eset);
sock_status = select(sock+1, &rset, &wset, &eset, &timeout);
// error
if (sock_status == -1) {
#ifdef WIN32
errno = WSAGetLastError();
#endif
if (errno == EINTR) {
if (gettimeofday(&now, 0) == -1) {
mongo_link.c view on Meta::CPAN
if (link->receiver(link, cursor->buf.pos, cursor->header.length) == -1) {
#ifdef WIN32
croak("WSA error getting database response: %d\n", WSAGetLastError());
#else
croak("error getting database response: %s\n", strerror(errno));
#endif
return 0;
}
cursor->num += num_returned;
return num_returned > 0;
}
/*
* closes sockets and sets "connected" to 0
*/
void set_disconnected(SV *link_sv) {
mongo_link *link;
link = (mongo_link*)perl_mongo_get_ptr_from_instance(link_sv, &connection_vtbl);
// check if there's nothing to do
if (link->master == 0 || link->master->connected == 0) {
return;
}
#ifdef WIN32
shutdown(link->master->socket, 2);
closesocket(link->master->socket);
WSACleanup();
#else
close(link->master->socket);
#endif
#ifdef MONGO_SSL
if(link->ssl){
ssl_disconnect(link);
}
#endif
link->master->connected = 0;
// TODO: set $self->_master to 0?
if (link->copy) {
link->master = 0;
perl_mongo_call_method(link_sv, "_master", G_DISCARD, 1, &PL_sv_no);
}
}
int perl_mongo_master(SV *link_sv, int auto_reconnect) {
SV *master;
mongo_link *link;
link = (mongo_link*)perl_mongo_get_ptr_from_instance(link_sv, &connection_vtbl);
if (link->master && link->master->connected) {
return link->master->socket;
}
//TODO: Make coro threads for calling connect and get_master, here just set threads to ready state. Because callings perl functions may break perl's stack when perl function switches coro's and returns in this function but in other thread.
// if we didn't have a connection above and this isn't a connection holder
if (!link->copy) {
// if this is a real connection, try to reconnect
if (auto_reconnect && link->auto_reconnect) {
perl_mongo_call_method(link_sv, "connect", G_DISCARD, 0);
if (link->master && link->master->connected) {
return link->master->socket;
}
}
return -1;
}
master = perl_mongo_call_method(link_sv, "get_master", 0, 0);
if (SvROK(master)) {
mongo_link *m_link;
m_link = (mongo_link*)perl_mongo_get_ptr_from_instance(master, &connection_vtbl);
link->copy = 1;
link->master = m_link->master;
link->ssl = m_link->ssl;
#ifdef MONGO_SSL
link->ssl_handle = m_link->ssl_handle;
link->ssl_context = m_link->ssl_context;
#endif
link->sender = m_link->sender;
link->receiver = m_link->receiver;
return link->master->socket;
}
link->master = 0;
return -1;
}
#ifdef MONGO_SSL
// Establish a regular tcp connection
void tcp_setup(mongo_link* link){
int error, handle;
struct hostent *host;
struct sockaddr_in server;
host = gethostbyname (link->master->host);
handle = socket (AF_INET, SOCK_STREAM, 0);
if (handle == -1){
handle = 0;
}
else {
server.sin_family = AF_INET;
server.sin_port = htons (link->master->port);
server.sin_addr = *((struct in_addr *) host->h_addr);
bzero (&(server.sin_zero), 8);
error = connect(handle, (struct sockaddr *) &server, sizeof (struct sockaddr));
if (error == -1){
handle = 0;
}
}
( run in 1.102 second using v1.01-cache-2.11-cpan-39bf76dae61 )