Hypersonic
view release on metacpan or search on metacpan
lib/Hypersonic/Stream.pm view on Meta::CPAN
package Hypersonic::Stream;
use strict;
use warnings;
use 5.010;
our $VERSION = '0.19';
use constant {
STATE_INIT => 0,
STATE_STARTED => 1,
STATE_FINISHED => 2,
STATE_ABORTED => 3,
};
use constant MAX_STREAMS => 65536;
# Class method for streaming handler detection (only Perl code needed)
sub is_streaming_handler {
my ($class, $handler, $opts) = @_;
return 1 if $opts && $opts->{streaming};
my $proto = prototype($handler);
return 1 if defined $proto && $proto =~ /stream/i;
eval {
require B::Deparse;
my $deparser = B::Deparse->new('-p', '-sC');
my $code = $deparser->coderef2text($handler);
return 1 if $code =~ /\$stream\s*->/;
};
return 0;
}
# ============================================================
# XS Code Generation - ALL instance methods generated in C
# ============================================================
sub generate_c_code {
my ($class, $builder, $opts) = @_;
$opts //= {};
my $max = $opts->{max_streams} // MAX_STREAMS;
# sys/uio.h is POSIX-only (writev + struct iovec live there).
# On Windows we fall back to multiple send() calls (see
# stream_write_chunk_http1 below). Winsock headers are pulled in
# by Hypersonic::JIT::Util / Hypersonic::Socket already.
$builder->line('#ifndef _WIN32')
->line('#include <sys/uio.h>')
->line('#endif')
->blank;
$class->gen_stream_registry($builder, $max);
$class->gen_status_text($builder);
$class->gen_stream_start_c($builder);
$class->gen_stream_write_chunk_c($builder);
$class->gen_stream_end_c($builder);
$class->gen_stream_reset_c($builder);
# XS instance methods
$class->gen_xs_new($builder);
$class->gen_xs_fd($builder);
$class->gen_xs_protocol($builder);
$class->gen_xs_state($builder);
$class->gen_xs_chunks_sent($builder);
$class->gen_xs_is_started($builder);
$class->gen_xs_is_finished($builder);
$class->gen_xs_headers($builder);
$class->gen_xs_content_type($builder);
$class->gen_xs_write($builder);
$class->gen_xs_end($builder);
$class->gen_xs_abort($builder);
return $builder;
}
sub gen_stream_registry {
my ($class, $builder, $max) = @_;
$builder->comment('Stream registry - O(1) lookup by fd')
->line('#define STREAM_MAX ' . $max)
->line('#define STREAM_STATE_INIT 0')
->line('#define STREAM_STATE_STARTED 1')
lib/Hypersonic/Stream.pm view on Meta::CPAN
->xs_end
->blank;
}
sub gen_xs_chunks_sent {
my ($class, $builder) = @_;
$builder->xs_function('xs_stream_chunks_sent')
->xs_preamble
->check_items(1, 1, '$stream->chunks_sent')
->line('int fd = SvIV(SvRV(ST(0)));')
->if('fd < 0 || fd >= STREAM_MAX')
->line('XSRETURN_IV(0);')
->endif
->line('XSRETURN_IV(stream_registry[fd].chunks_sent);')
->xs_end
->blank;
}
sub gen_xs_is_started {
my ($class, $builder) = @_;
$builder->xs_function('xs_stream_is_started')
->xs_preamble
->check_items(1, 1, '$stream->is_started')
->line('int fd = SvIV(SvRV(ST(0)));')
->if('stream_registry[fd].state >= STREAM_STATE_STARTED')
->line('XSRETURN_YES;')
->else
->line('XSRETURN_NO;')
->endif
->xs_end
->blank;
}
sub gen_xs_is_finished {
my ($class, $builder) = @_;
$builder->xs_function('xs_stream_is_finished')
->xs_preamble
->check_items(1, 1, '$stream->is_finished')
->line('int fd = SvIV(SvRV(ST(0)));')
->if('stream_registry[fd].state >= STREAM_STATE_FINISHED')
->line('XSRETURN_YES;')
->else
->line('XSRETURN_NO;')
->endif
->xs_end
->blank;
}
sub gen_xs_headers {
my ($class, $builder) = @_;
$builder->xs_function('xs_stream_headers')
->xs_preamble
->line('int fd = SvIV(SvRV(ST(0)));')
->line('StreamState* s = &stream_registry[fd];')
->blank
->if('s->state != STREAM_STATE_INIT')
->line('croak("Cannot set headers after streaming started");')
->endif
->blank
->if('items >= 2')
->line('s->status = SvIV(ST(1));')
->endif
->blank
->line('s->extra_headers[0] = \'\\0\';')
->if('items >= 3 && SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVHV')
->line('HV* hv = (HV*)SvRV(ST(2));')
->line('int extra_pos = 0;')
->blank
->comment('Extract Content-Type')
->line('SV** ct = hv_fetchs(hv, "Content-Type", 0);')
->if('!ct')
->line('ct = hv_fetchs(hv, "content-type", 0);')
->endif
->if('ct && *ct')
->line('STRLEN len;')
->line('const char* val = SvPV(*ct, len);')
->if('len < sizeof(s->content_type)')
->line('memcpy(s->content_type, val, len);')
->line('s->content_type[len] = \'\\0\';')
->endif
->endif
->blank
->comment('Extract other headers (Cache-Control, Connection, X-Accel-Buffering)')
->line('SV** cc = hv_fetchs(hv, "Cache-Control", 0);')
->if('cc && *cc')
->line('STRLEN len;')
->line('const char* val = SvPV(*cc, len);')
->line('extra_pos += snprintf(s->extra_headers + extra_pos,')
->line(' sizeof(s->extra_headers) - extra_pos, "Cache-Control: %s\\r\\n", val);')
->endif
->line('SV** conn = hv_fetchs(hv, "Connection", 0);')
->if('conn && *conn')
->line('STRLEN len;')
->line('const char* val = SvPV(*conn, len);')
->line('extra_pos += snprintf(s->extra_headers + extra_pos,')
->line(' sizeof(s->extra_headers) - extra_pos, "Connection: %s\\r\\n", val);')
->endif
->line('SV** xab = hv_fetchs(hv, "X-Accel-Buffering", 0);')
->if('xab && *xab')
->line('STRLEN len;')
->line('const char* val = SvPV(*xab, len);')
->line('extra_pos += snprintf(s->extra_headers + extra_pos,')
->line(' sizeof(s->extra_headers) - extra_pos, "X-Accel-Buffering: %s\\r\\n", val);')
->endif
->endif
->blank
->line('ST(0) = ST(0);')
->line('XSRETURN(1);')
->xs_end
->blank;
}
sub gen_xs_content_type {
my ($class, $builder) = @_;
$builder->xs_function('xs_stream_content_type')
->xs_preamble
->if('items != 2')
->line('croak("Usage: $stream->content_type(type)");')
->endif
->line('int fd = SvIV(SvRV(ST(0)));')
->line('StreamState* s = &stream_registry[fd];')
->blank
->if('s->state != STREAM_STATE_INIT')
->line('croak("Cannot set content_type after streaming started");')
->endif
->blank
->line('STRLEN len;')
->line('const char* ct = SvPV(ST(1), len);')
->if('len < sizeof(s->content_type)')
->line('memcpy(s->content_type, ct, len);')
->line('s->content_type[len] = \'\\0\';')
->endif
->blank
->line('ST(0) = ST(0);')
->line('XSRETURN(1);')
->xs_end
->blank;
}
sub gen_xs_write {
my ($class, $builder) = @_;
$builder->xs_function('xs_stream_write')
->xs_preamble
->if('items != 2')
->line('croak("Usage: $stream->write(data)");')
->endif
->line('int fd = SvIV(SvRV(ST(0)));')
->line('StreamState* s = &stream_registry[fd];')
->blank
->line('STRLEN len;')
->line('const char* data = SvPV(ST(1), len);')
->if('len == 0')
->line('ST(0) = ST(0);')
->line('XSRETURN(1);')
->endif
->blank
->if('s->state == STREAM_STATE_INIT && !s->http2')
->line('stream_start_http1(fd);')
->endif
->blank
->if('!s->http2')
->line('stream_write_chunk_http1(fd, data, len);')
->endif
->blank
->line('ST(0) = ST(0);')
->line('XSRETURN(1);')
->xs_end
->blank;
}
sub gen_xs_end {
my ($class, $builder) = @_;
$builder->xs_function('xs_stream_end')
->xs_preamble
->line('int fd = SvIV(SvRV(ST(0)));')
->line('StreamState* s = &stream_registry[fd];')
->blank
->if('s->state >= STREAM_STATE_FINISHED')
->line('ST(0) = ST(0);')
->line('XSRETURN(1);')
->endif
->blank
( run in 1.587 second using v1.01-cache-2.11-cpan-140bd7fdf52 )