Hypersonic

 view release on metacpan or  search on metacpan

lib/Hypersonic/UA/Async.pm  view on Meta::CPAN

    my $backend_name = Hypersonic::Event->best_backend;
    my $event_backend = Hypersonic::Event->backend($backend_name);
    
    # Store backend for other methods to use
    $opts->{event_backend} = $event_backend;
    $opts->{event_backend_name} = $backend_name;

    # Add required includes for networking
    $builder->line('#include <sys/socket.h>')
      ->line('#include <netinet/in.h>')
      ->line('#include <netdb.h>')
      ->line('#include <fcntl.h>')
      ->line('#include <errno.h>')
      ->line('#include <unistd.h>');
    
    # Add event backend includes and defines
    $builder->line($event_backend->includes)
      ->line($event_backend->defines)
      ->blank;
    
    $builder->line('#define MAX_ASYNC_CONTEXTS 1024')
      ->line('#ifndef MAX_EVENTS')
      ->line('#define MAX_EVENTS 256')
      ->line('#endif')
      ->blank
      ->line('#define ASYNC_STATE_CONNECTING 0')
      ->line('#define ASYNC_STATE_TLS        1')
      ->line('#define ASYNC_STATE_SENDING    2')
      ->line('#define ASYNC_STATE_RECEIVING  3')
      ->line('#define ASYNC_STATE_DONE       4')
      ->line('#define ASYNC_STATE_ERROR      5')
      ->line('#define ASYNC_STATE_CANCELLED  6')
      ->blank
      ->line('#define ASYNC_WAIT_NONE  0')
      ->line('#define ASYNC_WAIT_READ  1')
      ->line('#define ASYNC_WAIT_WRITE 2')
      ->blank
      ->line('typedef struct {')
      ->line('    int fd;')
      ->line('    void *ssl;')  # Use void* instead of SSL* to avoid OpenSSL dependency
      ->line('    int state;')
      ->line('    int tls;')
      ->line('    char *host;')
      ->line('    int port;')
      ->line('    char *request;')
      ->line('    size_t request_len;')
      ->line('    size_t request_sent;')
      ->line('    char *recv_buffer;')
      ->line('    size_t recv_buffer_len;')
      ->line('    size_t recv_buffer_cap;')
      ->line('    SV *future_sv;')
      ->line('    SV *callback;')
      ->line('    time_t deadline;')
      ->line('    char *error;')
      ->line('    int in_use;')
      ->line('} AsyncContext;')
      ->blank
      ->line('static AsyncContext async_registry[MAX_ASYNC_CONTEXTS];')
      ->line('static int async_ev_fd = -1;')  # event loop fd (kqueue/epoll) for batched polling
      ->blank
      ->comment('Async connection pool for keep-alive')
      ->line('#define ASYNC_POOL_SIZE 512')
      ->line('typedef struct {')
      ->line('    int fd;')
      ->line('    char host[256];')
      ->line('    int port;')
      ->line('    time_t expires;')
      ->line('} AsyncPooledConn;')
      ->blank
      ->line('static AsyncPooledConn async_conn_pool[ASYNC_POOL_SIZE];')
      ->blank
      ->comment('Get a pooled connection')
      ->line('static int async_pool_get(const char *host, int port) {')
      ->line('    int i;')
      ->line('    time_t now = time(NULL);')
      ->line('    for (i = 0; i < ASYNC_POOL_SIZE; i++) {')
      ->line('        if (async_conn_pool[i].fd > 0 && async_conn_pool[i].port == port &&')
      ->line('            strcmp(async_conn_pool[i].host, host) == 0 && async_conn_pool[i].expires > now) {')
      ->line('            int fd = async_conn_pool[i].fd;')
      ->line('            async_conn_pool[i].fd = 0;')
      ->line('            return fd;')
      ->line('        }')
      ->line('    }')
      ->line('    return -1;')
      ->line('}')
      ->blank
      ->comment('Return connection to pool (10 second keep-alive)')
      ->line('static void async_pool_put(int fd, const char *host, int port) {')
      ->line('    int i;')
      ->line('    if (fd < 0) return;')
      ->line('    time_t now = time(NULL);')
      ->line('    for (i = 0; i < ASYNC_POOL_SIZE; i++) {')
      ->line('        if (async_conn_pool[i].fd <= 0 || async_conn_pool[i].expires <= now) {')
      ->line('            if (async_conn_pool[i].fd > 0) close(async_conn_pool[i].fd);')
      ->line('            async_conn_pool[i].fd = fd;')
      ->line('            strncpy(async_conn_pool[i].host, host, 255);')
      ->line('            async_conn_pool[i].host[255] = 0;')
      ->line('            async_conn_pool[i].port = port;')
      ->line('            async_conn_pool[i].expires = now + 10;')
      ->line('            return;')
      ->line('        }')
      ->line('    }')
      ->line('    close(fd);')
      ->line('}')
      ->blank
      ->line('static int async_alloc_slot(void) {')
      ->line('    int i;')
      ->line('    for (i = 0; i < MAX_ASYNC_CONTEXTS; i++) {')
      ->line('        if (!async_registry[i].in_use) {')
      ->line('            memset(&async_registry[i], 0, sizeof(AsyncContext));')
      ->line('            async_registry[i].in_use = 1;')
      ->line('            async_registry[i].fd = -1;')
      ->line('            async_registry[i].future_sv = NULL;')
      ->line('            return i;')
      ->line('        }')
      ->line('    }')
      ->line('    return -1;')
      ->line('}')
      ->blank
      ->line('static void async_free_slot(int slot) {')
      ->line('    if (slot >= 0 && slot < MAX_ASYNC_CONTEXTS) {')
      ->line('        AsyncContext *ctx = &async_registry[slot];')
      ->comment('        Return connection to pool if successful')
      ->line('        if (ctx->fd >= 0 && ctx->state == ASYNC_STATE_DONE && ctx->host) {')
      ->line('            async_pool_put(ctx->fd, ctx->host, ctx->port);')
      ->line('            ctx->fd = -1;')
      ->line('        }')
      ->line('        if (ctx->fd >= 0) close(ctx->fd);')
      ->line('        if (ctx->host) free(ctx->host);')
      ->line('        if (ctx->request) free(ctx->request);')
      ->line('        if (ctx->recv_buffer) free(ctx->recv_buffer);')
      ->line('        if (ctx->error) free(ctx->error);')
      ->line('        if (ctx->callback) SvREFCNT_dec(ctx->callback);')
      ->line('        if (ctx->future_sv) SvREFCNT_dec(ctx->future_sv);')
      ->line('        memset(ctx, 0, sizeof(AsyncContext));')
      ->line('    }')
      ->line('}')
      ->blank;
}

sub gen_async_poll_one {
    my ($class, $builder) = @_;

    $builder->comment('Poll a single async context, return events needed')
      ->line('static int async_poll_one(int slot) {')
      ->line('    if (slot < 0 || slot >= MAX_ASYNC_CONTEXTS) return ASYNC_WAIT_NONE;')
      ->line('    AsyncContext *ctx = &async_registry[slot];')

lib/Hypersonic/UA/Async.pm  view on Meta::CPAN

      ->line('SV *method_sv = ST(0);')
      ->line('SV *url_sv = ST(1);')
      ->line('SV *body_sv = ST(2);')
      ->line('SV *future_or_cb = ST(3);')
      ->line('SV *ua_sv = ST(4);')
      ->blank
      ->line('int slot = async_alloc_slot();')
      ->line('if (slot < 0) croak("Too many async requests");')
      ->blank
      ->line('AsyncContext *ctx = &async_registry[slot];')
      ->blank
      ->comment('Parse URL')
      ->line('STRLEN url_len;')
      ->line('const char *url = SvPV(url_sv, url_len);')
      ->blank
      ->line('const char *scheme_end = strstr(url, "://");')
      ->line('if (!scheme_end) {')
      ->line('    async_free_slot(slot);')
      ->line('    croak("Invalid URL");')
      ->line('}')
      ->blank
      ->line('ctx->tls = (scheme_end - url == 5 && memcmp(url, "https", 5) == 0);')
      ->blank
      ->line('const char *host_start = scheme_end + 3;')
      ->line('const char *host_end = host_start;')
      ->line('ctx->port = ctx->tls ? 443 : 80;')
      ->blank
      ->line('while (*host_end && *host_end != \':\' && *host_end != \'/\') host_end++;')
      ->blank
      ->line('int host_len = host_end - host_start;')
      ->line('ctx->host = (char *)malloc(host_len + 1);')
      ->line('memcpy(ctx->host, host_start, host_len);')
      ->line('ctx->host[host_len] = 0;')
      ->blank
      ->if('*host_end == \':\'')
        ->line('ctx->port = atoi(host_end + 1);')
        ->line('while (*host_end && *host_end != \'/\') host_end++;')
      ->endif
      ->blank
      ->line('const char *path = (*host_end == \'/\') ? host_end : "/";')
      ->blank
      ->comment('Try to get a pooled connection')
      ->line('int pooled_fd = async_pool_get(ctx->host, ctx->port);')
      ->blank
      ->comment('Build request')
      ->line('STRLEN method_len;')
      ->line('const char *method = SvPV(method_sv, method_len);')
      ->blank
      ->line('STRLEN body_len = 0;')
      ->line('const char *body = NULL;')
      ->if('SvOK(body_sv)')
        ->line('body = SvPV(body_sv, body_len);')
      ->endif
      ->blank
      ->line('size_t req_cap = method_len + strlen(path) + host_len + 128 + body_len;')
      ->line('ctx->request = (char *)malloc(req_cap);')
      ->blank
      ->line('int req_len = snprintf(ctx->request, req_cap,')
      ->line('    "%s %s HTTP/1.1\\r\\n"')
      ->line('    "Host: %s\\r\\n"')
      ->line('    "Connection: keep-alive\\r\\n"')
      ->line('    "User-Agent: Hypersonic/1.0\\r\\n",')
      ->line('    method, path, ctx->host);')
      ->blank
      ->if('body_len > 0')
        ->line('req_len += snprintf(ctx->request + req_len, req_cap - req_len,')
        ->line('    "Content-Length: %zu\\r\\n\\r\\n", body_len);')
        ->line('memcpy(ctx->request + req_len, body, body_len);')
        ->line('req_len += body_len;')
      ->else
        ->line('req_len += snprintf(ctx->request + req_len, req_cap - req_len, "\\r\\n");')
      ->endif
      ->blank
      ->line('ctx->request_len = req_len;')
      ->line('ctx->request_sent = 0;')
      ->blank
      ->comment('Store callback or future')
      ->if('SvROK(future_or_cb) && SvTYPE(SvRV(future_or_cb)) == SVt_PVCV')
        ->line('ctx->callback = SvREFCNT_inc(future_or_cb);')
        ->line('ctx->future_sv = NULL;')
      ->else
        ->comment('Store the future SV directly')
        ->line('ctx->future_sv = SvREFCNT_inc(future_or_cb);')
        ->line('ctx->callback = NULL;')
      ->endif
      ->blank
      ->comment('Use pooled connection if available, otherwise create new socket')
      ->if('pooled_fd >= 0')
        ->line('ctx->fd = pooled_fd;')
        ->line('ctx->state = ASYNC_STATE_SENDING;')
      ->else
        ->comment('Create socket and set non-blocking')
        ->line('ctx->fd = socket(AF_INET, SOCK_STREAM, 0);')
        ->line('if (ctx->fd < 0) {')
        ->line('    async_free_slot(slot);')
        ->line('    croak("socket() failed");')
        ->line('}')
        ->line('int opt = 1;')
        ->line('setsockopt(ctx->fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));')
        ->line('int flags = fcntl(ctx->fd, F_GETFL, 0);')
        ->line('fcntl(ctx->fd, F_SETFL, flags | O_NONBLOCK);')
        ->blank
        ->comment('Start async connect')
        ->line('struct hostent *he = gethostbyname(ctx->host);')
        ->if('!he')
          ->line('async_free_slot(slot);')
          ->line('croak("DNS resolution failed");')
        ->endif
        ->blank
        ->line('struct sockaddr_in addr;')
        ->line('memset(&addr, 0, sizeof(addr));')
        ->line('addr.sin_family = AF_INET;')
        ->line('addr.sin_port = htons(ctx->port);')
        ->line('memcpy(&addr.sin_addr, he->h_addr_list[0], he->h_length);')
        ->blank
        ->line('int rc = connect(ctx->fd, (struct sockaddr *)&addr, sizeof(addr));')
        ->if('rc < 0 && errno != EINPROGRESS')
          ->line('async_free_slot(slot);')
          ->line('croak("connect() failed");')
        ->endif
        ->blank



( run in 0.447 second using v1.01-cache-2.11-cpan-5511b514fd6 )