App-cdnget
view release on metacpan or search on metacpan
lib/App/cdnget/Worker.pm view on Meta::CPAN
}
$downloader = $App::cdnget::Downloader::uids{$uid};
};
$fh->binmode(":bytes") or $self->throw($!);
do
{
local ($/, $\) = ("\r\n")x2;
my $line;
my $buf;
my $empty = 1;
while (not $self->terminating)
{
threads->yield();
my $downloaderTerminated = ! $downloader || $downloader->terminated;
$line = $fh->getline;
unless (defined($line))
{
$self->throw($!) if $fh->error;
return if $downloaderTerminated;
my $pos = $fh->tell;
usleep(1*1000);
$fh->seek($pos, 0) or $self->throw($!);
next;
}
chomp $line;
unless ($line =~ /^(Client\-)/i)
{
if (not $out->print("$line\r\n"))
{
not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
return;
}
$empty = 0;
}
last unless $line;
}
while (not $self->terminating)
{
threads->yield();
my $downloaderTerminated = ! $downloader || $downloader->terminated;
my $len = $fh->read($buf, $App::cdnget::CHUNK_SIZE);
$self->throw($!) unless defined($len);
if ($len == 0)
{
return if $downloaderTerminated;
my $pos = $fh->tell;
usleep(1*1000);
$fh->seek($pos, 0) or $self->throw($!);
next;
}
if (not $out->write($buf, $len))
{
not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
return;
}
$empty = 0;
}
if ($empty)
{
if (not $out->print("Status: 404\r\n"))
{
not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
return;
}
}
};
return;
}
sub run
{
my $self = shift;
my $tid = threads->tid();
$self->tid = $tid;
do
{
lock($self);
cond_signal($self);
};
my $spare = 1;
my $accepting = 0;
eval
{
my ($in, $out, $err) = (IO::Handle->new(), IO::Handle->new(), IO::Handle->new());
my $env = {};
my $req = FCGI::Request($in, $out, $err, $env, $socket, FCGI::FAIL_ACCEPT_ON_INTR) or $self->throw($!);
wait_accept:
while (not $self->terminating)
{
$accepterSemaphore->down_timed(1);
do
{
lock($accepterCount);
last wait_accept unless $accepterCount >= $spareCount;
};
}
$spareSemaphore->up();
$spare = 0;
accepter_loop:
while (not $self->terminating)
{
threads->yield();
$workerSemaphore->up();
$accepting = 1;
my $accept;
do
{
lock($accepterCount);
$accepterCount++;
};
eval { $accept = $req->Accept() };
do
{
lock($accepterCount);
$accepterCount--;
};
( run in 2.422 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )