App-Netdisco
view release on metacpan or search on metacpan
lib/App/Netdisco/JobQueue/PostgreSQL.pm view on Meta::CPAN
use Try::Tiny;
use base 'Exporter';
our @EXPORT = ();
our @EXPORT_OK = qw/
jq_warm_thrusters
jq_getsome
jq_locked
jq_queued
jq_lock
jq_defer
jq_complete
jq_log
jq_userlog
jq_insert
jq_delete
/;
our %EXPORT_TAGS = ( all => \@EXPORT_OK );
sub jq_warm_thrusters {
my $rs = schema(vars->{'tenant'})->resultset('DeviceSkip');
schema(vars->{'tenant'})->txn_do(sub {
$rs->search({
backend => setting('workers')->{'BACKEND'},
}, { for => 'update' }, )->update({ actionset => [] });
#Â on backend restart, allow one retry of all devices which have
#Â reached max retry (max_deferrals)
my $deferrals = setting('workers')->{'max_deferrals'} - 1;
$rs->search({
backend => setting('workers')->{'BACKEND'},
device => { '!=' => '255.255.255.255' },
deferrals => { '>' => $deferrals },
}, { for => 'update' }, )->update({ deferrals => $deferrals });
$rs->search({
backend => setting('workers')->{'BACKEND'},
actionset => { -value => [] }, #Â special syntax for matching empty ARRAY
deferrals => 0,
})->delete;
#Â also clean out any previous backend hint
#Â primeskiplist action will then run to recreate it
$rs->search({
backend => setting('workers')->{'BACKEND'},
device => '255.255.255.255',
actionset => { -value => [] }, #Â special syntax for matching empty ARRAY
})->delete;
});
}
sub jq_getsome {
my $num_slots = shift;
return () unless $num_slots and $num_slots > 0;
my $jobs = schema(vars->{'tenant'})->resultset('Admin');
my @returned = ();
my $tasty = schema(vars->{'tenant'})->resultset('Virtual::TastyJobs')
->search(undef,{ bind => [
setting('workers')->{'BACKEND'}, setting('job_prio')->{'high'},
setting('workers')->{'BACKEND'}, setting('workers')->{'max_deferrals'},
setting('workers')->{'retry_after'}, $num_slots,
]});
while (my $job = $tasty->next) {
if ($job->device
and not scalar grep {$job->action eq $_} @{ setting('job_targets_prefix') }) {
# need to handle device discovered since backend daemon started
# and the skiplist was primed. these should be checked against
# the various acls and have device_skip entry added if needed,
# and return false if it should have been skipped.
my @badactions = get_denied_actions($job->device);
if (scalar @badactions) {
schema(vars->{'tenant'})->resultset('DeviceSkip')->txn_do_locked(EXCLUSIVE, sub {
schema(vars->{'tenant'})->resultset('DeviceSkip')->find_or_create({
backend => setting('workers')->{'BACKEND'}, device => $job->device,
},{ key => 'device_skip_pkey' })->add_to_actionset(@badactions);
});
# will now not be selected in a future _getsome()
next if scalar grep {$_ eq $job->action} @badactions;
}
}
# remove any duplicate jobs, incuding possibly this job if there
# is already an equivalent job running
# note that the self-removal of a job has an unhelpful log: it is
# reported as a duplicate of itself! however what's happening is that
# netdisco has seen another running job with same params (but the query
# cannot see that ID to use it in the message).
my %job_properties = (
action => $job->action,
port => $job->port,
subaction => $job->subaction,
-or => [
{ device => $job->device },
($job->device_key ? ({ device_key => $job->device_key }) : ()),
],
#Â never de-duplicate user-submitted jobs
username => { '=' => undef },
userip => { '=' => undef },
);
my $gone = $jobs->search({
status => 'queued',
-and => [
%job_properties,
-or => [{
job => { '<' => $job->id },
},{
job => $job->id,
-exists => $jobs->search({
job => { '>' => $job->id },
status => 'queued',
backend => { '!=' => undef },
started => \[q/> (LOCALTIMESTAMP - ?::interval)/, setting('jobs_stale_after')],
( run in 0.788 second using v1.01-cache-2.11-cpan-2398b32b56e )