Hypersonic
view release on metacpan or search on metacpan
lib/Hypersonic/UA/Async.pm view on Meta::CPAN
my $backend_name = Hypersonic::Event->best_backend;
my $event_backend = Hypersonic::Event->backend($backend_name);
# Store backend for other methods to use
$opts->{event_backend} = $event_backend;
$opts->{event_backend_name} = $backend_name;
# Add required includes for networking
$builder->line('#include <sys/socket.h>')
->line('#include <netinet/in.h>')
->line('#include <netdb.h>')
->line('#include <fcntl.h>')
->line('#include <errno.h>')
->line('#include <unistd.h>');
# Add event backend includes and defines
$builder->line($event_backend->includes)
->line($event_backend->defines)
->blank;
$builder->line('#define MAX_ASYNC_CONTEXTS 1024')
->line('#ifndef MAX_EVENTS')
->line('#define MAX_EVENTS 256')
->line('#endif')
->blank
->line('#define ASYNC_STATE_CONNECTING 0')
->line('#define ASYNC_STATE_TLS 1')
->line('#define ASYNC_STATE_SENDING 2')
->line('#define ASYNC_STATE_RECEIVING 3')
->line('#define ASYNC_STATE_DONE 4')
->line('#define ASYNC_STATE_ERROR 5')
->line('#define ASYNC_STATE_CANCELLED 6')
->blank
->line('#define ASYNC_WAIT_NONE 0')
->line('#define ASYNC_WAIT_READ 1')
->line('#define ASYNC_WAIT_WRITE 2')
->blank
->line('typedef struct {')
->line(' int fd;')
->line(' void *ssl;') # Use void* instead of SSL* to avoid OpenSSL dependency
->line(' int state;')
->line(' int tls;')
->line(' char *host;')
->line(' int port;')
->line(' char *request;')
->line(' size_t request_len;')
->line(' size_t request_sent;')
->line(' char *recv_buffer;')
->line(' size_t recv_buffer_len;')
->line(' size_t recv_buffer_cap;')
->line(' SV *future_sv;')
->line(' SV *callback;')
->line(' time_t deadline;')
->line(' char *error;')
->line(' int in_use;')
->line('} AsyncContext;')
->blank
->line('static AsyncContext async_registry[MAX_ASYNC_CONTEXTS];')
->line('static int async_ev_fd = -1;') # event loop fd (kqueue/epoll) for batched polling
->blank
->comment('Async connection pool for keep-alive')
->line('#define ASYNC_POOL_SIZE 512')
->line('typedef struct {')
->line(' int fd;')
->line(' char host[256];')
->line(' int port;')
->line(' time_t expires;')
->line('} AsyncPooledConn;')
->blank
->line('static AsyncPooledConn async_conn_pool[ASYNC_POOL_SIZE];')
->blank
->comment('Get a pooled connection')
->line('static int async_pool_get(const char *host, int port) {')
->line(' int i;')
->line(' time_t now = time(NULL);')
->line(' for (i = 0; i < ASYNC_POOL_SIZE; i++) {')
->line(' if (async_conn_pool[i].fd > 0 && async_conn_pool[i].port == port &&')
->line(' strcmp(async_conn_pool[i].host, host) == 0 && async_conn_pool[i].expires > now) {')
->line(' int fd = async_conn_pool[i].fd;')
->line(' async_conn_pool[i].fd = 0;')
->line(' return fd;')
->line(' }')
->line(' }')
->line(' return -1;')
->line('}')
->blank
->comment('Return connection to pool (10 second keep-alive)')
->line('static void async_pool_put(int fd, const char *host, int port) {')
->line(' int i;')
->line(' if (fd < 0) return;')
->line(' time_t now = time(NULL);')
->line(' for (i = 0; i < ASYNC_POOL_SIZE; i++) {')
->line(' if (async_conn_pool[i].fd <= 0 || async_conn_pool[i].expires <= now) {')
->line(' if (async_conn_pool[i].fd > 0) close(async_conn_pool[i].fd);')
->line(' async_conn_pool[i].fd = fd;')
->line(' strncpy(async_conn_pool[i].host, host, 255);')
->line(' async_conn_pool[i].host[255] = 0;')
->line(' async_conn_pool[i].port = port;')
->line(' async_conn_pool[i].expires = now + 10;')
->line(' return;')
->line(' }')
->line(' }')
->line(' close(fd);')
->line('}')
->blank
->line('static int async_alloc_slot(void) {')
->line(' int i;')
->line(' for (i = 0; i < MAX_ASYNC_CONTEXTS; i++) {')
->line(' if (!async_registry[i].in_use) {')
->line(' memset(&async_registry[i], 0, sizeof(AsyncContext));')
->line(' async_registry[i].in_use = 1;')
->line(' async_registry[i].fd = -1;')
->line(' async_registry[i].future_sv = NULL;')
->line(' return i;')
->line(' }')
->line(' }')
->line(' return -1;')
->line('}')
->blank
->line('static void async_free_slot(int slot) {')
->line(' if (slot >= 0 && slot < MAX_ASYNC_CONTEXTS) {')
->line(' AsyncContext *ctx = &async_registry[slot];')
->comment(' Return connection to pool if successful')
->line(' if (ctx->fd >= 0 && ctx->state == ASYNC_STATE_DONE && ctx->host) {')
->line(' async_pool_put(ctx->fd, ctx->host, ctx->port);')
->line(' ctx->fd = -1;')
->line(' }')
->line(' if (ctx->fd >= 0) close(ctx->fd);')
->line(' if (ctx->host) free(ctx->host);')
->line(' if (ctx->request) free(ctx->request);')
->line(' if (ctx->recv_buffer) free(ctx->recv_buffer);')
->line(' if (ctx->error) free(ctx->error);')
->line(' if (ctx->callback) SvREFCNT_dec(ctx->callback);')
->line(' if (ctx->future_sv) SvREFCNT_dec(ctx->future_sv);')
->line(' memset(ctx, 0, sizeof(AsyncContext));')
->line(' }')
->line('}')
->blank;
}
sub gen_async_poll_one {
my ($class, $builder) = @_;
$builder->comment('Poll a single async context, return events needed')
->line('static int async_poll_one(int slot) {')
->line(' if (slot < 0 || slot >= MAX_ASYNC_CONTEXTS) return ASYNC_WAIT_NONE;')
->line(' AsyncContext *ctx = &async_registry[slot];')
lib/Hypersonic/UA/Async.pm view on Meta::CPAN
->line('SV *method_sv = ST(0);')
->line('SV *url_sv = ST(1);')
->line('SV *body_sv = ST(2);')
->line('SV *future_or_cb = ST(3);')
->line('SV *ua_sv = ST(4);')
->blank
->line('int slot = async_alloc_slot();')
->line('if (slot < 0) croak("Too many async requests");')
->blank
->line('AsyncContext *ctx = &async_registry[slot];')
->blank
->comment('Parse URL')
->line('STRLEN url_len;')
->line('const char *url = SvPV(url_sv, url_len);')
->blank
->line('const char *scheme_end = strstr(url, "://");')
->line('if (!scheme_end) {')
->line(' async_free_slot(slot);')
->line(' croak("Invalid URL");')
->line('}')
->blank
->line('ctx->tls = (scheme_end - url == 5 && memcmp(url, "https", 5) == 0);')
->blank
->line('const char *host_start = scheme_end + 3;')
->line('const char *host_end = host_start;')
->line('ctx->port = ctx->tls ? 443 : 80;')
->blank
->line('while (*host_end && *host_end != \':\' && *host_end != \'/\') host_end++;')
->blank
->line('int host_len = host_end - host_start;')
->line('ctx->host = (char *)malloc(host_len + 1);')
->line('memcpy(ctx->host, host_start, host_len);')
->line('ctx->host[host_len] = 0;')
->blank
->if('*host_end == \':\'')
->line('ctx->port = atoi(host_end + 1);')
->line('while (*host_end && *host_end != \'/\') host_end++;')
->endif
->blank
->line('const char *path = (*host_end == \'/\') ? host_end : "/";')
->blank
->comment('Try to get a pooled connection')
->line('int pooled_fd = async_pool_get(ctx->host, ctx->port);')
->blank
->comment('Build request')
->line('STRLEN method_len;')
->line('const char *method = SvPV(method_sv, method_len);')
->blank
->line('STRLEN body_len = 0;')
->line('const char *body = NULL;')
->if('SvOK(body_sv)')
->line('body = SvPV(body_sv, body_len);')
->endif
->blank
->line('size_t req_cap = method_len + strlen(path) + host_len + 128 + body_len;')
->line('ctx->request = (char *)malloc(req_cap);')
->blank
->line('int req_len = snprintf(ctx->request, req_cap,')
->line(' "%s %s HTTP/1.1\\r\\n"')
->line(' "Host: %s\\r\\n"')
->line(' "Connection: keep-alive\\r\\n"')
->line(' "User-Agent: Hypersonic/1.0\\r\\n",')
->line(' method, path, ctx->host);')
->blank
->if('body_len > 0')
->line('req_len += snprintf(ctx->request + req_len, req_cap - req_len,')
->line(' "Content-Length: %zu\\r\\n\\r\\n", body_len);')
->line('memcpy(ctx->request + req_len, body, body_len);')
->line('req_len += body_len;')
->else
->line('req_len += snprintf(ctx->request + req_len, req_cap - req_len, "\\r\\n");')
->endif
->blank
->line('ctx->request_len = req_len;')
->line('ctx->request_sent = 0;')
->blank
->comment('Store callback or future')
->if('SvROK(future_or_cb) && SvTYPE(SvRV(future_or_cb)) == SVt_PVCV')
->line('ctx->callback = SvREFCNT_inc(future_or_cb);')
->line('ctx->future_sv = NULL;')
->else
->comment('Store the future SV directly')
->line('ctx->future_sv = SvREFCNT_inc(future_or_cb);')
->line('ctx->callback = NULL;')
->endif
->blank
->comment('Use pooled connection if available, otherwise create new socket')
->if('pooled_fd >= 0')
->line('ctx->fd = pooled_fd;')
->line('ctx->state = ASYNC_STATE_SENDING;')
->else
->comment('Create socket and set non-blocking')
->line('ctx->fd = socket(AF_INET, SOCK_STREAM, 0);')
->line('if (ctx->fd < 0) {')
->line(' async_free_slot(slot);')
->line(' croak("socket() failed");')
->line('}')
->line('int opt = 1;')
->line('setsockopt(ctx->fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));')
->line('int flags = fcntl(ctx->fd, F_GETFL, 0);')
->line('fcntl(ctx->fd, F_SETFL, flags | O_NONBLOCK);')
->blank
->comment('Start async connect')
->line('struct hostent *he = gethostbyname(ctx->host);')
->if('!he')
->line('async_free_slot(slot);')
->line('croak("DNS resolution failed");')
->endif
->blank
->line('struct sockaddr_in addr;')
->line('memset(&addr, 0, sizeof(addr));')
->line('addr.sin_family = AF_INET;')
->line('addr.sin_port = htons(ctx->port);')
->line('memcpy(&addr.sin_addr, he->h_addr_list[0], he->h_length);')
->blank
->line('int rc = connect(ctx->fd, (struct sockaddr *)&addr, sizeof(addr));')
->if('rc < 0 && errno != EINPROGRESS')
->line('async_free_slot(slot);')
->line('croak("connect() failed");')
->endif
->blank
( run in 0.447 second using v1.01-cache-2.11-cpan-5511b514fd6 )