Parallel-Pvm-Scheduler

 view release on metacpan or  search on metacpan

Scheduler.pm  view on Meta::CPAN

  {
        $prm->submit("user-defined task description", $args[$i]);
  }

  # Wait for the tasks to complete
  $prm->recaptureHosts(1);


=head1 DESCRIPTION

Parallel-Pvm-Scheduler is a module designed to allow one to distribute a large
number of independent jobs across a cluster using PVM.  It first queries the number of
available machines in the cluster, then submits exactly 1 jobs per available machine.
When a job is finished, it recaptures the machine and submits the next job. It repeats
this until all the jobs are complete.

If all the available machines are used, it will wait until 1 becomes free before submitting
the next job, hence it will sleep until a task is complete.

=head2 EXPORT

None by default.

=head1 SEE ALSO

Parallel::Pvm perl module

=head1 AUTHOR

Ryan Golhar, E<lt>golharam@umdnj.edu<gt>

=head1 COPYRIGHT AND LICENSE

Copyright 2006 by Ryan Golhar

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself. 

=cut

my $MSGTAG = 1;
my $MSGTASKEXIT = 999;

=head2 new

 Title   : new
 Usage   : my $prm = new Parallel::Pvm::Scheduler();
 Function: Creates and initialized a PVM resource manager
 Returns : Parallel::Pvm::Scheduler

=cut

sub new {
	my ($class, @args) = @_;
	my ($info,@CONF) = Parallel::Pvm::config;
	my $FREEHOSTS = scalar(@CONF);
	my %HOSTS;
	my @TID;
	my %TIDcmd;
	
	# Map host id to hostname and set busy flag to zero
	foreach my $node (@CONF) {
		my $hostid = $node->{'hi_tid'};

		$HOSTS{$hostid}{'name'} = $node->{'hi_name'};
		$HOSTS{$hostid}{'busy'} = 0;
	}
	
	my $self = {CONF => \@CONF, FREEHOSTS => $FREEHOSTS, HOSTS => \%HOSTS, TID => \@TID, TIDCMD => \%TIDcmd};
	bless $self;
	return $self;
}

sub DESTROY {
	Parallel::Pvm::exit;
}

=head2 getHostCount

 Title   : getHostCount
 Usage   : $hostcount = $prm->getHostCount();
 Function: Returns the number of hosts within the PVM
 Returns : integer

=cut

sub getHostCount
{
	my ($self) = @_;
	my $CONF = $self->{CONF};
	
	return scalar(@$CONF);
}

=head2 getFreeHostCount

 Title   : getFreeHostCount
 Usage   : $freehostcount = $prm->getFreeHostCount();
 Function: Returns the number of free hosts within the PVM
 Returns : integer

=cut

sub getFreeHostCount
{
	my ($self) = @_;
	my $FREEHOSTS = $self->{FREEHOSTS};

	return $FREEHOSTS;
}

=head2 submit

 Title   : submit
 Usage   : $prm->getFreeHostCount($taskdesc, @argv);
 Function: Submits a task contained with an argument vector
 Args    : $taskdesc is a taskdescription.  It can be anything set by the user
 	   @argv is an argument vector.  The first element is the program to execute
	   the remaining elements are the parameters to the program.  
 Notes   : To execute multiple programs in serial for a single task, make a perl script
 	   and submit the perl script as the task.
 
=cut

sub submit {
	my ($self, $text, @pvmargv) = @_;

Scheduler.pm  view on Meta::CPAN


	my $HOSTS_ref = $self->{HOSTS};
	my $TID_ref = $self->{TID};
	my $TIDcmd_ref = $self->{TIDCMD};
	my $CONF_ref = $self->{CONF};
	
	$block = 0 if (!defined($block));
		
	# Iterate through the taskid and check to see if its done
	# If its done, process and remove it
	my $arrayindex = 0;
	my $childtid;
	
	while ($arrayindex < scalar(@$TID_ref)) {
		$childtid = @$TID_ref[$arrayindex];
		
		if ($childtid < 0) {
			print STDERR "Error with PVM Task: $childtid\n";
		} else {
			my $bufid = Parallel::Pvm::probe($childtid, $MSGTAG);
			if ($bufid < 0) {
				print STDERR "There was an error probing on $childtid: $bufid!\n";
			} elsif ($bufid > 0) {
				my $hostid = Parallel::Pvm::tidtohost($childtid);
				print STDERR "Recieved message from $childtid on ".$HOSTS_ref->{$hostid}{'name'}."\n";			
			
				$bufid = Parallel::Pvm::recv($childtid, $MSGTAG);
				my $output = Parallel::Pvm::unpack;
				print "($childtid) ", $TIDcmd_ref->{$childtid}, "\n";
				print "$output\n" if (defined($output));

				_deallocateHost($self, $hostid);
				
				# remove this child taskid
				splice(@$TID_ref, $arrayindex, 1);
				delete($TIDcmd_ref->{$childtid});
				
				# we don't want to increment the arrayindex or else 
				# we'll skip over the next task id.
				next;	
			} else {
				# $bufid == 0
				# wait until this tasks completes
				if ($block == 1) {
					sleep(3);
					next;
				}
			}
		}
		$arrayindex++;
	}
	
	if ($block == 1) {
		if ($self->{FREEHOSTS} != scalar(@$CONF_ref)) {
			print STDERR "There was a problem reconciling freehosts with pvm configuration!!!\n";
			
			foreach my $node (@$CONF_ref) {
				my $hostid = $node->{'hi_tid'};
				my $hostname = $node->{'hi_name'};

				if ($HOSTS_ref->{$hostid}{'busy'} != 0) {
					print STDERR "$hostname is not free\n";
				}
			}
		}
	}
}

=head2 _allocateHost

 Title   : _allocateHost
 Function: Internal Function: Used to allocate which host runs the next task
=cut

sub _allocateHost {
	my ($self) = @_;
	my $HOSTS_ref = $self->{HOSTS};
	
	# Locate a free host, allocate it, return it.
	# First available algorithm
	foreach my $hostid (keys %$HOSTS_ref) {
	
		if ($HOSTS_ref->{$hostid}{'busy'} == 0) {
			my $hostname = $HOSTS_ref->{$hostid}{'name'};
			
			print STDERR "Allocating host $hostname\n";
			
			$HOSTS_ref->{$hostid}{'busy'} = 1;
			$self->{FREEHOSTS}--;			
		
			return $hostname;
		}
	}
	
	# if we get here, no hosts were free.
	die "Unable to locate free host!\n";
}

=head2 _deallocateHost

 Title   : _deallocateHost
 Function: Internal Function: Frees a host making it available for another task
=cut

sub _deallocateHost {
	my ($self, $hostid) = @_;
	my $HOSTS_ref = $self->{HOSTS};

	if ($HOSTS_ref->{$hostid}{'busy'} == 0) {
		die "Host not allocated: ". $HOSTS_ref->{$hostid}{'name'} ."!\n";
	}
	
	print STDERR "Deallocating host ". $HOSTS_ref->{$hostid}{'name'} ."\n";
	$HOSTS_ref->{$hostid}{'busy'} = 0;
	$self->{FREEHOSTS}++;
}

1;
__END__



( run in 1.252 second using v1.01-cache-2.11-cpan-39bf76dae61 )