Hypersonic

 view release on metacpan or  search on metacpan

lib/Hypersonic/SSE.pm  view on Meta::CPAN

package Hypersonic::SSE;
use strict;
use warnings;
use 5.010;

# Hypersonic::SSE - High-level Server-Sent Events API
#
# Wraps the streaming infrastructure to provide a clean SSE interface.
# Automatically handles headers, event formatting, and keepalives.
# Uses JIT-compiled XS for performance.

our $VERSION = '0.17';

use constant {
    STATE_INIT     => 0,
    STATE_STARTED  => 1,
    STATE_FINISHED => 2,
};
use constant MAX_SSE_INSTANCES => 65536;
use constant DEFAULT_KEEPALIVE => 30;

use Hypersonic::Protocol::SSE;

=head1 NAME

Hypersonic::SSE - Server-Sent Events streaming interface

=head1 SYNOPSIS

    $app->get('/events' => sub {
        my ($req, $stream) = @_;

        my $sse = Hypersonic::SSE->new($stream);

        $sse->event(
            type => 'message',
            data => 'Hello World!',
        );

        $sse->event(
            type => 'update',
            data => '{"count": 42}',
            id   => '123',
        );

        $sse->close();
    }, { streaming => 1 });

=head1 DESCRIPTION

Hypersonic::SSE provides a high-level API for sending Server-Sent Events.
It wraps a Hypersonic::Stream object and handles SSE-specific formatting,
headers, and keepalives.

=cut

# ============================================================
# XS Code Generation - ALL instance methods generated in C
# ============================================================

sub generate_c_code {
    my ($class, $builder, $opts) = @_;
    $opts //= {};
    my $max = $opts->{max_sse_instances} // MAX_SSE_INSTANCES;

    $builder->line('#include <time.h>')
      ->blank;

    $class->gen_sse_registry($builder, $max);
    $class->gen_sse_reset($builder);
    $class->gen_sse_format_event($builder);
    $class->gen_sse_format_keepalive($builder);
    $class->gen_sse_format_retry($builder);
    $class->gen_sse_format_comment($builder);

    # XS instance methods
    $class->gen_xs_new($builder);
    $class->gen_xs_stream($builder);
    $class->gen_xs_is_started($builder);
    $class->gen_xs_event_count($builder);
    $class->gen_xs_last_event_time($builder);
    $class->gen_xs_needs_keepalive($builder);
    $class->gen_xs_event($builder);
    $class->gen_xs_data($builder);
    $class->gen_xs_retry($builder);
    $class->gen_xs_keepalive($builder);
    $class->gen_xs_comment($builder);
    $class->gen_xs_close($builder);

    return $builder;
}

sub gen_sse_registry {
    my ($class, $builder, $max) = @_;

    $builder->comment('SSE instance registry - stores SSE state')
      ->line('#define SSE_MAX ' . $max)
      ->line('#define SSE_STATE_INIT     0')
      ->line('#define SSE_STATE_STARTED  1')
      ->line('#define SSE_STATE_FINISHED 2')
      ->blank
      ->line('typedef struct {')
      ->line('    SV* stream_sv;')
      ->line('    int state;')
      ->line('    int event_count;')
      ->line('    time_t last_event_time;')
      ->line('    int keepalive_interval;')
      ->line('} SSEState;')
      ->blank
      ->line('static SSEState sse_registry[SSE_MAX];')
      ->line('static int sse_next_id = 0;')
      ->blank;
}

sub gen_sse_reset {
    my ($class, $builder) = @_;

    $builder->line('static void sse_reset(int id) {')
      ->line('    if (sse_registry[id].stream_sv) {')
      ->line('        SvREFCNT_dec(sse_registry[id].stream_sv);')
      ->line('    }')
      ->line('    memset(&sse_registry[id], 0, sizeof(SSEState));')
      ->line('    sse_registry[id].last_event_time = time(NULL);')
      ->line('    sse_registry[id].keepalive_interval = 30;')
      ->line('}')
      ->blank;
}

sub gen_sse_format_event {
    my ($class, $builder) = @_;

    $builder->comment('SSE: Format an event into buffer')
      ->comment('Returns bytes written')
      ->line('static size_t sse_format_event(char* buf, size_t buf_size,')
      ->line('                               const char* event_type,')
      ->line('                               const char* data,')
      ->line('                               const char* id) {')
      ->line('    size_t pos = 0;')
      ->blank
      ->comment('Event type (optional)')
      ->if('event_type && event_type[0]')
        ->line('pos += snprintf(buf + pos, buf_size - pos, "event: %s\\n", event_type);')
      ->endif
      ->blank
      ->comment('ID (optional)')
      ->if('id && id[0]')
        ->line('pos += snprintf(buf + pos, buf_size - pos, "id: %s\\n", id);')
      ->endif
      ->blank
      ->comment('Data (required) - handle multiline')
      ->if('data')
        ->line('const char* line_start = data;')
        ->line('const char* p = data;')
        ->while('*p')
          ->if('*p == \'\\n\'')
            ->line('pos += snprintf(buf + pos, buf_size - pos, "data: %.*s\\n",')
            ->line('               (int)(p - line_start), line_start);')
            ->line('line_start = p + 1;')
          ->endif
          ->line('p++;')
        ->endloop
        ->comment('Last line (or only line if no newlines)')
        ->if('line_start <= p && *line_start')
          ->line('pos += snprintf(buf + pos, buf_size - pos, "data: %s\\n", line_start);')
        ->elsif('line_start == data')
          ->comment('Empty string - still need data line')
          ->line('pos += snprintf(buf + pos, buf_size - pos, "data: \\n");')
        ->endif
      ->endif
      ->blank
      ->comment('End of event (blank line)')
      ->if('pos < buf_size')
        ->line('buf[pos++] = \'\\n\';')
      ->endif
      ->blank
      ->line('return pos;')
      ->line('}')
      ->blank;

    return $builder;
}

sub gen_sse_format_keepalive {
    my ($class, $builder) = @_;

    $builder->comment('SSE: Format keepalive comment')
      ->line('static size_t sse_format_keepalive(char* buf, size_t buf_size) {')
      ->line('    return snprintf(buf, buf_size, ": keepalive\\n\\n");')
      ->line('}')
      ->blank;

    return $builder;
}

sub gen_sse_format_retry {
    my ($class, $builder) = @_;

    $builder->comment('SSE: Format retry directive')
      ->line('static size_t sse_format_retry(char* buf, size_t buf_size, int ms) {')
      ->line('    return snprintf(buf, buf_size, "retry: %d\\n\\n", ms);')
      ->line('}')
      ->blank;

    return $builder;
}

sub gen_sse_format_comment {
    my ($class, $builder) = @_;

    $builder->comment('SSE: Format comment')
      ->line('static size_t sse_format_comment(char* buf, size_t buf_size, const char* text) {')
      ->line('    return snprintf(buf, buf_size, ": %s\\n\\n", text);')
      ->line('}')
      ->blank;

    return $builder;
}

# XS: new($stream, %opts) - returns blessed scalar
sub gen_xs_new {
    my ($class, $builder) = @_;

    $builder->xs_function('xs_sse_new')
      ->xs_preamble
      ->line('int id = sse_next_id;')
      ->line('sse_next_id = (sse_next_id + 1) % SSE_MAX;')
      ->blank
      ->line('sse_reset(id);')
      ->blank
      ->comment('First arg after class is the stream object')
      ->if('items >= 2')
        ->line('sse_registry[id].stream_sv = newSVsv(ST(1));')
      ->else
        ->line('croak("Hypersonic::SSE->new requires a stream argument");')
      ->endif
      ->blank
      ->comment('Parse optional hash args: keepalive => N')
      ->for('int i = 2', 'i < items', 'i += 2')
        ->if('i + 1 < items')
          ->line('STRLEN klen;')
          ->line('const char* key = SvPV(ST(i), klen);')
          ->if('klen == 9 && strncmp(key, "keepalive", 9) == 0')
            ->line('sse_registry[id].keepalive_interval = SvIV(ST(i + 1));')
          ->endif
        ->endif
      ->endfor
      ->blank
      ->line('SV* id_sv = newSViv(id);')
      ->line('SV* ref = newRV_noinc(id_sv);')
      ->line('sv_bless(ref, gv_stashpv("Hypersonic::SSE", GV_ADD));')
      ->line('ST(0) = sv_2mortal(ref);')
      ->line('XSRETURN(1);')
      ->xs_end
      ->blank;
}

sub gen_xs_stream {
    my ($class, $builder) = @_;

    $builder->xs_function('xs_sse_stream')
      ->xs_preamble
      ->check_items(1, 1, '$sse->stream')
      ->line('int id = SvIV(SvRV(ST(0)));')
      ->if('id < 0 || id >= SSE_MAX')
        ->line('XSRETURN_UNDEF;')
      ->endif
      ->if('sse_registry[id].stream_sv')
        ->line('ST(0) = sv_2mortal(newSVsv(sse_registry[id].stream_sv));')
        ->line('XSRETURN(1);')
      ->endif
      ->line('XSRETURN_UNDEF;')
      ->xs_end
      ->blank;
}

sub gen_xs_is_started {
    my ($class, $builder) = @_;

    $builder->xs_function('xs_sse_is_started')
      ->xs_preamble
      ->check_items(1, 1, '$sse->is_started')
      ->line('int id = SvIV(SvRV(ST(0)));')
      ->if('id < 0 || id >= SSE_MAX')
        ->line('XSRETURN_NO;')
      ->endif
      ->if('sse_registry[id].state >= SSE_STATE_STARTED')
        ->line('XSRETURN_YES;')
      ->else
        ->line('XSRETURN_NO;')
      ->endif
      ->xs_end
      ->blank;
}

sub gen_xs_event_count {
    my ($class, $builder) = @_;

    $builder->xs_function('xs_sse_event_count')
      ->xs_preamble
      ->check_items(1, 1, '$sse->event_count')
      ->line('int id = SvIV(SvRV(ST(0)));')
      ->if('id < 0 || id >= SSE_MAX')
        ->line('XSRETURN_IV(0);')
      ->endif
      ->line('XSRETURN_IV(sse_registry[id].event_count);')
      ->xs_end
      ->blank;
}

sub gen_xs_last_event_time {
    my ($class, $builder) = @_;

    $builder->xs_function('xs_sse_last_event_time')
      ->xs_preamble
      ->check_items(1, 1, '$sse->last_event_time')
      ->line('int id = SvIV(SvRV(ST(0)));')
      ->if('id < 0 || id >= SSE_MAX')
        ->line('XSRETURN_IV(0);')
      ->endif
      ->line('XSRETURN_IV((IV)sse_registry[id].last_event_time);')
      ->xs_end
      ->blank;
}

sub gen_xs_needs_keepalive {
    my ($class, $builder) = @_;

    $builder->xs_function('xs_sse_needs_keepalive')
      ->xs_preamble
      ->check_items(1, 1, '$sse->needs_keepalive')
      ->line('int id = SvIV(SvRV(ST(0)));')
      ->if('id < 0 || id >= SSE_MAX')
        ->line('XSRETURN_NO;')
      ->endif
      ->blank
      ->comment('Check if stream is finished')
      ->line('SSEState* s = &sse_registry[id];')
      ->if('s->state >= SSE_STATE_FINISHED')
        ->line('XSRETURN_NO;')
      ->endif
      ->blank
      ->line('time_t now = time(NULL);')
      ->line('time_t elapsed = now - s->last_event_time;')
      ->if('elapsed >= s->keepalive_interval')
        ->line('XSRETURN_YES;')
      ->else
        ->line('XSRETURN_NO;')
      ->endif
      ->xs_end
      ->blank;
}

sub gen_xs_event {
    my ($class, $builder) = @_;

    $builder->xs_function('xs_sse_event')
      ->xs_preamble
      ->line('int id = SvIV(SvRV(ST(0)));')
      ->if('id < 0 || id >= SSE_MAX')
        ->line('ST(0) = ST(0);')
        ->line('XSRETURN(1);')
      ->endif
      ->blank
      ->line('SSEState* s = &sse_registry[id];')
      ->blank
      ->comment('Check if stream is finished by calling is_finished method')
      ->if('s->stream_sv')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('PUTBACK;')
        ->line('int count = call_method("is_finished", G_SCALAR);')
        ->line('SPAGAIN;')
        ->line('int is_finished = 0;')
        ->if('count > 0')
          ->comment('SvTRUE is a multi-evaluation macro on older perls; pop into a temp first')
          ->line('SV* _ret_sv = POPs;')
          ->line('is_finished = SvTRUE(_ret_sv);')
        ->endif
        ->line('PUTBACK;')
        ->line('FREETMPS;')
        ->line('LEAVE;')
        ->if('is_finished')
          ->line('ST(0) = ST(0);')
          ->line('XSRETURN(1);')
        ->endif
      ->endif
      ->blank
      ->comment('Start SSE if not started - call headers method on stream')
      ->if('s->state == SSE_STATE_INIT && s->stream_sv')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('XPUSHs(sv_2mortal(newSViv(200)));')
        ->comment('Create headers hash')
        ->line('HV* hv = newHV();')
        ->line('(void)hv_store(hv, "Content-Type", 12, newSVpv("text/event-stream", 0), 0);')
        ->line('(void)hv_store(hv, "Cache-Control", 13, newSVpv("no-cache", 0), 0);')
        ->line('(void)hv_store(hv, "Connection", 10, newSVpv("keep-alive", 0), 0);')
        ->line('(void)hv_store(hv, "X-Accel-Buffering", 17, newSVpv("no", 0), 0);')
        ->line('XPUSHs(sv_2mortal(newRV_noinc((SV*)hv)));')
        ->line('PUTBACK;')
        ->line('call_method("headers", G_DISCARD);')
        ->line('FREETMPS;')
        ->line('LEAVE;')
        ->line('s->state = SSE_STATE_STARTED;')
      ->endif
      ->blank
      ->comment('Parse event options from hash args')
      ->line('const char* event_type = NULL;')
      ->line('const char* data = "";')
      ->line('const char* event_id = NULL;')
      ->blank
      ->for('int i = 1', 'i < items', 'i += 2')
        ->if('i + 1 < items')
          ->line('STRLEN klen;')
          ->line('const char* key = SvPV(ST(i), klen);')
          ->if('klen == 4 && strncmp(key, "type", 4) == 0')
            ->line('event_type = SvPV_nolen(ST(i + 1));')
          ->endif
          ->if('klen == 4 && strncmp(key, "data", 4) == 0')
            ->line('data = SvPV_nolen(ST(i + 1));')
          ->endif
          ->if('klen == 2 && strncmp(key, "id", 2) == 0')
            ->line('event_id = SvPV_nolen(ST(i + 1));')
          ->endif
        ->endif
      ->endfor
      ->blank
      ->comment('Format event')
      ->line('char buf[8192];')
      ->line('size_t len = sse_format_event(buf, sizeof(buf), event_type, data, event_id);')
      ->blank
      ->comment('Write to stream')
      ->if('s->stream_sv && len > 0')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('XPUSHs(sv_2mortal(newSVpvn(buf, len)));')
        ->line('PUTBACK;')
        ->line('call_method("write", G_DISCARD);')
        ->line('FREETMPS;')
        ->line('LEAVE;')
      ->endif
      ->blank
      ->line('s->event_count++;')
      ->line('s->last_event_time = time(NULL);')
      ->blank
      ->line('ST(0) = ST(0);')
      ->line('XSRETURN(1);')
      ->xs_end
      ->blank;
}

sub gen_xs_data {
    my ($class, $builder) = @_;

    $builder->xs_function('xs_sse_data')
      ->xs_preamble
      ->if('items != 2')
        ->line('croak("Usage: $sse->data(payload)");')
      ->endif
      ->line('int id = SvIV(SvRV(ST(0)));')
      ->if('id < 0 || id >= SSE_MAX')
        ->line('ST(0) = ST(0);')
        ->line('XSRETURN(1);')
      ->endif
      ->blank
      ->line('SSEState* s = &sse_registry[id];')
      ->blank
      ->comment('Check if stream is finished')
      ->if('s->stream_sv')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('PUTBACK;')
        ->line('int count = call_method("is_finished", G_SCALAR);')
        ->line('SPAGAIN;')
        ->line('int is_finished = 0;')
        ->if('count > 0')
          ->comment('SvTRUE is a multi-evaluation macro on older perls; pop into a temp first')
          ->line('SV* _ret_sv = POPs;')
          ->line('is_finished = SvTRUE(_ret_sv);')
        ->endif
        ->line('PUTBACK;')
        ->line('FREETMPS;')
        ->line('LEAVE;')
        ->if('is_finished')
          ->line('ST(0) = ST(0);')
          ->line('XSRETURN(1);')
        ->endif
      ->endif
      ->blank
      ->comment('Start SSE if not started')
      ->if('s->state == SSE_STATE_INIT && s->stream_sv')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('XPUSHs(sv_2mortal(newSViv(200)));')
        ->line('HV* hv = newHV();')
        ->line('(void)hv_store(hv, "Content-Type", 12, newSVpv("text/event-stream", 0), 0);')
        ->line('(void)hv_store(hv, "Cache-Control", 13, newSVpv("no-cache", 0), 0);')
        ->line('(void)hv_store(hv, "Connection", 10, newSVpv("keep-alive", 0), 0);')
        ->line('(void)hv_store(hv, "X-Accel-Buffering", 17, newSVpv("no", 0), 0);')
        ->line('XPUSHs(sv_2mortal(newRV_noinc((SV*)hv)));')
        ->line('PUTBACK;')
        ->line('call_method("headers", G_DISCARD);')
        ->line('FREETMPS;')
        ->line('LEAVE;')
        ->line('s->state = SSE_STATE_STARTED;')
      ->endif
      ->blank
      ->comment('Format data-only event')
      ->line('const char* data = SvPV_nolen(ST(1));')
      ->line('char buf[8192];')
      ->line('size_t len = sse_format_event(buf, sizeof(buf), NULL, data, NULL);')
      ->blank
      ->comment('Write to stream')
      ->if('s->stream_sv && len > 0')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('XPUSHs(sv_2mortal(newSVpvn(buf, len)));')
        ->line('PUTBACK;')
        ->line('call_method("write", G_DISCARD);')
        ->line('FREETMPS;')
        ->line('LEAVE;')
      ->endif
      ->blank
      ->line('s->event_count++;')
      ->line('s->last_event_time = time(NULL);')
      ->blank
      ->line('ST(0) = ST(0);')
      ->line('XSRETURN(1);')
      ->xs_end
      ->blank;
}

sub gen_xs_retry {
    my ($class, $builder) = @_;

    $builder->xs_function('xs_sse_retry')
      ->xs_preamble
      ->if('items != 2')
        ->line('croak("Usage: $sse->retry(milliseconds)");')
      ->endif
      ->line('int id = SvIV(SvRV(ST(0)));')
      ->if('id < 0 || id >= SSE_MAX')
        ->line('ST(0) = ST(0);')
        ->line('XSRETURN(1);')
      ->endif
      ->blank
      ->line('SSEState* s = &sse_registry[id];')
      ->blank
      ->comment('Check if stream is finished')
      ->if('s->stream_sv')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('PUTBACK;')
        ->line('int count = call_method("is_finished", G_SCALAR);')
        ->line('SPAGAIN;')
        ->line('int is_finished = 0;')
        ->if('count > 0')
          ->comment('SvTRUE is a multi-evaluation macro on older perls; pop into a temp first')
          ->line('SV* _ret_sv = POPs;')
          ->line('is_finished = SvTRUE(_ret_sv);')
        ->endif
        ->line('PUTBACK;')
        ->line('FREETMPS;')
        ->line('LEAVE;')
        ->if('is_finished')
          ->line('ST(0) = ST(0);')
          ->line('XSRETURN(1);')
        ->endif
      ->endif
      ->blank
      ->comment('Start SSE if not started')
      ->if('s->state == SSE_STATE_INIT && s->stream_sv')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('XPUSHs(sv_2mortal(newSViv(200)));')
        ->line('HV* hv = newHV();')
        ->line('(void)hv_store(hv, "Content-Type", 12, newSVpv("text/event-stream", 0), 0);')
        ->line('(void)hv_store(hv, "Cache-Control", 13, newSVpv("no-cache", 0), 0);')
        ->line('(void)hv_store(hv, "Connection", 10, newSVpv("keep-alive", 0), 0);')
        ->line('(void)hv_store(hv, "X-Accel-Buffering", 17, newSVpv("no", 0), 0);')
        ->line('XPUSHs(sv_2mortal(newRV_noinc((SV*)hv)));')
        ->line('PUTBACK;')
        ->line('call_method("headers", G_DISCARD);')
        ->line('FREETMPS;')
        ->line('LEAVE;')
        ->line('s->state = SSE_STATE_STARTED;')
      ->endif
      ->blank
      ->comment('Format retry directive')
      ->line('int ms = SvIV(ST(1));')
      ->line('char buf[64];')
      ->line('size_t len = sse_format_retry(buf, sizeof(buf), ms);')
      ->blank
      ->comment('Write to stream')
      ->if('s->stream_sv && len > 0')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('XPUSHs(sv_2mortal(newSVpvn(buf, len)));')
        ->line('PUTBACK;')
        ->line('call_method("write", G_DISCARD);')
        ->line('FREETMPS;')
        ->line('LEAVE;')
      ->endif
      ->blank
      ->line('ST(0) = ST(0);')
      ->line('XSRETURN(1);')
      ->xs_end
      ->blank;
}

sub gen_xs_keepalive {
    my ($class, $builder) = @_;

    $builder->xs_function('xs_sse_keepalive')
      ->xs_preamble
      ->check_items(1, 1, '$sse->keepalive')
      ->line('int id = SvIV(SvRV(ST(0)));')
      ->if('id < 0 || id >= SSE_MAX')
        ->line('ST(0) = ST(0);')
        ->line('XSRETURN(1);')
      ->endif
      ->blank
      ->line('SSEState* s = &sse_registry[id];')
      ->blank
      ->comment('Check if stream is finished')
      ->if('s->stream_sv')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('PUTBACK;')
        ->line('int count = call_method("is_finished", G_SCALAR);')
        ->line('SPAGAIN;')
        ->line('int is_finished = 0;')
        ->if('count > 0')
          ->comment('SvTRUE is a multi-evaluation macro on older perls; pop into a temp first')
          ->line('SV* _ret_sv = POPs;')
          ->line('is_finished = SvTRUE(_ret_sv);')
        ->endif
        ->line('PUTBACK;')
        ->line('FREETMPS;')
        ->line('LEAVE;')
        ->if('is_finished')
          ->line('ST(0) = ST(0);')
          ->line('XSRETURN(1);')
        ->endif
      ->endif
      ->blank
      ->comment('Start SSE if not started')
      ->if('s->state == SSE_STATE_INIT && s->stream_sv')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('XPUSHs(sv_2mortal(newSViv(200)));')
        ->line('HV* hv = newHV();')
        ->line('(void)hv_store(hv, "Content-Type", 12, newSVpv("text/event-stream", 0), 0);')
        ->line('(void)hv_store(hv, "Cache-Control", 13, newSVpv("no-cache", 0), 0);')
        ->line('(void)hv_store(hv, "Connection", 10, newSVpv("keep-alive", 0), 0);')
        ->line('(void)hv_store(hv, "X-Accel-Buffering", 17, newSVpv("no", 0), 0);')
        ->line('XPUSHs(sv_2mortal(newRV_noinc((SV*)hv)));')
        ->line('PUTBACK;')
        ->line('call_method("headers", G_DISCARD);')
        ->line('FREETMPS;')
        ->line('LEAVE;')
        ->line('s->state = SSE_STATE_STARTED;')
      ->endif
      ->blank
      ->comment('Format keepalive')
      ->line('char buf[32];')
      ->line('size_t len = sse_format_keepalive(buf, sizeof(buf));')
      ->blank
      ->comment('Write to stream')
      ->if('s->stream_sv && len > 0')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('XPUSHs(sv_2mortal(newSVpvn(buf, len)));')
        ->line('PUTBACK;')
        ->line('call_method("write", G_DISCARD);')
        ->line('FREETMPS;')
        ->line('LEAVE;')
      ->endif
      ->blank
      ->line('s->last_event_time = time(NULL);')
      ->blank
      ->line('ST(0) = ST(0);')
      ->line('XSRETURN(1);')
      ->xs_end
      ->blank;
}

sub gen_xs_comment {
    my ($class, $builder) = @_;

    $builder->xs_function('xs_sse_comment')
      ->xs_preamble
      ->if('items != 2')
        ->line('croak("Usage: $sse->comment(text)");')
      ->endif
      ->line('int id = SvIV(SvRV(ST(0)));')
      ->if('id < 0 || id >= SSE_MAX')
        ->line('ST(0) = ST(0);')
        ->line('XSRETURN(1);')
      ->endif
      ->blank
      ->line('SSEState* s = &sse_registry[id];')
      ->blank
      ->comment('Check if stream is finished')
      ->if('s->stream_sv')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('PUTBACK;')
        ->line('int count = call_method("is_finished", G_SCALAR);')
        ->line('SPAGAIN;')
        ->line('int is_finished = 0;')
        ->if('count > 0')
          ->comment('SvTRUE is a multi-evaluation macro on older perls; pop into a temp first')
          ->line('SV* _ret_sv = POPs;')
          ->line('is_finished = SvTRUE(_ret_sv);')
        ->endif
        ->line('PUTBACK;')
        ->line('FREETMPS;')
        ->line('LEAVE;')
        ->if('is_finished')
          ->line('ST(0) = ST(0);')
          ->line('XSRETURN(1);')
        ->endif
      ->endif
      ->blank
      ->comment('Start SSE if not started')
      ->if('s->state == SSE_STATE_INIT && s->stream_sv')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('XPUSHs(sv_2mortal(newSViv(200)));')
        ->line('HV* hv = newHV();')
        ->line('(void)hv_store(hv, "Content-Type", 12, newSVpv("text/event-stream", 0), 0);')
        ->line('(void)hv_store(hv, "Cache-Control", 13, newSVpv("no-cache", 0), 0);')
        ->line('(void)hv_store(hv, "Connection", 10, newSVpv("keep-alive", 0), 0);')
        ->line('(void)hv_store(hv, "X-Accel-Buffering", 17, newSVpv("no", 0), 0);')
        ->line('XPUSHs(sv_2mortal(newRV_noinc((SV*)hv)));')
        ->line('PUTBACK;')
        ->line('call_method("headers", G_DISCARD);')
        ->line('FREETMPS;')
        ->line('LEAVE;')
        ->line('s->state = SSE_STATE_STARTED;')
      ->endif
      ->blank
      ->comment('Format comment')
      ->line('const char* text = SvPV_nolen(ST(1));')
      ->line('char buf[4096];')
      ->line('size_t len = sse_format_comment(buf, sizeof(buf), text);')
      ->blank
      ->comment('Write to stream')
      ->if('s->stream_sv && len > 0')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('XPUSHs(sv_2mortal(newSVpvn(buf, len)));')
        ->line('PUTBACK;')
        ->line('call_method("write", G_DISCARD);')
        ->line('FREETMPS;')
        ->line('LEAVE;')
      ->endif
      ->blank
      ->line('ST(0) = ST(0);')
      ->line('XSRETURN(1);')
      ->xs_end
      ->blank;
}

sub gen_xs_close {
    my ($class, $builder) = @_;

    $builder->xs_function('xs_sse_close')
      ->xs_preamble
      ->check_items(1, 1, '$sse->close')
      ->line('int id = SvIV(SvRV(ST(0)));')
      ->if('id < 0 || id >= SSE_MAX')
        ->line('ST(0) = ST(0);')
        ->line('XSRETURN(1);')
      ->endif
      ->blank
      ->line('SSEState* s = &sse_registry[id];')
      ->blank
      ->comment('Check if stream is finished')
      ->if('s->stream_sv')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('PUTBACK;')
        ->line('int count = call_method("is_finished", G_SCALAR);')
        ->line('SPAGAIN;')
        ->line('int is_finished = 0;')
        ->if('count > 0')
          ->comment('SvTRUE is a multi-evaluation macro on older perls; pop into a temp first')
          ->line('SV* _ret_sv = POPs;')
          ->line('is_finished = SvTRUE(_ret_sv);')
        ->endif
        ->line('PUTBACK;')
        ->line('FREETMPS;')
        ->line('LEAVE;')
        ->if('is_finished')
          ->line('ST(0) = ST(0);')
          ->line('XSRETURN(1);')
        ->endif
      ->endif
      ->blank
      ->comment('Call end on stream')
      ->if('s->stream_sv')
        ->line('dSP;')
        ->line('ENTER;')
        ->line('SAVETMPS;')
        ->line('PUSHMARK(SP);')
        ->line('XPUSHs(s->stream_sv);')
        ->line('PUTBACK;')
        ->line('call_method("end", G_DISCARD);')
        ->line('FREETMPS;')
        ->line('LEAVE;')
      ->endif
      ->blank
      ->line('s->state = SSE_STATE_FINISHED;')
      ->blank
      ->line('ST(0) = ST(0);')
      ->line('XSRETURN(1);')
      ->xs_end
      ->blank;
}

sub get_xs_functions {
    return {
        'Hypersonic::SSE::new'             => { source => 'xs_sse_new', is_xs_native => 1 },
        'Hypersonic::SSE::stream'          => { source => 'xs_sse_stream', is_xs_native => 1 },
        'Hypersonic::SSE::is_started'      => { source => 'xs_sse_is_started', is_xs_native => 1 },
        'Hypersonic::SSE::event_count'     => { source => 'xs_sse_event_count', is_xs_native => 1 },
        'Hypersonic::SSE::last_event_time' => { source => 'xs_sse_last_event_time', is_xs_native => 1 },
        'Hypersonic::SSE::needs_keepalive' => { source => 'xs_sse_needs_keepalive', is_xs_native => 1 },
        'Hypersonic::SSE::event'           => { source => 'xs_sse_event', is_xs_native => 1 },
        'Hypersonic::SSE::data'            => { source => 'xs_sse_data', is_xs_native => 1 },
        'Hypersonic::SSE::retry'           => { source => 'xs_sse_retry', is_xs_native => 1 },
        'Hypersonic::SSE::keepalive'       => { source => 'xs_sse_keepalive', is_xs_native => 1 },
        'Hypersonic::SSE::comment'         => { source => 'xs_sse_comment', is_xs_native => 1 },
        'Hypersonic::SSE::close'           => { source => 'xs_sse_close', is_xs_native => 1 },
    };
}

1;

__END__

=head1 CLIENT EXAMPLE

JavaScript:

    const events = new EventSource('/events');
    
    events.onmessage = (e) => {
        console.log('Message:', e.data);
    };
    
    events.addEventListener('update', (e) => {
        console.log('Update:', JSON.parse(e.data));
        console.log('Event ID:', e.lastEventId);
    });
    
    events.onerror = (e) => {
        if (e.target.readyState === EventSource.CLOSED) {
            console.log('Connection closed');
        } else {
            console.log('Error, will auto-reconnect');
        }
    };

=head1 RECONNECTION

The browser automatically reconnects when the connection drops.
Use the C<id> field to enable resumption:

    $app->get('/events' => sub {
        my ($req, $stream) = @_;
        
        my $last_id = $req->header('Last-Event-ID') // 0;
        my $sse = Hypersonic::SSE->new($stream);
        
        for my $id (($last_id + 1) .. 100) {
            $sse->event(
                type => 'update',
                data => "Event $id",
                id   => $id,
            );
        }
        
        $sse->close();
    }, { streaming => 1 });

=head1 SEE ALSO

L<Hypersonic::Stream>, L<Hypersonic::Protocol::SSE>

=head1 AUTHOR



( run in 1.596 second using v1.01-cache-2.11-cpan-5b529ec07f3 )