App-cdnget

 view release on metacpan or  search on metacpan

lib/App/cdnget/Worker.pm  view on Meta::CPAN

		}
		$downloader = $App::cdnget::Downloader::uids{$uid};
	};
	$fh->binmode(":bytes") or $self->throw($!);

	do
	{
		local ($/, $\) = ("\r\n")x2;
		my $line;
		my $buf;
		my $empty = 1;
		while (not $self->terminating)
		{
			threads->yield();
			my $downloaderTerminated = ! $downloader || $downloader->terminated;
			$line = $fh->getline;
			unless (defined($line))
			{
				$self->throw($!) if $fh->error;
				return if $downloaderTerminated;
				my $pos = $fh->tell;
				usleep(1*1000);
				$fh->seek($pos, 0) or $self->throw($!);
				next;
			}
			chomp $line;
			unless ($line =~ /^(Client\-)/i)
			{
				if (not $out->print("$line\r\n"))
				{
					not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
					return;
				}
				$empty = 0;
			}
			last unless $line;
		}
		while (not $self->terminating)
		{
			threads->yield();
			my $downloaderTerminated = ! $downloader || $downloader->terminated;
			my $len = $fh->read($buf, $App::cdnget::CHUNK_SIZE);
			$self->throw($!) unless defined($len);
			if ($len == 0)
			{
				return if $downloaderTerminated;
				my $pos = $fh->tell;
				usleep(1*1000);
				$fh->seek($pos, 0) or $self->throw($!);
				next;
			}
			if (not $out->write($buf, $len))
			{
				not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
				return;
			}
			$empty = 0;
		}
		if ($empty)
		{
			if (not $out->print("Status: 404\r\n"))
			{
				not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
				return;
			}
		}
	};
	return;
}

sub run
{
	my $self = shift;
	my $tid = threads->tid();

	$self->tid = $tid;
	do
	{
		lock($self);
		cond_signal($self);
	};

	my $spare = 1;
	my $accepting = 0;
	eval
	{
		my ($in, $out, $err) = (IO::Handle->new(), IO::Handle->new(), IO::Handle->new());
		my $env = {};
		my $req = FCGI::Request($in, $out, $err, $env, $socket, FCGI::FAIL_ACCEPT_ON_INTR) or $self->throw($!);

		wait_accept:
		while (not $self->terminating)
		{
			$accepterSemaphore->down_timed(1);
			do
			{
				lock($accepterCount);
				last wait_accept unless $accepterCount >= $spareCount;
			};
		}
		$spareSemaphore->up();
		$spare = 0;

		accepter_loop:
		while (not $self->terminating)
		{
			threads->yield();
			$workerSemaphore->up();
			$accepting = 1;
			my $accept;
			do
			{
				lock($accepterCount);
				$accepterCount++;
			};
			eval { $accept = $req->Accept() };
			do
			{
				lock($accepterCount);
				$accepterCount--;
			};



( run in 2.422 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )