Net-DNS-Native

 view release on metacpan or  search on metacpan

Native.xs  view on Meta::CPAN

#undef malloc
#include "queue.h" // will be used outside of the main thread
#pragma pop_macro("free")
#pragma pop_macro("malloc")

// write() is deprecated in favor of _write() - windows way
#if defined(WIN32) && !defined(UNDER_CE)
# include <io.h>
# define write _write
# define read _read
#endif

// unnamed semaphores are not implemented in this POSIX compatible UNIX system
#ifdef PERL_DARWIN
# include "mysemaphore.h"
# define sem_t my_sem_t
# define sem_init my_sem_init
# define sem_wait my_sem_wait
# define sem_post my_sem_post
# define sem_destroy my_sem_destroy
#endif

#ifdef __linux__
# include <link.h>
int _dl_phdr_cb(struct dl_phdr_info *info, size_t size, void *data) {
    int i;
    char *found = (char*)data;
    
    if (*found) {
        return *found;
    }
    
    for (i=0; i < info->dlpi_phnum; i++) {
        if (instr(info->dlpi_name, "libnss_files") != NULL) {
            *found = 1;
            break;
        }
    }
    
    return *found;
}
#endif

typedef struct DNS_result DNS_result;

typedef struct {
    pthread_mutex_t mutex;
    pthread_cond_t cv;
    pthread_attr_t thread_attrs;
#ifndef WIN32
    sigset_t blocked_sig;
#endif
    sem_t semaphore;
    bstree* fd_map;
    queue* in_queue;
    int active_threads_cnt;
    int pool;
    char extra_thread;
    char notify_on_begin;
    int extra_threads_cnt;
    int busy_threads;
    queue* tout_queue;
    char need_pool_reinit;
    PerlInterpreter *perl;
} Net_DNS_Native;

typedef struct {
    Net_DNS_Native *self;
    char *host;
    char *service;
    struct addrinfo *hints;
    char extra;
    char queued;
    DNS_result *res;
} DNS_thread_arg;

struct DNS_result {
    int fd1;
    int gai_error;
    int sys_error;
    struct addrinfo *hostinfo;
    int type;
    DNS_thread_arg *arg;
    char dequeued;
};

queue *DNS_instances = NULL;

void DNS_on_thread_finish(Net_DNS_Native *self) {
    pthread_mutex_lock(&self->mutex);
    if (--self->active_threads_cnt == 0) {
        pthread_cond_signal(&self->cv);
    }
    pthread_mutex_unlock(&self->mutex);
}

void *DNS_getaddrinfo(void *v_arg) {
    DNS_thread_arg *arg = (DNS_thread_arg *)v_arg;
    char queued = arg->queued; // to not affect by main thread
    Net_DNS_Native *self = arg->self;
#ifndef WIN32
    if (!queued)
        pthread_sigmask(SIG_BLOCK, &self->blocked_sig, NULL);
#endif
    
    if (self->notify_on_begin)
        write(arg->res->fd1, "1", 1);
    arg->res->gai_error = getaddrinfo(arg->host, arg->service, arg->hints, &arg->res->hostinfo);
    if (arg->res->gai_error == EAI_SYSTEM)
        arg->res->sys_error = errno;

    pthread_mutex_lock(&self->mutex);
    arg->res->arg = arg;
    if (arg->extra) self->extra_threads_cnt--;
    write(arg->res->fd1, "2", 1);
    pthread_mutex_unlock(&self->mutex);
    
    if (!queued) DNS_on_thread_finish(self);
    return NULL;
}

void *DNS_pool_worker(void *v_arg) {
    Net_DNS_Native *self = (Net_DNS_Native*)v_arg;
#ifndef WIN32
    pthread_sigmask(SIG_BLOCK, &self->blocked_sig, NULL);
#endif
    
    while (sem_wait(&self->semaphore) == 0) {
        pthread_mutex_lock(&self->mutex);
        void *arg = queue_shift(self->in_queue);
        if (arg != NULL) self->busy_threads++;
        pthread_mutex_unlock(&self->mutex);
        
        if (arg == NULL) {
            // this was request to quit thread
            break;
        }
        
        DNS_getaddrinfo(arg);
        
        pthread_mutex_lock(&self->mutex);
        self->busy_threads--;
        pthread_mutex_unlock(&self->mutex);
    }
    
    DNS_on_thread_finish(self);
    return NULL;
}

void *DNS_extra_worker(void *v_arg) {
    Net_DNS_Native *self = (Net_DNS_Native*)v_arg;
    char stop = 0;
#ifndef WIN32
    pthread_sigmask(SIG_BLOCK, &self->blocked_sig, NULL);
#endif
    
    while (sem_wait(&self->semaphore) == 0) {
        pthread_mutex_lock(&self->mutex);
        void *arg = queue_shift(self->in_queue);
        pthread_mutex_unlock(&self->mutex);
        
        if (arg == NULL) {
            break;
        }
        
        DNS_getaddrinfo(arg);
        
        pthread_mutex_lock(&self->mutex);
        if (!queue_size(self->in_queue) || (self->pool && self->busy_threads < self->pool)) {
            // extra worker may stop if queue is empty or there is free worker from the pool
            stop = 1;
        }
        pthread_mutex_unlock(&self->mutex);
        
        if (stop)
            break;
    }
    
    DNS_on_thread_finish(self);
    return NULL;
}

void DNS_free_timedout(Net_DNS_Native *self, char force) {
    if (queue_size(self->tout_queue)) {
        queue_iterator *it = queue_iterator_new(self->tout_queue);
        int fd;
        DNS_result *res;
        
        while (!queue_iterator_end(it)) {
            fd = (intptr_t)queue_at(self->tout_queue, it);
            res = bstree_get(self->fd_map, fd);
            if (res == NULL) {
                goto FREE_TOUT;
            }
            
            if (force || res->arg) {
                bstree_del(self->fd_map, fd);
                if (!res->gai_error && res->hostinfo)
                    freeaddrinfo(res->hostinfo);
                
                close(fd);
                close(res->fd1);
                if (res->arg) {
                    if (res->arg->hints)   free(res->arg->hints);
                    if (res->arg->host)    Safefree(res->arg->host);
                    if (res->arg->service) Safefree(res->arg->service);
                    free(res->arg);
                }
                free(res);
                
                FREE_TOUT:
                    queue_del(self->tout_queue, it);
                    continue;
            }
            
            queue_iterator_next(it);
        }
        
        queue_iterator_destroy(it);
    }
}

void DNS_lock_semaphore(sem_t *s) {
#ifdef PERL_DARWIN
    pthread_mutex_lock(&s->lock);
#endif
}

void DNS_unlock_semaphore(sem_t *s) {

Native.xs  view on Meta::CPAN

    Net_DNS_Native *self;
    queue_iterator *it = queue_iterator_new(DNS_instances);
    while (!queue_iterator_end(it)) {
        self = queue_at(DNS_instances, it);
        pthread_mutex_lock(&self->mutex);
        if (self->pool) DNS_lock_semaphore(&self->semaphore);
        queue_iterator_next(it);
    }
    queue_iterator_destroy(it);
}

void DNS_after_fork_handler_parent() {
    if (queue_size(DNS_instances) == 0) {
        return;
    }
    
    Net_DNS_Native *self;
    queue_iterator *it = queue_iterator_new(DNS_instances);
    while (!queue_iterator_end(it)) {
        self = queue_at(DNS_instances, it);
        pthread_mutex_unlock(&self->mutex);
        if (self->pool) DNS_unlock_semaphore(&self->semaphore);
        queue_iterator_next(it);
    }
    queue_iterator_destroy(it);
}

void DNS_reinit_pool(Net_DNS_Native *self) {
    pthread_t tid;
    int i, rc;
    
    for (i=0; i<self->pool; i++) {
        rc = pthread_create(&tid, &self->thread_attrs, DNS_pool_worker, (void*)self);
        if (rc == 0) {
            self->active_threads_cnt++;
        }
        else {
            croak("Can't recreate thread #%d after fork: %s", i+1, strerror(rc));
        }
    }
}

void DNS_after_fork_handler_child() {
    if (queue_size(DNS_instances) == 0) {
        return;
    }
    
    Net_DNS_Native *self;
    queue_iterator *it = queue_iterator_new(DNS_instances);
    
    while (!queue_iterator_end(it)) {
        self = queue_at(DNS_instances, it);
        pthread_mutex_unlock(&self->mutex);
        if (self->pool) DNS_unlock_semaphore(&self->semaphore);
        
        // reinitialize stuff
        DNS_free_timedout(self, 1);
        
        self->active_threads_cnt = 0;
        self->extra_threads_cnt = 0;
        self->busy_threads = 0;
        self->perl = PERL_GET_THX;
        
        if (self->pool) {
#ifdef __NetBSD__
            // unfortunetly under NetBSD threads created here will misbehave
            self->need_pool_reinit = 1;
#else
            DNS_reinit_pool(self);
#endif
        }
        
        queue_iterator_next(it);
    }
    
    queue_iterator_destroy(it);
}

MODULE = Net::DNS::Native   PACKAGE = Net::DNS::Native

PROTOTYPES: DISABLE

SV*
new(char* class, ...)
    PREINIT:
        Net_DNS_Native *self;
    CODE:
        if (items % 2 == 0)
            croak("odd number of parameters");
        
        Newx(self, 1, Net_DNS_Native);
        
        int i, rc;
        self->pool = 0;
        self->notify_on_begin = 0;
        self->extra_thread = 0;
        self->active_threads_cnt = 0;
        self->extra_threads_cnt = 0;
        self->busy_threads = 0;
        self->need_pool_reinit = 0;
        self->perl = PERL_GET_THX;
#ifndef WIN32
        sigfillset(&self->blocked_sig);
#endif
        char *opt;
        
        for (i=1; i<items; i+=2) {
            opt = SvPV_nolen(ST(i));
            
            if (strEQ(opt, "pool")) {
                self->pool = SvIV(ST(i+1));
                if (self->pool < 0) self->pool = 0;
            }
            else if (strEQ(opt, "extra_thread")) {
                self->extra_thread = SvIV(ST(i+1));
            }
            else if (strEQ(opt, "notify_on_begin")) {
                self->notify_on_begin = SvIV(ST(i+1));
            }
            else {
                warn("unsupported option: %s", SvPV_nolen(ST(i)));
            }
        }
        
        char attr_ok = 0, mutex_ok = 0, cv_ok = 0, sem_ok = 0;
        
        rc = pthread_attr_init(&self->thread_attrs);
        if (rc != 0) {
            warn("pthread_attr_init(): %s", strerror(rc));
            goto FAIL;
        }
        attr_ok = 1;
        rc = pthread_attr_setdetachstate(&self->thread_attrs, PTHREAD_CREATE_DETACHED);
        if (rc != 0) {
            warn("pthread_attr_setdetachstate(): %s", strerror(rc));
            goto FAIL;
        }
        rc = pthread_mutex_init(&self->mutex, NULL);
        if (rc != 0) {
            warn("pthread_mutex_init(): %s", strerror(rc));
            goto FAIL;
        }
        mutex_ok = 1;
        
        rc = pthread_cond_init(&self->cv, NULL);
        if (rc != 0) {
            warn("pthread_cond_init(): %s", strerror(rc));
            goto FAIL;
        }
        cv_ok = 1;
        
        self->in_queue = NULL;
        
        if (DNS_instances == NULL) {
            DNS_instances = queue_new();
#ifndef WIN32
            rc = pthread_atfork(DNS_before_fork_handler, DNS_after_fork_handler_parent, DNS_after_fork_handler_child);
            if (rc != 0) {
                warn("Can't install fork handler: %s", strerror(rc));

Native.xs  view on Meta::CPAN

            if (!SvROK(sv_hints) || SvTYPE(SvRV(sv_hints)) != SVt_PVHV) {
                // not reference or not a hash inside reference
                croak("hints should be reference to hash");
            }
            
            hints = malloc(sizeof(struct addrinfo));
            hints->ai_flags = 0;
            hints->ai_family = AF_UNSPEC;
            hints->ai_socktype = 0;
            hints->ai_protocol = 0;
            hints->ai_addrlen = 0;
            hints->ai_addr = NULL;
            hints->ai_canonname = NULL;
            hints->ai_next = NULL;
            
            HV* hv_hints = (HV*)SvRV(sv_hints);
            
            SV **flags_ptr = hv_fetch(hv_hints, "flags", 5, 0);
            if (flags_ptr != NULL) {
                hints->ai_flags = SvIV(*flags_ptr);
            }
            
            SV **family_ptr = hv_fetch(hv_hints, "family", 6, 0);
            if (family_ptr != NULL) {
                hints->ai_family = SvIV(*family_ptr);
            }
            
            SV **socktype_ptr = hv_fetch(hv_hints, "socktype", 8, 0);
            if (socktype_ptr != NULL) {
                hints->ai_socktype = SvIV(*socktype_ptr);
            }
            
            SV **protocol_ptr = hv_fetch(hv_hints, "protocol", 8, 0);
            if (protocol_ptr != NULL) {
                hints->ai_protocol = SvIV(*protocol_ptr);
            }
        }
        
        DNS_result *res = malloc(sizeof(DNS_result));
        res->fd1 = fd[1];
        res->gai_error = 0;
        res->sys_error = 0;
        res->hostinfo = NULL;
        res->type = type;
        res->arg = NULL;
        res->dequeued = 0;
        
        DNS_thread_arg *arg = malloc(sizeof(DNS_thread_arg));
        arg->self = self;
        arg->host = strlen(host) ? savepv(host) : NULL;
        arg->service = strlen(service) ? savepv(service) : NULL;
        arg->hints = hints;
        arg->extra = 0;
        arg->queued  = 0;
        arg->res = res;
        
        pthread_mutex_lock(&self->mutex);
        DNS_free_timedout(self, 0);
        bstree_put(self->fd_map, fd[0], res);
        if (self->pool) {
            if (self->busy_threads == self->pool && (self->extra_thread || queue_size(self->tout_queue) > self->extra_threads_cnt)) {
                arg->extra = 1;
                self->extra_threads_cnt++;
            }
            else {
                arg->queued = 1;
                queue_push(self->in_queue, arg);
                sem_post(&self->semaphore);
            }
        }
        pthread_mutex_unlock(&self->mutex);
        
        if (!self->pool || arg->extra) {
            pthread_t tid;
            
            pthread_mutex_lock(&self->mutex);
            int rc = pthread_create(&tid, &self->thread_attrs, DNS_getaddrinfo, (void *)arg);
            if (rc == 0) {
                ++self->active_threads_cnt;
                pthread_mutex_unlock(&self->mutex);
            }
            else {
                pthread_mutex_unlock(&self->mutex);
                if (arg->host)    Safefree(arg->host);
                if (arg->service) Safefree(arg->service);
                free(arg);
                free(res);
                if (hints) free(hints);
                pthread_mutex_lock(&self->mutex);
                bstree_del(self->fd_map, fd[0]);
                pthread_mutex_unlock(&self->mutex);
                close(fd[0]);
                close(fd[1]);
                croak("pthread_create(): %s", strerror(rc));
            }
        }
        
        RETVAL = fd[0];
    OUTPUT:
        RETVAL

void
_get_result(Net_DNS_Native *self, int fd)
    PPCODE:
        pthread_mutex_lock(&self->mutex);
        DNS_result *res = bstree_get(self->fd_map, fd);
        bstree_del(self->fd_map, fd);
        pthread_mutex_unlock(&self->mutex);
        
        if (res == NULL) croak("attempt to get result which doesn't exists");
        if (!res->arg) {
            pthread_mutex_lock(&self->mutex);
            bstree_put(self->fd_map, fd, res);
            pthread_mutex_unlock(&self->mutex);
            croak("attempt to get not ready result");
        }
        
        XPUSHs(sv_2mortal(newSViv(res->type)));
        SV *err = newSV(0);
        sv_setiv(err, (IV)res->gai_error);
        sv_setpv(err, res->gai_error ? gai_strerror(res->gai_error) : "");



( run in 1.005 second using v1.01-cache-2.11-cpan-39bf76dae61 )