Argon
view release on metacpan or search on metacpan
lib/Argon/Manager.pm view on Meta::CPAN
sub process_queue {
my $self = shift;
while ($self->has_capacity && $self->has_messages) {
$self->assign($self->next_worker, $self->next_message);
}
$self->save_queue;
}
sub _queue {
my ($self, $addr, $msg) = @_;
if ($self->queue->is_full) {
$self->send($msg->reply(cmd => $DENY, info => "No available capacity. Please try again later."));
}
else {
$self->queue_message($msg);
$self->process_queue;
}
}
sub _collect {
my ($self, $msg) = @_;
my $id = delete $self->assigned->{$msg->id};
$self->tracker->{$id}->finish($msg);
$self->tracker->{self}->finish($msg);
$self->send($msg);
$self->process_queue;
}
sub _hire {
my ($self, $addr, $msg) = @_;
$self->send($msg->reply(cmd => $ACK));
my $id = $msg->token || croak 'Missing token: ' . $msg->explain;
my $cap = $msg->info->{capacity};
my $worker = $self->client->{$addr};
$worker->on_msg(K('_collect', $self));
$worker->on_close(K('_fire', $self, $id, $cap));
$self->add_worker($id, $worker);
$self->tracker->{$id} = Argon::Tracker->new(capacity => $cap);
$self->tracker->{self}->add_capacity($cap);
$self->update_queue_capacity;
log_info 'New worker with identity %s added %d capacity (%d total)',
$id, $cap, $self->capacity;
}
sub _fire {
my ($self, $worker, $capacity) = @_;
$self->tracker->{self}->remove_capacity($capacity);
$self->del_worker($worker);
delete $self->tracker->{$worker};
$self->update_queue_capacity;
my @msgids = grep { $self->assigned->{$_} eq $worker }
keys %{$self->assigned};
if (@msgids) {
my $msg = 'The worker assigned to this task disconnected before completion.';
$self->send(Argon::Message->error($msg, id => $_))
foreach @msgids;
}
log_info 'Worker %s disconnected; capacity is down to %d',
$worker,
$self->capacity;
}
__PACKAGE__->meta->make_immutable;
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Argon::Manager - Entry-point Argon service providing intelligent task routing
=head1 VERSION
version 0.18
=head1 SYNOPSIS
use Argon::Manager;
use AnyEvent;
my $cv = AnyEvent->condvar;
my $mgr = Argon::Manager->new(
host => 'localhost',
port => 8000,
keyfile => 'path/to/secret',
);
$cv->recv;
=head1 DESCRIPTION
The entry point service with which workers are registered, to which clients
connect, and which provides task queueing services for the system.
For most use cases, this class need not be access directly; instead,
L<bin/ar-manager> provides a command-line interface to control the manager
process.
=head1 PUBLIC INTERFACE
Most relevant attributes and methods are documented in L<Argon::Server>.
=head1 AUTHOR
Jeff Ober <sysread@fastmail.fm>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2017 by Jeff Ober.
( run in 1.819 second using v1.01-cache-2.11-cpan-ceb78f64989 )