Gtk2-CV

 view release on metacpan or  search on metacpan

lib/Gtk2/CV/Jobber.pm  view on Meta::CPAN

=head1 NAME

Gtk2::CV::Jobber - a job queue mechanism for Gtk2::CV

=head1 SYNOPSIS

  use Gtk2::CV::Jobber;

=head1 DESCRIPTION

=over 4

=cut

package Gtk2::CV::Jobber;

use common::sense;
use POSIX ();

use Scalar::Util ();
use IO::AIO;
use Fcntl ();
use Guard ();
use AnyEvent::Fork;

use Gtk2::CV::Progress;

=item %Gtk2::CV::Jobber::job [READ-ONLY]

Global variable containing all jobs, indexed by full path.

=cut

our %jobs;
our @jobs; # global job order
our %busy; # exists if this path is executing a job
our %hide; # which paths to hide

our %type;
our @type; # type order

our $TEMPLATE; # the AnyEvent::Fork object

my %type_hide;
my $disabled;

my $MAXFORK = int 1 + do {
   local $/;
   open my $fh, "<", "/proc/cpuinfo"
      or return 1;
   scalar (() = <$fh> =~ /^processor/mg) || 1
};

my %class_limit = (
   other => 32,
   stat  => 16,
   read  =>  2,
   fork  => $MAXFORK,
);

my @idle_slave;

my $progress;

sub scheduler {
job:
   for my $idx (1 .. (@jobs < 10 ? @jobs : 10)) {
      return if $disabled;

      my $path = $jobs[-$idx];

      next if exists $busy{$path};

      my $types = $jobs{$path};
      my @types = keys %$types;

      if (@types) {
         for my $type (@type) {
            next unless exists $types->{$type};

            my $class = $type{$type}{class};

            if ($class_limit{$class}) {
               $class_limit{$class}--;

               my $job = bless delete $types->{$type}, Gtk2::CV::Jobber::Job::;

               $job->{path} = $path;
               $job->{type} = $type;

               undef $busy{$path};

               $job->run;
            }

            next job;
         }

         die "FATAL: unknown job type <@types> encountered, aborting.\n";
      } else {
         delete $jobs{splice @jobs, -$idx, 1, ()}
            and $progress->increment;

         goto job;
      }
   }

   return if @jobs;

   undef $progress;

#   (pop @idle_slave)->destroy while @idle_slave;
}

sub set_template {
   $TEMPLATE = shift;
}

=item Gtk2::CV::Jobber::define $type, [option => $value, ...], $cb

Register a new job type identified by $type. The callback will be called
with ($cont, $path, $type), and has to call &$cont once it has finished
processing.

 pri     => number
 read    => whether reading the file contents ahead of time is useful
 stat    => whether stating the object ahead of time is useful
 fork    => (lots of restrictions)
 class   =>
 maxread =>
 hide    => true if the file(spec) will go away via this job
 cb      => callback

=cut

sub define($@) {
   my ($type, @opt) = @_;

   my $cb = pop @opt;
   my %opt = @opt;

   $opt{cb}    = $cb;
   $opt{type}  = $type;

   $opt{maxread} ||= 1024*1024*2;

   $opt{class}   ||= "fork" if $opt{fork};
   $opt{class}   ||= "read" if $opt{read};
   $opt{class}   ||= "stat" if $opt{stat};
   $opt{class}   ||= "other";

lib/Gtk2/CV/Jobber.pm  view on Meta::CPAN

         aio_read $job->{fh}, 0, $type->{maxread}, $job->{contents}, 0, sub {
            $job->run;
         };
      };
   } elsif ($type->{stat} && !$job->{stat}) {
      aioreq_pri -3;
      aio_stat $job->{path}, sub {
         $_[0] and return $job->finish; # don't run job if stat error
         $job->{stat} = [stat _];
         $job->run;
      };
   } elsif ($type->{fork}) {
      my $slave = (pop @idle_slave) || new Gtk2::CV::Jobber::Slave;

      $slave->send ($job);
   } else {
      $type->{cb}->($job);
   }
}

our (@EVENT_BATCH, $EVENT_TIMER);

sub client_update {
   push @EVENT_BATCH, @_;

   $EVENT_TIMER ||= AE::timer 0.1, 0, sub {
      for my $client (grep $_, values %Gtk2::CV::Jobber::client) {
         my $update = $client->can ("jobber_update");
         $update->($client, $_) for @EVENT_BATCH;
      }

      @EVENT_BATCH = ();
      undef $EVENT_TIMER;
   };
}

sub Gtk2::CV::Jobber::Job::event {
   my ($job, $type, $path, $data, %arg) = @_;

   $arg{type} = $type;
   $arg{path} = $path;
   $arg{data} = $data;

   if (my $slave = $job->{slave}) {
      $slave->event (\%arg);
   } else {
      client_update \%arg;
   }
}

sub Gtk2::CV::Jobber::Job::finish {
   my ($job) = @_;

   if (my $slave = delete $job->{slave}) {
      $slave->finish ($job);
   } else {
      unless (delete $job->{event}) {
         my $type = $type{$job->{type}};
         ++$class_limit{$type->{class}};
         delete $hide{$job->{path}} if $type->{hide};
         delete $busy{$job->{path}};

         scheduler;
      }

      client_update $_[0];
   }
}

package Gtk2::CV::Jobber::Client;

=back

=head2 The Gtk2::CV::Jobber::Client class

=over 4

=item $self->jobber_register

To be called when creating a new object instance that wants to listen to
jobber updates.

=cut

sub jobber_register {
   my ($self) = @_;

   Scalar::Util::weaken ($Gtk2::CV::Jobber::client{$self} = $self);

   # nuke all invalid references
   delete $Gtk2::CV::Jobber::client{$_}
      for grep !$Gtk2::CV::Jobber::client{$_}, keys %Gtk2::CV::Jobber::client;
}

=item $self->jobber_update ($job)

The given job has finished.

=cut

sub jobber_update {
   my ($self, $job) = @_;
}

package Gtk2::CV::Jobber::Slave;

use Socket ();

sub new {
   my $class = shift;
   my $self = bless { @_ }, $class;

   $TEMPLATE ||= AnyEvent::Fork->new;

   my $rlen;
   my $rbuf;

   $TEMPLATE->fork->run ("Gtk2::CV::Jobber::Slave::run", sub {
      my ($fh) = @_;
      $self->{fh} = $fh;
      delete $self->{ww};



( run in 1.560 second using v1.01-cache-2.11-cpan-39bf76dae61 )