EV-Websockets

 view release on metacpan or  search on metacpan

Websockets.xs  view on Meta::CPAN

typedef struct ev_ws_send_s {
    struct ev_ws_send_s* next;
    size_t len;
    enum lws_write_protocol write_mode;
    char data[1]; /* C89-compatible flexible array; LWS_PRE + payload */
} ev_ws_send_t;

/* Context structure - manages lws_context and connections */
struct ev_ws_ctx_s {
    unsigned int magic;
    int* alive_flag; /* points to caller's stack variable during lws_service */
    struct ev_loop* loop;
    struct lws_context* lws_ctx;
    ev_ws_conn_t* connections;
    ev_ws_fd_t** fd_table;
    int fd_table_size;
    ev_timer timer;
    ev_idle idle;
};

/* Connection structure */

Websockets.xs  view on Meta::CPAN


    double delay_s = (double)delay_ms / 1000.0;

    ev_timer_stop(ctx->loop, &ctx->timer);
    ev_timer_set(&ctx->timer, delay_s, 0.);
    ev_timer_start(ctx->loop, &ctx->timer);
}

static void do_lws_service(ev_ws_ctx_t* ctx) {
    if (ctx && ctx->magic == EV_WS_CTX_MAGIC && ctx->lws_ctx) {
        int alive = 1;
        int* prev_flag = ctx->alive_flag;
        ctx->alive_flag = &alive;
        lws_service(ctx->lws_ctx, 0);
        if (alive) {
            ctx->alive_flag = prev_flag;
            schedule_timeout(ctx);
        }
    }
}

static void idle_cb(EV_P_ ev_idle* w, int revents) {
    (void)loop; (void)revents;
    do_lws_service((ev_ws_ctx_t*)w->data);
}

Websockets.xs  view on Meta::CPAN


    pollfd.fd = fdw->fd;
    pollfd.events = fdw->poll_events;
    pollfd.revents = 0;

    if (revents & EV_READ)  pollfd.revents |= POLLIN;
    if (revents & EV_WRITE) pollfd.revents |= POLLOUT;
    if (revents & EV_ERROR) pollfd.revents |= POLLERR | POLLHUP;

    {
        int alive = 1;
        int* prev_flag = ctx->alive_flag;
        ctx->alive_flag = &alive;
        lws_service_fd(ctx->lws_ctx, &pollfd);
        if (alive) {
            ctx->alive_flag = prev_flag;
            schedule_timeout(ctx);
        }
    }
}

#define FD_TABLE_INIT_SIZE 64

static void fd_table_grow(ev_ws_ctx_t* ctx, int needed) {
    int new_size = ctx->fd_table_size ? ctx->fd_table_size : FD_TABLE_INIT_SIZE;
    while (new_size <= needed) new_size *= 2;

Websockets.xs  view on Meta::CPAN

        conn_unref(conn); /* drop wsi ref — may free conn */
    }
    self->connections = NULL;

    if (self->lws_ctx) {
        lws_context_destroy(self->lws_ctx);
        self->lws_ctx = NULL;
    }

    self->loop = NULL;
    if (self->alive_flag) *self->alive_flag = 0;
    Safefree(self);
}

EV::Websockets::Connection
connect(EV::Websockets::Context self, ...);
PREINIT:
    struct lws_client_connect_info ccinfo;
    const char* url = NULL;
    const char* protocol = NULL;
    char* host = NULL;

Websockets.xs  view on Meta::CPAN

    RETVAL->on_message = on_message;
    RETVAL->on_close = on_close;
    RETVAL->on_error = on_error;
    RETVAL->on_pong = on_pong;
    RETVAL->on_drain = on_drain;
    RETVAL->max_message_size = max_message_size;
    RETVAL->loop = self->loop;
    /* Hold a reference to the underlying glob/IO to prevent Perl
     * from closing the fd while lws owns it.  For blessed glob refs
     * (IO::Socket etc.) we ref the glob itself so framework DESTROY
     * methods see it as still alive. */
    RETVAL->adopted_fh = SvROK(fh_sv) ? newRV_inc(SvRV(fh_sv))
                                       : SvREFCNT_inc(fh_sv);

    link_conn(self, RETVAL);

    {
        struct lws_vhost *vh = lws_get_vhost_by_name(self->lws_ctx, "server");
        if (!vh) {
            /* Auto-create a server vhost for adoption (no listener needed) */
            struct lws_context_creation_info vinfo;

Websockets.xs  view on Meta::CPAN

            conn_unref(RETVAL);
        }
        croak("Failed to adopt socket");
    }
    conn_unref(RETVAL); /* drop sentinel */

    /* Kick lws to process readbuf data (needed for lws 4.5+).
     * Guard with extra ref: lws_service may synchronously fire
     * error/destroy callbacks that would free RETVAL. */
    {
        int rejected, alive = 1;
        int* prev_flag = self->alive_flag;
        conn_ref(RETVAL);
        self->alive_flag = &alive;
        lws_service(self->lws_ctx, 0);
        if (alive) {
            self->alive_flag = prev_flag;
            schedule_timeout(self);
        }
        rejected = (RETVAL->wsi == NULL);
        conn_unref(RETVAL);
        if (rejected)
            croak("Failed to adopt socket");
    }
}
OUTPUT:
    RETVAL

t/07-end-to-end.t  view on Meta::CPAN

EV::Websockets::_set_debug(1) if $ENV{EV_WS_DEBUG};

my $ctx = EV::Websockets::Context->new();

my %results = (
    server_received => '',
    client_received => '',
    done => 0,
);

my %keep_alive;

# 1. Native Listener (port 0 = OS-assigned)
my $port = $ctx->listen(
    port => 0,
    on_connect => sub {
        my ($c) = @_;
        diag "Server: WebSocket established";
        $keep_alive{server_conn} = $c;
    },
    on_message => sub {
        my ($c, $data) = @_;
        $results{server_received} = $data;
        diag "Server: Received '$data', echoing...";
        $c->send("Echo: $data");
    },
    on_close => sub {
        diag "Server: Connection closed";
        delete $keep_alive{server_conn};
    }
);

diag "Server: listening on port $port";

# 2. Native Client
my $timer = EV::timer(0.1, 0, sub {
    diag "Client: initiating connection...";
    $keep_alive{client_conn} = $ctx->connect(
        url => "ws://127.0.0.1:$port",
        on_connect => sub {
            my ($c) = @_;
            diag "Client: connected, sending greeting";
            $c->send("Hello Native");
        },
        on_message => sub {
            my ($c, $data) = @_;
            $results{client_received} = $data;
            diag "Client: received '$data'";
            $results{done} = 1;
            $c->close(1000, "Done");
        },
        on_close => sub {
            diag "Client: closed";
            delete $keep_alive{client_conn};
            EV::break;
        },
        on_error => sub {
            diag "Client Error: $_[1]";
            delete $keep_alive{client_conn};
            EV::break;
        }
    );
    diag "Client: Stored conn=" . $keep_alive{client_conn};
});

# 3. Execution
my $timeout = EV::timer(10, 0, sub { diag "Test timed out"; EV::break; });

diag "Entering loop";
EV::run;

is($results{server_received}, "Hello Native", "Server received client message");
is($results{client_received}, "Echo: Hello Native", "Client received server response");

t/16-edge-cases.t  view on Meta::CPAN

    my $to = EV::timer(10, 0, sub { EV::break });
    EV::run;

    ok(!defined $closed_peer, "peer_address returns undef on closed connection");
    ok(!defined $closed_proto, "get_protocol returns undef on closed connection");
}

# Drop the shared context before test 6
undef $ctx;

# 6. Context destroy from inside on_message (alive_flag test)
{
    my $ctx2 = EV::Websockets::Context->new();
    my ($msg_received, $destroyed_ok);
    my %keep;

    my $port = $ctx2->listen(
        port => 0,
        on_connect => sub { $keep{s} = $_[0] },
        on_message => sub { $_[0]->send("ack") },
        on_close => sub { delete $keep{s} },

t/18-new-features.t  view on Meta::CPAN

    my $srv_msg_count = 0;
    my $port = $ctx->listen(
        port => 0,
        on_connect => sub { $keep{srv} = $_[0] },
        on_message => sub {
            my ($c, $data) = @_;
            $srv_msg_count++;
            $c->send("echo:$data");
            if ($srv_msg_count == 1) {
                # Send retry immediately — the client's die in on_message
                # is caught by G_EVAL, so the connection stays alive
                $c->send("retry");
            }
        },
        on_close => sub { delete $keep{srv} },
    );

    my $phase = 0;
    $keep{cli} = $ctx->connect(
        url => "ws://127.0.0.1:$port",
        on_connect => sub { $_[0]->send("first") },



( run in 1.462 second using v1.01-cache-2.11-cpan-39bf76dae61 )