Chandra

 view release on metacpan or  search on metacpan

xs/socket_connection.xs  view on Meta::CPAN

            PUTBACK;
            FREETMPS;
            LEAVE;
        }

        RETVAL = newSViv(success);
    }
}
OUTPUT:
    RETVAL

SV *
reply(self, orig_msg_sv, data_sv)
    SV *self
    SV *orig_msg_sv
    SV *data_sv
CODE:
{
    if (!SvOK(orig_msg_sv) || !SvROK(orig_msg_sv) ||
        SvTYPE(SvRV(orig_msg_sv)) != SVt_PVHV) {
        RETVAL = newSViv(0);
    } else {
        HV *orig_hv = (HV *)SvRV(orig_msg_sv);
        SV **id_svp = hv_fetchs(orig_hv, "_id", 0);

        if (!id_svp || !SvOK(*id_svp)) {
            RETVAL = newSViv(0);
        } else {
            SV **ch_svp = hv_fetchs(orig_hv, "channel", 0);
            HV *extra_hv = newHV();
            int count;

            (void)hv_stores(extra_hv, "_reply_to", newSVsv(*id_svp));

            {
                dSP;
                ENTER;
                SAVETMPS;
                PUSHMARK(SP);
                XPUSHs(self);
                XPUSHs(ch_svp && SvOK(*ch_svp) ? *ch_svp : &PL_sv_undef);
                XPUSHs(data_sv);
                XPUSHs(sv_2mortal(newRV_noinc((SV *)extra_hv)));
                PUTBACK;
                count = call_method("send", G_SCALAR);
                SPAGAIN;
                RETVAL = (count > 0) ? SvREFCNT_inc(POPs) : newSViv(0);
                PUTBACK;
                FREETMPS;
                LEAVE;
            }
        }
    }
}
OUTPUT:
    RETVAL

void
recv(self)
    SV *self
PPCODE:
{
    /* Delegate entirely to Perl helper _xs_do_recv which handles
       sysread, buffering, frame decoding, and returns a list of msgs */
    int count, i;
    SV **results;

    {
        dSP;
        ENTER;
        SAVETMPS;
        PUSHMARK(SP);
        XPUSHs(self);
        PUTBACK;
        count = call_pv("Chandra::Socket::Connection::_xs_do_recv", G_ARRAY);
        SPAGAIN;

        if (count > 0) {
            /* Allocate temp array to save results before FREETMPS */
            Newx(results, count, SV *);
            for (i = count - 1; i >= 0; i--) {
                results[i] = newSVsv(POPs);
            }
        }
        PUTBACK;
        FREETMPS;
        LEAVE;
    }

    /* Now push saved results onto the PPCODE return stack */
    for (i = 0; i < count; i++) {
        XPUSHs(sv_2mortal(results[i]));
    }
    if (count > 0) Safefree(results);
}

void
close(self)
    SV *self
CODE:
{
    HV *hv = (HV *)SvRV(self);
    SV **sock_svp;

    (void)hv_stores(hv, "_connected", newSViv(0));

    sock_svp = hv_fetchs(hv, "socket", 0);
    if (sock_svp && SvOK(*sock_svp)) {
        {
            dSP;
            ENTER;
            SAVETMPS;
            PUSHMARK(SP);
            XPUSHs(*sock_svp);
            PUTBACK;
            call_method("close", G_DISCARD);
            FREETMPS;
            LEAVE;
        }
        (void)hv_stores(hv, "socket", newSVsv(&PL_sv_undef));
    }
}

SV *
encode_frame(class, msg_sv)
    SV *class
    SV *msg_sv
CODE:
{
    SV *json_sv;
    const char *json_str;
    STRLEN json_len;
    char header[4];

    /* JSON encode */
    {
        dSP;
        int count;
        ENTER;
        SAVETMPS;
        PUSHMARK(SP);
        XPUSHs(msg_sv);
        PUTBACK;
        count = call_pv("Chandra::Socket::Connection::_xs_json_encode",
            G_SCALAR);
        SPAGAIN;
        json_sv = (count > 0) ? newSVsv(POPs) : newSVpvs("");
        PUTBACK;
        FREETMPS;
        LEAVE;
    }

    json_str = SvPV(json_sv, json_len);

    /* Build frame: 4-byte big-endian length + JSON payload */
    header[0] = (json_len >> 24) & 0xFF;
    header[1] = (json_len >> 16) & 0xFF;
    header[2] = (json_len >> 8) & 0xFF;
    header[3] = json_len & 0xFF;

    RETVAL = newSV(4 + json_len);
    SvPOK_on(RETVAL);
    Copy(header, SvPVX(RETVAL), 4, char);
    Copy(json_str, SvPVX(RETVAL) + 4, json_len, char);
    SvCUR_set(RETVAL, 4 + json_len);

    SvREFCNT_dec(json_sv);
}
OUTPUT:
    RETVAL

void
decode_frames(class, data_sv)
    SV *class
    SV *data_sv
PPCODE:
{
    AV *result = newAV();
    I32 ri, result_count;

    if (SvOK(data_sv)) {
        const char *buf;
        STRLEN buf_len;
        STRLEN consumed = 0;

        buf = SvPV(data_sv, buf_len);

        while (buf_len - consumed >= 4) {
            const unsigned char *p =
                (const unsigned char *)(buf + consumed);
            UV frame_len = ((UV)p[0] << 24) | ((UV)p[1] << 16) |
                           ((UV)p[2] << 8)  | (UV)p[3];

            if (buf_len - consumed < 4 + frame_len) break;

            /* JSON decode payload */
            {
                SV *msg_sv = NULL;
                dSP;
                int count;
                ENTER;
                SAVETMPS;
                PUSHMARK(SP);
                XPUSHs(sv_2mortal(newSVpvn(
                    buf + consumed + 4, frame_len)));
                PUTBACK;
                count = call_pv(
                    "Chandra::Socket::Connection::_xs_json_decode",
                    G_SCALAR);
                SPAGAIN;
                if (count > 0) msg_sv = newSVsv(POPs);
                PUTBACK;
                FREETMPS;
                LEAVE;

                if (msg_sv && SvOK(msg_sv)) {
                    av_push(result, msg_sv);
                } else {
                    if (msg_sv) SvREFCNT_dec(msg_sv);
                }
            }

            consumed += 4 + frame_len;
        }
    }

    /* Push all decoded messages onto Perl stack */
    result_count = av_len(result) + 1;
    for (ri = 0; ri < result_count; ri++) {
        SV **svp = av_fetch(result, ri, 0);
        if (svp) XPUSHs(sv_2mortal(SvREFCNT_inc(*svp)));
    }
    SvREFCNT_dec((SV *)result);
}

int

xs/socket_connection.xs  view on Meta::CPAN

        XPUSHs(msg_sv);
        PUTBACK;
        count = call_method("encode", G_SCALAR);
        SPAGAIN;
        payload = (count > 0) ? newSVsv(POPs) : newSVpvs("");
        PUTBACK;
        FREETMPS; LEAVE;
    }

    payload_len = SvCUR(payload);

    /* pack('N', length) . payload - build frame in C */
    len_buf[0] = (payload_len >> 24) & 0xFF;
    len_buf[1] = (payload_len >> 16) & 0xFF;
    len_buf[2] = (payload_len >> 8) & 0xFF;
    len_buf[3] = payload_len & 0xFF;

    frame = newSVpvn((char *)len_buf, 4);
    sv_catsv(frame, payload);
    SvREFCNT_dec(payload);
    frame_len = SvCUR(frame);

    /* Ignore SIGPIPE during syswrite - portable across OS */
    {
        SigpipeGuard spg;
        SV *written_sv;
        sigpipe_ignore(&spg);

        /* $socket->syswrite($frame) */
        {
            dSP;
            int count;
            ENTER; SAVETMPS;
            PUSHMARK(SP);
            XPUSHs(socket_sv);
            XPUSHs(frame);
            PUTBACK;
            count = call_method("syswrite", G_SCALAR);
            SPAGAIN;
            written_sv = (count > 0) ? POPs : &PL_sv_undef;
            if (SvOK(written_sv) && SvIV(written_sv) == (IV)frame_len) {
                RETVAL = 1;
            } else {
                RETVAL = 0;
            }
            PUTBACK;
            FREETMPS; LEAVE;
        }

        sigpipe_restore(&spg);
    }

    SvREFCNT_dec(frame);
}
OUTPUT:
    RETVAL

void
_xs_do_recv(self)
    SV *self
PPCODE:
{
    HV *hv = (HV *)SvRV(self);
    SV **connected_svp = hv_fetchs(hv, "_connected", 0);
    SV **sock_svp = hv_fetchs(hv, "socket", 0);
    SV **buf_svp;
    AV *messages = newAV();
    I32 mi, msg_count;

    if (!connected_svp || !SvTRUE(*connected_svp) ||
        !sock_svp || !SvOK(*sock_svp)) {
        goto return_messages;
    }

    /* sysread */
    {
        SV *read_buf = newSV(65536);
        SvPOK_on(read_buf);
        SvCUR_set(read_buf, 0);

        {
            dSP;
            int count;
            SV *read_result;
            ENTER; SAVETMPS;
            PUSHMARK(SP);
            XPUSHs(*sock_svp);
            XPUSHs(read_buf);
            XPUSHs(sv_2mortal(newSViv(65536)));
            PUTBACK;
            count = call_method("sysread", G_SCALAR);
            SPAGAIN;
            read_result = (count > 0) ? POPs : &PL_sv_undef;

            if (SvOK(read_result) && SvIV(read_result) > 0) {
                /* Append to buffer */
                buf_svp = hv_fetchs(hv, "_buf", 0);
                if (buf_svp && SvOK(*buf_svp)) {
                    sv_catsv(*buf_svp, read_buf);
                } else {
                    (void)hv_stores(hv, "_buf", newSVsv(read_buf));
                }

                buf_svp = hv_fetchs(hv, "_buf", 0);
                if (buf_svp && SvCUR(*buf_svp) > 64 * 1024 * 1024) {
                    warn("Chandra::Socket::Connection: buffer overflow, disconnecting\n");
                    (void)hv_stores(hv, "_connected", newSViv(0));
                    (void)hv_stores(hv, "_buf", newSVpvs(""));
                    SvREFCNT_dec(read_buf);
                    PUTBACK; FREETMPS; LEAVE;
                    goto return_messages;
                }
            } else if (!SvOK(read_result)) {
                /* Check errno for EAGAIN/EWOULDBLOCK */
                int err_no = errno;
#ifdef EAGAIN
                if (err_no == EAGAIN) { PUTBACK; FREETMPS; LEAVE; SvREFCNT_dec(read_buf); goto parse_frames; }
#endif
#ifdef EWOULDBLOCK
                if (err_no == EWOULDBLOCK) { PUTBACK; FREETMPS; LEAVE; SvREFCNT_dec(read_buf); goto parse_frames; }
#endif



( run in 0.697 second using v1.01-cache-2.11-cpan-71847e10f99 )