Feersum

 view release on metacpan or  search on metacpan

Feersum.xs  view on Meta::CPAN


static int
psgix_io_svt_get (pTHX_ SV *sv, MAGIC *mg)
{
    dSP;

    struct feer_conn *c = sv_2feer_conn(mg->mg_obj);
    trace("invoking psgix.io magic for fd=%d\n", c->fd);

    sv_unmagic(sv, PERL_MAGIC_ext);

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    XPUSHs(sv);
    mXPUSHs(newSViv(c->fd));
    PUTBACK;

    call_pv("Feersum::Connection::_raw", G_VOID|G_DISCARD|G_EVAL);
    SPAGAIN;

    if (unlikely(SvTRUE(ERRSV))) {
        call_died(aTHX_ c, "psgix.io magic");
    }
    else {
        SV *io_glob   = SvRV(sv);
        GvSV(io_glob) = newRV_inc(c->self);

        // Put whatever remainder data into the socket buffer.
        // Optimizes for the websocket case.
        //
        // TODO: For keepalive support the opposite operation is required;
        // pull the data out of the socket buffer and back into feersum.
        if (likely(c->rbuf && SvOK(c->rbuf) && SvCUR(c->rbuf))) {
            STRLEN rbuf_len;
            const char *rbuf_ptr = SvPV(c->rbuf, rbuf_len);
            IO *io = GvIOp(io_glob);
            assert(io != NULL);
            PerlIO_unread(IoIFP(io), (const void *)rbuf_ptr, rbuf_len);
            sv_setpvs(c->rbuf, "");
        }

        stop_read_watcher(c);
        stop_read_timer(c);
        // don't stop write watcher in case there's outstanding data.
    }

    PUTBACK;
    FREETMPS;
    LEAVE;
    return 0;
}

MODULE = Feersum		PACKAGE = Feersum

PROTOTYPES: ENABLE

void
set_server_name_and_port(SV *self, SV *name, SV *port)
    PPCODE:
{
    if (feer_server_name)
        SvREFCNT_dec(feer_server_name);
    feer_server_name = newSVsv(name);
    SvREADONLY_on(feer_server_name);

    if (feer_server_port)
        SvREFCNT_dec(feer_server_port);
    feer_server_port = newSVsv(port);
    SvREADONLY_on(feer_server_port);
}

void
accept_on_fd(SV *self, int fd)
    PPCODE:
{
    struct sockaddr_storage addr;
    socklen_t addr_len = sizeof(addr);

    if (getsockname(fd, (struct sockaddr*)&addr, &addr_len) == -1) perror("getsockname");
    switch (addr.ss_family) {
        case AF_INET:
        case AF_INET6:
            is_tcp = 1;
#ifdef TCP_DEFER_ACCEPT
            trace("going to defer accept on %d\n",fd);
            if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &(int){1}, sizeof(int)) < 0)
                perror("setsockopt TCP_DEFER_ACCEPT");
#endif
            break;
#ifdef AF_UNIX
        case AF_UNIX:
            is_tcp = 0;
            break;
#endif
    }

    trace("going to accept on %d\n",fd);
    feersum_ev_loop = EV_DEFAULT;

    signal(SIGPIPE, SIG_IGN);

    ev_prepare_init(&ep, prepare_cb);
    ev_prepare_start(feersum_ev_loop, &ep);

    ev_check_init(&ec, check_cb);
    ev_check_start(feersum_ev_loop, &ec);

    ev_idle_init(&ei, idle_cb);

    ev_io_init(&accept_w, accept_cb, fd, EV_READ);
}

void
unlisten (SV *self)
    PPCODE:
{
    trace("stopping accept\n");
    ev_prepare_stop(feersum_ev_loop, &ep);
    ev_check_stop(feersum_ev_loop, &ec);
    ev_idle_stop(feersum_ev_loop, &ei);
    ev_io_stop(feersum_ev_loop, &accept_w);
}

void
request_handler(SV *self, SV *cb)
    PROTOTYPE: $&
    ALIAS:
        psgi_request_handler = 1
    PPCODE:
{
    if (unlikely(!SvOK(cb) || !SvROK(cb)))
        croak("can't supply an undef handler");
    if (request_cb_cv)
        SvREFCNT_dec(request_cb_cv);
    request_cb_cv = newSVsv(cb); // copy so 5.8.7 overload magic sticks.
    request_cb_is_psgi = ix;
    trace("assigned %s request handler %p\n",
        request_cb_is_psgi?"PSGI":"Feersum", request_cb_cv);
}

void
graceful_shutdown (SV *self, SV *cb)
    PROTOTYPE: $&
    PPCODE:
{
    if (!IsCodeRef(cb))
        croak("must supply a code reference");
    if (unlikely(shutting_down))
        croak("already shutting down");
    shutdown_cb_cv = newSVsv(cb);
    trace("shutting down, handler=%p, active=%d\n", SvRV(cb), active_conns);

    shutting_down = 1;
    ev_io_stop(feersum_ev_loop, &accept_w);
    close(accept_w.fd);

    if (active_conns <= 0) {
        trace("shutdown is immediate\n");
        dSP;
        ENTER;
        SAVETMPS;
        PUSHMARK(SP);
        call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
        PUTBACK;
        trace3("called shutdown handler\n");
        SvREFCNT_dec(shutdown_cb_cv);
        shutdown_cb_cv = NULL;
        FREETMPS;
        LEAVE;
    }
}

double
read_timeout (SV *self, ...)
    PROTOTYPE: $;$
    PREINIT:
        double new_read_timeout = 0.0;
    CODE:
{
    if (items > 1) {
        new_read_timeout = SvNV(ST(1));
        if (!(new_read_timeout > 0.0)) {
            croak("must set a positive (non-zero) value for the timeout");
        }
        trace("set timeout %f\n", new_read_timeout);
        read_timeout = new_read_timeout;
    }
    RETVAL = read_timeout;
}
    OUTPUT:
        RETVAL

void
set_keepalive (SV *self, SV *set)
    PPCODE:
{
    trace("set keepalive %d\n", SvTRUE(set));
    is_keepalive = SvTRUE(set);
}

unsigned int
max_connection_reqs (SV *self, ...)
    PROTOTYPE: $;$
    PREINIT:
        unsigned int new_max_connection_reqs = 0;
    CODE:
{
    if (items > 1) {
        new_max_connection_reqs = SvIV(ST(1));
        if (!(new_max_connection_reqs >= 0)) {
            croak("must set a positive value");
        }
        trace("set max requests per connection %d\n", new_max_connection_reqs);
        max_connection_reqs = new_max_connection_reqs;
    }
    RETVAL = max_connection_reqs;
}
    OUTPUT:
        RETVAL

void
DESTROY (SV *self)
    PPCODE:
{
    trace3("DESTROY server\n");
    if (request_cb_cv)
        SvREFCNT_dec(request_cb_cv);
}

MODULE = Feersum	PACKAGE = Feersum::Connection::Handle

PROTOTYPES: ENABLE

int
fileno (feer_conn_handle *hdl)
    CODE:
        RETVAL = c->fd;
    OUTPUT:
        RETVAL

void
DESTROY (SV *self)
    ALIAS:
        Feersum::Connection::Reader::DESTROY = 1
        Feersum::Connection::Writer::DESTROY = 2
    PPCODE:
{
    feer_conn_handle *hdl = sv_2feer_conn_handle(self, 0);

    if (hdl == NULL) {
        trace3("DESTROY handle (closed) class=%s\n",
            HvNAME(SvSTASH(SvRV(self))));
    }
    else {
        struct feer_conn *c = (struct feer_conn *)hdl;
        trace3("DESTROY handle fd=%d, class=%s\n", c->fd,
            HvNAME(SvSTASH(SvRV(self))));
        if (ix == 2) // only close the writer on destruction
            feersum_close_handle(aTHX_ c, 1);
    }
}

SV*
read (feer_conn_handle *hdl, SV *buf, size_t len, ...)
    PROTOTYPE: $$$;$
    PPCODE:
{
    STRLEN buf_len = 0, src_len = 0;
    ssize_t offset;
    char *buf_ptr, *src_ptr;

    // optimizes for the "read everything" case.

    if (unlikely(items == 4) && SvOK(ST(3)) && SvIOK(ST(3)))
        offset = SvIV(ST(3));
    else
        offset = 0;

    trace("read fd=%d : request    len=%"Sz_uf" off=%"Ssz_df"\n",
        c->fd, (Sz)len, (Ssz)offset);

    if (unlikely(c->receiving <= RECEIVE_HEADERS))
        // XXX as of 0.984 this is dead code
        croak("can't call read() until the body begins to arrive");

    if (!SvOK(buf) || !SvPOK(buf)) {
        // force to a PV and ensure buffer space
        sv_setpvn(buf,"",0);
        SvGROW(buf, len+1);
    }

    if (unlikely(SvREADONLY(buf)))
        croak("buffer must not be read-only");

    if (unlikely(len == 0))
        XSRETURN_IV(0); // assumes undef buffer got allocated to empty-string

    buf_ptr = SvPV(buf, buf_len);
    if (likely(c->rbuf))
        src_ptr = SvPV(c->rbuf, src_len);

    if (unlikely(len < 0))
        len = src_len;

    if (unlikely(offset < 0))
        offset = (-offset >= c->received_cl) ? 0 : c->received_cl + offset;

    if (unlikely(len + offset > src_len))
        len = src_len - offset;

    trace("read fd=%d : normalized len=%"Sz_uf" off=%"Ssz_df" src_len=%"Sz_uf"\n",
        c->fd, (Sz)len, (Ssz)offset, (Sz)src_len);

    if (unlikely(!c->rbuf || src_len == 0 || offset >= c->received_cl)) {
        trace2("rbuf empty during read %d\n", c->fd);
        if (c->receiving == RECEIVE_SHUTDOWN) {
            XSRETURN_IV(0);
        }
        else {
            errno = EAGAIN;
            XSRETURN_UNDEF;
        }
    }

    if (likely(len == src_len && offset == 0)) {
        trace2("appending entire rbuf fd=%d\n", c->fd);

Feersum.xs  view on Meta::CPAN

        if (likely(buf_len == 0)) {
            sv_setsv(buf, c->rbuf);
        }
        else {
            sv_catsv(buf, c->rbuf);
        }
        c->rbuf = NULL;
    }
    else {
        src_ptr += offset;
        trace2("appending partial rbuf fd=%d len=%"Sz_uf" off=%"Ssz_df" ptr=%p\n",
            c->fd, len, offset, src_ptr);
        SvGROW(buf, SvCUR(buf) + len);
        sv_catpvn(buf, src_ptr, len);
        if (likely(items == 3)) {
            // there wasn't an offset param, throw away beginning
            sv_chop(c->rbuf, SvPVX(c->rbuf) + len);
        }
    }

    XSRETURN_IV(len);
}

STRLEN
write (feer_conn_handle *hdl, ...)
    PROTOTYPE: $;$
    CODE:
{
    if (unlikely(c->responding != RESPOND_STREAMING))
        croak("can only call write in streaming mode");

    SV *body = (items == 2) ? ST(1) : &PL_sv_undef;
    if (unlikely(!body || !SvOK(body)))
        XSRETURN_IV(0);

    trace("write fd=%d c=%p, body=%p\n", c->fd, c, body);
    if (SvROK(body)) {
        SV *refd = SvRV(body);
        if (SvOK(refd) && SvPOK(refd)) {
            body = refd;
        }
        else {
            croak("body must be a scalar, scalar ref or undef");
        }
    }
    (void)SvPV(body, RETVAL);

    if (c->is_http11)
        add_chunk_sv_to_wbuf(c, body);
    else
        add_sv_to_wbuf(c, body);

    conn_write_ready(c);
}
    OUTPUT:
        RETVAL

void
write_array (feer_conn_handle *hdl, AV *abody)
    PROTOTYPE: $$
    PPCODE:
{
    if (unlikely(c->responding != RESPOND_STREAMING))
        croak("can only call write in streaming mode");

    trace("write_array fd=%d c=%p, abody=%p\n", c->fd, c, abody);

    I32 amax = av_len(abody);
    int i;
    if (c->is_http11) {
        for (i=0; i<=amax; i++) {
            SV *sv = fetch_av_normal(aTHX_ abody, i);
            if (likely(sv)) add_chunk_sv_to_wbuf(c, sv);
        }
    }
    else {
        for (i=0; i<=amax; i++) {
            SV *sv = fetch_av_normal(aTHX_ abody, i);
            if (likely(sv)) add_sv_to_wbuf(c, sv);
        }
    }

    conn_write_ready(c);
}

int
seek (feer_conn_handle *hdl, ssize_t offset, ...)
    PROTOTYPE: $$;$
    CODE:
{
    int whence = SEEK_CUR;
    if (items == 3 && SvOK(ST(2)) && SvIOK(ST(2)))
        whence = SvIV(ST(2));

    trace("seek fd=%d offset=%"Ssz_df" whence=%d\n", c->fd, offset, whence);

    if (unlikely(!c->rbuf)) {
        // handle is effectively "closed"
        RETVAL = 0;
    }
    else if (offset == 0) {
        RETVAL = 1; // stay put for any whence
    }
    else if (offset > 0 && (whence == SEEK_CUR || whence == SEEK_SET)) {
        STRLEN len;
        const char *str = SvPV_const(c->rbuf, len);
        if (offset > len)
            offset = len;
        sv_chop(c->rbuf, str + offset);
        RETVAL = 1;
    }
    else if (offset < 0 && whence == SEEK_END) {
        STRLEN len;
        const char *str = SvPV_const(c->rbuf, len);
        offset += len; // can't be > len since block is offset<0
        if (offset == 0) {
            RETVAL = 1; // no-op, but OK
        }
        else if (offset > 0) {
            sv_chop(c->rbuf, str + offset);
            RETVAL = 1;
        }
        else {
            // past beginning of string
            RETVAL = 0;
        }
    }
    else {
        // invalid seek
        RETVAL = 0;
    }
}
    OUTPUT:
        RETVAL

int
close (feer_conn_handle *hdl)
    PROTOTYPE: $
    ALIAS:
        Feersum::Connection::Reader::close = 1
        Feersum::Connection::Writer::close = 2
    CODE:
{
    assert(ix);
    RETVAL = feersum_close_handle(aTHX_ c, (ix == 2));
    SvUVX(hdl_sv) = 0;
}
    OUTPUT:
        RETVAL

void
_poll_cb (feer_conn_handle *hdl, SV *cb)
    PROTOTYPE: $$
    ALIAS:
        Feersum::Connection::Reader::poll_cb = 1
        Feersum::Connection::Writer::poll_cb = 2
    PPCODE:
{
    if (unlikely(ix < 1 || ix > 2))
        croak("can't call _poll_cb directly");
    else if (unlikely(ix == 1))
        croak("poll_cb for reading not yet supported"); // TODO poll_read_cb

    if (c->poll_write_cb != NULL) {
        SvREFCNT_dec(c->poll_write_cb);
        c->poll_write_cb = NULL;
    }

    if (!SvOK(cb)) {
        trace("unset poll_cb ix=%d\n", ix);
        return;
    }
    else if (unlikely(!IsCodeRef(cb)))
        croak("must supply a code reference to poll_cb");

    c->poll_write_cb = newSVsv(cb);
    conn_write_ready(c);
}

SV*
response_guard (feer_conn_handle *hdl, ...)
    PROTOTYPE: $;$
    CODE:
        RETVAL = feersum_conn_guard(aTHX_ c, (items==2) ? ST(1) : NULL);
    OUTPUT:
        RETVAL

MODULE = Feersum	PACKAGE = Feersum::Connection

PROTOTYPES: ENABLE

SV *
start_streaming (struct feer_conn *c, SV *message, AV *headers)
    PROTOTYPE: $$\@
    CODE:
        feersum_start_response(aTHX_ c, message, headers, 1);
        RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
    OUTPUT:
        RETVAL

int
is_http11 (struct feer_conn *c)
    CODE:
        RETVAL = c->is_http11;
    OUTPUT:
        RETVAL

size_t
send_response (struct feer_conn *c, SV* message, AV *headers, SV *body)
    PROTOTYPE: $$\@$
    CODE:
        feersum_start_response(aTHX_ c, message, headers, 0);
        if (unlikely(!SvOK(body)))
            croak("can't send_response with an undef body");
        RETVAL = feersum_write_whole_body(aTHX_ c, body);
    OUTPUT:
        RETVAL

SV*
_continue_streaming_psgi (struct feer_conn *c, SV *psgi_response)
    PROTOTYPE: $\@
    CODE:
{
    AV *av;
    int len = 0;

    if (IsArrayRef(psgi_response)) {
        av = (AV*)SvRV(psgi_response);
        len = av_len(av) + 1;
    }

    if (len == 3) {
        // 0 is "don't recurse" (i.e. don't allow another code-ref)
        feersum_handle_psgi_response(aTHX_ c, psgi_response, 0);
        RETVAL = &PL_sv_undef;
    }
    else if (len == 2) {
        SV *message = *(av_fetch(av,0,0));
        SV *headers = *(av_fetch(av,1,0));
        if (unlikely(!IsArrayRef(headers)))
            croak("PSGI headers must be an array ref");
        feersum_start_response(aTHX_ c, message, (AV*)SvRV(headers), 1);
        RETVAL = new_feer_conn_handle(aTHX_ c, 1); // RETVAL gets mortalized
    }
    else {
        croak("PSGI response starter expects a 2 or 3 element array-ref");
    }
}
    OUTPUT:
        RETVAL

void
force_http10 (struct feer_conn *c)
    PROTOTYPE: $
    ALIAS:
        force_http11 = 1
    PPCODE:
        c->is_http11 = ix;

SV *
env (struct feer_conn *c)
    PROTOTYPE: $
    CODE:
        RETVAL = newRV_noinc((SV*)feersum_env(aTHX_ c));
    OUTPUT:
        RETVAL

SV *
method (struct feer_conn *c)
    PROTOTYPE: $
    CODE:
        struct feer_req *r = c->req;
        RETVAL = feersum_env_method(aTHX_ r);
    OUTPUT:
        RETVAL

SV *
uri (struct feer_conn *c)
    PROTOTYPE: $
    CODE:
        struct feer_req *r = c->req;
        RETVAL = feersum_env_uri(aTHX_ r);
    OUTPUT:
        RETVAL

SV *
protocol (struct feer_conn *c)
    PROTOTYPE: $
    CODE:
        struct feer_req *r = c->req;
        RETVAL = SvREFCNT_inc_simple_NN(feersum_env_protocol(aTHX_ r));
    OUTPUT:
        RETVAL

SV *
path (struct feer_conn *c)
    PROTOTYPE: $
    CODE:
        struct feer_req *r = c->req;
        RETVAL = SvREFCNT_inc_simple_NN(feersum_env_path(aTHX_ r));
    OUTPUT:
        RETVAL

SV *
query (struct feer_conn *c)
    PROTOTYPE: $
    CODE:
        struct feer_req *r = c->req;
        RETVAL = SvREFCNT_inc_simple_NN(feersum_env_query(aTHX_ r));
    OUTPUT:
        RETVAL

SV *
remote_address (struct feer_conn *c)
    PROTOTYPE: $
    CODE:
        RETVAL = SvREFCNT_inc_simple_NN(feersum_env_addr(aTHX_ c));

Feersum.xs  view on Meta::CPAN

    PROTOTYPE: $
    CODE:
        RETVAL = feersum_env_content_length(aTHX_ c);
    OUTPUT:
        RETVAL

SV *
input (struct feer_conn *c)
    PROTOTYPE: $
    CODE:
        if (likely(c->expected_cl > 0)) {
            RETVAL = new_feer_conn_handle(aTHX_ c, 0);
        } else {
            RETVAL = &PL_sv_undef;
        }
    OUTPUT:
        RETVAL

SV *
headers (struct feer_conn *c, int norm = 0)
    PROTOTYPE: $;$
    CODE:
        struct feer_req *r = c->req;
        RETVAL = newRV_noinc((SV*)feersum_env_headers(aTHX_ r, norm));
    OUTPUT:
        RETVAL

SV *
header (struct feer_conn *c, SV *name)
    PROTOTYPE: $$
    CODE:
        struct feer_req *r = c->req;
        RETVAL = feersum_env_header(aTHX_ r, name);
    OUTPUT:
        RETVAL

int
fileno (struct feer_conn *c)
    CODE:
        RETVAL = c->fd;
    OUTPUT:
        RETVAL

bool
is_keepalive (struct feer_conn *c)
    CODE:
        RETVAL = c->is_keepalive;
    OUTPUT:
        RETVAL

SV*
response_guard (struct feer_conn *c, ...)
    PROTOTYPE: $;$
    CODE:
        RETVAL = feersum_conn_guard(aTHX_ c, (items == 2) ? ST(1) : NULL);
    OUTPUT:
        RETVAL

void
DESTROY (struct feer_conn *c)
    PPCODE:
{
    int i;
    trace("DESTROY connection fd=%d c=%p\n", c->fd, c);

    if (likely(c->rbuf)) SvREFCNT_dec(c->rbuf);

    if (c->wbuf_rinq) {
        struct iomatrix *m;
        while ((m = (struct iomatrix *)rinq_shift(&c->wbuf_rinq)) != NULL) {
            for (i=0; i < m->count; i++) {
                if (m->sv[i]) SvREFCNT_dec(m->sv[i]);
            }
            Safefree(m);
        }
    }

    if (likely(c->req)) {
        if (c->req->buf) SvREFCNT_dec(c->req->buf);
        if (likely(c->req->path)) SvREFCNT_dec(c->req->path);
        if (likely(c->req->query)) SvREFCNT_dec(c->req->query);
        if (likely(c->req->addr)) SvREFCNT_dec(c->req->addr);
        if (likely(c->req->port)) SvREFCNT_dec(c->req->port);
        Safefree(c->req);
    }

    if (likely(c->sa)) free(c->sa);

    safe_close_conn(c, "close at destruction");

    if (c->poll_write_cb) SvREFCNT_dec(c->poll_write_cb);

    if (c->ext_guard) SvREFCNT_dec(c->ext_guard);

    active_conns--;

    if (unlikely(shutting_down && active_conns <= 0)) {
        ev_idle_stop(feersum_ev_loop, &ei);
        ev_prepare_stop(feersum_ev_loop, &ep);
        ev_check_stop(feersum_ev_loop, &ec);

        trace3("... was last conn, going to try shutdown\n");
        if (shutdown_cb_cv) {
            PUSHMARK(SP);
            call_sv(shutdown_cb_cv, G_EVAL|G_VOID|G_DISCARD|G_NOARGS|G_KEEPERR);
            PUTBACK;
            trace3("... ok, called that handler\n");
            SvREFCNT_dec(shutdown_cb_cv);
            shutdown_cb_cv = NULL;
        }
    }
}

MODULE = Feersum	PACKAGE = Feersum

BOOT:
    {
        feer_stash = gv_stashpv("Feersum", 1);
        feer_conn_stash = gv_stashpv("Feersum::Connection", 1);
        feer_conn_writer_stash = gv_stashpv("Feersum::Connection::Writer",0);
        feer_conn_reader_stash = gv_stashpv("Feersum::Connection::Reader",0);



( run in 0.549 second using v1.01-cache-2.11-cpan-5511b514fd6 )