Argon

 view release on metacpan or  search on metacpan

lib/Argon/Manager.pm  view on Meta::CPAN


sub process_queue {
  my $self = shift;

  while ($self->has_capacity && $self->has_messages) {
    $self->assign($self->next_worker, $self->next_message);
  }

  $self->save_queue;
}

sub _queue {
  my ($self, $addr, $msg) = @_;
  if ($self->queue->is_full) {
    $self->send($msg->reply(cmd => $DENY, info => "No available capacity. Please try again later."));
  }
  else {
    $self->queue_message($msg);
    $self->process_queue;
  }
}

sub _collect {
  my ($self, $msg) = @_;
  my $id = delete $self->assigned->{$msg->id};
  $self->tracker->{$id}->finish($msg);
  $self->tracker->{self}->finish($msg);
  $self->send($msg);
  $self->process_queue;
}

sub _hire {
  my ($self, $addr, $msg) = @_;
  $self->send($msg->reply(cmd => $ACK));

  my $id  = $msg->token || croak 'Missing token: ' . $msg->explain;
  my $cap = $msg->info->{capacity};

  my $worker = $self->client->{$addr};
  $worker->on_msg(K('_collect', $self));
  $worker->on_close(K('_fire', $self, $id, $cap));

  $self->add_worker($id, $worker);

  $self->tracker->{$id} = Argon::Tracker->new(capacity => $cap);
  $self->tracker->{self}->add_capacity($cap);
  $self->update_queue_capacity;

  log_info 'New worker with identity %s added %d capacity (%d total)',
    $id, $cap, $self->capacity;
}

sub _fire {
  my ($self, $worker, $capacity) = @_;
  $self->tracker->{self}->remove_capacity($capacity);
  $self->del_worker($worker);
  delete $self->tracker->{$worker};

  $self->update_queue_capacity;

  my @msgids = grep { $self->assigned->{$_} eq $worker }
    keys %{$self->assigned};

  if (@msgids) {
    my $msg = 'The worker assigned to this task disconnected before completion.';
    $self->send(Argon::Message->error($msg, id => $_))
      foreach @msgids;
  }

  log_info 'Worker %s disconnected; capacity is down to %d',
    $worker,
    $self->capacity;
}

__PACKAGE__->meta->make_immutable;

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Argon::Manager - Entry-point Argon service providing intelligent task routing

=head1 VERSION

version 0.18

=head1 SYNOPSIS

  use Argon::Manager;
  use AnyEvent;

  my $cv = AnyEvent->condvar;

  my $mgr = Argon::Manager->new(
    host    => 'localhost',
    port    => 8000,
    keyfile => 'path/to/secret',
  );

  $cv->recv;

=head1 DESCRIPTION

The entry point service with which workers are registered, to which clients
connect, and which provides task queueing services for the system.

For most use cases, this class need not be access directly; instead,
L<bin/ar-manager> provides a command-line interface to control the manager
process.

=head1 PUBLIC INTERFACE

Most relevant attributes and methods are documented in L<Argon::Server>.

=head1 AUTHOR

Jeff Ober <sysread@fastmail.fm>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2017 by Jeff Ober.



( run in 1.819 second using v1.01-cache-2.11-cpan-ceb78f64989 )