App-MtAws

 view release on metacpan or  search on metacpan

lib/App/MtAws/QueueEngine.pm  view on Meta::CPAN

sub init { confess "Unimplemented" }
sub queue { confess "Unimplemented" }

sub add_worker
{
	my ($self, $worker_id) = @_;
	$self->{workers}{$worker_id} = {};
}

sub unqueue_task
{
	my ($self, $worker_id) = @_;
	my $task_id = delete $self->{workers}{$worker_id}{task};
	my $task = delete $self->{tasks}{$task_id} or confess;
	push @{ $self->{freeworkers} }, $worker_id;
	return $task;
}

sub _next_task_id
{
	my ($self) = @_;
	my $next_id = ++$self->{task_inc};
	$next_id > 0 or confess;
	$next_id;
}

sub process
{
	my ($self, $job) = @_;
	confess "code is not reentrant" if defined $self->{tasks};
	$self->{tasks} = {};
	@{$self->{freeworkers}} = keys %{$self->{workers}};
	while () {
		if (@{ $self->{freeworkers} }) {
			my $res = $job->next;
			if ($res->{code} eq JOB_OK) {
				my $task_id = $self->_next_task_id;

				my $worker_id = shift @{ $self->{freeworkers} };
				my $task = $res->{task};

				$task->{_id} = $task_id;
				$self->queue($worker_id, $task);

				$self->{tasks}{$task_id} = $task;
				$self->{workers}{$worker_id}{task} = $task_id;

			} elsif ($res->{code} eq JOB_WAIT) {
				$self->wait_worker();
			} elsif ($res->{code} eq JOB_DONE) {
				return $job
			} else {
				confess;
			}
		} else {
			$self->wait_worker();
		}
	}
}

sub get_busy_workers_ids
{
	my ($self) = @_;
	grep { $self->{workers}{$_}{task} } keys %{ $self->{workers}};
}

1;



( run in 0.549 second using v1.01-cache-2.11-cpan-39bf76dae61 )