EV-Websockets
view release on metacpan or search on metacpan
Websockets.xs view on Meta::CPAN
typedef struct ev_ws_send_s {
struct ev_ws_send_s* next;
size_t len;
enum lws_write_protocol write_mode;
char data[1]; /* C89-compatible flexible array; LWS_PRE + payload */
} ev_ws_send_t;
/* Context structure - manages lws_context and connections */
struct ev_ws_ctx_s {
unsigned int magic;
int* alive_flag; /* points to caller's stack variable during lws_service */
struct ev_loop* loop;
struct lws_context* lws_ctx;
ev_ws_conn_t* connections;
ev_ws_fd_t** fd_table;
int fd_table_size;
ev_timer timer;
ev_idle idle;
};
/* Connection structure */
Websockets.xs view on Meta::CPAN
double delay_s = (double)delay_ms / 1000.0;
ev_timer_stop(ctx->loop, &ctx->timer);
ev_timer_set(&ctx->timer, delay_s, 0.);
ev_timer_start(ctx->loop, &ctx->timer);
}
static void do_lws_service(ev_ws_ctx_t* ctx) {
if (ctx && ctx->magic == EV_WS_CTX_MAGIC && ctx->lws_ctx) {
int alive = 1;
int* prev_flag = ctx->alive_flag;
ctx->alive_flag = &alive;
lws_service(ctx->lws_ctx, 0);
if (alive) {
ctx->alive_flag = prev_flag;
schedule_timeout(ctx);
}
}
}
static void idle_cb(EV_P_ ev_idle* w, int revents) {
(void)loop; (void)revents;
do_lws_service((ev_ws_ctx_t*)w->data);
}
Websockets.xs view on Meta::CPAN
pollfd.fd = fdw->fd;
pollfd.events = fdw->poll_events;
pollfd.revents = 0;
if (revents & EV_READ) pollfd.revents |= POLLIN;
if (revents & EV_WRITE) pollfd.revents |= POLLOUT;
if (revents & EV_ERROR) pollfd.revents |= POLLERR | POLLHUP;
{
int alive = 1;
int* prev_flag = ctx->alive_flag;
ctx->alive_flag = &alive;
lws_service_fd(ctx->lws_ctx, &pollfd);
if (alive) {
ctx->alive_flag = prev_flag;
schedule_timeout(ctx);
}
}
}
#define FD_TABLE_INIT_SIZE 64
static void fd_table_grow(ev_ws_ctx_t* ctx, int needed) {
int new_size = ctx->fd_table_size ? ctx->fd_table_size : FD_TABLE_INIT_SIZE;
while (new_size <= needed) new_size *= 2;
Websockets.xs view on Meta::CPAN
conn_unref(conn); /* drop wsi ref â may free conn */
}
self->connections = NULL;
if (self->lws_ctx) {
lws_context_destroy(self->lws_ctx);
self->lws_ctx = NULL;
}
self->loop = NULL;
if (self->alive_flag) *self->alive_flag = 0;
Safefree(self);
}
EV::Websockets::Connection
connect(EV::Websockets::Context self, ...);
PREINIT:
struct lws_client_connect_info ccinfo;
const char* url = NULL;
const char* protocol = NULL;
char* host = NULL;
Websockets.xs view on Meta::CPAN
RETVAL->on_message = on_message;
RETVAL->on_close = on_close;
RETVAL->on_error = on_error;
RETVAL->on_pong = on_pong;
RETVAL->on_drain = on_drain;
RETVAL->max_message_size = max_message_size;
RETVAL->loop = self->loop;
/* Hold a reference to the underlying glob/IO to prevent Perl
* from closing the fd while lws owns it. For blessed glob refs
* (IO::Socket etc.) we ref the glob itself so framework DESTROY
* methods see it as still alive. */
RETVAL->adopted_fh = SvROK(fh_sv) ? newRV_inc(SvRV(fh_sv))
: SvREFCNT_inc(fh_sv);
link_conn(self, RETVAL);
{
struct lws_vhost *vh = lws_get_vhost_by_name(self->lws_ctx, "server");
if (!vh) {
/* Auto-create a server vhost for adoption (no listener needed) */
struct lws_context_creation_info vinfo;
Websockets.xs view on Meta::CPAN
conn_unref(RETVAL);
}
croak("Failed to adopt socket");
}
conn_unref(RETVAL); /* drop sentinel */
/* Kick lws to process readbuf data (needed for lws 4.5+).
* Guard with extra ref: lws_service may synchronously fire
* error/destroy callbacks that would free RETVAL. */
{
int rejected, alive = 1;
int* prev_flag = self->alive_flag;
conn_ref(RETVAL);
self->alive_flag = &alive;
lws_service(self->lws_ctx, 0);
if (alive) {
self->alive_flag = prev_flag;
schedule_timeout(self);
}
rejected = (RETVAL->wsi == NULL);
conn_unref(RETVAL);
if (rejected)
croak("Failed to adopt socket");
}
}
OUTPUT:
RETVAL
t/07-end-to-end.t view on Meta::CPAN
EV::Websockets::_set_debug(1) if $ENV{EV_WS_DEBUG};
my $ctx = EV::Websockets::Context->new();
my %results = (
server_received => '',
client_received => '',
done => 0,
);
my %keep_alive;
# 1. Native Listener (port 0 = OS-assigned)
my $port = $ctx->listen(
port => 0,
on_connect => sub {
my ($c) = @_;
diag "Server: WebSocket established";
$keep_alive{server_conn} = $c;
},
on_message => sub {
my ($c, $data) = @_;
$results{server_received} = $data;
diag "Server: Received '$data', echoing...";
$c->send("Echo: $data");
},
on_close => sub {
diag "Server: Connection closed";
delete $keep_alive{server_conn};
}
);
diag "Server: listening on port $port";
# 2. Native Client
my $timer = EV::timer(0.1, 0, sub {
diag "Client: initiating connection...";
$keep_alive{client_conn} = $ctx->connect(
url => "ws://127.0.0.1:$port",
on_connect => sub {
my ($c) = @_;
diag "Client: connected, sending greeting";
$c->send("Hello Native");
},
on_message => sub {
my ($c, $data) = @_;
$results{client_received} = $data;
diag "Client: received '$data'";
$results{done} = 1;
$c->close(1000, "Done");
},
on_close => sub {
diag "Client: closed";
delete $keep_alive{client_conn};
EV::break;
},
on_error => sub {
diag "Client Error: $_[1]";
delete $keep_alive{client_conn};
EV::break;
}
);
diag "Client: Stored conn=" . $keep_alive{client_conn};
});
# 3. Execution
my $timeout = EV::timer(10, 0, sub { diag "Test timed out"; EV::break; });
diag "Entering loop";
EV::run;
is($results{server_received}, "Hello Native", "Server received client message");
is($results{client_received}, "Echo: Hello Native", "Client received server response");
t/16-edge-cases.t view on Meta::CPAN
my $to = EV::timer(10, 0, sub { EV::break });
EV::run;
ok(!defined $closed_peer, "peer_address returns undef on closed connection");
ok(!defined $closed_proto, "get_protocol returns undef on closed connection");
}
# Drop the shared context before test 6
undef $ctx;
# 6. Context destroy from inside on_message (alive_flag test)
{
my $ctx2 = EV::Websockets::Context->new();
my ($msg_received, $destroyed_ok);
my %keep;
my $port = $ctx2->listen(
port => 0,
on_connect => sub { $keep{s} = $_[0] },
on_message => sub { $_[0]->send("ack") },
on_close => sub { delete $keep{s} },
t/18-new-features.t view on Meta::CPAN
my $srv_msg_count = 0;
my $port = $ctx->listen(
port => 0,
on_connect => sub { $keep{srv} = $_[0] },
on_message => sub {
my ($c, $data) = @_;
$srv_msg_count++;
$c->send("echo:$data");
if ($srv_msg_count == 1) {
# Send retry immediately â the client's die in on_message
# is caught by G_EVAL, so the connection stays alive
$c->send("retry");
}
},
on_close => sub { delete $keep{srv} },
);
my $phase = 0;
$keep{cli} = $ctx->connect(
url => "ws://127.0.0.1:$port",
on_connect => sub { $_[0]->send("first") },
( run in 1.462 second using v1.01-cache-2.11-cpan-39bf76dae61 )