Chandra
view release on metacpan or search on metacpan
xs/socket_connection.xs view on Meta::CPAN
PUTBACK;
FREETMPS;
LEAVE;
}
RETVAL = newSViv(success);
}
}
OUTPUT:
RETVAL
SV *
reply(self, orig_msg_sv, data_sv)
SV *self
SV *orig_msg_sv
SV *data_sv
CODE:
{
if (!SvOK(orig_msg_sv) || !SvROK(orig_msg_sv) ||
SvTYPE(SvRV(orig_msg_sv)) != SVt_PVHV) {
RETVAL = newSViv(0);
} else {
HV *orig_hv = (HV *)SvRV(orig_msg_sv);
SV **id_svp = hv_fetchs(orig_hv, "_id", 0);
if (!id_svp || !SvOK(*id_svp)) {
RETVAL = newSViv(0);
} else {
SV **ch_svp = hv_fetchs(orig_hv, "channel", 0);
HV *extra_hv = newHV();
int count;
(void)hv_stores(extra_hv, "_reply_to", newSVsv(*id_svp));
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(self);
XPUSHs(ch_svp && SvOK(*ch_svp) ? *ch_svp : &PL_sv_undef);
XPUSHs(data_sv);
XPUSHs(sv_2mortal(newRV_noinc((SV *)extra_hv)));
PUTBACK;
count = call_method("send", G_SCALAR);
SPAGAIN;
RETVAL = (count > 0) ? SvREFCNT_inc(POPs) : newSViv(0);
PUTBACK;
FREETMPS;
LEAVE;
}
}
}
}
OUTPUT:
RETVAL
void
recv(self)
SV *self
PPCODE:
{
/* Delegate entirely to Perl helper _xs_do_recv which handles
sysread, buffering, frame decoding, and returns a list of msgs */
int count, i;
SV **results;
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(self);
PUTBACK;
count = call_pv("Chandra::Socket::Connection::_xs_do_recv", G_ARRAY);
SPAGAIN;
if (count > 0) {
/* Allocate temp array to save results before FREETMPS */
Newx(results, count, SV *);
for (i = count - 1; i >= 0; i--) {
results[i] = newSVsv(POPs);
}
}
PUTBACK;
FREETMPS;
LEAVE;
}
/* Now push saved results onto the PPCODE return stack */
for (i = 0; i < count; i++) {
XPUSHs(sv_2mortal(results[i]));
}
if (count > 0) Safefree(results);
}
void
close(self)
SV *self
CODE:
{
HV *hv = (HV *)SvRV(self);
SV **sock_svp;
(void)hv_stores(hv, "_connected", newSViv(0));
sock_svp = hv_fetchs(hv, "socket", 0);
if (sock_svp && SvOK(*sock_svp)) {
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(*sock_svp);
PUTBACK;
call_method("close", G_DISCARD);
FREETMPS;
LEAVE;
}
(void)hv_stores(hv, "socket", newSVsv(&PL_sv_undef));
}
}
SV *
encode_frame(class, msg_sv)
SV *class
SV *msg_sv
CODE:
{
SV *json_sv;
const char *json_str;
STRLEN json_len;
char header[4];
/* JSON encode */
{
dSP;
int count;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(msg_sv);
PUTBACK;
count = call_pv("Chandra::Socket::Connection::_xs_json_encode",
G_SCALAR);
SPAGAIN;
json_sv = (count > 0) ? newSVsv(POPs) : newSVpvs("");
PUTBACK;
FREETMPS;
LEAVE;
}
json_str = SvPV(json_sv, json_len);
/* Build frame: 4-byte big-endian length + JSON payload */
header[0] = (json_len >> 24) & 0xFF;
header[1] = (json_len >> 16) & 0xFF;
header[2] = (json_len >> 8) & 0xFF;
header[3] = json_len & 0xFF;
RETVAL = newSV(4 + json_len);
SvPOK_on(RETVAL);
Copy(header, SvPVX(RETVAL), 4, char);
Copy(json_str, SvPVX(RETVAL) + 4, json_len, char);
SvCUR_set(RETVAL, 4 + json_len);
SvREFCNT_dec(json_sv);
}
OUTPUT:
RETVAL
void
decode_frames(class, data_sv)
SV *class
SV *data_sv
PPCODE:
{
AV *result = newAV();
I32 ri, result_count;
if (SvOK(data_sv)) {
const char *buf;
STRLEN buf_len;
STRLEN consumed = 0;
buf = SvPV(data_sv, buf_len);
while (buf_len - consumed >= 4) {
const unsigned char *p =
(const unsigned char *)(buf + consumed);
UV frame_len = ((UV)p[0] << 24) | ((UV)p[1] << 16) |
((UV)p[2] << 8) | (UV)p[3];
if (buf_len - consumed < 4 + frame_len) break;
/* JSON decode payload */
{
SV *msg_sv = NULL;
dSP;
int count;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(sv_2mortal(newSVpvn(
buf + consumed + 4, frame_len)));
PUTBACK;
count = call_pv(
"Chandra::Socket::Connection::_xs_json_decode",
G_SCALAR);
SPAGAIN;
if (count > 0) msg_sv = newSVsv(POPs);
PUTBACK;
FREETMPS;
LEAVE;
if (msg_sv && SvOK(msg_sv)) {
av_push(result, msg_sv);
} else {
if (msg_sv) SvREFCNT_dec(msg_sv);
}
}
consumed += 4 + frame_len;
}
}
/* Push all decoded messages onto Perl stack */
result_count = av_len(result) + 1;
for (ri = 0; ri < result_count; ri++) {
SV **svp = av_fetch(result, ri, 0);
if (svp) XPUSHs(sv_2mortal(SvREFCNT_inc(*svp)));
}
SvREFCNT_dec((SV *)result);
}
int
xs/socket_connection.xs view on Meta::CPAN
XPUSHs(msg_sv);
PUTBACK;
count = call_method("encode", G_SCALAR);
SPAGAIN;
payload = (count > 0) ? newSVsv(POPs) : newSVpvs("");
PUTBACK;
FREETMPS; LEAVE;
}
payload_len = SvCUR(payload);
/* pack('N', length) . payload - build frame in C */
len_buf[0] = (payload_len >> 24) & 0xFF;
len_buf[1] = (payload_len >> 16) & 0xFF;
len_buf[2] = (payload_len >> 8) & 0xFF;
len_buf[3] = payload_len & 0xFF;
frame = newSVpvn((char *)len_buf, 4);
sv_catsv(frame, payload);
SvREFCNT_dec(payload);
frame_len = SvCUR(frame);
/* Ignore SIGPIPE during syswrite - portable across OS */
{
SigpipeGuard spg;
SV *written_sv;
sigpipe_ignore(&spg);
/* $socket->syswrite($frame) */
{
dSP;
int count;
ENTER; SAVETMPS;
PUSHMARK(SP);
XPUSHs(socket_sv);
XPUSHs(frame);
PUTBACK;
count = call_method("syswrite", G_SCALAR);
SPAGAIN;
written_sv = (count > 0) ? POPs : &PL_sv_undef;
if (SvOK(written_sv) && SvIV(written_sv) == (IV)frame_len) {
RETVAL = 1;
} else {
RETVAL = 0;
}
PUTBACK;
FREETMPS; LEAVE;
}
sigpipe_restore(&spg);
}
SvREFCNT_dec(frame);
}
OUTPUT:
RETVAL
void
_xs_do_recv(self)
SV *self
PPCODE:
{
HV *hv = (HV *)SvRV(self);
SV **connected_svp = hv_fetchs(hv, "_connected", 0);
SV **sock_svp = hv_fetchs(hv, "socket", 0);
SV **buf_svp;
AV *messages = newAV();
I32 mi, msg_count;
if (!connected_svp || !SvTRUE(*connected_svp) ||
!sock_svp || !SvOK(*sock_svp)) {
goto return_messages;
}
/* sysread */
{
SV *read_buf = newSV(65536);
SvPOK_on(read_buf);
SvCUR_set(read_buf, 0);
{
dSP;
int count;
SV *read_result;
ENTER; SAVETMPS;
PUSHMARK(SP);
XPUSHs(*sock_svp);
XPUSHs(read_buf);
XPUSHs(sv_2mortal(newSViv(65536)));
PUTBACK;
count = call_method("sysread", G_SCALAR);
SPAGAIN;
read_result = (count > 0) ? POPs : &PL_sv_undef;
if (SvOK(read_result) && SvIV(read_result) > 0) {
/* Append to buffer */
buf_svp = hv_fetchs(hv, "_buf", 0);
if (buf_svp && SvOK(*buf_svp)) {
sv_catsv(*buf_svp, read_buf);
} else {
(void)hv_stores(hv, "_buf", newSVsv(read_buf));
}
buf_svp = hv_fetchs(hv, "_buf", 0);
if (buf_svp && SvCUR(*buf_svp) > 64 * 1024 * 1024) {
warn("Chandra::Socket::Connection: buffer overflow, disconnecting\n");
(void)hv_stores(hv, "_connected", newSViv(0));
(void)hv_stores(hv, "_buf", newSVpvs(""));
SvREFCNT_dec(read_buf);
PUTBACK; FREETMPS; LEAVE;
goto return_messages;
}
} else if (!SvOK(read_result)) {
/* Check errno for EAGAIN/EWOULDBLOCK */
int err_no = errno;
#ifdef EAGAIN
if (err_no == EAGAIN) { PUTBACK; FREETMPS; LEAVE; SvREFCNT_dec(read_buf); goto parse_frames; }
#endif
#ifdef EWOULDBLOCK
if (err_no == EWOULDBLOCK) { PUTBACK; FREETMPS; LEAVE; SvREFCNT_dec(read_buf); goto parse_frames; }
#endif
( run in 0.697 second using v1.01-cache-2.11-cpan-71847e10f99 )