PDL-Parallel-MPI
view release on metacpan or search on metacpan
send_test(double d, int dest)
PREINIT:
int retval;
int tag = 0;
int flag;
CODE:
MPIpm_errhandler("MPI_Initialized", MPI_Initialized(&flag));
if (! flag) { croak("send_test: MPI not initialized !\n"); }
else { fprintf(stdout, "intalized ok from inside MPI.xs -- send_test\n"); }
retval = MPI_Send(&d,1,MPI_DOUBLE,dest,tag,MPI_COMM_WORLD);
MPIpm_errhandler("send_test",retval);
double
receive_test(int source)
PREINIT:
double d;
int retval;
int tag = 0;
int flag;
CODE:
MPIpm_errhandler("MPI_Initialized", MPI_Initialized(&flag));
if (! flag) { croak("receive_test: MPI not initialized !\n"); }
else { fprintf(stdout, "intalized ok from inside MPI.xs -- receive_test\n"); }
retval =
MPI_Recv(&d,1, MPI_DOUBLE,source,tag,MPI_COMM_WORLD,&global_status);
MPIpm_errhandler("receive_test",retval);
RETVAL = d;
OUTPUT:
RETVAL
void
xs_send(SV *sv,int dest,int tag=0,MPI_Comm comm=MPI_COMM_WORLD)
PREINIT:
pdl *piddle;
double * the_data;
int retval;
CODE:
piddle = (pdl *) SvIV(sv);
PDL_CHKMAGIC(piddle);
the_data = (double *) piddle->data;
retval = MPI_Send(the_data,piddle->nvals, pdl_mpi_typemap(piddle->datatype),dest,tag,comm);
MPIpm_errhandler("&PDL::Parallel::MPI::send",retval);
void
xs_receive(SV *sv,int source,int tag=0,MPI_Comm comm=MPI_COMM_WORLD)
PREINIT:
pdl *piddle;
double * the_data;
int retval;
CODE:
piddle = (pdl *) SvIV(sv);
PDL_CHKMAGIC(piddle);
the_data = (double *) piddle->data;
retval =
MPI_Recv(the_data,piddle->nvals, pdl_mpi_typemap(piddle->datatype),
source,tag,comm,&global_status);
MPIpm_errhandler("&PDL::Parallel::MPI::receive",retval);
void
get_status_list()
PPCODE:
/* return the status as a 4 element array:
* (count,MPI_SOURCE,MPI_TAG,MPI_ERROR) */
XPUSHs(sv_2mortal(newSViv(global_status.count)));
XPUSHs(sv_2mortal(newSViv(global_status.MPI_SOURCE)));
XPUSHs(sv_2mortal(newSViv(global_status.MPI_TAG)));
XPUSHs(sv_2mortal(newSViv(global_status.MPI_ERROR)));
void
xs_broadcast(SV * sv,int root=0, MPI_Comm comm = MPI_COMM_WORLD)
PREINIT:
pdl *piddle;
double * the_data;
int retval;
CODE:
piddle = (pdl *) SvIV(sv);
PDL_CHKMAGIC(piddle);
the_data = (double *) piddle->data;
retval =
MPI_Bcast(the_data,piddle->nvals, pdl_mpi_typemap(piddle->datatype),root,comm);
MPIpm_errhandler("&PDL::Parallel::MPI::broadcast",retval);
void
xs_scatter(SV * source_sv, SV * dest_sv, int root=0, MPI_Comm comm = MPI_COMM_WORLD)
PREINIT:
int rank;
pdl * source_piddle;
pdl * dest_piddle;
int err_code;
void * source_data;
MPI_Datatype sendtype;
CODE:
MPIpm_errhandler("MPI_Comm_rank", MPI_Comm_rank(comm,&rank));
dest_piddle = (pdl *) SvIV(dest_sv);
PDL_CHKMAGIC(dest_piddle);
if (rank == root) {
source_piddle=(pdl *) SvIV(source_sv);
PDL_CHKMAGIC(source_piddle);
source_data = source_piddle->data;
sendtype=pdl_mpi_typemap(source_piddle->datatype);
} else {
/* MPI documentation says sendtype is signifigant only at
* root. MPI documentation lies. This is a silly hack
* to get around the problem. email me if you know of a better
* work around.
*/
sendtype= pdl_mpi_typemap(dest_piddle->datatype);
}
err_code = MPI_Scatter(
source_data, dest_piddle->nvals, sendtype,
dest_piddle->data, dest_piddle->nvals, pdl_mpi_typemap(dest_piddle->datatype),
root, comm);
MPIpm_errhandler("scatter-MPI_Scatter", err_code);
void
xs_gather(SV * source_sv, SV * dest_sv, int root=0, MPI_Comm comm = MPI_COMM_WORLD)
PREINIT:
int rank;
AV *stuff = (AV*) SvRV(ref);
if(count > (av_len(stuff)+1)) {
printf("MPI_Send: count param is larger than given array. Using "
"array length.\n");
count = av_len(stuff)+1;
}
#ifdef SEND_DEBUG
printf("[%d] av_len=%d\n",getpid(),av_len(stuff));
for (i=0 ; i <= av_len(stuff) ; i++) {
SV **sv_tmp = av_fetch(stuff,i,0);
if (sv_tmp == NULL)
printf("[%d] $stuff[%d]=undef\n",getpid(),i);
else {
printf("[%d] $stuff[%d]=%s\n",getpid(),i,SvPV(*sv_tmp, PL_na));
}
}
#endif /* SEND_DEBUG */
len = MPIpm_packarray(&buf, stuff, datatype, count);
#ifdef SEND_DEBUG
printf("[%d] len=%d\n[%d] ", getpid(), len, getpid());
for(i=0;i<len;i++) {
printf("%02x ", (unsigned char)((char*)buf)[i]);
if((i!=0) && (i%16) == 0) printf("\n[%d] ", getpid());
}
printf("\n");
#endif /* SEND_DEBUG */
if(datatype == MPI_STRING)
MPIpm_errhandler("MPI_Send",
MPI_Send(&len, 1, MPI_INT, dest, tag, comm));
MPIpm_errhandler("MPI_Send",
MPI_Send(buf, len, MPI_CHAR, dest, tag, comm));
} else {
count = 1;
if (datatype == MPI_CHAR)
count = (SvCUR(SvRV(ref)) + 1) * sizeof(char);
buf = (void*) malloc(MPIpm_bufsize(datatype,SvRV(ref),count));
MPIpm_packscalar(buf,SvRV(ref),datatype);
ret = MPI_Send(buf,count,datatype,dest,tag,comm);
free(buf);
MPIpm_errhandler("MPI_Send",ret);
}
void
MPI_Recv(ref, count, datatype, source, tag, comm)
SV * ref
int count
MPI_Datatype datatype
int source
int tag
MPI_Comm comm
PREINIT:
void* buf;
int ret;
MPI_Status status;
#ifdef SEND_DEBUG
int i;
#endif
PPCODE:
if (! SvROK(ref))
croak("MPI_Recv: First argument is not a reference!");
if (SvTYPE(SvRV(ref)) == SVt_PVHV) {
croak("MPI_Recv: Hashes are not supported yet.");
} else if (SvTYPE(SvRV(ref)) == SVt_PVAV) {
int len;
AV *stuff = (AV*) SvRV(ref);
switch(datatype) {
case MPI_STRING:
MPI_Recv(&len, 1, MPI_INT, source, tag, comm, &status);
break;
case MPI_INT:
len = count * sizeof(int);
break;
#ifndef FLOAT_HACK
case MPI_FLOAT:
len = count * sizeof(float);
break;
#endif
case MPI_DOUBLE:
len = count * sizeof(double);
break;
}
#ifdef SEND_DEBUG
printf("[%d] len=%d\n", getpid(), len);
#endif
buf = (char *) malloc(len);
ret = MPI_Recv(buf, len, MPI_CHAR, source, tag, comm, &status);
#ifdef SEND_DEBUG
printf("[%d] len=%d\n[%d] ", getpid(), len, getpid());
for(i=0;i<len;i++) {
printf("%02x ", (unsigned char)((char*)buf)[i]);
if((i!=0) && (i%16) == 0) printf("\n[%d] ", getpid());
}
printf("\n");
#endif /* SEND_DEBUG */
MPIpm_unpackarray(buf, &stuff, datatype, count);
MPIpm_errhandler("MPI_Recv",ret);
/* return the status as a 4 element array:
* (count,MPI_SOURCE,MPI_TAG,MPI_ERROR) */
XPUSHs(sv_2mortal(newSViv(status.count)));
XPUSHs(sv_2mortal(newSViv(status.MPI_SOURCE)));
XPUSHs(sv_2mortal(newSViv(status.MPI_TAG)));
XPUSHs(sv_2mortal(newSViv(status.MPI_ERROR)));
} else {
buf = (void*) malloc(MPIpm_bufsize(datatype,NULL,count));
ret = MPI_Recv(buf,count,datatype,source,tag,comm,&status);
MPIpm_unpackscalar(buf,SvRV(ref),datatype);
free(buf);
MPIpm_errhandler("MPI_Recv",ret);
/* return the status as a 4 element array:
* (count,MPI_SOURCE,MPI_TAG,MPI_ERROR) */
XPUSHs(sv_2mortal(newSViv(status.count)));
XPUSHs(sv_2mortal(newSViv(status.MPI_SOURCE)));
XPUSHs(sv_2mortal(newSViv(status.MPI_TAG)));
XPUSHs(sv_2mortal(newSViv(status.MPI_ERROR)));
MPIpm_unpackscalar(recvbuf,SvRV(recvref),recvtype);
#ifdef WHY_DOES_THIS_MAKE_IT_SEGFAULT
free(sendbuf);
free(recvbuf);
#endif
MPIpm_errhandler("MPI_Scatter",ret);
}
int
MPI_Gather(sendref, sendcnt, sendtype, recvref, recvcnt, recvtype, root, comm)
SV * sendref
int sendcnt
MPI_Datatype sendtype
SV * recvref
int recvcnt
MPI_Datatype recvtype
int root
MPI_Comm comm
PREINIT:
void* sendbuf, *recvbuf;
int ret;
CODE:
if (! SvROK(sendref) || ! SvROK(recvref))
croak("MPI_Gather: First and Fourth arguments must be references!");
if (SvTYPE(SvRV(sendref)) == SVt_PVAV ||
SvTYPE(SvRV(recvref)) == SVt_PVAV)
{
croak("MPI_Gather: Arrays are not implemented yet.");
} else {
sendbuf = (void*)malloc(MPIpm_bufsize(sendtype,SvRV(sendref),sendcnt));
recvbuf = (void*)malloc(MPIpm_bufsize(recvtype,SvRV(recvref),recvcnt));
MPIpm_packscalar(sendbuf,SvRV(sendref),sendtype);
ret = MPI_Gather(sendbuf,sendcnt,sendtype,recvbuf,recvcnt,recvtype,root,comm);
MPIpm_unpackscalar(recvbuf,SvRV(recvref),recvtype);
free(sendbuf);
free(recvbuf);
MPIpm_errhandler("MPI_Gather",ret);
}
int
MPI_Sendrecv(sendref, sendcount, sendtype, dest, sendtag, recvref, recvcount, recvtype, source, recvtag, comm)
SV * sendref
int sendcount
MPI_Datatype sendtype
int dest
int sendtag
SV * recvref
int recvcount
MPI_Datatype recvtype
int source
int recvtag
MPI_Comm comm
PREINIT:
void* sendbuf, *recvbuf;
int ret;
MPI_Status status;
PPCODE:
if (! SvROK(sendref) || ! SvROK(recvref))
croak("MPI_Sendrecv: First and Fourth arguments must be references!");
if (SvTYPE(SvRV(sendref)) == SVt_PVAV &&
SvTYPE(SvRV(recvref)) == SVt_PVAV)
{
AV* array = (AV*) SvRV(recvref);
int len;
recvbuf = malloc(MPIpm_bufsize(recvtype, SvRV(recvref), recvcount));
len = MPIpm_packarray(&sendbuf, (AV*)SvRV(sendref), sendtype, sendcount);
ret = MPI_Sendrecv(sendbuf, sendcount, sendtype, dest, sendtag,
recvbuf, recvcount, recvtype, source, recvtag,
comm, &status);
MPIpm_unpackarray(recvbuf,&array,recvtype,recvcount);
MPIpm_errhandler("MPI_Sendrecv",ret);
} else {
sendbuf = (void*)malloc(MPIpm_bufsize(sendtype,SvRV(sendref),sendcount));
recvbuf = (void*)malloc(MPIpm_bufsize(recvtype,SvRV(recvref),recvcount));
MPIpm_packscalar(sendbuf,SvRV(sendref),sendtype);
MPIpm_packscalar(recvbuf,SvRV(recvref),recvtype);
ret = MPI_Sendrecv(sendbuf, sendcount, sendtype, dest,
sendtag, recvbuf, recvcount, recvtype,
source, recvtag, comm, &status);
MPIpm_unpackscalar(recvbuf,SvRV(recvref),recvtype);
free(sendbuf);
free(recvbuf);
MPIpm_errhandler("MPI_Sendrecv",ret);
}
/* return the status as a 4 element array:
* (count,MPI_SOURCE,MPI_TAG,MPI_ERROR) */
XPUSHs(sv_2mortal(newSViv(status.count)));
XPUSHs(sv_2mortal(newSViv(status.MPI_SOURCE)));
XPUSHs(sv_2mortal(newSViv(status.MPI_TAG)));
XPUSHs(sv_2mortal(newSViv(status.MPI_ERROR)));
( run in 0.561 second using v1.01-cache-2.11-cpan-5511b514fd6 )