Argon
view release on metacpan or search on metacpan
lib/Argon/Manager.pm view on Meta::CPAN
if ($self->persist && $self->store->exists) {
log_trace 'Loading message queue from saved copy';
my $saved = $self->decode($self->load_file);
my $queue = $saved->{queue};
$queue->max($self->capacity);
return $queue;
}
else {
return Argon::Queue->new;
}
}
has persist => (
is => 'ro',
isa => 'Maybe[Str]',
);
has store => (
is => 'ro',
isa => 'Maybe[Path::Tiny]',
lazy => 1,
builder => '_build_store',
init_arg => undef,
handles => {
save_file => 'spew_raw',
load_file => 'slurp_raw',
},
);
sub _build_store { path(shift->persist) }
after configure => sub {
my $self = shift;
$self->handles($HIRE, K('_hire', $self));
$self->handles($QUEUE, K('_queue', $self));
};
sub DEMOLISH {
my ($self, $global) = @_;
$self->save_queue unless $global;
}
sub save_queue {
my $self = shift;
return unless $self->persist;
log_trace 'Saving copy of message queue';
my $saved = {
queue => $self->queue,
};
my $data = encode($saved);
$self->save_file($data);
}
sub capacity { $_[0]->tracker->{self}->capacity }
sub has_capacity { $_[0]->tracker->{self}->has_capacity }
sub update_queue_capacity {
my $self = shift;
$self->queue->max($self->capacity * 3);
}
sub next_worker {
my $self = shift;
my @workers =
sort { $self->tracker->{$a}->load <=> $self->tracker->{$b}->load }
grep { $self->tracker->{$_}->has_capacity }
$self->worker_ids;
shift @workers;
}
sub assign {
my ($self, $id, $msg) = @_;
$self->get_worker($id)->send($msg);
$self->tracker->{$id}->start($msg);
$self->tracker->{self}->start($msg);
$self->assigned->{$msg->id} = $id;
log_trace 'worker %s assigned %s', $id, $msg->explain;
}
sub process_queue {
my $self = shift;
while ($self->has_capacity && $self->has_messages) {
$self->assign($self->next_worker, $self->next_message);
}
$self->save_queue;
}
sub _queue {
my ($self, $addr, $msg) = @_;
if ($self->queue->is_full) {
$self->send($msg->reply(cmd => $DENY, info => "No available capacity. Please try again later."));
}
else {
$self->queue_message($msg);
$self->process_queue;
}
}
sub _collect {
my ($self, $msg) = @_;
my $id = delete $self->assigned->{$msg->id};
$self->tracker->{$id}->finish($msg);
$self->tracker->{self}->finish($msg);
$self->send($msg);
$self->process_queue;
}
sub _hire {
my ($self, $addr, $msg) = @_;
$self->send($msg->reply(cmd => $ACK));
my $id = $msg->token || croak 'Missing token: ' . $msg->explain;
my $cap = $msg->info->{capacity};
( run in 2.138 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )