Argon

 view release on metacpan or  search on metacpan

lib/Argon/Worker.pm  view on Meta::CPAN

package Argon::Worker;
# ABSTRACT: Argon worker node providing capacity to an Argon::Manager
$Argon::Worker::VERSION = '0.18';

use strict;
use warnings;
use Carp;
use Moose;
use AnyEvent;
use AnyEvent::Util qw(fork_call portable_socketpair fh_nonblocking);
use Argon;
use Argon::Constants qw(:commands :defaults);
use Argon::Log;
use Argon::Marshal;
use Argon::Types;
use Argon::Util qw(K param interval);
require Argon::Channel;
require Argon::Client;
require Argon::Message;

with qw(Argon::Encryption);


has capacity => (
  is       => 'ro',
  isa      => 'Int',
  required => 1,
);


has mgr_host => (
  is       => 'ro',
  isa      => 'Str',
  required => 1,
);


has mgr_port => (
  is       => 'ro',
  isa      => 'Int',
  required => 1,
);

has timer => (
  is  => 'rw',
  isa => 'Any',
);

has tries => (
  is      => 'rw',
  isa     => 'Int',
  default => 0,
);

has intvl => (
  is       => 'ro',
  isa      => 'CodeRef',
  default  => sub { interval(1) },
  init_arg => undef,
);

has mgr => (
  is  => 'rw',
  isa => 'Argon::Client',
);

has workers => (
  is  => 'rw',
  isa => 'ArrayRef',
  default => sub {[]},
);

has assigned => (
  is  => 'rw',
  isa => 'HashRef',
  default => sub {{}},
);


sub BUILD {
  my ($self, $args) = @_;
  $AnyEvent::Util::MAX_FORKS = $self->capacity;
  $self->add_worker foreach 1 .. $self->capacity;
}


sub start {
  my $self = shift;
  $self->connect;
}


sub connect {
  my $self = shift;
  $self->timer(undef);
  $self->tries($self->tries + 1);

  log_trace 'Connecting to manager (attempt %d)', $self->tries;

  $self->mgr(Argon::Client->new(
    key    => $self->key,
    token  => $self->token,
    host   => $self->mgr_host,
    port   => $self->mgr_port,
    ready  => K('register', $self),
    closed => K('_disconnected', $self),
    notify => K('_queue', $self),
  ));

  $self->intvl->(1); # reset
}

sub _disconnected {
  my $self = shift;
  log_debug 'Manager disconnected' unless $self->timer;
  $self->reconnect;
}

sub reconnect {
  my $self = shift;
  my $intvl = $self->intvl->();
  $self->timer(AnyEvent->timer(after => $intvl, cb => K('connect', $self)));
  log_debug 'Reconection attempt in %0.4fs', $intvl;
}

sub register {
  my $self = shift;
  log_note 'Connected to manager';
  log_trace 'Registering with manager';

  my $msg = Argon::Message->new(
    cmd  => $HIRE,
    info => {capacity => $self->capacity},
  );

  $self->mgr->send($msg);
  $self->mgr->reply_cb($msg, K('_mgr_registered', $self));
}

sub _mgr_registered {
  my ($self, $msg) = @_;
  if ($msg->failed) {
    log_error 'Failed to register with manager: %s', $msg->info;
  }
  else {
    log_info 'Accepting tasks';
    log_note 'Direct code execution is permitted'
      if $Argon::ALLOW_EVAL;
  }
}

sub _queue {
  my ($self, $msg) = @_;
  if (my $worker = shift @{$self->{workers}}) {
    my ($id, $chan) = @$worker;
    $chan->send($msg);
    $self->{assigned}{$id} = $worker;
  } else {
    log_debug 'No available capacity';
    $self->mgr->send($msg->reply(cmd => $DENY, info => "No available capacity. Please try again later."));
  }
}

sub _result {
  my ($self, $id, $reply) = @_;
  push @{$self->{workers}}, delete $self->{assigned}{$id};
  $self->mgr->send($reply);
}

sub _worker_closed {
  my ($self, $id) = @_;
  delete $self->assigned->{$id};
  $self->{workers} = [ grep { $_->[0] ne $id } @{$self->{workers}} ];
  $self->add_worker;
}

sub add_worker {
  my $self = shift;
  my $id = $self->create_token;
  my $on_close = K('_worker_closed', $self, $id);

  my ($left, $right) = portable_socketpair;



( run in 0.597 second using v1.01-cache-2.11-cpan-39bf76dae61 )