EV-Websockets
view release on metacpan or search on metacpan
Queue a binary frame. Croaks if the connection is not open.
send_ping([$payload])
Queue a Ping frame. $payload is optional; if supplied it is silently
truncated to 125 bytes per RFC 6455 §5.5. Croaks if the connection is
not open.
send_pong([$payload])
Queue a Pong frame. Same payload rules as "send_ping". Most peers send
Pong automatically in response to Ping; you only need this to send an
unsolicited Pong (e.g. as a one-way keepalive).
send_fragment($data, $is_binary = 0, $is_final = 1)
Send one fragment of a streaming message. The first call starts a new
fragmented message (text or binary per $is_binary); subsequent calls
send continuation frames. Set $is_final true on the last fragment.
$conn->send_fragment("part1", 0, 0); # text, not final
$conn->send_fragment("part2", 0, 0); # continuation, not final
$conn->send_fragment("part3", 0, 1); # continuation, final
is_connecting
Returns true while "state" is "connecting". Returns false once the
connection is established, closing, closed, or destroyed.
state
Returns the current state as one of:
"connecting" - TCP/TLS handshake or HTTP upgrade in progress
"connected" - open and ready to send/receive
"closing" - close() has been called; pending sends still draining
"closed" - the underlying wsi is gone but the Perl object is still alive
"destroyed" - the C struct has been freed (further method calls will
croak)
DEBUGGING
EV::Websockets::_set_debug(1);
Enables verbose debug output from both the module and libwebsockets. In
tests, gate on $ENV{EV_WS_DEBUG}:
EV::Websockets::_set_debug(1) if $ENV{EV_WS_DEBUG};
Websockets.xs view on Meta::CPAN
struct ev_ws_send_s* next;
size_t len;
enum lws_write_protocol write_mode;
char data[1]; /* C89-compatible flexible array; LWS_PRE + payload */
} ev_ws_send_t;
/* Context structure - manages lws_context and connections */
struct ev_ws_ctx_s {
unsigned int magic;
int refcnt; /* lifecycle refcount: Perl + each in-flight lws_service */
int* alive_flag; /* points to caller's stack variable during lws_service */
struct ev_loop* loop;
struct lws_context* lws_ctx;
ev_ws_conn_t* connections;
ev_ws_fd_t** fd_table;
int fd_table_size;
ev_timer timer;
ev_idle idle;
};
/* Connection structure */
Websockets.xs view on Meta::CPAN
double delay_s = (double)delay_ms / 1000.0;
ev_timer_stop(ctx->loop, &ctx->timer);
ev_timer_set(&ctx->timer, delay_s, 0.);
ev_timer_start(ctx->loop, &ctx->timer);
}
static void do_lws_service(ev_ws_ctx_t* ctx) {
if (ctx && ctx->magic == EV_WS_CTX_MAGIC && ctx->lws_ctx) {
int alive = 1;
int* prev_flag = ctx->alive_flag;
ctx->alive_flag = &alive;
ctx_ref(ctx);
lws_service(ctx->lws_ctx, 0);
if (alive) {
ctx->alive_flag = prev_flag;
schedule_timeout(ctx);
} else if (prev_flag) {
*prev_flag = 0; /* propagate destruction up the alive_flag chain */
}
ctx_unref(ctx);
}
}
static void idle_cb(EV_P_ ev_idle* w, int revents) {
(void)loop; (void)revents;
do_lws_service((ev_ws_ctx_t*)w->data);
}
Websockets.xs view on Meta::CPAN
pollfd.fd = fdw->fd;
pollfd.events = fdw->poll_events;
pollfd.revents = 0;
if (revents & EV_READ) pollfd.revents |= POLLIN;
if (revents & EV_WRITE) pollfd.revents |= POLLOUT;
if (revents & EV_ERROR) pollfd.revents |= POLLERR | POLLHUP;
{
int alive = 1;
int* prev_flag = ctx->alive_flag;
ctx->alive_flag = &alive;
ctx_ref(ctx);
lws_service_fd(ctx->lws_ctx, &pollfd);
if (alive) {
ctx->alive_flag = prev_flag;
schedule_timeout(ctx);
} else if (prev_flag) {
*prev_flag = 0; /* propagate destruction up the alive_flag chain */
}
ctx_unref(ctx);
}
}
#define FD_TABLE_INIT_SIZE 64
static void fd_table_grow(ev_ws_ctx_t* ctx, int needed) {
int new_size = ctx->fd_table_size ? ctx->fd_table_size : FD_TABLE_INIT_SIZE;
while (new_size <= needed) new_size *= 2;
Websockets.xs view on Meta::CPAN
conn_unref(conn); /* drop wsi ref â may free conn */
}
self->connections = NULL;
if (self->lws_ctx) {
lws_context_destroy(self->lws_ctx);
self->lws_ctx = NULL;
}
self->loop = NULL;
if (self->alive_flag) *self->alive_flag = 0;
ctx_unref(self); /* drops Perl ref; Safefree happens when refcnt==0 */
}
EV::Websockets::Connection
connect(EV::Websockets::Context self, ...);
PREINIT:
struct lws_client_connect_info ccinfo;
const char* url = NULL;
const char* protocol = NULL;
char* host = NULL;
Websockets.xs view on Meta::CPAN
RETVAL->on_message = on_message;
RETVAL->on_close = on_close;
RETVAL->on_error = on_error;
RETVAL->on_pong = on_pong;
RETVAL->on_drain = on_drain;
RETVAL->max_message_size = max_message_size;
RETVAL->loop = self->loop;
/* Hold a reference to the underlying glob/IO to prevent Perl
* from closing the fd while lws owns it. For blessed glob refs
* (IO::Socket etc.) we ref the glob itself so framework DESTROY
* methods see it as still alive. */
RETVAL->adopted_fh = SvROK(fh_sv) ? newRV_inc(SvRV(fh_sv))
: SvREFCNT_inc(fh_sv);
link_conn(self, RETVAL);
{
struct lws_vhost *vh = lws_get_vhost_by_name(self->lws_ctx, "server");
if (!vh) {
/* Auto-create a server vhost for adoption (no listener needed) */
struct lws_context_creation_info vinfo;
Websockets.xs view on Meta::CPAN
conn_unref(RETVAL);
}
croak("Failed to adopt socket");
}
conn_unref(RETVAL); /* drop sentinel */
/* Kick lws to process readbuf data (needed for lws 4.5+).
* Guard with extra refs: lws_service may synchronously fire
* error/destroy callbacks that would free RETVAL or ctx. */
{
int rejected, alive = 1;
int* prev_flag = self->alive_flag;
conn_ref(RETVAL);
ctx_ref(self);
self->alive_flag = &alive;
lws_service(self->lws_ctx, 0);
if (alive) {
self->alive_flag = prev_flag;
schedule_timeout(self);
} else if (prev_flag) {
/* Context destroyed during inner lws_service.
Propagate destruction up the alive_flag chain. */
*prev_flag = 0;
}
rejected = (RETVAL->wsi == NULL);
conn_unref(RETVAL);
ctx_unref(self);
if (rejected)
croak("Failed to adopt socket");
}
}
OUTPUT:
lib/EV/Websockets.pm view on Meta::CPAN
=head3 send_ping([$payload])
Queue a Ping frame. C<$payload> is optional; if supplied it is silently
truncated to 125 bytes per RFC 6455 §5.5. Croaks if the connection is not
open.
=head3 send_pong([$payload])
Queue a Pong frame. Same payload rules as C<send_ping>. Most peers send Pong
automatically in response to Ping; you only need this to send an unsolicited
Pong (e.g. as a one-way keepalive).
=head3 send_fragment($data, $is_binary = 0, $is_final = 1)
Send one fragment of a streaming message. The first call starts a new
fragmented message (text or binary per C<$is_binary>); subsequent calls send
continuation frames. Set C<$is_final> true on the last fragment.
$conn->send_fragment("part1", 0, 0); # text, not final
$conn->send_fragment("part2", 0, 0); # continuation, not final
$conn->send_fragment("part3", 0, 1); # continuation, final
lib/EV/Websockets.pm view on Meta::CPAN
Returns the current state as one of:
=over 4
=item C<"connecting"> - TCP/TLS handshake or HTTP upgrade in progress
=item C<"connected"> - open and ready to send/receive
=item C<"closing"> - C<close()> has been called; pending sends still draining
=item C<"closed"> - the underlying wsi is gone but the Perl object is still alive
=item C<"destroyed"> - the C struct has been freed (further method calls will croak)
=back
=head1 DEBUGGING
EV::Websockets::_set_debug(1);
Enables verbose debug output from both the module and libwebsockets.
t/07-end-to-end.t view on Meta::CPAN
EV::Websockets::_set_debug(1) if $ENV{EV_WS_DEBUG};
my $ctx = EV::Websockets::Context->new();
my %results = (
server_received => '',
client_received => '',
done => 0,
);
my %keep_alive;
# 1. Native Listener (port 0 = OS-assigned)
my $port = $ctx->listen(
port => 0,
on_connect => sub {
my ($c) = @_;
diag "Server: WebSocket established";
$keep_alive{server_conn} = $c;
},
on_message => sub {
my ($c, $data) = @_;
$results{server_received} = $data;
diag "Server: Received '$data', echoing...";
$c->send("Echo: $data");
},
on_close => sub {
diag "Server: Connection closed";
delete $keep_alive{server_conn};
}
);
diag "Server: listening on port $port";
# 2. Native Client
my $timer = EV::timer(0.1, 0, sub {
diag "Client: initiating connection...";
$keep_alive{client_conn} = $ctx->connect(
url => "ws://127.0.0.1:$port",
on_connect => sub {
my ($c) = @_;
diag "Client: connected, sending greeting";
$c->send("Hello Native");
},
on_message => sub {
my ($c, $data) = @_;
$results{client_received} = $data;
diag "Client: received '$data'";
$results{done} = 1;
$c->close(1000, "Done");
},
on_close => sub {
diag "Client: closed";
delete $keep_alive{client_conn};
EV::break;
},
on_error => sub {
diag "Client Error: $_[1]";
delete $keep_alive{client_conn};
EV::break;
}
);
diag "Client: Stored conn=" . $keep_alive{client_conn};
});
# 3. Execution
my $timeout = EV::timer(10, 0, sub { diag "Test timed out"; EV::break; });
diag "Entering loop";
EV::run;
is($results{server_received}, "Hello Native", "Server received client message");
is($results{client_received}, "Echo: Hello Native", "Client received server response");
t/16-edge-cases.t view on Meta::CPAN
my $to = EV::timer(10, 0, sub { EV::break });
EV::run;
ok(!defined $closed_peer, "peer_address returns undef on closed connection");
ok(!defined $closed_proto, "get_protocol returns undef on closed connection");
}
# Drop the shared context before test 6
undef $ctx;
# 6. Context destroy from inside on_message (alive_flag test)
{
my $ctx2 = EV::Websockets::Context->new();
my ($msg_received, $destroyed_ok);
my %keep;
my $port = $ctx2->listen(
port => 0,
on_connect => sub { $keep{s} = $_[0] },
on_message => sub { $_[0]->send("ack") },
on_close => sub { delete $keep{s} },
t/18-new-features.t view on Meta::CPAN
my $srv_msg_count = 0;
my $port = $ctx->listen(
port => 0,
on_connect => sub { $keep{srv} = $_[0] },
on_message => sub {
my ($c, $data) = @_;
$srv_msg_count++;
$c->send("echo:$data");
if ($srv_msg_count == 1) {
# Send retry immediately â the client's die in on_message
# is caught by G_EVAL, so the connection stays alive
$c->send("retry");
}
},
on_close => sub { delete $keep{srv} },
);
my $phase = 0;
$keep{cli} = $ctx->connect(
url => "ws://127.0.0.1:$port",
on_connect => sub { $_[0]->send("first") },
t/24-callback-and-accessor-coverage.t view on Meta::CPAN
on_error => sub { delete $keep{cli}; EV::break },
);
my $to = EV::timer(5, 0, sub { diag "timeout"; EV::break });
EV::run;
undef $ctx;
# $conn_after is still a Perl-side EV::Websockets::Connection object;
# the wsi is gone but stash() should still work on it (state="closed").
# The destroyed-magic croak fires only after the C struct is freed,
# which happens when the last Perl ref drops. We can verify the
# already-closed-but-not-destroyed branch:
ok(eval { $conn_after->stash; 1 }, "stash on closed-but-alive conn returns hashref");
is(ref $conn_after->stash, "HASH", "stash hashref persists across close");
}
done_testing;
( run in 2.240 seconds using v1.01-cache-2.11-cpan-8f98c5d2c55 )