Argon

 view release on metacpan or  search on metacpan

lib/Argon/Manager.pm  view on Meta::CPAN

  if ($self->persist && $self->store->exists) {
    log_trace 'Loading message queue from saved copy';

    my $saved = $self->decode($self->load_file);
    my $queue = $saved->{queue};
    $queue->max($self->capacity);

    return $queue;
  }
  else {
    return Argon::Queue->new;
  }
}

has persist => (
  is  => 'ro',
  isa => 'Maybe[Str]',
);

has store => (
  is       => 'ro',
  isa      => 'Maybe[Path::Tiny]',
  lazy     => 1,
  builder  => '_build_store',
  init_arg => undef,
  handles  => {
    save_file => 'spew_raw',
    load_file => 'slurp_raw',
  },
);

sub _build_store { path(shift->persist) }

after configure => sub {
  my $self = shift;
  $self->handles($HIRE,  K('_hire',  $self));
  $self->handles($QUEUE, K('_queue', $self));
};

sub DEMOLISH {
  my ($self, $global) = @_;
  $self->save_queue unless $global;
}

sub save_queue {
  my $self = shift;
  return unless $self->persist;
  log_trace 'Saving copy of message queue';

  my $saved = {
    queue => $self->queue,
  };

  my $data = encode($saved);
  $self->save_file($data);
}

sub capacity     { $_[0]->tracker->{self}->capacity }
sub has_capacity { $_[0]->tracker->{self}->has_capacity }

sub update_queue_capacity {
  my $self = shift;
  $self->queue->max($self->capacity * 3);
}

sub next_worker {
  my $self = shift;

  my @workers =
    sort { $self->tracker->{$a}->load <=> $self->tracker->{$b}->load }
    grep { $self->tracker->{$_}->has_capacity }
    $self->worker_ids;

  shift @workers;
}

sub assign {
  my ($self, $id, $msg) = @_;
  $self->get_worker($id)->send($msg);
  $self->tracker->{$id}->start($msg);
  $self->tracker->{self}->start($msg);
  $self->assigned->{$msg->id} = $id;
  log_trace 'worker %s assigned %s', $id, $msg->explain;
}

sub process_queue {
  my $self = shift;

  while ($self->has_capacity && $self->has_messages) {
    $self->assign($self->next_worker, $self->next_message);
  }

  $self->save_queue;
}

sub _queue {
  my ($self, $addr, $msg) = @_;
  if ($self->queue->is_full) {
    $self->send($msg->reply(cmd => $DENY, info => "No available capacity. Please try again later."));
  }
  else {
    $self->queue_message($msg);
    $self->process_queue;
  }
}

sub _collect {
  my ($self, $msg) = @_;
  my $id = delete $self->assigned->{$msg->id};
  $self->tracker->{$id}->finish($msg);
  $self->tracker->{self}->finish($msg);
  $self->send($msg);
  $self->process_queue;
}

sub _hire {
  my ($self, $addr, $msg) = @_;
  $self->send($msg->reply(cmd => $ACK));

  my $id  = $msg->token || croak 'Missing token: ' . $msg->explain;
  my $cap = $msg->info->{capacity};



( run in 2.138 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )