Feersum
view release on metacpan or search on metacpan
static int
psgix_io_svt_get (pTHX_ SV *sv, MAGIC *mg)
{
dSP;
struct feer_conn *c = sv_2feer_conn(mg->mg_obj);
trace("invoking psgix.io magic for fd=%d\n", c->fd);
sv_unmagic(sv, PERL_MAGIC_ext);
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(sv);
mXPUSHs(newSViv(c->fd));
PUTBACK;
call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
SPAGAIN;
if (unlikely(SvTRUE(ERRSV))) {
call_died(aTHX_ c, "psgix.io magic");
}
else {
SV *io_glob = SvRV(sv);
GvSV(io_glob) = newRV_inc(c->self);
// Put whatever remainder data into the socket buffer.
// Optimizes for the websocket case.
//
// TODO: For keepalive support the opposite operation is required;
// pull the data out of the socket buffer and back into feersum.
if (likely(c->rbuf && SvOK(c->rbuf) && SvCUR(c->rbuf))) {
STRLEN rbuf_len;
const char *rbuf_ptr = SvPV(c->rbuf, rbuf_len);
IO *io = GvIOp(io_glob);
assert(io != NULL);
PerlIO_unread(IoIFP(io), (const void *)rbuf_ptr, rbuf_len);
sv_setpvs(c->rbuf, "");
}
stop_read_watcher(c);
stop_read_timer(c);
// don't stop write watcher in case there's outstanding data.
}
PUTBACK;
FREETMPS;
LEAVE;
return 0;
}
MODULE = Feersum PACKAGE = Feersum
PROTOTYPES: ENABLE
void
set_server_name_and_port(SV *self, SV *name, SV *port)
PPCODE:
{
if (feer_server_name)
SvREFCNT_dec(feer_server_name);
feer_server_name = newSVsv(name);
SvREADONLY_on(feer_server_name);
if (feer_server_port)
SvREFCNT_dec(feer_server_port);
feer_server_port = newSVsv(port);
SvREADONLY_on(feer_server_port);
}
void
accept_on_fd(SV *self, int fd)
PPCODE:
{
struct sockaddr_storage addr;
socklen_t addr_len = sizeof(addr);
if (getsockname(fd, (struct sockaddr*)&addr, &addr_len) == -1) perror("getsockname");
switch (addr.ss_family) {
case AF_INET:
case AF_INET6:
is_tcp = 1;
#ifdef TCP_DEFER_ACCEPT
trace("going to defer accept on %d\n",fd);
if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &(int){1}, sizeof(int)) < 0)
perror("setsockopt TCP_DEFER_ACCEPT");
#endif
break;
#ifdef AF_UNIX
case AF_UNIX:
is_tcp = 0;
break;
#endif
}
trace("going to accept on %d\n",fd);
feersum_ev_loop = EV_DEFAULT;
signal(SIGPIPE, SIG_IGN);
ev_prepare_init(&ep, prepare_cb);
ev_prepare_start(feersum_ev_loop, &ep);
ev_check_init(&ec, check_cb);
ev_check_start(feersum_ev_loop, &ec);
ev_idle_init(&ei, idle_cb);
ev_io_init(&accept_w, accept_cb, fd, EV_READ);
}
void
unlisten (SV *self)
PPCODE:
{
trace("stopping accept\n");
ev_prepare_stop(feersum_ev_loop, &ep);
ev_check_stop(feersum_ev_loop, &ec);
ev_idle_stop(feersum_ev_loop, &ei);
ev_io_stop(feersum_ev_loop, &accept_w);
}
void
request_handler(SV *self, SV *cb)
PROTOTYPE: $&
ALIAS:
psgi_request_handler = 1
PPCODE:
{
if (unlikely(!SvOK(cb) || !SvROK(cb)))
croak("can't supply an undef handler");
if (request_cb_cv)
SvREFCNT_dec(request_cb_cv);
request_cb_cv = newSVsv(cb); // copy so 5.8.7 overload magic sticks.
request_cb_is_psgi = ix;
trace("assigned %s request handler %p\n",
request_cb_is_psgi?"PSGI":"Feersum", request_cb_cv);
}
void
graceful_shutdown (SV *self, SV *cb)
PROTOTYPE: $&
PPCODE:
{
if (!IsCodeRef(cb))
croak("must supply a code reference");
if (unlikely(shutting_down))
croak("already shutting down");
shutdown_cb_cv = newSVsv(cb);
trace("shutting down, handler=%p, active=%d\n", SvRV(cb), active_conns);
shutting_down = 1;
ev_io_stop(feersum_ev_loop, &accept_w);
close(accept_w.fd);
if (active_conns <= 0) {
trace("shutdown is immediate\n");
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
PUTBACK;
trace3("called shutdown handler\n");
SvREFCNT_dec(shutdown_cb_cv);
shutdown_cb_cv = NULL;
FREETMPS;
LEAVE;
}
}
double
read_timeout (SV *self, ...)
PROTOTYPE: $;$
PREINIT:
double new_read_timeout = 0.0;
CODE:
{
if (items > 1) {
new_read_timeout = SvNV(ST(1));
if (!(new_read_timeout > 0.0)) {
croak("must set a positive (non-zero) value for the timeout");
}
trace("set timeout %f\n", new_read_timeout);
read_timeout = new_read_timeout;
}
RETVAL = read_timeout;
}
OUTPUT:
RETVAL
void
set_keepalive (SV *self, SV *set)
PPCODE:
{
trace("set keepalive %d\n", SvTRUE(set));
is_keepalive = SvTRUE(set);
}
unsigned int
max_connection_reqs (SV *self, ...)
PROTOTYPE: $;$
PREINIT:
unsigned int new_max_connection_reqs = 0;
CODE:
{
if (items > 1) {
new_max_connection_reqs = SvIV(ST(1));
if (!(new_max_connection_reqs >= 0)) {
croak("must set a positive value");
}
trace("set max requests per connection %d\n", new_max_connection_reqs);
max_connection_reqs = new_max_connection_reqs;
}
RETVAL = max_connection_reqs;
}
OUTPUT:
RETVAL
void
DESTROY (SV *self)
PPCODE:
{
trace3("DESTROY server\n");
if (request_cb_cv)
SvREFCNT_dec(request_cb_cv);
}
MODULE = Feersum PACKAGE = Feersum::Connection::Handle
PROTOTYPES: ENABLE
int
fileno (feer_conn_handle *hdl)
CODE:
RETVAL = c->fd;
OUTPUT:
RETVAL
void
DESTROY (SV *self)
ALIAS:
Feersum::Connection::Reader::DESTROY = 1
Feersum::Connection::Writer::DESTROY = 2
PPCODE:
{
feer_conn_handle *hdl = sv_2feer_conn_handle(self, 0);
if (hdl == NULL) {
trace3("DESTROY handle (closed) class=%s\n",
HvNAME(SvSTASH(SvRV(self))));
}
else {
struct feer_conn *c = (struct feer_conn *)hdl;
trace3("DESTROY handle fd=%d, class=%s\n", c->fd,
HvNAME(SvSTASH(SvRV(self))));
if (ix == 2) // only close the writer on destruction
feersum_close_handle(aTHX_ c, 1);
}
}
SV*
read (feer_conn_handle *hdl, SV *buf, size_t len, ...)
PROTOTYPE: $$$;$
PPCODE:
{
STRLEN buf_len = 0, src_len = 0;
ssize_t offset;
char *buf_ptr, *src_ptr;
// optimizes for the "read everything" case.
if (unlikely(items == 4) && SvOK(ST(3)) && SvIOK(ST(3)))
offset = SvIV(ST(3));
else
offset = 0;
trace("read fd=%d : request len=%"Sz_uf" off=%"Ssz_df"\n",
c->fd, (Sz)len, (Ssz)offset);
if (unlikely(c->receiving <= RECEIVE_HEADERS))
// XXX as of 0.984 this is dead code
croak("can't call read() until the body begins to arrive");
if (!SvOK(buf) || !SvPOK(buf)) {
// force to a PV and ensure buffer space
sv_setpvn(buf,"",0);
SvGROW(buf, len+1);
}
if (unlikely(SvREADONLY(buf)))
croak("buffer must not be read-only");
if (unlikely(len == 0))
XSRETURN_IV(0); // assumes undef buffer got allocated to empty-string
buf_ptr = SvPV(buf, buf_len);
if (likely(c->rbuf))
src_ptr = SvPV(c->rbuf, src_len);
if (unlikely(len < 0))
len = src_len;
if (unlikely(offset < 0))
offset = (-offset >= c->received_cl) ? 0 : c->received_cl + offset;
if (unlikely(len + offset > src_len))
len = src_len - offset;
trace("read fd=%d : normalized len=%"Sz_uf" off=%"Ssz_df" src_len=%"Sz_uf"\n",
c->fd, (Sz)len, (Ssz)offset, (Sz)src_len);
if (unlikely(!c->rbuf || src_len == 0 || offset >= c->received_cl)) {
trace2("rbuf empty during read %d\n", c->fd);
if (c->receiving == RECEIVE_SHUTDOWN) {
XSRETURN_IV(0);
}
else {
errno = EAGAIN;
XSRETURN_UNDEF;
}
}
if (likely(len == src_len && offset == 0)) {
trace2("appending entire rbuf fd=%d\n", c->fd);
if (likely(buf_len == 0)) {
sv_setsv(buf, c->rbuf);
}
else {
sv_catsv(buf, c->rbuf);
}
c->rbuf = NULL;
}
else {
src_ptr += offset;
trace2("appending partial rbuf fd=%d len=%"Sz_uf" off=%"Ssz_df" ptr=%p\n",
c->fd, len, offset, src_ptr);
SvGROW(buf, SvCUR(buf) + len);
sv_catpvn(buf, src_ptr, len);
if (likely(items == 3)) {
// there wasn't an offset param, throw away beginning
sv_chop(c->rbuf, SvPVX(c->rbuf) + len);
}
}
XSRETURN_IV(len);
}
STRLEN
write (feer_conn_handle *hdl, ...)
PROTOTYPE: $;$
CODE:
{
if (unlikely(c->responding != RESPOND_STREAMING))
croak("can only call write in streaming mode");
SV *body = (items == 2) ? ST(1) : &PL_sv_undef;
if (unlikely(!body || !SvOK(body)))
XSRETURN_IV(0);
trace("write fd=%d c=%p, body=%p\n", c->fd, c, body);
if (SvROK(body)) {
SV *refd = SvRV(body);
if (SvOK(refd) && SvPOK(refd)) {
body = refd;
}
else {
croak("body must be a scalar, scalar ref or undef");
}
}
(void)SvPV(body, RETVAL);
if (c->is_http11)
add_chunk_sv_to_wbuf(c, body);
else
add_sv_to_wbuf(c, body);
conn_write_ready(c);
}
OUTPUT:
RETVAL
void
write_array (feer_conn_handle *hdl, AV *abody)
PROTOTYPE: $$
PPCODE:
{
if (unlikely(c->responding != RESPOND_STREAMING))
croak("can only call write in streaming mode");
trace("write_array fd=%d c=%p, abody=%p\n", c->fd, c, abody);
I32 amax = av_len(abody);
int i;
if (c->is_http11) {
for (i=0; i<=amax; i++) {
SV *sv = fetch_av_normal(aTHX_ abody, i);
if (likely(sv)) add_chunk_sv_to_wbuf(c, sv);
}
}
else {
for (i=0; i<=amax; i++) {
SV *sv = fetch_av_normal(aTHX_ abody, i);
if (likely(sv)) add_sv_to_wbuf(c, sv);
}
}
conn_write_ready(c);
}
int
seek (feer_conn_handle *hdl, ssize_t offset, ...)
PROTOTYPE: $$;$
CODE:
{
int whence = SEEK_CUR;
if (items == 3 && SvOK(ST(2)) && SvIOK(ST(2)))
whence = SvIV(ST(2));
trace("seek fd=%d offset=%"Ssz_df" whence=%d\n", c->fd, offset, whence);
if (unlikely(!c->rbuf)) {
// handle is effectively "closed"
RETVAL = 0;
}
else if (offset == 0) {
RETVAL = 1; // stay put for any whence
}
else if (offset > 0 && (whence == SEEK_CUR || whence == SEEK_SET)) {
STRLEN len;
const char *str = SvPV_const(c->rbuf, len);
if (offset > len)
offset = len;
sv_chop(c->rbuf, str + offset);
RETVAL = 1;
}
else if (offset < 0 && whence == SEEK_END) {
STRLEN len;
const char *str = SvPV_const(c->rbuf, len);
offset += len; // can't be > len since block is offset<0
if (offset == 0) {
RETVAL = 1; // no-op, but OK
}
else if (offset > 0) {
sv_chop(c->rbuf, str + offset);
RETVAL = 1;
}
else {
// past beginning of string
RETVAL = 0;
}
}
else {
// invalid seek
RETVAL = 0;
}
}
OUTPUT:
RETVAL
int
close (feer_conn_handle *hdl)
PROTOTYPE: $
ALIAS:
Feersum::Connection::Reader::close = 1
Feersum::Connection::Writer::close = 2
CODE:
{
assert(ix);
RETVAL = feersum_close_handle(aTHX_ c, (ix == 2));
SvUVX(hdl_sv) = 0;
}
OUTPUT:
RETVAL
void
_poll_cb (feer_conn_handle *hdl, SV *cb)
PROTOTYPE: $$
ALIAS:
Feersum::Connection::Reader::poll_cb = 1
Feersum::Connection::Writer::poll_cb = 2
PPCODE:
{
if (unlikely(ix < 1 || ix > 2))
croak("can't call _poll_cb directly");
else if (unlikely(ix == 1))
croak("poll_cb for reading not yet supported"); // TODO poll_read_cb
if (c->poll_write_cb != NULL) {
SvREFCNT_dec(c->poll_write_cb);
c->poll_write_cb = NULL;
}
if (!SvOK(cb)) {
trace("unset poll_cb ix=%d\n", ix);
return;
}
else if (unlikely(!IsCodeRef(cb)))
croak("must supply a code reference to poll_cb");
c->poll_write_cb = newSVsv(cb);
conn_write_ready(c);
}
SV*
response_guard (feer_conn_handle *hdl, ...)
PROTOTYPE: $;$
CODE:
RETVAL = feersum_conn_guard(aTHX_ c, (items==2) ? ST(1) : NULL);
OUTPUT:
RETVAL
MODULE = Feersum PACKAGE = Feersum::Connection
PROTOTYPES: ENABLE
SV *
start_streaming (struct feer_conn *c, SV *message, AV *headers)
PROTOTYPE: $$\@
CODE:
feersum_start_response(aTHX_ c, message, headers, 1);
RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
OUTPUT:
RETVAL
int
is_http11 (struct feer_conn *c)
CODE:
RETVAL = c->is_http11;
OUTPUT:
RETVAL
size_t
send_response (struct feer_conn *c, SV* message, AV *headers, SV *body)
PROTOTYPE: $$\@$
CODE:
feersum_start_response(aTHX_ c, message, headers, 0);
if (unlikely(!SvOK(body)))
croak("can't send_response with an undef body");
RETVAL = feersum_write_whole_body(aTHX_ c, body);
OUTPUT:
RETVAL
SV*
_continue_streaming_psgi (struct feer_conn *c, SV *psgi_response)
PROTOTYPE: $\@
CODE:
{
AV *av;
int len = 0;
if (IsArrayRef(psgi_response)) {
av = (AV*)SvRV(psgi_response);
len = av_len(av) + 1;
}
if (len == 3) {
// 0 is "don't recurse" (i.e. don't allow another code-ref)
feersum_handle_psgi_response(aTHX_ c, psgi_response, 0);
RETVAL = &PL_sv_undef;
}
else if (len == 2) {
SV *message = *(av_fetch(av,0,0));
SV *headers = *(av_fetch(av,1,0));
if (unlikely(!IsArrayRef(headers)))
croak("PSGI headers must be an array ref");
feersum_start_response(aTHX_ c, message, (AV*)SvRV(headers), 1);
RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
}
else {
croak("PSGI response starter expects a 2 or 3 element array-ref");
}
}
OUTPUT:
RETVAL
void
force_http10 (struct feer_conn *c)
PROTOTYPE: $
ALIAS:
force_http11 = 1
PPCODE:
c->is_http11 = ix;
SV *
env (struct feer_conn *c)
PROTOTYPE: $
CODE:
RETVAL = newRV_noinc((SV*)feersum_env(aTHX_ c));
OUTPUT:
RETVAL
SV *
method (struct feer_conn *c)
PROTOTYPE: $
CODE:
struct feer_req *r = c->req;
RETVAL = feersum_env_method(aTHX_ r);
OUTPUT:
RETVAL
SV *
uri (struct feer_conn *c)
PROTOTYPE: $
CODE:
struct feer_req *r = c->req;
RETVAL = feersum_env_uri(aTHX_ r);
OUTPUT:
RETVAL
SV *
protocol (struct feer_conn *c)
PROTOTYPE: $
CODE:
struct feer_req *r = c->req;
RETVAL = SvREFCNT_inc_simple_NN(feersum_env_protocol(aTHX_ r));
OUTPUT:
RETVAL
SV *
path (struct feer_conn *c)
PROTOTYPE: $
CODE:
struct feer_req *r = c->req;
RETVAL = SvREFCNT_inc_simple_NN(feersum_env_path(aTHX_ r));
OUTPUT:
RETVAL
SV *
query (struct feer_conn *c)
PROTOTYPE: $
CODE:
struct feer_req *r = c->req;
RETVAL = SvREFCNT_inc_simple_NN(feersum_env_query(aTHX_ r));
OUTPUT:
RETVAL
SV *
remote_address (struct feer_conn *c)
PROTOTYPE: $
CODE:
RETVAL = SvREFCNT_inc_simple_NN(feersum_env_addr(aTHX_ c));
PROTOTYPE: $
CODE:
RETVAL = feersum_env_content_length(aTHX_ c);
OUTPUT:
RETVAL
SV *
input (struct feer_conn *c)
PROTOTYPE: $
CODE:
if (likely(c->expected_cl > 0)) {
RETVAL = new_feer_conn_handle(aTHX_ c, 0);
} else {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
SV *
headers (struct feer_conn *c, int norm = 0)
PROTOTYPE: $;$
CODE:
struct feer_req *r = c->req;
RETVAL = newRV_noinc((SV*)feersum_env_headers(aTHX_ r, norm));
OUTPUT:
RETVAL
SV *
header (struct feer_conn *c, SV *name)
PROTOTYPE: $$
CODE:
struct feer_req *r = c->req;
RETVAL = feersum_env_header(aTHX_ r, name);
OUTPUT:
RETVAL
int
fileno (struct feer_conn *c)
CODE:
RETVAL = c->fd;
OUTPUT:
RETVAL
bool
is_keepalive (struct feer_conn *c)
CODE:
RETVAL = c->is_keepalive;
OUTPUT:
RETVAL
SV*
response_guard (struct feer_conn *c, ...)
PROTOTYPE: $;$
CODE:
RETVAL = feersum_conn_guard(aTHX_ c, (items == 2) ? ST(1) : NULL);
OUTPUT:
RETVAL
void
DESTROY (struct feer_conn *c)
PPCODE:
{
int i;
trace("DESTROY connection fd=%d c=%p\n", c->fd, c);
if (likely(c->rbuf)) SvREFCNT_dec(c->rbuf);
if (c->wbuf_rinq) {
struct iomatrix *m;
while ((m = (struct iomatrix *)rinq_shift(&c->wbuf_rinq)) != NULL) {
for (i=0; i < m->count; i++) {
if (m->sv[i]) SvREFCNT_dec(m->sv[i]);
}
Safefree(m);
}
}
if (likely(c->req)) {
if (c->req->buf) SvREFCNT_dec(c->req->buf);
if (likely(c->req->path)) SvREFCNT_dec(c->req->path);
if (likely(c->req->query)) SvREFCNT_dec(c->req->query);
if (likely(c->req->addr)) SvREFCNT_dec(c->req->addr);
if (likely(c->req->port)) SvREFCNT_dec(c->req->port);
Safefree(c->req);
}
if (likely(c->sa)) free(c->sa);
safe_close_conn(c, "close at destruction");
if (c->poll_write_cb) SvREFCNT_dec(c->poll_write_cb);
if (c->ext_guard) SvREFCNT_dec(c->ext_guard);
active_conns--;
if (unlikely(shutting_down && active_conns <= 0)) {
ev_idle_stop(feersum_ev_loop, &ei);
ev_prepare_stop(feersum_ev_loop, &ep);
ev_check_stop(feersum_ev_loop, &ec);
trace3("... was last conn, going to try shutdown\n");
if (shutdown_cb_cv) {
PUSHMARK(SP);
call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
PUTBACK;
trace3("... ok, called that handler\n");
SvREFCNT_dec(shutdown_cb_cv);
shutdown_cb_cv = NULL;
}
}
}
MODULE = Feersum PACKAGE = Feersum
BOOT:
{
feer_stash = gv_stashpv("Feersum", 1);
feer_conn_stash = gv_stashpv("Feersum::Connection", 1);
feer_conn_writer_stash = gv_stashpv("Feersum::Connection::Writer",0);
feer_conn_reader_stash = gv_stashpv("Feersum::Connection::Reader",0);
( run in 0.549 second using v1.01-cache-2.11-cpan-5511b514fd6 )