App-cdnget
view release on metacpan or search on metacpan
lib/App/cdnget/Worker.pm view on Meta::CPAN
};
App::cdnget::log_info("Workers terminating...");
my $gracefully = 0;
while (not $gracefully and not $App::cdnget::terminating_force)
{
$gracefully = $workerSemaphore->down_timed(3, $maxCount);
}
lock($terminated);
$terminated = 1;
App::cdnget::log_info("Workers terminated".($gracefully? " gracefully": "").".");
return 1;
}
sub terminating
{
lock($terminating);
return $terminating;
}
sub terminated
{
if (@_ > 0)
{
my $self = shift;
lock($self);
return defined($self->tid)? 0: 1;
}
lock($terminated);
return $terminated;
}
sub new
{
my $class = shift;
while (not $spareSemaphore->down_timed(1))
{
if (terminating())
{
return;
}
}
while (not $workerSemaphore->down_timed(1))
{
if (terminating())
{
$spareSemaphore->up();
return;
}
}
if (terminating())
{
$spareSemaphore->up();
$workerSemaphore->up();
return;
}
my $self = $class->SUPER();
$self->tid = undef;
do
{
lock($self);
my $thr = threads->create(\&run, $self) or $self->throw($!);
cond_wait($self);
unless (defined($self->tid))
{
App::cdnget::Exception->throw($thr->join());
}
$thr->detach();
};
return $self;
}
sub DESTROY
{
my $self = shift;
$self->SUPER::DESTROY;
}
sub throw
{
my $self = shift;
my ($msg) = @_;
unless (ref($msg))
{
$msg = "Unknown" unless $msg;
$msg = "Worker ".
$msg;
}
App::cdnget::Exception->throw($msg, 1);
}
sub worker
{
my $self = shift;
my ($req) = @_;
my ($in, $out, $err) = $req->GetHandles();
my $env = $req->GetEnvironment();
my $id = $env->{CDNGET_ID};
$self->throw("Invalid ID") unless defined($id);
$id = ($id =~ /^(.*)/)[0];
$id =~ s/^\s+|\s+$//g;
$self->throw("Invalid ID") unless $id =~ /^\w+$/i;
my $origin = $env->{CDNGET_ORIGIN};
$self->throw("Invalid origin") unless defined($origin);
$origin = ($origin =~ /^(.*)/)[0];
$origin =~ s/^\s+|\s+$//g;
$origin = URI->new($origin);
$self->throw("Invalid origin scheme") unless $origin->scheme =~ /^http|https$/i;
$origin->path(substr($origin->path, 0, length($origin->path)-1)) while $origin->path and substr($origin->path, -1) eq "/";
my $uri = $env->{CDNGET_URI};
$self->throw("Invalid URI") unless defined($uri);
$uri = ($uri =~ /^(.*)/)[0];
$uri =~ s/^\s+|\s+$//g;
$uri = "/$uri" unless $uri and substr($uri, 0, 1) eq "/";
my $hook = $env->{CDNGET_HOOK};
$hook = "" unless defined($hook);
$hook = ($hook =~ /^(.*)/)[0];
$hook =~ s/^\s+|\s+$//g;
my $url = $origin->scheme."://".$origin->host_port.$origin->path.$uri;
my $digest = Digest::MD5::md5_hex("$url $hook");
my $uid = "$id/$digest";
my $path = "$cachePath/$id";
mkdir($path);
my @dirs = $digest =~ /(..)(.)$/;
my $file = $digest;
for (reverse @dirs)
{
$path .= "/$_";
mkdir($path);
}
$self->throw("Cache directory not exists") unless -d $path;
$path .= "/$file";
my $fh;
my $downloader;
do
{
lock(%App::cdnget::Downloader::uids);
$fh = FileHandle->new($path, "<");
unless ($fh)
{
return unless App::cdnget::Downloader->new($uid, $path, $url, $hook);
$fh = FileHandle->new($path, "<") or $self->throw($!);
}
$downloader = $App::cdnget::Downloader::uids{$uid};
};
$fh->binmode(":bytes") or $self->throw($!);
do
{
local ($/, $\) = ("\r\n")x2;
my $line;
my $buf;
my $empty = 1;
while (not $self->terminating)
{
threads->yield();
my $downloaderTerminated = ! $downloader || $downloader->terminated;
$line = $fh->getline;
unless (defined($line))
{
$self->throw($!) if $fh->error;
return if $downloaderTerminated;
my $pos = $fh->tell;
usleep(1*1000);
$fh->seek($pos, 0) or $self->throw($!);
next;
}
chomp $line;
unless ($line =~ /^(Client\-)/i)
{
if (not $out->print("$line\r\n"))
{
not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
return;
}
$empty = 0;
}
last unless $line;
}
while (not $self->terminating)
{
threads->yield();
my $downloaderTerminated = ! $downloader || $downloader->terminated;
my $len = $fh->read($buf, $App::cdnget::CHUNK_SIZE);
$self->throw($!) unless defined($len);
if ($len == 0)
{
return if $downloaderTerminated;
my $pos = $fh->tell;
usleep(1*1000);
$fh->seek($pos, 0) or $self->throw($!);
next;
}
if (not $out->write($buf, $len))
{
not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
return;
}
$empty = 0;
}
if ($empty)
{
if (not $out->print("Status: 404\r\n"))
{
not $! or $!{EPIPE} or $!{ECONNRESET} or $!{EPROTOTYPE} or $self->throw($!);
return;
}
}
};
return;
}
sub run
{
my $self = shift;
my $tid = threads->tid();
$self->tid = $tid;
do
{
lock($self);
cond_signal($self);
};
my $spare = 1;
my $accepting = 0;
eval
{
my ($in, $out, $err) = (IO::Handle->new(), IO::Handle->new(), IO::Handle->new());
my $env = {};
my $req = FCGI::Request($in, $out, $err, $env, $socket, FCGI::FAIL_ACCEPT_ON_INTR) or $self->throw($!);
wait_accept:
while (not $self->terminating)
{
$accepterSemaphore->down_timed(1);
do
{
lock($accepterCount);
last wait_accept unless $accepterCount >= $spareCount;
};
}
$spareSemaphore->up();
$spare = 0;
accepter_loop:
while (not $self->terminating)
{
threads->yield();
$workerSemaphore->up();
$accepting = 1;
my $accept;
do
{
lock($accepterCount);
$accepterCount++;
};
eval { $accept = $req->Accept() };
do
{
lock($accepterCount);
$accepterCount--;
};
$accepterSemaphore->up();
last unless $accept >= 0;
if ($self->terminating)
{
$req->Finish();
last;
}
$workerSemaphore->down();
$accepting = 0;
eval
{
$self->worker($req);
};
do
{
local $@;
$req->Finish();
};
if ($@)
{
die $@;
}
do
{
lock($accepterCount);
last accepter_loop if $accepterCount >= $spareCount;
};
}
};
do
{
local $@;
$workerSemaphore->up() unless $accepting;
$spareSemaphore->up() if $spare;
usleep(10*1000); #cond_wait bug
lock($self);
$self->tid = undef;
};
if ($@)
{
warn $@;
}
return;
}
( run in 0.503 second using v1.01-cache-2.11-cpan-39bf76dae61 )