EV-Websockets

 view release on metacpan or  search on metacpan

Websockets.xs  view on Meta::CPAN


#include "EVAPI.h"

#include <libwebsockets.h>
#include <string.h>
#include <stddef.h>

/* Magic numbers for use-after-free detection */
#define EV_WS_CTX_MAGIC  0xDEADBEEF
#define EV_WS_CTX_FREED  0xFEEDFACE
#define EV_WS_CONN_MAGIC 0xCAFEBABE
#define EV_WS_CONN_FREED 0xBADC0FFE

/* Forward declarations */
typedef struct ev_ws_ctx_s ev_ws_ctx_t;
typedef struct ev_ws_conn_s ev_ws_conn_t;
typedef struct ev_ws_fd_s ev_ws_fd_t;

#define EV_WS_SRV_MAGIC 0xFEEDCAFE
#define EV_WS_SRV_FREED 0xFEEDDEAD

typedef struct ev_ws_server_s {
    unsigned int magic;
    SV* on_connect;
    SV* on_message;
    SV* on_close;
    SV* on_error;
    SV* on_pong;
    SV* on_drain;
    SV* on_handshake;
    HV* response_headers; /* headers to inject into upgrade response */
    size_t max_message_size;
    char* protocol_name;
    struct lws_protocols vhost_protocols[2];
} ev_ws_server_t;

typedef ev_ws_ctx_t* EV__Websockets__Context;
typedef ev_ws_conn_t* EV__Websockets__Connection;
typedef struct ev_loop* EV__Loop;

/* File descriptor watcher tracking */
struct ev_ws_fd_s {
    ev_io io;
    ev_ws_ctx_t* ctx;
    int fd;
    int poll_events; /* registered POLLIN/POLLOUT interest mask */
};

/* Send buffer node for pending writes (FAM: data follows the struct) */
typedef struct ev_ws_send_s {
    struct ev_ws_send_s* next;
    size_t len;
    enum lws_write_protocol write_mode;
    char data[1]; /* C89-compatible flexible array; LWS_PRE + payload */
} ev_ws_send_t;

/* Context structure - manages lws_context and connections */
struct ev_ws_ctx_s {
    unsigned int magic;
    int refcnt;      /* lifecycle refcount: Perl + each in-flight lws_service */
    int* alive_flag; /* points to caller's stack variable during lws_service */
    struct ev_loop* loop;
    struct lws_context* lws_ctx;
    ev_ws_conn_t* connections;
    ev_ws_fd_t** fd_table;
    int fd_table_size;
    ev_timer timer;
    ev_idle idle;
};

/* Connection structure */
struct ev_ws_conn_s {
    unsigned int magic;
    ev_ws_ctx_t* ctx;
    struct lws* wsi;
    ev_ws_conn_t* next;  /* linked list */
    ev_ws_conn_t* prev;

    int refcnt;
    SV* perl_self; /* Weak-ish reference to the Perl object */

    /* Callbacks */
    SV* on_connect;
    SV* on_message;
    SV* on_close;
    SV* on_error;
    SV* on_pong;
    SV* on_drain;

    /* Custom Headers */
    HV* custom_headers;

    /* Response Headers (client: response headers; server: request headers) */
    HV* response_headers;

    /* Receive buffer for fragmented messages */
    char* recv_buf;
    size_t recv_len;
    size_t recv_alloc;
    int recv_is_binary;
    size_t max_message_size;

    /* Send queue */
    ev_ws_send_t* send_head;
    ev_ws_send_t* send_tail;
    size_t send_queue_bytes;

    /* Adopted file handle (prevents Perl from closing the fd) */
    SV* adopted_fh;

    /* Connect timeout */
    ev_timer connect_timer;
    int connect_timer_active;
    struct ev_loop* loop;

    /* Fragmented send state */
    int in_fragmented_send;

    /* Per-connection metadata */
    HV* stash;

Websockets.xs  view on Meta::CPAN

        capture_header(wsi, hv, request_hdrs[i].tok,
                       request_hdrs[i].name, request_hdrs[i].nlen);
}

/* Inject all key/value pairs from an HV as HTTP headers via lws */
static void inject_headers(struct lws *wsi, HV *hv,
                           unsigned char **p, unsigned char *end) {
    HE *entry;
    char kbuf[256];
    hv_iterinit(hv);
    while ((entry = hv_iternext(hv))) {
        I32 klen;
        const char *key = hv_iterkey(entry, &klen);
        SV *val_sv = hv_iterval(hv, entry);
        STRLEN vlen;
        const char *val = SvPV(val_sv, vlen);
        if (klen >= 254) continue;
        memcpy(kbuf, key, klen);
        kbuf[klen] = ':';
        kbuf[klen + 1] = '\0';
        if (lws_add_http_header_by_name(wsi, (unsigned char *)kbuf,
                (unsigned char *)val, vlen, p, end))
            break;
    }
}

#define DEBUG_LOG(fmt, ...) do { if (ev_ws_debug) fprintf(stderr, "[EV::WS] " fmt "\n", ##__VA_ARGS__); } while(0)

static void ctx_ref(ev_ws_ctx_t* ctx) {
    ctx->refcnt++;
}

static void ctx_unref(ev_ws_ctx_t* ctx) {
    if (--ctx->refcnt == 0) {
        Safefree(ctx);
    }
}

/* Helper to schedule next timeout */
static void schedule_timeout(ev_ws_ctx_t* ctx) {
    /* lws_service_adjust_timeout returns ms; 0 means work pending */
    int delay_ms = lws_service_adjust_timeout(ctx->lws_ctx, 1000, 0);

    if (delay_ms <= 0) {
        ev_idle_start(ctx->loop, &ctx->idle);
        ev_timer_stop(ctx->loop, &ctx->timer);
        return;
    }

    ev_idle_stop(ctx->loop, &ctx->idle);

    double delay_s = (double)delay_ms / 1000.0;

    ev_timer_stop(ctx->loop, &ctx->timer);
    ev_timer_set(&ctx->timer, delay_s, 0.);
    ev_timer_start(ctx->loop, &ctx->timer);
}

static void do_lws_service(ev_ws_ctx_t* ctx) {
    if (ctx && ctx->magic == EV_WS_CTX_MAGIC && ctx->lws_ctx) {
        int alive = 1;
        int* prev_flag = ctx->alive_flag;
        ctx->alive_flag = &alive;
        ctx_ref(ctx);
        lws_service(ctx->lws_ctx, 0);
        if (alive) {
            ctx->alive_flag = prev_flag;
            schedule_timeout(ctx);
        } else if (prev_flag) {
            *prev_flag = 0; /* propagate destruction up the alive_flag chain */
        }
        ctx_unref(ctx);
    }
}

static void idle_cb(EV_P_ ev_idle* w, int revents) {
    (void)loop; (void)revents;
    do_lws_service((ev_ws_ctx_t*)w->data);
}

static void timer_cb(EV_P_ ev_timer* w, int revents) {
    (void)loop; (void)revents;
    do_lws_service((ev_ws_ctx_t*)w->data);
}

/* Forward declarations */
static void io_cb(EV_P_ ev_io* w, int revents);
static void add_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events);
static void change_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events);
static void conn_ref(ev_ws_conn_t* conn);
static void conn_unref(ev_ws_conn_t* conn);

static SV* get_conn_sv(ev_ws_conn_t* conn) {
    if (conn->perl_self) {
        SV* rv = newRV_inc(conn->perl_self);
        sv_bless(rv, gv_stashpv("EV::Websockets::Connection", 1));
        return rv;
    }
    SV* rv = newSV(0);
    sv_setref_pv(rv, "EV::Websockets::Connection", (void*)conn);
    conn->perl_self = SvRV(rv);
    
    /* We are creating a new Perl owner for this connection.
       Increment refcnt so DESTROY doesn't kill it prematurely if LWS still needs it. */
    conn_ref(conn);
    
    return rv;
}

static void emit_error(ev_ws_conn_t* conn, const char* error) {
    if (conn == NULL || conn->on_error == NULL) return;

    dSP;
    ENTER;
    SAVETMPS;
    PUSHMARK(SP);
    conn_ref(conn);
    XPUSHs(sv_2mortal(get_conn_sv(conn)));
    XPUSHs(sv_2mortal(newSVpv(error, 0)));
    PUTBACK;
    sv_setsv(ERRSV, &PL_sv_undef);
    call_sv(conn->on_error, G_DISCARD | G_EVAL);
    if (SvTRUE(ERRSV)) {
        warn("EV::Websockets: exception in error handler: %s", SvPV_nolen(ERRSV));
    }
    FREETMPS;
    LEAVE;
    conn_unref(conn);
}

Websockets.xs  view on Meta::CPAN


    if (conn->prev) {
        conn->prev->next = conn->next;
    } else {
        conn->ctx->connections = conn->next;
    }
    if (conn->next) {
        conn->next->prev = conn->prev;
    }
    conn->prev = NULL;
    conn->next = NULL;
    conn->ctx = NULL;
}

static void queue_send(ev_ws_conn_t* conn, const char* data, size_t len, enum lws_write_protocol write_mode) {
    int was_empty = (conn->send_head == NULL);
    size_t alloc = offsetof(ev_ws_send_t, data) + LWS_PRE + len;
    ev_ws_send_t* send = (ev_ws_send_t*)safemalloc(alloc);

    if (data && len > 0) {
        memcpy(send->data + LWS_PRE, data, len);
    }
    send->len = len;
    send->write_mode = write_mode;
    send->next = NULL;

    if (conn->send_tail) {
        conn->send_tail->next = send;
        conn->send_tail = send;
    } else {
        conn->send_head = send;
        conn->send_tail = send;
    }
    conn->send_queue_bytes += len;

    if (was_empty && conn->wsi) {
        lws_callback_on_writable(conn->wsi);
    }
}

static void io_cb(EV_P_ ev_io* w, int revents) {
    ev_ws_fd_t* fdw = (ev_ws_fd_t*)w;
    ev_ws_ctx_t* ctx = fdw->ctx;
    struct lws_pollfd pollfd;

    (void)loop;

    if (ctx == NULL || ctx->magic != EV_WS_CTX_MAGIC || ctx->lws_ctx == NULL) {
        return;
    }

    pollfd.fd = fdw->fd;
    pollfd.events = fdw->poll_events;
    pollfd.revents = 0;

    if (revents & EV_READ)  pollfd.revents |= POLLIN;
    if (revents & EV_WRITE) pollfd.revents |= POLLOUT;
    if (revents & EV_ERROR) pollfd.revents |= POLLERR | POLLHUP;

    {
        int alive = 1;
        int* prev_flag = ctx->alive_flag;
        ctx->alive_flag = &alive;
        ctx_ref(ctx);
        lws_service_fd(ctx->lws_ctx, &pollfd);
        if (alive) {
            ctx->alive_flag = prev_flag;
            schedule_timeout(ctx);
        } else if (prev_flag) {
            *prev_flag = 0; /* propagate destruction up the alive_flag chain */
        }
        ctx_unref(ctx);
    }
}

#define FD_TABLE_INIT_SIZE 64

static void fd_table_grow(ev_ws_ctx_t* ctx, int needed) {
    int new_size = ctx->fd_table_size ? ctx->fd_table_size : FD_TABLE_INIT_SIZE;
    while (new_size <= needed) new_size *= 2;
    Renew(ctx->fd_table, new_size, ev_ws_fd_t*);
    Zero(ctx->fd_table + ctx->fd_table_size, new_size - ctx->fd_table_size, ev_ws_fd_t*);
    ctx->fd_table_size = new_size;
}

static void add_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events) {
    ev_ws_fd_t* fdw;
    int ev_events = 0;

    if (fd < 0) return;
    if (fd >= ctx->fd_table_size) fd_table_grow(ctx, fd);

    fdw = ctx->fd_table[fd];
    if (fdw != NULL) {
        change_fd_watcher(ctx, fd, events);
        return;
    }

    Newxz(fdw, 1, ev_ws_fd_t);
    fdw->ctx = ctx;
    fdw->fd = fd;
    fdw->poll_events = events;

    if (events & POLLIN) ev_events |= EV_READ;
    if (events & POLLOUT) ev_events |= EV_WRITE;

    DEBUG_LOG("add_fd_watcher: fd=%d poll_events=%d ev_events=%d", fd, events, ev_events);

    ev_io_init(&fdw->io, io_cb, fd, ev_events ? ev_events : EV_READ);
    if (ev_events)
        ev_io_start(ctx->loop, &fdw->io);

    ctx->fd_table[fd] = fdw;
}

static void del_fd_watcher(ev_ws_ctx_t* ctx, int fd) {
    ev_ws_fd_t* fdw;
    if (fd < 0 || fd >= ctx->fd_table_size) return;
    fdw = ctx->fd_table[fd];
    if (fdw == NULL) return;

    ev_io_stop(ctx->loop, &fdw->io);
    ctx->fd_table[fd] = NULL;
    Safefree(fdw);
}

static void change_fd_watcher(ev_ws_ctx_t* ctx, int fd, int events) {
    ev_ws_fd_t* fdw;
    int ev_events = 0;

Websockets.xs  view on Meta::CPAN

    }

    DEBUG_LOG("Creating context (manual integration)");

    RETVAL->lws_ctx = lws_create_context(&info);
    if (RETVAL->lws_ctx == NULL) {
        Safefree(RETVAL);
        croak("Failed to create libwebsockets context");
    }
    if (ssl_init)
        ev_ws_ssl_inited = 1;
    ev_timer_init(&RETVAL->timer, timer_cb, 0.00001, 0.);
    RETVAL->timer.data = (void*)RETVAL;

    ev_idle_init(&RETVAL->idle, idle_cb);
    RETVAL->idle.data = (void*)RETVAL;

    schedule_timeout(RETVAL);

    DEBUG_LOG("Context created successfully");
}
OUTPUT:
    RETVAL

void
DESTROY(EV::Websockets::Context self);
CODE:
{
    ev_ws_conn_t* conn;
    ev_ws_conn_t* next;

    if (self->magic != EV_WS_CTX_MAGIC) return;

    self->magic = EV_WS_CTX_FREED;

    ev_timer_stop(self->loop, &self->timer);
    ev_idle_stop(self->loop, &self->idle);

    free_all_fd_watchers(self);

    /* Close all connections */
    for (conn = self->connections; conn != NULL; conn = next) {
        next = conn->next;
        conn->ctx = NULL;
        conn->prev = NULL;
        conn->next = NULL;
        if (conn->wsi) {
            lws_set_wsi_user(conn->wsi, NULL);
            conn->wsi = NULL;
        }
        conn_unref(conn); /* drop wsi ref — may free conn */
    }
    self->connections = NULL;

    if (self->lws_ctx) {
        lws_context_destroy(self->lws_ctx);
        self->lws_ctx = NULL;
    }

    self->loop = NULL;
    if (self->alive_flag) *self->alive_flag = 0;
    ctx_unref(self); /* drops Perl ref; Safefree happens when refcnt==0 */
}

EV::Websockets::Connection
connect(EV::Websockets::Context self, ...);
PREINIT:
    struct lws_client_connect_info ccinfo;
    const char* url = NULL;
    const char* protocol = NULL;
    char* host = NULL;
    char* host_header = NULL; /* for IPv6: "[::1]" form */
    char* path = NULL;
    int port = 80;
    int use_ssl = 0;
    int ssl_verify = 1;
    char* url_copy = NULL;
    char* p;
    char* path_start;
    SV* on_connect = NULL;
    SV* on_message = NULL;
    SV* on_close = NULL;
    SV* on_error = NULL;
    SV* on_pong = NULL;
    SV* on_drain = NULL;
    SV* headers_hv = NULL;
    size_t max_message_size = 0;
    double connect_timeout = 0;
    int i;
CODE:
{
    if (self->magic != EV_WS_CTX_MAGIC) {
        croak("Context has been destroyed");
    }

    for (i = 1; i < items; i += 2) {
        if (i + 1 >= items) break;
        const char* key = SvPV_nolen(ST(i));
        SV* val = ST(i + 1);

        if (strcmp(key, "url") == 0) {
            url = SvPV_nolen(val);
        } else if (strcmp(key, "protocol") == 0) {
            protocol = SvPV_nolen(val);
        } else if (strcmp(key, "ssl_verify") == 0) {
            ssl_verify = SvTRUE(val);
        } else if (strcmp(key, "max_message_size") == 0) {
            max_message_size = (size_t)SvUV(val);
        } else if (strcmp(key, "connect_timeout") == 0) {
            connect_timeout = SvNV(val);
        } else if (strcmp(key, "headers") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVHV) {
            headers_hv = val;
        } else if (strcmp(key, "on_connect") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
            on_connect = SvREFCNT_inc(val);
        } else if (strcmp(key, "on_message") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
            on_message = SvREFCNT_inc(val);
        } else if (strcmp(key, "on_close") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
            on_close = SvREFCNT_inc(val);
        } else if (strcmp(key, "on_error") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
            on_error = SvREFCNT_inc(val);
        } else if (strcmp(key, "on_pong") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {

Websockets.xs  view on Meta::CPAN

        const char* key = SvPV_nolen(ST(i));
        SV* val = ST(i + 1);

        if (strcmp(key, "fh") == 0) {
            fh_sv = val;
        } else if (strcmp(key, "initial_data") == 0) {
            initial_data_sv = val;
        } else if (strcmp(key, "max_message_size") == 0) {
            max_message_size = (size_t)SvUV(val);
        } else if (strcmp(key, "on_connect") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
            on_connect = SvREFCNT_inc(val);
        } else if (strcmp(key, "on_message") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
            on_message = SvREFCNT_inc(val);
        } else if (strcmp(key, "on_close") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
            on_close = SvREFCNT_inc(val);
        } else if (strcmp(key, "on_error") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
            on_error = SvREFCNT_inc(val);
        } else if (strcmp(key, "on_pong") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
            on_pong = SvREFCNT_inc(val);
        } else if (strcmp(key, "on_drain") == 0 && SvROK(val) && SvTYPE(SvRV(val)) == SVt_PVCV) {
            on_drain = SvREFCNT_inc(val);
        }
    }

    if (fh_sv == NULL) {
        if (on_connect) SvREFCNT_dec(on_connect);
        if (on_message) SvREFCNT_dec(on_message);
        if (on_close) SvREFCNT_dec(on_close);
        if (on_error) SvREFCNT_dec(on_error);
        if (on_pong) SvREFCNT_dec(on_pong);
        if (on_drain) SvREFCNT_dec(on_drain);
        croak("fh parameter is required");
    }

    IO* io = sv_2io(fh_sv);
    PerlIO *ifp = io ? IoIFP(io) : NULL;
    if (!ifp || (fd = PerlIO_fileno(ifp)) < 0) {
        if (on_connect) SvREFCNT_dec(on_connect);
        if (on_message) SvREFCNT_dec(on_message);
        if (on_close) SvREFCNT_dec(on_close);
        if (on_error) SvREFCNT_dec(on_error);
        if (on_pong) SvREFCNT_dec(on_pong);
        if (on_drain) SvREFCNT_dec(on_drain);
        croak("Invalid filehandle");
    }

    Newxz(RETVAL, 1, ev_ws_conn_t);
    RETVAL->magic = EV_WS_CONN_MAGIC;
    RETVAL->refcnt = 2; /* wsi ref + sentinel (protects against sync WSI_DESTROY) */
    RETVAL->on_connect = on_connect;
    RETVAL->on_message = on_message;
    RETVAL->on_close = on_close;
    RETVAL->on_error = on_error;
    RETVAL->on_pong = on_pong;
    RETVAL->on_drain = on_drain;
    RETVAL->max_message_size = max_message_size;
    RETVAL->loop = self->loop;
    /* Hold a reference to the underlying glob/IO to prevent Perl
     * from closing the fd while lws owns it.  For blessed glob refs
     * (IO::Socket etc.) we ref the glob itself so framework DESTROY
     * methods see it as still alive. */
    RETVAL->adopted_fh = SvROK(fh_sv) ? newRV_inc(SvRV(fh_sv))
                                       : SvREFCNT_inc(fh_sv);

    link_conn(self, RETVAL);

    {
        struct lws_vhost *vh = lws_get_vhost_by_name(self->lws_ctx, "server");
        if (!vh) {
            /* Auto-create a server vhost for adoption (no listener needed) */
            struct lws_context_creation_info vinfo;
            memset(&vinfo, 0, sizeof(vinfo));
            vinfo.port = CONTEXT_PORT_NO_LISTEN_SERVER;
            vinfo.protocols = protocols;
            vinfo.vhost_name = "server";
            vh = lws_create_vhost(self->lws_ctx, &vinfo);
        }
        if (!vh) {
            unlink_conn(RETVAL);
            free_conn_resources(RETVAL);
            RETVAL->magic = EV_WS_CONN_FREED;
            Safefree(RETVAL);
            croak("Failed to create vhost for adoption");
        }
        pending_adoption = RETVAL;
        if (initial_data_sv && SvOK(initial_data_sv)) {
            STRLEN rdlen;
            const char *rdbuf = SvPV(initial_data_sv, rdlen);
            RETVAL->wsi = lws_adopt_socket_vhost_readbuf(vh,
                (lws_sockfd_type)fd, rdbuf, rdlen);
        } else {
            RETVAL->wsi = lws_adopt_socket_vhost(vh, (lws_sockfd_type)fd);
        }
        pending_adoption = NULL;
    }

    if (RETVAL->wsi == NULL) {
        unlink_conn(RETVAL);
        if (RETVAL->perl_self == NULL) {
            free_conn_resources(RETVAL);
            RETVAL->magic = EV_WS_CONN_FREED;
            Safefree(RETVAL);
        } else {
            conn_unref(RETVAL);
        }
        croak("Failed to adopt socket");
    }
    conn_unref(RETVAL); /* drop sentinel */

    /* Kick lws to process readbuf data (needed for lws 4.5+).
     * Guard with extra refs: lws_service may synchronously fire
     * error/destroy callbacks that would free RETVAL or ctx. */
    {
        int rejected, alive = 1;
        int* prev_flag = self->alive_flag;
        conn_ref(RETVAL);
        ctx_ref(self);
        self->alive_flag = &alive;
        lws_service(self->lws_ctx, 0);
        if (alive) {
            self->alive_flag = prev_flag;
            schedule_timeout(self);
        } else if (prev_flag) {
            /* Context destroyed during inner lws_service.
               Propagate destruction up the alive_flag chain. */
            *prev_flag = 0;
        }
        rejected = (RETVAL->wsi == NULL);
        conn_unref(RETVAL);
        ctx_unref(self);
        if (rejected)
            croak("Failed to adopt socket");
    }
}
OUTPUT:
    RETVAL

void
connections(EV::Websockets::Context self);
PPCODE:
{
    ev_ws_conn_t* conn;
    if (self->magic != EV_WS_CTX_MAGIC) XSRETURN_EMPTY;
    for (conn = self->connections; conn != NULL; conn = conn->next) {
        if (conn->magic == EV_WS_CONN_MAGIC && conn->connected) {
            XPUSHs(sv_2mortal(get_conn_sv(conn)));
        }
    }
}

MODULE = EV::Websockets  PACKAGE = EV::Websockets::Connection

void
DESTROY(EV::Websockets::Connection self);
CODE:
{
    if (self->magic != EV_WS_CONN_MAGIC) return;

    DEBUG_LOG("Perl object DESTROY: self=%p wsi=%p", self, self->wsi);

    /* Clear the cached Perl object pointer in the C struct */
    self->perl_self = NULL;

    conn_unref(self); /* drop Perl ref */
}

void
send(EV::Websockets::Connection self, SV* data);
CODE:
{
    STRLEN len;
    const char* buf;

    if (self->magic != EV_WS_CONN_MAGIC) {
        croak("Connection has been destroyed");
    }
    if (!self->wsi || !self->connected || self->closing) {
        croak("Connection is not open");
    }
    if (self->in_fragmented_send) {
        croak("Cannot send while a fragmented message is in progress; "
              "finish the fragment with send_fragment(..., is_final => 1) first");
    }

    buf = SvPV(data, len);



( run in 0.595 second using v1.01-cache-2.11-cpan-98e64b0badf )