Gtk2-CV
view release on metacpan or search on metacpan
lib/Gtk2/CV/Jobber.pm view on Meta::CPAN
=head1 NAME
Gtk2::CV::Jobber - a job queue mechanism for Gtk2::CV
=head1 SYNOPSIS
use Gtk2::CV::Jobber;
=head1 DESCRIPTION
=over 4
=cut
package Gtk2::CV::Jobber;
use common::sense;
use POSIX ();
use Scalar::Util ();
use IO::AIO;
use Fcntl ();
use Guard ();
use AnyEvent::Fork;
use Gtk2::CV::Progress;
=item %Gtk2::CV::Jobber::job [READ-ONLY]
Global variable containing all jobs, indexed by full path.
=cut
our %jobs;
our @jobs; # global job order
our %busy; # exists if this path is executing a job
our %hide; # which paths to hide
our %type;
our @type; # type order
our $TEMPLATE; # the AnyEvent::Fork object
my %type_hide;
my $disabled;
my $MAXFORK = int 1 + do {
local $/;
open my $fh, "<", "/proc/cpuinfo"
or return 1;
scalar (() = <$fh> =~ /^processor/mg) || 1
};
my %class_limit = (
other => 32,
stat => 16,
read => 2,
fork => $MAXFORK,
);
my @idle_slave;
my $progress;
sub scheduler {
job:
for my $idx (1 .. (@jobs < 10 ? @jobs : 10)) {
return if $disabled;
my $path = $jobs[-$idx];
next if exists $busy{$path};
my $types = $jobs{$path};
my @types = keys %$types;
if (@types) {
for my $type (@type) {
next unless exists $types->{$type};
my $class = $type{$type}{class};
if ($class_limit{$class}) {
$class_limit{$class}--;
my $job = bless delete $types->{$type}, Gtk2::CV::Jobber::Job::;
$job->{path} = $path;
$job->{type} = $type;
undef $busy{$path};
$job->run;
}
next job;
}
die "FATAL: unknown job type <@types> encountered, aborting.\n";
} else {
delete $jobs{splice @jobs, -$idx, 1, ()}
and $progress->increment;
goto job;
}
}
return if @jobs;
undef $progress;
# (pop @idle_slave)->destroy while @idle_slave;
}
sub set_template {
$TEMPLATE = shift;
}
=item Gtk2::CV::Jobber::define $type, [option => $value, ...], $cb
Register a new job type identified by $type. The callback will be called
with ($cont, $path, $type), and has to call &$cont once it has finished
processing.
pri => number
read => whether reading the file contents ahead of time is useful
stat => whether stating the object ahead of time is useful
fork => (lots of restrictions)
class =>
maxread =>
hide => true if the file(spec) will go away via this job
cb => callback
=cut
sub define($@) {
my ($type, @opt) = @_;
my $cb = pop @opt;
my %opt = @opt;
$opt{cb} = $cb;
$opt{type} = $type;
$opt{maxread} ||= 1024*1024*2;
$opt{class} ||= "fork" if $opt{fork};
$opt{class} ||= "read" if $opt{read};
$opt{class} ||= "stat" if $opt{stat};
$opt{class} ||= "other";
lib/Gtk2/CV/Jobber.pm view on Meta::CPAN
aio_read $job->{fh}, 0, $type->{maxread}, $job->{contents}, 0, sub {
$job->run;
};
};
} elsif ($type->{stat} && !$job->{stat}) {
aioreq_pri -3;
aio_stat $job->{path}, sub {
$_[0] and return $job->finish; # don't run job if stat error
$job->{stat} = [stat _];
$job->run;
};
} elsif ($type->{fork}) {
my $slave = (pop @idle_slave) || new Gtk2::CV::Jobber::Slave;
$slave->send ($job);
} else {
$type->{cb}->($job);
}
}
our (@EVENT_BATCH, $EVENT_TIMER);
sub client_update {
push @EVENT_BATCH, @_;
$EVENT_TIMER ||= AE::timer 0.1, 0, sub {
for my $client (grep $_, values %Gtk2::CV::Jobber::client) {
my $update = $client->can ("jobber_update");
$update->($client, $_) for @EVENT_BATCH;
}
@EVENT_BATCH = ();
undef $EVENT_TIMER;
};
}
sub Gtk2::CV::Jobber::Job::event {
my ($job, $type, $path, $data, %arg) = @_;
$arg{type} = $type;
$arg{path} = $path;
$arg{data} = $data;
if (my $slave = $job->{slave}) {
$slave->event (\%arg);
} else {
client_update \%arg;
}
}
sub Gtk2::CV::Jobber::Job::finish {
my ($job) = @_;
if (my $slave = delete $job->{slave}) {
$slave->finish ($job);
} else {
unless (delete $job->{event}) {
my $type = $type{$job->{type}};
++$class_limit{$type->{class}};
delete $hide{$job->{path}} if $type->{hide};
delete $busy{$job->{path}};
scheduler;
}
client_update $_[0];
}
}
package Gtk2::CV::Jobber::Client;
=back
=head2 The Gtk2::CV::Jobber::Client class
=over 4
=item $self->jobber_register
To be called when creating a new object instance that wants to listen to
jobber updates.
=cut
sub jobber_register {
my ($self) = @_;
Scalar::Util::weaken ($Gtk2::CV::Jobber::client{$self} = $self);
# nuke all invalid references
delete $Gtk2::CV::Jobber::client{$_}
for grep !$Gtk2::CV::Jobber::client{$_}, keys %Gtk2::CV::Jobber::client;
}
=item $self->jobber_update ($job)
The given job has finished.
=cut
sub jobber_update {
my ($self, $job) = @_;
}
package Gtk2::CV::Jobber::Slave;
use Socket ();
sub new {
my $class = shift;
my $self = bless { @_ }, $class;
$TEMPLATE ||= AnyEvent::Fork->new;
my $rlen;
my $rbuf;
$TEMPLATE->fork->run ("Gtk2::CV::Jobber::Slave::run", sub {
my ($fh) = @_;
$self->{fh} = $fh;
delete $self->{ww};
( run in 1.560 second using v1.01-cache-2.11-cpan-39bf76dae61 )