EV-Websockets
view release on metacpan or search on metacpan
Websockets.xs view on Meta::CPAN
#include "EVAPI.h"
#include <libwebsockets.h>
#include <string.h>
#include <stddef.h>
/* Magic numbers for use-after-free detection */
#define EV_WS_CTX_MAGIC 0xDEADBEEF
#define EV_WS_CTX_FREED 0xFEEDFACE
#define EV_WS_CONN_MAGIC 0xCAFEBABE
#define EV_WS_CONN_FREED 0xBADC0FFE
/* Forward declarations */
typedef struct ev_ws_ctx_s ev_ws_ctx_t;
typedef struct ev_ws_conn_s ev_ws_conn_t;
typedef struct ev_ws_fd_s ev_ws_fd_t;
#define EV_WS_SRV_MAGIC 0xFEEDCAFE
#define EV_WS_SRV_FREED 0xFEEDDEAD
typedef struct ev_ws_server_s {
unsigned int magic;
SV* on_connect;
SV* on_message;
SV* on_close;
SV* on_error;
SV* on_pong;
SV* on_drain;
SV* on_handshake;
HV* response_headers; /* headers to inject into upgrade response */
size_t max_message_size;
char* protocol_name;
struct lws_protocols vhost_protocols[2];
} ev_ws_server_t;
typedef ev_ws_ctx_t* EV__Websockets__Context;
typedef ev_ws_conn_t* EV__Websockets__Connection;
typedef struct ev_loop* EV__Loop;
/* File descriptor watcher tracking */
struct ev_ws_fd_s {
ev_io io;
ev_ws_ctx_t* ctx;
int fd;
int poll_events; /* registered POLLIN/POLLOUT interest mask */
};
/* Send buffer node for pending writes (FAM: data follows the struct) */
typedef struct ev_ws_send_s {
struct ev_ws_send_s* next;
size_t len;
enum lws_write_protocol write_mode;
char data[1]; /* C89-compatible flexible array; LWS_PRE + payload */
} ev_ws_send_t;
/* Context structure - manages lws_context and connections */
struct ev_ws_ctx_s {
unsigned int magic;
int refcnt; /* lifecycle refcount: Perl + each in-flight lws_service */
int* alive_flag; /* points to caller's stack variable during lws_service */
struct ev_loop* loop;
struct lws_context* lws_ctx;
ev_ws_conn_t* connections;
ev_ws_fd_t** fd_table;
int fd_table_size;
ev_timer timer;
ev_idle idle;
};
/* Connection structure */
struct ev_ws_conn_s {
unsigned int magic;
ev_ws_ctx_t* ctx;
struct lws* wsi;
ev_ws_conn_t* next; /* linked list */
ev_ws_conn_t* prev;
int refcnt;
SV* perl_self; /* Weak-ish reference to the Perl object */
/* Callbacks */
SV* on_connect;
SV* on_message;
SV* on_close;
SV* on_error;
SV* on_pong;
SV* on_drain;
/* Custom Headers */
HV* custom_headers;
/* Response Headers (client: response headers; server: request headers) */
HV* response_headers;
/* Receive buffer for fragmented messages */
char* recv_buf;
size_t recv_len;
size_t recv_alloc;
int recv_is_binary;
size_t max_message_size;
/* Send queue */
ev_ws_send_t* send_head;
ev_ws_send_t* send_tail;
size_t send_queue_bytes;
/* Adopted file handle (prevents Perl from closing the fd) */
SV* adopted_fh;
/* Connect timeout */
ev_timer connect_timer;
int connect_timer_active;
struct ev_loop* loop;
/* Fragmented send state */
int in_fragmented_send;
/* Per-connection metadata */
HV* stash;
Websockets.xs view on Meta::CPAN
capture_header(wsi, hv, request_hdrs[i].tok,
request_hdrs[i].name, request_hdrs[i].nlen);
}
/* Inject all key/value pairs from an HV as HTTP headers via lws */
static void inject_headers(struct lws *wsi, HV *hv,
unsigned char **p, unsigned char *end) {
HE *entry;
char kbuf[256];
hv_iterinit(hv);
while ((entry = hv_iternext(hv))) {
I32 klen;
const char *key = hv_iterkey(entry, &klen);
SV *val_sv = hv_iterval(hv, entry);
STRLEN vlen;
const char *val = SvPV(val_sv, vlen);
if (klen >= 254) continue;
memcpy(kbuf, key, klen);
kbuf[klen] = ':';
kbuf[klen + 1] = '\0';
if (lws_add_http_header_by_name(wsi, (unsigned char *)kbuf,
(unsigned char *)val, vlen, p, end))
break;
}
}
#define DEBUG_LOG(fmt, ...) do { if (ev_ws_debug) fprintf(stderr, "[EV::WS] " fmt "\n", ##__VA_ARGS__); } while(0)
static void ctx_ref(ev_ws_ctx_t* ctx) {
ctx->refcnt++;
}
static void ctx_unref(ev_ws_ctx_t* ctx) {
if (--ctx->refcnt == 0) {
Safefree(ctx);
}
}
/* Helper to schedule next timeout */
static void schedule_timeout(ev_ws_ctx_t* ctx) {
/* lws_service_adjust_timeout returns ms; 0 means work pending */
int delay_ms = lws_service_adjust_timeout(ctx->lws_ctx, 1000, 0);
if (delay_ms <= 0) {
ev_idle_start(ctx->loop, &ctx->idle);
ev_timer_stop(ctx->loop, &ctx->timer);
return;
}
ev_idle_stop(ctx->loop, &ctx->idle);
double delay_s = (double)delay_ms / 1000.0;
ev_timer_stop(ctx->loop, &ctx->timer);
ev_timer_set(&ctx->timer, delay_s, 0.);
ev_timer_start(ctx->loop, &ctx->timer);
}
static void do_lws_service(ev_ws_ctx_t* ctx) {
if (ctx && ctx->magic == EV_WS_CTX_MAGIC && ctx->lws_ctx) {
int alive = 1;
int* prev_flag = ctx->alive_flag;
ctx->alive_flag = &alive;
ctx_ref(ctx);
lws_service(ctx->lws_ctx, 0);
if (alive) {
ctx->alive_flag = prev_flag;
schedule_timeout(ctx);
} else if (prev_flag) {
*prev_flag = 0; /* propagate destruction up the alive_flag chain */
}
ctx_unref(ctx);
}
}
static void idle_cb(EV_P_ ev_idle* w, int revents) {
(void)loop; (void)revents;
do_lws_service((ev_ws_ctx_t*)w->data);
}
static void timer_cb(EV_P_ ev_timer* w, int revents) {
(void)loop; (void)revents;
do_lws_service((ev_ws_ctx_t*)w->data);
}
/* Forward declarations */
static void io_cb(EV_P_ ev_io* w, int revents);
static void add_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events);
static void change_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events);
static void conn_ref(ev_ws_conn_t* conn);
static void conn_unref(ev_ws_conn_t* conn);
static SV* get_conn_sv(ev_ws_conn_t* conn) {
if (conn->perl_self) {
SV* rv = newRV_inc(conn->perl_self);
sv_bless(rv, gv_stashpv("EV::Websockets::Connection", 1));
return rv;
}
SV* rv = newSV(0);
sv_setref_pv(rv, "EV::Websockets::Connection", (void*)conn);
conn->perl_self = SvRV(rv);
/* We are creating a new Perl owner for this connection.
Increment refcnt so DESTROY doesn't kill it prematurely if LWS still needs it. */
conn_ref(conn);
return rv;
}
static void emit_error(ev_ws_conn_t* conn, const char* error) {
if (conn == NULL || conn->on_error == NULL) return;
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
conn_ref(conn);
XPUSHs(sv_2mortal(get_conn_sv(conn)));
XPUSHs(sv_2mortal(newSVpv(error, 0)));
PUTBACK;
sv_setsv(ERRSV, &PL_sv_undef);
call_sv(conn->on_error, G_DISCARD | G_EVAL);
if (SvTRUE(ERRSV)) {
warn("EV::Websockets: exception in error handler: %s", SvPV_nolen(ERRSV));
}
FREETMPS;
LEAVE;
conn_unref(conn);
}
Websockets.xs view on Meta::CPAN
if (conn->prev) {
conn->prev->next = conn->next;
} else {
conn->ctx->connections = conn->next;
}
if (conn->next) {
conn->next->prev = conn->prev;
}
conn->prev = NULL;
conn->next = NULL;
conn->ctx = NULL;
}
static void queue_send(ev_ws_conn_t* conn, const char* data, size_t len, enum lws_write_protocol write_mode) {
int was_empty = (conn->send_head == NULL);
size_t alloc = offsetof(ev_ws_send_t, data) + LWS_PRE + len;
ev_ws_send_t* send = (ev_ws_send_t*)safemalloc(alloc);
if (data && len > 0) {
memcpy(send->data + LWS_PRE, data, len);
}
send->len = len;
send->write_mode = write_mode;
send->next = NULL;
if (conn->send_tail) {
conn->send_tail->next = send;
conn->send_tail = send;
} else {
conn->send_head = send;
conn->send_tail = send;
}
conn->send_queue_bytes += len;
if (was_empty && conn->wsi) {
lws_callback_on_writable(conn->wsi);
}
}
static void io_cb(EV_P_ ev_io* w, int revents) {
ev_ws_fd_t* fdw = (ev_ws_fd_t*)w;
ev_ws_ctx_t* ctx = fdw->ctx;
struct lws_pollfd pollfd;
(void)loop;
if (ctx == NULL || ctx->magic != EV_WS_CTX_MAGIC || ctx->lws_ctx == NULL) {
return;
}
pollfd.fd = fdw->fd;
pollfd.events = fdw->poll_events;
pollfd.revents = 0;
if (revents & EV_READ) pollfd.revents |= POLLIN;
if (revents & EV_WRITE) pollfd.revents |= POLLOUT;
if (revents & EV_ERROR) pollfd.revents |= POLLERR | POLLHUP;
{
int alive = 1;
int* prev_flag = ctx->alive_flag;
ctx->alive_flag = &alive;
ctx_ref(ctx);
lws_service_fd(ctx->lws_ctx, &pollfd);
if (alive) {
ctx->alive_flag = prev_flag;
schedule_timeout(ctx);
} else if (prev_flag) {
*prev_flag = 0; /* propagate destruction up the alive_flag chain */
}
ctx_unref(ctx);
}
}
#define FD_TABLE_INIT_SIZE 64
static void fd_table_grow(ev_ws_ctx_t* ctx, int needed) {
int new_size = ctx->fd_table_size ? ctx->fd_table_size : FD_TABLE_INIT_SIZE;
while (new_size <= needed) new_size *= 2;
Renew(ctx->fd_table, new_size, ev_ws_fd_t*);
Zero(ctx->fd_table + ctx->fd_table_size, new_size - ctx->fd_table_size, ev_ws_fd_t*);
ctx->fd_table_size = new_size;
}
static void add_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events) {
ev_ws_fd_t* fdw;
int ev_events = 0;
if (fd < 0) return;
if (fd >= ctx->fd_table_size) fd_table_grow(ctx, fd);
fdw = ctx->fd_table[fd];
if (fdw != NULL) {
change_fd_watcher(ctx, fd, events);
return;
}
Newxz(fdw, 1, ev_ws_fd_t);
fdw->ctx = ctx;
fdw->fd = fd;
fdw->poll_events = events;
if (events & POLLIN) ev_events |= EV_READ;
if (events & POLLOUT) ev_events |= EV_WRITE;
DEBUG_LOG("add_fd_watcher: fd=%d poll_events=%d ev_events=%d", fd, events, ev_events);
ev_io_init(&fdw->io, io_cb, fd, ev_events ? ev_events : EV_READ);
if (ev_events)
ev_io_start(ctx->loop, &fdw->io);
ctx->fd_table[fd] = fdw;
}
static void del_fd_watcher(ev_ws_ctx_t* ctx, int fd) {
ev_ws_fd_t* fdw;
if (fd < 0 || fd >= ctx->fd_table_size) return;
fdw = ctx->fd_table[fd];
if (fdw == NULL) return;
ev_io_stop(ctx->loop, &fdw->io);
ctx->fd_table[fd] = NULL;
Safefree(fdw);
}
static void change_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events) {
ev_ws_fd_t* fdw;
int ev_events = 0;
Websockets.xs view on Meta::CPAN
}
DEBUG_LOG("Creating context (manual integration)");
RETVAL->lws_ctx = lws_create_context(&info);
if (RETVAL->lws_ctx == NULL) {
Safefree(RETVAL);
croak("Failed to create libwebsockets context");
}
if (ssl_init)
ev_ws_ssl_inited = 1;
ev_timer_init(&RETVAL->timer, timer_cb, 0.00001, 0.);
RETVAL->timer.data = (void*)RETVAL;
ev_idle_init(&RETVAL->idle, idle_cb);
RETVAL->idle.data = (void*)RETVAL;
schedule_timeout(RETVAL);
DEBUG_LOG("Context created successfully");
}
OUTPUT:
RETVAL
void
DESTROY(EV::Websockets::Context self);
CODE:
{
ev_ws_conn_t* conn;
ev_ws_conn_t* next;
if (self->magic != EV_WS_CTX_MAGIC) return;
self->magic = EV_WS_CTX_FREED;
ev_timer_stop(self->loop, &self->timer);
ev_idle_stop(self->loop, &self->idle);
free_all_fd_watchers(self);
/* Close all connections */
for (conn = self->connections; conn != NULL; conn = next) {
next = conn->next;
conn->ctx = NULL;
conn->prev = NULL;
conn->next = NULL;
if (conn->wsi) {
lws_set_wsi_user(conn->wsi, NULL);
conn->wsi = NULL;
}
conn_unref(conn); /* drop wsi ref â may free conn */
}
self->connections = NULL;
if (self->lws_ctx) {
lws_context_destroy(self->lws_ctx);
self->lws_ctx = NULL;
}
self->loop = NULL;
if (self->alive_flag) *self->alive_flag = 0;
ctx_unref(self); /* drops Perl ref; Safefree happens when refcnt==0 */
}
EV::Websockets::Connection
connect(EV::Websockets::Context self, ...);
PREINIT:
struct lws_client_connect_info ccinfo;
const char* url = NULL;
const char* protocol = NULL;
char* host = NULL;
char* host_header = NULL; /* for IPv6: "[::1]" form */
char* path = NULL;
int port = 80;
int use_ssl = 0;
int ssl_verify = 1;
char* url_copy = NULL;
char* p;
char* path_start;
SV* on_connect = NULL;
SV* on_message = NULL;
SV* on_close = NULL;
SV* on_error = NULL;
SV* on_pong = NULL;
SV* on_drain = NULL;
SV* headers_hv = NULL;
size_t max_message_size = 0;
double connect_timeout = 0;
int i;
CODE:
{
if (self->magic != EV_WS_CTX_MAGIC) {
croak("Context has been destroyed");
}
for (i = 1; i < items; i += 2) {
if (i + 1 >= items) break;
const char* key = SvPV_nolen(ST(i));
SV* val = ST(i + 1);
if (strcmp(key, "url") == 0) {
url = SvPV_nolen(val);
} else if (strcmp(key, "protocol") == 0) {
protocol = SvPV_nolen(val);
} else if (strcmp(key, "ssl_verify") == 0) {
ssl_verify = SvTRUE(val);
} else if (strcmp(key, "max_message_size") == 0) {
max_message_size = (size_t)SvUV(val);
} else if (strcmp(key, "connect_timeout") == 0) {
connect_timeout = SvNV(val);
} else if (strcmp(key, "headers") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVHV) {
headers_hv = val;
} else if (strcmp(key, "on_connect") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
on_connect = SvREFCNT_inc(val);
} else if (strcmp(key, "on_message") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
on_message = SvREFCNT_inc(val);
} else if (strcmp(key, "on_close") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
on_close = SvREFCNT_inc(val);
} else if (strcmp(key, "on_error") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
on_error = SvREFCNT_inc(val);
} else if (strcmp(key, "on_pong") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
Websockets.xs view on Meta::CPAN
const char* key = SvPV_nolen(ST(i));
SV* val = ST(i + 1);
if (strcmp(key, "fh") == 0) {
fh_sv = val;
} else if (strcmp(key, "initial_data") == 0) {
initial_data_sv = val;
} else if (strcmp(key, "max_message_size") == 0) {
max_message_size = (size_t)SvUV(val);
} else if (strcmp(key, "on_connect") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
on_connect = SvREFCNT_inc(val);
} else if (strcmp(key, "on_message") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
on_message = SvREFCNT_inc(val);
} else if (strcmp(key, "on_close") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
on_close = SvREFCNT_inc(val);
} else if (strcmp(key, "on_error") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
on_error = SvREFCNT_inc(val);
} else if (strcmp(key, "on_pong") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
on_pong = SvREFCNT_inc(val);
} else if (strcmp(key, "on_drain") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
on_drain = SvREFCNT_inc(val);
}
}
if (fh_sv == NULL) {
if (on_connect) SvREFCNT_dec(on_connect);
if (on_message) SvREFCNT_dec(on_message);
if (on_close) SvREFCNT_dec(on_close);
if (on_error) SvREFCNT_dec(on_error);
if (on_pong) SvREFCNT_dec(on_pong);
if (on_drain) SvREFCNT_dec(on_drain);
croak("fh parameter is required");
}
IO* io = sv_2io(fh_sv);
PerlIO *ifp = io ? IoIFP(io) : NULL;
if (!ifp || (fd = PerlIO_fileno(ifp)) < 0) {
if (on_connect) SvREFCNT_dec(on_connect);
if (on_message) SvREFCNT_dec(on_message);
if (on_close) SvREFCNT_dec(on_close);
if (on_error) SvREFCNT_dec(on_error);
if (on_pong) SvREFCNT_dec(on_pong);
if (on_drain) SvREFCNT_dec(on_drain);
croak("Invalid filehandle");
}
Newxz(RETVAL, 1, ev_ws_conn_t);
RETVAL->magic = EV_WS_CONN_MAGIC;
RETVAL->refcnt = 2; /* wsi ref + sentinel (protects against sync WSI_DESTROY) */
RETVAL->on_connect = on_connect;
RETVAL->on_message = on_message;
RETVAL->on_close = on_close;
RETVAL->on_error = on_error;
RETVAL->on_pong = on_pong;
RETVAL->on_drain = on_drain;
RETVAL->max_message_size = max_message_size;
RETVAL->loop = self->loop;
/* Hold a reference to the underlying glob/IO to prevent Perl
* from closing the fd while lws owns it. For blessed glob refs
* (IO::Socket etc.) we ref the glob itself so framework DESTROY
* methods see it as still alive. */
RETVAL->adopted_fh = SvROK(fh_sv) ? newRV_inc(SvRV(fh_sv))
: SvREFCNT_inc(fh_sv);
link_conn(self, RETVAL);
{
struct lws_vhost *vh = lws_get_vhost_by_name(self->lws_ctx, "server");
if (!vh) {
/* Auto-create a server vhost for adoption (no listener needed) */
struct lws_context_creation_info vinfo;
memset(&vinfo, 0, sizeof(vinfo));
vinfo.port = CONTEXT_PORT_NO_LISTEN_SERVER;
vinfo.protocols = protocols;
vinfo.vhost_name = "server";
vh = lws_create_vhost(self->lws_ctx, &vinfo);
}
if (!vh) {
unlink_conn(RETVAL);
free_conn_resources(RETVAL);
RETVAL->magic = EV_WS_CONN_FREED;
Safefree(RETVAL);
croak("Failed to create vhost for adoption");
}
pending_adoption = RETVAL;
if (initial_data_sv && SvOK(initial_data_sv)) {
STRLEN rdlen;
const char *rdbuf = SvPV(initial_data_sv, rdlen);
RETVAL->wsi = lws_adopt_socket_vhost_readbuf(vh,
(lws_sockfd_type)fd, rdbuf, rdlen);
} else {
RETVAL->wsi = lws_adopt_socket_vhost(vh, (lws_sockfd_type)fd);
}
pending_adoption = NULL;
}
if (RETVAL->wsi == NULL) {
unlink_conn(RETVAL);
if (RETVAL->perl_self == NULL) {
free_conn_resources(RETVAL);
RETVAL->magic = EV_WS_CONN_FREED;
Safefree(RETVAL);
} else {
conn_unref(RETVAL);
}
croak("Failed to adopt socket");
}
conn_unref(RETVAL); /* drop sentinel */
/* Kick lws to process readbuf data (needed for lws 4.5+).
* Guard with extra refs: lws_service may synchronously fire
* error/destroy callbacks that would free RETVAL or ctx. */
{
int rejected, alive = 1;
int* prev_flag = self->alive_flag;
conn_ref(RETVAL);
ctx_ref(self);
self->alive_flag = &alive;
lws_service(self->lws_ctx, 0);
if (alive) {
self->alive_flag = prev_flag;
schedule_timeout(self);
} else if (prev_flag) {
/* Context destroyed during inner lws_service.
Propagate destruction up the alive_flag chain. */
*prev_flag = 0;
}
rejected = (RETVAL->wsi == NULL);
conn_unref(RETVAL);
ctx_unref(self);
if (rejected)
croak("Failed to adopt socket");
}
}
OUTPUT:
RETVAL
void
connections(EV::Websockets::Context self);
PPCODE:
{
ev_ws_conn_t* conn;
if (self->magic != EV_WS_CTX_MAGIC) XSRETURN_EMPTY;
for (conn = self->connections; conn != NULL; conn = conn->next) {
if (conn->magic == EV_WS_CONN_MAGIC && conn->connected) {
XPUSHs(sv_2mortal(get_conn_sv(conn)));
}
}
}
MODULE = EV::Websockets PACKAGE = EV::Websockets::Connection
void
DESTROY(EV::Websockets::Connection self);
CODE:
{
if (self->magic != EV_WS_CONN_MAGIC) return;
DEBUG_LOG("Perl object DESTROY: self=%p wsi=%p", self, self->wsi);
/* Clear the cached Perl object pointer in the C struct */
self->perl_self = NULL;
conn_unref(self); /* drop Perl ref */
}
void
send(EV::Websockets::Connection self, SV* data);
CODE:
{
STRLEN len;
const char* buf;
if (self->magic != EV_WS_CONN_MAGIC) {
croak("Connection has been destroyed");
}
if (!self->wsi || !self->connected || self->closing) {
croak("Connection is not open");
}
if (self->in_fragmented_send) {
croak("Cannot send while a fragmented message is in progress; "
"finish the fragment with send_fragment(..., is_final => 1) first");
}
buf = SvPV(data, len);
( run in 0.595 second using v1.01-cache-2.11-cpan-98e64b0badf )