Acme-Parataxis
view release on metacpan or search on metacpan
t/009_http_tiny.t view on Meta::CPAN
use blib;
use Acme::Parataxis;
use HTTP::Tiny;
use IO::Socket::INET;
use Time::HiRes qw[time];
use Socket qw[SHUT_WR];
use POSIX ();
$|++;
#
package Acme::Parataxis::Test::HTTP {
use parent 'HTTP::Tiny';
sub _open_handle {
my ( $self, $request, $scheme, $host, $port, $peer ) = @_;
my $handle = Acme::Parataxis::Test::HTTP::Handle->new(
timeout => $self->{timeout},
SSL_options => $self->{SSL_options},
verify_SSL => $self->{verify_SSL},
);
return $handle->connect( $scheme, $host, $port, $peer );
}
sub request {
my ( $self, $method, $url, $args ) = @_;
$args //= {};
my $orig_cb = $args->{data_callback};
my $content = '';
$args->{data_callback} = sub {
my ( $data, $response ) = @_;
# diag 'Progress: Received ' . length($data) . " bytes for $url";
if ($orig_cb) {
return $orig_cb->( $data, $response );
}
$content .= $data;
return 1;
};
my $res = $self->SUPER::request( $method, $url, $args );
$res->{content} = $content unless $orig_cb;
return $res;
}
}
{
package Acme::Parataxis::Test::HTTP::Handle;
use parent -norequire, 'HTTP::Tiny::Handle';
sub _do_timeout {
my ( $self, $type, $timeout ) = @_;
$timeout //= $self->{timeout};
if ( $self->{fh} ) {
my $start = time();
while (1) {
# Immediate check using original select (0 timeout)
return 1 if $self->SUPER::_do_timeout( $type, 0 );
# Check for overall timeout
return 0 if ( time() - $start ) > $timeout;
# Suspend fiber and wait for background I/O check.
# await_* submits a job and yields 'WAITING'.
if ( $type eq 'read' ) {
Acme::Parataxis->await_read( $self->{fh}, 500 );
}
else {
Acme::Parataxis->await_write( $self->{fh}, 500 );
}
}
}
return $self->SUPER::_do_timeout( $type, 0 );
}
}
Acme::Parataxis::run(
sub {
my $listener = IO::Socket::INET->new(
Listen => 10,
LocalAddr => '127.0.0.1', # Force IPv4
LocalPort => 0, # Auto-assign port
Proto => 'tcp',
ReuseAddr => 1,
Blocking => 0
) or
die "Could not create listener: $!";
my $server_port = $listener->sockport;
diag "Mock server listening on 127.0.0.1:$server_port";
#
Acme::Parataxis->spawn(
sub {
while (1) {
# Wait for a connection
Acme::Parataxis->await_read($listener);
my $client = $listener->accept();
next unless $client;
$client->blocking(0);
# SPAWN a new fiber per connection for true concurrency
Acme::Parataxis->spawn(
sub {
# Drain the request headers from client
my $buffer = '';
while (1) {
my $bytes = sysread( $client, $buffer, 4096, length($buffer) );
last if $buffer =~ /\r?\n\r?\n/; # End of headers
if ( !defined $bytes ) {
last if $! != POSIX::EAGAIN && $! != POSIX::EWOULDBLOCK;
Acme::Parataxis->await_read( $client, 100 );
}
last if defined $bytes && $bytes == 0; # EOF
}
#
my $response = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 2\r\nConnection: close\r\n\r\nHI";
my $offset = 0;
my $len = length($response);
while ( $offset < $len ) {
my $written = syswrite( $client, $response, $len - $offset, $offset );
if ( defined $written ) {
$offset += $written;
}
elsif ( $! != POSIX::EAGAIN && $! != POSIX::EWOULDBLOCK ) {
( run in 2.881 seconds using v1.01-cache-2.11-cpan-f56aa216473 )