Net-FCP
view release on metacpan or search on metacpan
my $self = bless { @_ }, $class;
for my $job (values %job) {
if ($job->{p}{key} eq $self->{p}{key}) {
warn "job $job->{id} already works on this, not adding";
aio_unlink $self->{job};
return;
}
}
$self->{p}{title} !~ /\//
or die "$self->{p}{title}: malformed key\n";
$self->{p}{title} =~ s/\s+$//;
$self->{p}{title} =~ s/\n/_/g;
$self->{id} = ++$count;
$self->{job} ||= "$QUEUE_HOME/" . Time::HiRes::time . ":$count.j";
$job{$self->{id}} = $self;
$self->start;
$self
}
sub new_from_key {
my ($class, $key, $title) = @_;
$class->new (p => { key => $key, title => $title, state => "examine" });
}
sub new_from_file {
my ($class, $path) = @_;
$class->new (job => $path, p => Storable::retrieve $path);
}
sub save {
my ($self) = @_;
if (my $fh = aio_open "$self->{job}~", O_CREAT|O_TRUNC|O_WRONLY, 0600) {
my $data = Coro::Storable::blocking_nfreeze $self->{p};
aio_write $fh, 0, undef, $data, 0;
aio_fsync $fh;
aio_close $fh;
aio_rename "$self->{job}~", $self->{job};
aio_pathsync $QUEUE_HOME;
}
}
sub clean {
my ($self) = @_;
delete $job{$self->{id}};
$self->save;
system "mv", $self->{job}, "$DONE_HOME/$self->{p}{title}.job";
unlink $self->{job};
}
sub kill {
my ($self) = @_;
$self->clean;
$self->{coro}->cancel;
}
our @queue;
our $queue_change = new Coro::Signal;
our $queue_alloc = 0;
async {
for (;;) {
while (@queue
and (($queue[0][0] > 10 and $queue_alloc < $MAX_TXN)
or ($queue[0][0] > 1 and $queue_alloc < $MAX_TXN - 3)
or $queue_alloc < $MAX_TXN - 5)) {
(pop_heap @queue)->[1]->send;
$queue_alloc++;
#Coro::Timer::sleep 0.05;
}
$queue_change->wait;
}
};
sub txn_begin {
my ($pri) = @_;
my $sig = new Coro::Signal;
#warn "txn_begin $pri\n";#d#
push_heap @queue, [$pri, $sig];
$queue_change->send;
$sig->wait;
}
sub txn_end {
$queue_alloc--;
$queue_change->send;
}
sub txn_client_get {
my %arg = @_;
txn_begin $arg{pri};
$FCP->txn_client_get ($arg{uri}, $arg{htl})->cb (unblock_sub {
txn_end;
$arg{cb}->(@_);
});
}
sub fetch_uri {
my ($pri, $uri) = @_;
for(my $count = 1; ; $count += 0.3) {
for my $htl (@HTL) {
txn_begin time + $htl + $count;
my ($meta, $data) = eval { @{ $FCP->client_get ($uri, $htl) } };
txn_end;
if ($@) {
if (UNIVERSAL::isa ($@, Net::FCP::Exception::)) {
if ($@->type ("data_not_found")
|| $@->type ("route_not_found")) {
next;
}
die;
}
sub log {
my ($self, $text) = @_;
my $time = POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time;
warn "$time $self->{id},$self->{p}{pri}: $text\n";
unless ($text =~ /Net::FCP::Exception/) {
$self->{p}{log} .= "$time $text\n";
$self->save;
}
}
sub feedback {
my ($self, $prompt) = @_;
$self->{input} = [$Coro::current, $prompt];
Coro::schedule;
}
sub show {
my ($self) = @_;
"\n$self->{p}{log}\n"
. "ID: $self->{id}\n"
. "Title: $self->{p}{title}\n"
. "Blocks#: " . @{$self->{p}{blk} || []} . "\n"
. "Blocks: " . (join "", map {
$_->{done} ? "+" : "-"
} @{$self->{p}{blk} || []}) . "\n" .
""
}
our $id;
sub MAXSEG (){ 128*1024*1024 }
sub MINSEG (){ 6* 128*1024 }
sub blocksize($) {
return
$_[0] >= 64*1024*1024 ? 1024*1024
: $_[0] >= 32*1024*1024 ? 512*1024
: $_[0] >= 1024*1024 ? 256*1024
: 128*1024;
}
sub start {
my ($self) = @_;
$self->{p}{pri} ||= 1;
$self->{p}{state} ||= "examine";
$self->{job} =~ /\/([^\/]*)\.j$/ or die "$self->{job}: missing .j";
$self->{file} = "$DATA_HOME/$1.d";
sysopen $self->{fh}, $self->{file}, O_RDWR|O_CREAT, 0600
#$self->{fh} = aio_open $self->{file}, O_RDWR|O_CREAT, 0600
or die "$self->{file}: $!";
$self->{status} = "starting";
$self->{coro} = async {
$self->save;
for(;;) {
my ($state, @args) = ref $self->{p}{state} ? @{$self->{p}{state}} : $self->{p}{state};
my $next = eval { $self->can ("state_$state")->($self, @args) };
if ($@) {
$self->log ($@);
$next = $self->feedback ("continue with state: ");
}
$self->log ($self->{status} = "STATE CHANGE: ". join ", ", ref $next ? @$next : $next);
$self->{p}{state} = $next;
$self->save;
}
};
}
sub state_finish {
my ($self, $save) = @_;
if ($save) {
aio_fsync $self->{fh};
close $self->{fh};
aio_unlink "$DONE_HOME/$self->{p}{title}";
aio_link $self->{file}, "$DONE_HOME/$self->{p}{title}"
and die "link: $self->{file} => $DONE_HOME/$self->{p}{title}: $!";
aio_pathsync $DONE_HOME;
}
$self->clean;
aio_unlink $self->{file};
$self->{status} = "finished";
$self->feedback ("finished");
terminate;
}
sub state_examine {
my ($self) = @_;
my $p = $self->{p};
$self->{status} = "initial fetch";
for (;;) {
$self->log ("fetching $p->{key} (=$p->{title})");
($p->{meta}, $p->{data}) = fetch_uri 100, "freenet:$p->{key}";
$self->save;
#use PApp::Util; print STDERR PApp::Util::dumpval [keys %{$meta->{document}[0]{split_file}}];
$self->log ("type $p->{meta}{document}[0]{info}{format}");
if (my $splitfile = $p->{meta}{document}[0]{split_file}) {
return "splitfile";
} elsif ((defined $p->{data}) and (length $p->{data})) {
syswrite $self->{fh}, delete $p->{data};
aio_fsync $self->{fh};
return ["finish", 1];
}
$self->log ("EMPTY, retrying in an hour");
( run in 1.160 second using v1.01-cache-2.11-cpan-df04353d9ac )