Net-FCP

 view release on metacpan or  search on metacpan

bin/fmd  view on Meta::CPAN

   my $self = bless { @_ }, $class;

   for my $job (values %job) {
      if ($job->{p}{key} eq $self->{p}{key}) {
         warn "job $job->{id} already works on this, not adding";
         aio_unlink $self->{job};
         return;
      }
   }

   $self->{p}{title} !~ /\//
      or die "$self->{p}{title}: malformed key\n";

   $self->{p}{title} =~ s/\s+$//;
   $self->{p}{title} =~ s/\n/_/g;

   $self->{id} = ++$count;
   $self->{job} ||= "$QUEUE_HOME/" . Time::HiRes::time . ":$count.j";

   $job{$self->{id}} = $self;

   $self->start;
   $self
}

sub new_from_key {
   my ($class, $key, $title) = @_;
   $class->new (p => { key => $key, title => $title, state => "examine" });
}

sub new_from_file {
   my ($class, $path) = @_;
   $class->new (job => $path, p => Storable::retrieve $path);
}

sub save {
   my ($self) = @_;
   if (my $fh = aio_open "$self->{job}~", O_CREAT|O_TRUNC|O_WRONLY, 0600) {
      my $data = Coro::Storable::blocking_nfreeze $self->{p};
      aio_write $fh, 0, undef, $data, 0;
      aio_fsync $fh;
      aio_close $fh;
      aio_rename "$self->{job}~", $self->{job};
      aio_pathsync $QUEUE_HOME;
   }
}

sub clean {
   my ($self) = @_;

   delete $job{$self->{id}};
   $self->save;
   system "mv", $self->{job}, "$DONE_HOME/$self->{p}{title}.job";
   unlink $self->{job};
}

sub kill {
   my ($self) = @_;

   $self->clean;
   $self->{coro}->cancel;
}

our @queue;
our $queue_change = new Coro::Signal;
our $queue_alloc = 0;

async {
   for (;;) {
      while (@queue
             and (($queue[0][0] > 10 and $queue_alloc < $MAX_TXN)
               or ($queue[0][0] >  1 and $queue_alloc < $MAX_TXN - 3)
               or $queue_alloc < $MAX_TXN - 5)) {
         (pop_heap @queue)->[1]->send;
         $queue_alloc++;
         #Coro::Timer::sleep 0.05;
      }
      $queue_change->wait;
   }
};

sub txn_begin {
   my ($pri) = @_;
   my $sig = new Coro::Signal;

   #warn "txn_begin $pri\n";#d#
   push_heap @queue, [$pri, $sig];
   $queue_change->send;
   $sig->wait;
}

sub txn_end {
   $queue_alloc--;
   $queue_change->send;
}

sub txn_client_get {
   my %arg = @_;

   txn_begin $arg{pri};
   $FCP->txn_client_get ($arg{uri}, $arg{htl})->cb (unblock_sub {
      txn_end;

      $arg{cb}->(@_);
   });
}

sub fetch_uri {
   my ($pri, $uri) = @_;

   for(my $count = 1; ; $count += 0.3) {
      for my $htl (@HTL) {
         txn_begin time + $htl + $count;
         my ($meta, $data) = eval { @{ $FCP->client_get ($uri, $htl) } };
         txn_end;
         if ($@) {
            if (UNIVERSAL::isa ($@, Net::FCP::Exception::)) {
               if ($@->type ("data_not_found")
                   || $@->type ("route_not_found")) {
                  next;
               }

bin/fmd  view on Meta::CPAN


   die;
}

sub log {
   my ($self, $text) = @_;
   my $time = POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time;
   warn "$time $self->{id},$self->{p}{pri}: $text\n";

   unless ($text =~ /Net::FCP::Exception/) {
      $self->{p}{log} .= "$time $text\n";
      $self->save;
   }
}

sub feedback {
   my ($self, $prompt) = @_;
   $self->{input} = [$Coro::current, $prompt];
   Coro::schedule;
}

sub show {
   my ($self) = @_;

   "\n$self->{p}{log}\n"
      . "ID: $self->{id}\n"
      . "Title: $self->{p}{title}\n"
      . "Blocks#: " . @{$self->{p}{blk} || []} . "\n"
      . "Blocks: " . (join "", map {
         $_->{done} ? "+" : "-"
      } @{$self->{p}{blk} || []}) . "\n" .
   ""
}

our $id;

sub MAXSEG (){ 128*1024*1024 }
sub MINSEG (){   6* 128*1024 }

sub blocksize($) {
   return
        $_[0] >= 64*1024*1024 ? 1024*1024
      : $_[0] >= 32*1024*1024 ?  512*1024
      : $_[0] >=    1024*1024 ?  256*1024
                              :  128*1024;
}

sub start {
   my ($self) = @_;

   $self->{p}{pri} ||= 1;
   $self->{p}{state} ||= "examine";

   $self->{job} =~ /\/([^\/]*)\.j$/ or die "$self->{job}: missing .j";
   $self->{file} = "$DATA_HOME/$1.d";
   sysopen $self->{fh}, $self->{file}, O_RDWR|O_CREAT, 0600
   #$self->{fh} = aio_open $self->{file}, O_RDWR|O_CREAT, 0600 
      or die "$self->{file}: $!";

   $self->{status} = "starting";
   $self->{coro} = async {
      $self->save;

      for(;;) {
         my ($state, @args) = ref $self->{p}{state} ? @{$self->{p}{state}} : $self->{p}{state};
         my $next = eval { $self->can ("state_$state")->($self, @args) };
         if ($@) {
            $self->log ($@);
            $next = $self->feedback ("continue with state: ");
         }
         $self->log ($self->{status} = "STATE CHANGE: ". join ", ", ref $next ? @$next : $next);
         $self->{p}{state} = $next;
         $self->save;
      }
   };
}

sub state_finish {
   my ($self, $save) = @_;

   if ($save) {
      aio_fsync $self->{fh};
      close $self->{fh};

      aio_unlink "$DONE_HOME/$self->{p}{title}";
      aio_link $self->{file}, "$DONE_HOME/$self->{p}{title}"
         and die "link: $self->{file} => $DONE_HOME/$self->{p}{title}: $!";
      aio_pathsync $DONE_HOME;
   }
   $self->clean;

   aio_unlink $self->{file};

   $self->{status} = "finished";
   $self->feedback ("finished");
   terminate;
}

sub state_examine {
   my ($self) = @_;
   my $p = $self->{p};

   $self->{status} = "initial fetch";

   for (;;) {
      $self->log ("fetching $p->{key} (=$p->{title})");

      ($p->{meta}, $p->{data}) = fetch_uri 100, "freenet:$p->{key}";
      $self->save;
      #use PApp::Util; print STDERR PApp::Util::dumpval [keys %{$meta->{document}[0]{split_file}}];
      $self->log ("type $p->{meta}{document}[0]{info}{format}");

      if (my $splitfile = $p->{meta}{document}[0]{split_file}) {
         return "splitfile";
      } elsif ((defined $p->{data}) and (length $p->{data})) {
         syswrite $self->{fh}, delete $p->{data};
         aio_fsync $self->{fh};
         return ["finish", 1];
      }

      $self->log ("EMPTY, retrying in an hour");



( run in 1.160 second using v1.01-cache-2.11-cpan-df04353d9ac )