App-MtAws
view release on metacpan or search on metacpan
lib/App/MtAws/QueueEngine.pm view on Meta::CPAN
sub init { confess "Unimplemented" }
sub queue { confess "Unimplemented" }
sub add_worker
{
my ($self, $worker_id) = @_;
$self->{workers}{$worker_id} = {};
}
sub unqueue_task
{
my ($self, $worker_id) = @_;
my $task_id = delete $self->{workers}{$worker_id}{task};
my $task = delete $self->{tasks}{$task_id} or confess;
push @{ $self->{freeworkers} }, $worker_id;
return $task;
}
sub _next_task_id
{
my ($self) = @_;
my $next_id = ++$self->{task_inc};
$next_id > 0 or confess;
$next_id;
}
sub process
{
my ($self, $job) = @_;
confess "code is not reentrant" if defined $self->{tasks};
$self->{tasks} = {};
@{$self->{freeworkers}} = keys %{$self->{workers}};
while () {
if (@{ $self->{freeworkers} }) {
my $res = $job->next;
if ($res->{code} eq JOB_OK) {
my $task_id = $self->_next_task_id;
my $worker_id = shift @{ $self->{freeworkers} };
my $task = $res->{task};
$task->{_id} = $task_id;
$self->queue($worker_id, $task);
$self->{tasks}{$task_id} = $task;
$self->{workers}{$worker_id}{task} = $task_id;
} elsif ($res->{code} eq JOB_WAIT) {
$self->wait_worker();
} elsif ($res->{code} eq JOB_DONE) {
return $job
} else {
confess;
}
} else {
$self->wait_worker();
}
}
}
sub get_busy_workers_ids
{
my ($self) = @_;
grep { $self->{workers}{$_}{task} } keys %{ $self->{workers}};
}
1;
( run in 0.549 second using v1.01-cache-2.11-cpan-39bf76dae61 )