App-Netdisco
view release on metacpan or search on metacpan
lib/App/Netdisco/JobQueue/PostgreSQL.pm view on Meta::CPAN
);
my $gone = $jobs->search({
status => 'queued',
-and => [
%job_properties,
-or => [{
job => { '<' => $job->id },
},{
job => $job->id,
-exists => $jobs->search({
job => { '>' => $job->id },
status => 'queued',
backend => { '!=' => undef },
started => \[q/> (LOCALTIMESTAMP - ?::interval)/, setting('jobs_stale_after')],
%job_properties,
})->as_query,
}],
],
}, { for => 'update' })
->update({ status => 'info', log => (sprintf 'duplicate of %s', $job->id) });
debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id;
push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
}
return @returned;
}
sub jq_locked {
my @returned = ();
my $rs = schema(vars->{'tenant'})->resultset('Admin')->search({
status => 'queued',
backend => setting('workers')->{'BACKEND'},
started => \[q/> (LOCALTIMESTAMP - ?::interval)/, setting('jobs_stale_after')],
});
while (my $job = $rs->next) {
push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
}
return @returned;
}
sub jq_queued {
my $job_type = shift;
return schema(vars->{'tenant'})->resultset('Admin')->search({
device => { '!=' => undef},
action => $job_type,
status => 'queued',
})->get_column('device')->all;
}
sub jq_lock {
my $job = shift;
return true unless $job->id;
my $happy = false;
# lock db row and update to show job has been picked
try {
my $updated = schema(vars->{'tenant'})->resultset('Admin')
->search({ job => $job->id, status => 'queued' }, { for => 'update' })
->update({
status => 'queued',
backend => setting('workers')->{'BACKEND'},
started => \"LOCALTIMESTAMP",
});
$happy = true if $updated > 0;
}
catch {
error $_;
};
return $happy;
}
sub jq_defer {
my $job = shift;
my $happy = false;
# note this taints all actions on the device. for example if both
# macsuck and arpnip are allowed, but macsuck fails 10 times, then
# arpnip (and every other action) will be prevented on the device.
# seeing as defer is only triggered by an SNMP connect failure, this
# behaviour seems reasonable, to me (or desirable, perhaps).
#Â the deferrable_actions setting exists as a workaround to this behaviour
#Â should it be needed by any action (that is, per-device action but
#Â do not increment deferrals count and simply try to run again).
try {
schema(vars->{'tenant'})->resultset('DeviceSkip')->txn_do_locked(EXCLUSIVE, sub {
if ($job->device
and not scalar grep { $job->action eq $_ }
@{ setting('deferrable_actions') || [] }) {
schema(vars->{'tenant'})->resultset('DeviceSkip')->find_or_create({
backend => setting('workers')->{'BACKEND'}, device => $job->device,
},{ key => 'device_skip_pkey' })->increment_deferrals;
}
debug sprintf 'defer: job %s', ($job->id || 'unknown');
# lock db row and update to show job is available
schema(vars->{'tenant'})->resultset('Admin')
->search({ job => $job->id }, { for => 'update' })
->update({
device => $job->device, #Â if job had alias this sets to canonical
status => 'queued',
backend => undef,
started => undef,
log => $job->log,
});
});
$happy = true;
}
catch {
error $_;
};
return $happy;
}
sub jq_complete {
my $job = shift;
my $happy = false;
( run in 1.090 second using v1.01-cache-2.11-cpan-39bf76dae61 )