App-Netdisco

 view release on metacpan or  search on metacpan

lib/App/Netdisco/JobQueue/PostgreSQL.pm  view on Meta::CPAN

use Try::Tiny;

use base 'Exporter';
our @EXPORT = ();
our @EXPORT_OK = qw/
  jq_warm_thrusters
  jq_getsome
  jq_locked
  jq_queued
  jq_lock
  jq_defer
  jq_complete
  jq_log
  jq_userlog
  jq_insert
  jq_delete
/;
our %EXPORT_TAGS = ( all => \@EXPORT_OK );

sub jq_warm_thrusters {
  my $rs = schema(vars->{'tenant'})->resultset('DeviceSkip');

  schema(vars->{'tenant'})->txn_do(sub {
    $rs->search({
      backend => setting('workers')->{'BACKEND'},
    }, { for => 'update' }, )->update({ actionset => [] });

    # on backend restart, allow one retry of all devices which have
    # reached max retry (max_deferrals)
    my $deferrals = setting('workers')->{'max_deferrals'} - 1;
    $rs->search({
      backend => setting('workers')->{'BACKEND'},
      device => { '!=' => '255.255.255.255' },
      deferrals => { '>' => $deferrals },
    }, { for => 'update' }, )->update({ deferrals => $deferrals });

    $rs->search({
      backend => setting('workers')->{'BACKEND'},
      actionset => { -value => [] }, # special syntax for matching empty ARRAY
      deferrals => 0,
    })->delete;

    # also clean out any previous backend hint
    # primeskiplist action will then run to recreate it
    $rs->search({
      backend => setting('workers')->{'BACKEND'},
      device => '255.255.255.255',
      actionset => { -value => [] }, # special syntax for matching empty ARRAY
    })->delete;
  });
}

sub jq_getsome {
  my $num_slots = shift;
  return () unless $num_slots and $num_slots > 0;

  my $jobs = schema(vars->{'tenant'})->resultset('Admin');
  my @returned = ();

  my $tasty = schema(vars->{'tenant'})->resultset('Virtual::TastyJobs')
    ->search(undef,{ bind => [
      setting('workers')->{'BACKEND'}, setting('job_prio')->{'high'},
      setting('workers')->{'BACKEND'}, setting('workers')->{'max_deferrals'},
      setting('workers')->{'retry_after'}, $num_slots,
    ]});

  while (my $job = $tasty->next) {
    if ($job->device
      and not scalar grep {$job->action eq $_} @{ setting('job_targets_prefix') }) {

      # need to handle device discovered since backend daemon started
      # and the skiplist was primed. these should be checked against
      # the various acls and have device_skip entry added if needed,
      # and return false if it should have been skipped.
      my @badactions = get_denied_actions($job->device);
      if (scalar @badactions) {
        schema(vars->{'tenant'})->resultset('DeviceSkip')->txn_do_locked(EXCLUSIVE, sub {
            schema(vars->{'tenant'})->resultset('DeviceSkip')->find_or_create({
              backend => setting('workers')->{'BACKEND'}, device => $job->device,
            },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions);
        });

        # will now not be selected in a future _getsome()
        next if scalar grep {$_ eq $job->action} @badactions;
      }
    }

    # remove any duplicate jobs, incuding possibly this job if there
    # is already an equivalent job running

    # note that the self-removal of a job has an unhelpful log: it is
    # reported as a duplicate of itself! however what's happening is that
    # netdisco has seen another running job with same params (but the query
    # cannot see that ID to use it in the message).

    my %job_properties = (
      action => $job->action,
      port   => $job->port,
      subaction => $job->subaction,
      -or => [
        { device => $job->device },
        ($job->device_key ? ({ device_key => $job->device_key }) : ()),
      ],
      # never de-duplicate user-submitted jobs
      username => { '=' => undef },
      userip   => { '=' => undef },
    );

    my $gone = $jobs->search({
      status => 'queued',
      -and => [
        %job_properties,
        -or => [{
          job => { '<' => $job->id },
        },{
          job => $job->id,
          -exists => $jobs->search({
            job => { '>' => $job->id },
            status => 'queued',
            backend => { '!=' => undef },
            started => \[q/> (LOCALTIMESTAMP - ?::interval)/, setting('jobs_stale_after')],



( run in 0.788 second using v1.01-cache-2.11-cpan-2398b32b56e )