App-Netdisco

 view release on metacpan or  search on metacpan

lib/App/Netdisco/JobQueue/PostgreSQL.pm  view on Meta::CPAN

    );

    my $gone = $jobs->search({
      status => 'queued',
      -and => [
        %job_properties,
        -or => [{
          job => { '<' => $job->id },
        },{
          job => $job->id,
          -exists => $jobs->search({
            job => { '>' => $job->id },
            status => 'queued',
            backend => { '!=' => undef },
            started => \[q/> (LOCALTIMESTAMP - ?::interval)/, setting('jobs_stale_after')],
            %job_properties,
          })->as_query,
        }],
      ],
    }, { for => 'update' })
        ->update({ status => 'info', log => (sprintf 'duplicate of %s', $job->id) });

    debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id;
    push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
  }

  return @returned;
}

sub jq_locked {
  my @returned = ();
  my $rs = schema(vars->{'tenant'})->resultset('Admin')->search({
    status  => 'queued',
    backend => setting('workers')->{'BACKEND'},
    started => \[q/> (LOCALTIMESTAMP - ?::interval)/, setting('jobs_stale_after')],
  });

  while (my $job = $rs->next) {
      push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
  }
  return @returned;
}

sub jq_queued {
  my $job_type = shift;

  return schema(vars->{'tenant'})->resultset('Admin')->search({
    device => { '!=' => undef},
    action => $job_type,
    status => 'queued',
  })->get_column('device')->all;
}

sub jq_lock {
  my $job = shift;
  return true unless $job->id;
  my $happy = false;

  # lock db row and update to show job has been picked
  try {
    my $updated = schema(vars->{'tenant'})->resultset('Admin')
      ->search({ job => $job->id, status => 'queued' }, { for => 'update' })
      ->update({
          status  => 'queued',
          backend => setting('workers')->{'BACKEND'},
          started => \"LOCALTIMESTAMP",
      });

    $happy = true if $updated > 0;
  }
  catch {
    error $_;
  };

  return $happy;
}

sub jq_defer {
  my $job = shift;
  my $happy = false;

  # note this taints all actions on the device. for example if both
  # macsuck and arpnip are allowed, but macsuck fails 10 times, then
  # arpnip (and every other action) will be prevented on the device.

  # seeing as defer is only triggered by an SNMP connect failure, this
  # behaviour seems reasonable, to me (or desirable, perhaps).

  # the deferrable_actions setting exists as a workaround to this behaviour
  # should it be needed by any action (that is, per-device action but
  # do not increment deferrals count and simply try to run again).

  try {
    schema(vars->{'tenant'})->resultset('DeviceSkip')->txn_do_locked(EXCLUSIVE, sub {
      if ($job->device
          and not scalar grep { $job->action eq $_ }
                              @{ setting('deferrable_actions') || [] }) {

        schema(vars->{'tenant'})->resultset('DeviceSkip')->find_or_create({
          backend => setting('workers')->{'BACKEND'}, device => $job->device,
        },{ key => 'device_skip_pkey' })->increment_deferrals;
      }

      debug sprintf 'defer: job %s', ($job->id || 'unknown');

      # lock db row and update to show job is available
      schema(vars->{'tenant'})->resultset('Admin')
        ->search({ job => $job->id }, { for => 'update' })
        ->update({
            device => $job->device, # if job had alias this sets to canonical
            status => 'queued',
            backend => undef,
            started => undef,
            log => $job->log,
        });
    });
    $happy = true;
  }
  catch {
    error $_;
  };

  return $happy;
}

sub jq_complete {
  my $job = shift;
  my $happy = false;



( run in 1.090 second using v1.01-cache-2.11-cpan-39bf76dae61 )