Hypersonic

 view release on metacpan or  search on metacpan

lib/Hypersonic/Stream.pm  view on Meta::CPAN

package Hypersonic::Stream;
use strict;
use warnings;
use 5.010;

our $VERSION = '0.19';

use constant {
    STATE_INIT     => 0,
    STATE_STARTED  => 1,
    STATE_FINISHED => 2,
    STATE_ABORTED  => 3,
};
use constant MAX_STREAMS => 65536;

# Class method for streaming handler detection (only Perl code needed)
sub is_streaming_handler {
    my ($class, $handler, $opts) = @_;
    return 1 if $opts && $opts->{streaming};
    my $proto = prototype($handler);
    return 1 if defined $proto && $proto =~ /stream/i;
    eval {
        require B::Deparse;
        my $deparser = B::Deparse->new('-p', '-sC');
        my $code = $deparser->coderef2text($handler);
        return 1 if $code =~ /\$stream\s*->/;
    };
    return 0;
}

# ============================================================
# XS Code Generation - ALL instance methods generated in C
# ============================================================

sub generate_c_code {
    my ($class, $builder, $opts) = @_;
    $opts //= {};
    my $max = $opts->{max_streams} // MAX_STREAMS;
    
    # sys/uio.h is POSIX-only (writev + struct iovec live there).
    # On Windows we fall back to multiple send() calls (see
    # stream_write_chunk_http1 below). Winsock headers are pulled in
    # by Hypersonic::JIT::Util / Hypersonic::Socket already.
    $builder->line('#ifndef _WIN32')
      ->line('#include <sys/uio.h>')
      ->line('#endif')
      ->blank;
    
    $class->gen_stream_registry($builder, $max);
    $class->gen_status_text($builder);
    $class->gen_stream_start_c($builder);
    $class->gen_stream_write_chunk_c($builder);
    $class->gen_stream_end_c($builder);
    $class->gen_stream_reset_c($builder);
    
    # XS instance methods
    $class->gen_xs_new($builder);
    $class->gen_xs_fd($builder);
    $class->gen_xs_protocol($builder);
    $class->gen_xs_state($builder);
    $class->gen_xs_chunks_sent($builder);
    $class->gen_xs_is_started($builder);
    $class->gen_xs_is_finished($builder);
    $class->gen_xs_headers($builder);
    $class->gen_xs_content_type($builder);
    $class->gen_xs_write($builder);
    $class->gen_xs_end($builder);
    $class->gen_xs_abort($builder);
    
    return $builder;
}

sub gen_stream_registry {
    my ($class, $builder, $max) = @_;
    
    $builder->comment('Stream registry - O(1) lookup by fd')
      ->line('#define STREAM_MAX ' . $max)
      ->line('#define STREAM_STATE_INIT     0')
      ->line('#define STREAM_STATE_STARTED  1')

lib/Hypersonic/Stream.pm  view on Meta::CPAN

      ->xs_end
      ->blank;
}

sub gen_xs_chunks_sent {
    my ($class, $builder) = @_;
    
    $builder->xs_function('xs_stream_chunks_sent')
      ->xs_preamble
      ->check_items(1, 1, '$stream->chunks_sent')
      ->line('int fd = SvIV(SvRV(ST(0)));')
      ->if('fd < 0 || fd >= STREAM_MAX')
        ->line('XSRETURN_IV(0);')
      ->endif
      ->line('XSRETURN_IV(stream_registry[fd].chunks_sent);')
      ->xs_end
      ->blank;
}

sub gen_xs_is_started {
    my ($class, $builder) = @_;
    
    $builder->xs_function('xs_stream_is_started')
      ->xs_preamble
      ->check_items(1, 1, '$stream->is_started')
      ->line('int fd = SvIV(SvRV(ST(0)));')
      ->if('stream_registry[fd].state >= STREAM_STATE_STARTED')
        ->line('XSRETURN_YES;')
      ->else
        ->line('XSRETURN_NO;')
      ->endif
      ->xs_end
      ->blank;
}

sub gen_xs_is_finished {
    my ($class, $builder) = @_;
    
    $builder->xs_function('xs_stream_is_finished')
      ->xs_preamble
      ->check_items(1, 1, '$stream->is_finished')
      ->line('int fd = SvIV(SvRV(ST(0)));')
      ->if('stream_registry[fd].state >= STREAM_STATE_FINISHED')
        ->line('XSRETURN_YES;')
      ->else
        ->line('XSRETURN_NO;')
      ->endif
      ->xs_end
      ->blank;
}

sub gen_xs_headers {
    my ($class, $builder) = @_;
    
    $builder->xs_function('xs_stream_headers')
      ->xs_preamble
      ->line('int fd = SvIV(SvRV(ST(0)));')
      ->line('StreamState* s = &stream_registry[fd];')
      ->blank
      ->if('s->state != STREAM_STATE_INIT')
        ->line('croak("Cannot set headers after streaming started");')
      ->endif
      ->blank
      ->if('items >= 2')
        ->line('s->status = SvIV(ST(1));')
      ->endif
      ->blank
      ->line('s->extra_headers[0] = \'\\0\';')
      ->if('items >= 3 && SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVHV')
        ->line('HV* hv = (HV*)SvRV(ST(2));')
        ->line('int extra_pos = 0;')
        ->blank
        ->comment('Extract Content-Type')
        ->line('SV** ct = hv_fetchs(hv, "Content-Type", 0);')
        ->if('!ct')
          ->line('ct = hv_fetchs(hv, "content-type", 0);')
        ->endif
        ->if('ct && *ct')
          ->line('STRLEN len;')
          ->line('const char* val = SvPV(*ct, len);')
          ->if('len < sizeof(s->content_type)')
            ->line('memcpy(s->content_type, val, len);')
            ->line('s->content_type[len] = \'\\0\';')
          ->endif
        ->endif
        ->blank
        ->comment('Extract other headers (Cache-Control, Connection, X-Accel-Buffering)')
        ->line('SV** cc = hv_fetchs(hv, "Cache-Control", 0);')
        ->if('cc && *cc')
          ->line('STRLEN len;')
          ->line('const char* val = SvPV(*cc, len);')
          ->line('extra_pos += snprintf(s->extra_headers + extra_pos,')
          ->line('    sizeof(s->extra_headers) - extra_pos, "Cache-Control: %s\\r\\n", val);')
        ->endif
        ->line('SV** conn = hv_fetchs(hv, "Connection", 0);')
        ->if('conn && *conn')
          ->line('STRLEN len;')
          ->line('const char* val = SvPV(*conn, len);')
          ->line('extra_pos += snprintf(s->extra_headers + extra_pos,')
          ->line('    sizeof(s->extra_headers) - extra_pos, "Connection: %s\\r\\n", val);')
        ->endif
        ->line('SV** xab = hv_fetchs(hv, "X-Accel-Buffering", 0);')
        ->if('xab && *xab')
          ->line('STRLEN len;')
          ->line('const char* val = SvPV(*xab, len);')
          ->line('extra_pos += snprintf(s->extra_headers + extra_pos,')
          ->line('    sizeof(s->extra_headers) - extra_pos, "X-Accel-Buffering: %s\\r\\n", val);')
        ->endif
      ->endif
      ->blank
      ->line('ST(0) = ST(0);')
      ->line('XSRETURN(1);')
      ->xs_end
      ->blank;
}

sub gen_xs_content_type {
    my ($class, $builder) = @_;
    
    $builder->xs_function('xs_stream_content_type')
      ->xs_preamble
      ->if('items != 2')
        ->line('croak("Usage: $stream->content_type(type)");')
      ->endif
      ->line('int fd = SvIV(SvRV(ST(0)));')
      ->line('StreamState* s = &stream_registry[fd];')
      ->blank
      ->if('s->state != STREAM_STATE_INIT')
        ->line('croak("Cannot set content_type after streaming started");')
      ->endif
      ->blank
      ->line('STRLEN len;')
      ->line('const char* ct = SvPV(ST(1), len);')
      ->if('len < sizeof(s->content_type)')
        ->line('memcpy(s->content_type, ct, len);')
        ->line('s->content_type[len] = \'\\0\';')
      ->endif
      ->blank
      ->line('ST(0) = ST(0);')
      ->line('XSRETURN(1);')
      ->xs_end
      ->blank;
}

sub gen_xs_write {
    my ($class, $builder) = @_;
    
    $builder->xs_function('xs_stream_write')
      ->xs_preamble
      ->if('items != 2')
        ->line('croak("Usage: $stream->write(data)");')
      ->endif
      ->line('int fd = SvIV(SvRV(ST(0)));')
      ->line('StreamState* s = &stream_registry[fd];')
      ->blank
      ->line('STRLEN len;')
      ->line('const char* data = SvPV(ST(1), len);')
      ->if('len == 0')
        ->line('ST(0) = ST(0);')
        ->line('XSRETURN(1);')
      ->endif
      ->blank
      ->if('s->state == STREAM_STATE_INIT && !s->http2')
        ->line('stream_start_http1(fd);')
      ->endif
      ->blank
      ->if('!s->http2')
        ->line('stream_write_chunk_http1(fd, data, len);')
      ->endif
      ->blank
      ->line('ST(0) = ST(0);')
      ->line('XSRETURN(1);')
      ->xs_end
      ->blank;
}

sub gen_xs_end {
    my ($class, $builder) = @_;
    
    $builder->xs_function('xs_stream_end')
      ->xs_preamble
      ->line('int fd = SvIV(SvRV(ST(0)));')
      ->line('StreamState* s = &stream_registry[fd];')
      ->blank
      ->if('s->state >= STREAM_STATE_FINISHED')
        ->line('ST(0) = ST(0);')
        ->line('XSRETURN(1);')
      ->endif
      ->blank



( run in 1.587 second using v1.01-cache-2.11-cpan-140bd7fdf52 )