PDL-Parallel-MPI

 view release on metacpan or  search on metacpan

MPI.xs  view on Meta::CPAN

send_test(double d, int dest)
	PREINIT:
		int retval;
		int tag = 0;
		int flag;
	CODE:
        MPIpm_errhandler("MPI_Initialized",  MPI_Initialized(&flag));
		if (! flag) { croak("send_test: MPI not initialized !\n"); }
		else { fprintf(stdout, "intalized ok from inside MPI.xs -- send_test\n"); }
		retval = MPI_Send(&d,1,MPI_DOUBLE,dest,tag,MPI_COMM_WORLD);
		MPIpm_errhandler("send_test",retval);

double
receive_test(int source)
	PREINIT:
		double d;
		int retval;
		int tag = 0;
		int flag;
	CODE:
        MPIpm_errhandler("MPI_Initialized",  MPI_Initialized(&flag));
		if (! flag) { croak("receive_test: MPI not initialized !\n"); }
		else { fprintf(stdout, "intalized ok from inside MPI.xs -- receive_test\n"); }
		retval = 
		MPI_Recv(&d,1, MPI_DOUBLE,source,tag,MPI_COMM_WORLD,&global_status);
		MPIpm_errhandler("receive_test",retval);
		RETVAL = d;
	OUTPUT:
		RETVAL

void
xs_send(SV *sv,int dest,int tag=0,MPI_Comm comm=MPI_COMM_WORLD)
	PREINIT:
		pdl *piddle; 
		double * the_data;
		int retval;
	CODE:
		piddle = (pdl *) SvIV(sv);
		PDL_CHKMAGIC(piddle);
		the_data = (double *) piddle->data;
		retval = MPI_Send(the_data,piddle->nvals, pdl_mpi_typemap(piddle->datatype),dest,tag,comm);
		MPIpm_errhandler("&PDL::Parallel::MPI::send",retval);

void
xs_receive(SV *sv,int source,int tag=0,MPI_Comm comm=MPI_COMM_WORLD)
	PREINIT:
		pdl *piddle; 
		double * the_data;
		int retval;
	CODE:
		piddle = (pdl *) SvIV(sv);
		PDL_CHKMAGIC(piddle);
		the_data = (double *) piddle->data;
		retval = 
		MPI_Recv(the_data,piddle->nvals, pdl_mpi_typemap(piddle->datatype),
				source,tag,comm,&global_status);
		MPIpm_errhandler("&PDL::Parallel::MPI::receive",retval); 

void
get_status_list()
	PPCODE:
	    /* return the status as a 4 element array:
	     * (count,MPI_SOURCE,MPI_TAG,MPI_ERROR) */
	    XPUSHs(sv_2mortal(newSViv(global_status.count)));
	    XPUSHs(sv_2mortal(newSViv(global_status.MPI_SOURCE)));
	    XPUSHs(sv_2mortal(newSViv(global_status.MPI_TAG)));
	    XPUSHs(sv_2mortal(newSViv(global_status.MPI_ERROR)));

void
xs_broadcast(SV * sv,int root=0, MPI_Comm comm = MPI_COMM_WORLD)
	PREINIT:
		pdl *piddle; 
		double * the_data;
		int retval;
	CODE:
		piddle = (pdl *) SvIV(sv);
		PDL_CHKMAGIC(piddle);
		the_data = (double *) piddle->data;
		retval = 
		MPI_Bcast(the_data,piddle->nvals, pdl_mpi_typemap(piddle->datatype),root,comm);
		MPIpm_errhandler("&PDL::Parallel::MPI::broadcast",retval); 

void 
xs_scatter(SV * source_sv, SV * dest_sv, int root=0, MPI_Comm comm = MPI_COMM_WORLD)
	PREINIT:
		int rank;
		pdl * source_piddle; 
		pdl * dest_piddle;
		int err_code;
		void * source_data;
		MPI_Datatype sendtype;
	CODE:
        MPIpm_errhandler("MPI_Comm_rank", MPI_Comm_rank(comm,&rank));

		dest_piddle = (pdl *) SvIV(dest_sv);
		PDL_CHKMAGIC(dest_piddle);

		if (rank == root) {
			source_piddle=(pdl *) SvIV(source_sv);
			PDL_CHKMAGIC(source_piddle);
			source_data = source_piddle->data;
			sendtype=pdl_mpi_typemap(source_piddle->datatype);
		} else {
			/* MPI documentation says sendtype is signifigant only at
			 * root.  MPI documentation lies.  This is a silly hack
			 * to get around the problem.  email me if you know of a better
			 * work around.
			 */
			sendtype= pdl_mpi_typemap(dest_piddle->datatype);
		}

		err_code = MPI_Scatter(
			 source_data, dest_piddle->nvals, sendtype,
			 dest_piddle->data,   dest_piddle->nvals, pdl_mpi_typemap(dest_piddle->datatype),
				root, comm);
		MPIpm_errhandler("scatter-MPI_Scatter", err_code);

void 
xs_gather(SV * source_sv, SV * dest_sv, int root=0, MPI_Comm comm = MPI_COMM_WORLD)
	PREINIT:
		int rank;

MPI.xs  view on Meta::CPAN

	AV *stuff = (AV*) SvRV(ref);
        if(count > (av_len(stuff)+1)) {
            printf("MPI_Send: count param is larger than given array.  Using "
                 "array length.\n");
            count = av_len(stuff)+1;
        }
#ifdef SEND_DEBUG
	printf("[%d] av_len=%d\n",getpid(),av_len(stuff));
 	for (i=0 ; i <= av_len(stuff) ; i++) {
	    SV **sv_tmp = av_fetch(stuff,i,0);
	    if (sv_tmp == NULL)
		printf("[%d] $stuff[%d]=undef\n",getpid(),i);
	    else {
		printf("[%d] $stuff[%d]=%s\n",getpid(),i,SvPV(*sv_tmp, PL_na));
	    }
	}
#endif /* SEND_DEBUG */
        len = MPIpm_packarray(&buf, stuff, datatype, count);
#ifdef SEND_DEBUG
        printf("[%d] len=%d\n[%d] ", getpid(), len, getpid());
	for(i=0;i<len;i++) {
	    printf("%02x ", (unsigned char)((char*)buf)[i]);
	    if((i!=0) && (i%16) == 0) printf("\n[%d] ", getpid());
	}
	printf("\n");
#endif /* SEND_DEBUG */
        if(datatype == MPI_STRING)
	    MPIpm_errhandler("MPI_Send",
		             MPI_Send(&len, 1, MPI_INT, dest, tag, comm));
	MPIpm_errhandler("MPI_Send",
	                 MPI_Send(buf, len, MPI_CHAR, dest, tag, comm));
    } else {
	count = 1;
        if (datatype == MPI_CHAR)
	    count = (SvCUR(SvRV(ref)) + 1) * sizeof(char);
	buf = (void*) malloc(MPIpm_bufsize(datatype,SvRV(ref),count));
	MPIpm_packscalar(buf,SvRV(ref),datatype);

	ret = MPI_Send(buf,count,datatype,dest,tag,comm);

	free(buf);
	MPIpm_errhandler("MPI_Send",ret);
    }


void
MPI_Recv(ref, count, datatype, source, tag, comm)
	SV *	ref
	int     count
	MPI_Datatype	datatype
	int	source
	int	tag
	MPI_Comm comm
      PREINIT:
        void* buf;
        int ret;
	MPI_Status status;
#ifdef SEND_DEBUG
        int i;
#endif
      PPCODE:
	if (! SvROK(ref)) 
            croak("MPI_Recv: First argument is not a reference!");

	if (SvTYPE(SvRV(ref)) == SVt_PVHV) {
            croak("MPI_Recv: Hashes are not supported yet.");
	} else if (SvTYPE(SvRV(ref)) == SVt_PVAV) {
	    int len;
	    AV *stuff = (AV*) SvRV(ref);
            switch(datatype) {
	      case MPI_STRING:
	        MPI_Recv(&len, 1, MPI_INT, source, tag, comm, &status);
		break;
	      case MPI_INT:
                len = count * sizeof(int);
                break;
#ifndef FLOAT_HACK
	      case MPI_FLOAT:
                len = count * sizeof(float);
                break;
#endif
	      case MPI_DOUBLE:
                len = count * sizeof(double);
                break;
            }
#ifdef SEND_DEBUG
	    printf("[%d] len=%d\n", getpid(), len);
#endif
            buf = (char *) malloc(len);
            ret = MPI_Recv(buf, len, MPI_CHAR, source, tag, comm, &status);
#ifdef SEND_DEBUG
	    printf("[%d] len=%d\n[%d] ", getpid(), len, getpid());
	    for(i=0;i<len;i++) {
		printf("%02x ", (unsigned char)((char*)buf)[i]);
		if((i!=0) && (i%16) == 0) printf("\n[%d] ", getpid());
	    }
	    printf("\n");
#endif /* SEND_DEBUG */
            MPIpm_unpackarray(buf, &stuff, datatype, count);
	    MPIpm_errhandler("MPI_Recv",ret);
	    /* return the status as a 4 element array:
	     * (count,MPI_SOURCE,MPI_TAG,MPI_ERROR) */
	    XPUSHs(sv_2mortal(newSViv(status.count)));
	    XPUSHs(sv_2mortal(newSViv(status.MPI_SOURCE)));
	    XPUSHs(sv_2mortal(newSViv(status.MPI_TAG)));
	    XPUSHs(sv_2mortal(newSViv(status.MPI_ERROR)));
	} else {
	  buf = (void*) malloc(MPIpm_bufsize(datatype,NULL,count));

	  ret = MPI_Recv(buf,count,datatype,source,tag,comm,&status);

	  MPIpm_unpackscalar(buf,SvRV(ref),datatype);
	  free(buf);
	  MPIpm_errhandler("MPI_Recv",ret);

	  /* return the status as a 4 element array:
	   * (count,MPI_SOURCE,MPI_TAG,MPI_ERROR) */
	  XPUSHs(sv_2mortal(newSViv(status.count)));
	  XPUSHs(sv_2mortal(newSViv(status.MPI_SOURCE)));
	  XPUSHs(sv_2mortal(newSViv(status.MPI_TAG)));
	  XPUSHs(sv_2mortal(newSViv(status.MPI_ERROR)));

MPI.xs  view on Meta::CPAN

	  MPIpm_unpackscalar(recvbuf,SvRV(recvref),recvtype);
#ifdef WHY_DOES_THIS_MAKE_IT_SEGFAULT
	  free(sendbuf);
	  free(recvbuf);
#endif
	  MPIpm_errhandler("MPI_Scatter",ret);
	}


int
MPI_Gather(sendref, sendcnt, sendtype, recvref, recvcnt, recvtype, root, comm)
	SV *    sendref
        int     sendcnt
        MPI_Datatype 	sendtype
	SV *    recvref
        int     recvcnt
        MPI_Datatype 	recvtype
	int	root
	MPI_Comm	comm
      PREINIT:
        void* sendbuf, *recvbuf;
        int ret;
      CODE:     
	if (! SvROK(sendref) || ! SvROK(recvref))
            croak("MPI_Gather: First and Fourth arguments must be references!");

	if (SvTYPE(SvRV(sendref)) == SVt_PVAV ||
            SvTYPE(SvRV(recvref)) == SVt_PVAV)
	{
	    croak("MPI_Gather: Arrays are not implemented yet.");
	} else {
	  sendbuf = (void*)malloc(MPIpm_bufsize(sendtype,SvRV(sendref),sendcnt));
	  recvbuf = (void*)malloc(MPIpm_bufsize(recvtype,SvRV(recvref),recvcnt));
	  MPIpm_packscalar(sendbuf,SvRV(sendref),sendtype);

	  ret = MPI_Gather(sendbuf,sendcnt,sendtype,recvbuf,recvcnt,recvtype,root,comm);

	  MPIpm_unpackscalar(recvbuf,SvRV(recvref),recvtype);
	  free(sendbuf);
	  free(recvbuf);
	  MPIpm_errhandler("MPI_Gather",ret);
	}

int
MPI_Sendrecv(sendref, sendcount, sendtype, dest, sendtag, recvref, recvcount, recvtype, source, recvtag, comm)
	SV *	sendref
	int	sendcount
	MPI_Datatype	sendtype
	int	dest
	int	sendtag
	SV *	recvref
	int	recvcount
	MPI_Datatype	recvtype
	int	source
	int	recvtag
	MPI_Comm	comm
      PREINIT:
        void* sendbuf, *recvbuf;
        int ret;
	MPI_Status status;
      PPCODE:     
	if (! SvROK(sendref) || ! SvROK(recvref))
            croak("MPI_Sendrecv: First and Fourth arguments must be references!");

	if (SvTYPE(SvRV(sendref)) == SVt_PVAV &&
            SvTYPE(SvRV(recvref)) == SVt_PVAV)
	{
            AV* array = (AV*) SvRV(recvref);
	    int len;
	    recvbuf = malloc(MPIpm_bufsize(recvtype, SvRV(recvref), recvcount));
	    len = MPIpm_packarray(&sendbuf, (AV*)SvRV(sendref), sendtype, sendcount);
	    ret = MPI_Sendrecv(sendbuf, sendcount, sendtype, dest, sendtag,
                               recvbuf, recvcount, recvtype, source, recvtag,
                               comm, &status);

	    MPIpm_unpackarray(recvbuf,&array,recvtype,recvcount);
	    MPIpm_errhandler("MPI_Sendrecv",ret);
	} else {
	  sendbuf = (void*)malloc(MPIpm_bufsize(sendtype,SvRV(sendref),sendcount));
	  recvbuf = (void*)malloc(MPIpm_bufsize(recvtype,SvRV(recvref),recvcount));
	  MPIpm_packscalar(sendbuf,SvRV(sendref),sendtype);
          MPIpm_packscalar(recvbuf,SvRV(recvref),recvtype);
	  
	  ret = MPI_Sendrecv(sendbuf, sendcount, sendtype, dest,
                             sendtag, recvbuf, recvcount, recvtype,
			     source, recvtag, comm, &status);

	  MPIpm_unpackscalar(recvbuf,SvRV(recvref),recvtype);
	  free(sendbuf);
	  free(recvbuf);
	  MPIpm_errhandler("MPI_Sendrecv",ret);
	}

	/* return the status as a 4 element array:
	 * (count,MPI_SOURCE,MPI_TAG,MPI_ERROR) */
	XPUSHs(sv_2mortal(newSViv(status.count)));
	XPUSHs(sv_2mortal(newSViv(status.MPI_SOURCE)));
	XPUSHs(sv_2mortal(newSViv(status.MPI_TAG)));
	XPUSHs(sv_2mortal(newSViv(status.MPI_ERROR)));



( run in 0.561 second using v1.01-cache-2.11-cpan-5511b514fd6 )