AMPI: add support for freeing non-predefined MPI_Op's
[charm.git] / src / libs / ck-libs / ampi / ampiimpl.h
bloba9acea39fcd35f59eb453ca61e94085a96f6b32d
1 #ifndef _AMPIIMPL_H
2 #define _AMPIIMPL_H
4 #include <string.h> /* for strlen */
6 #include "ampi.h"
7 #include "ddt.h"
8 #include "charm++.h"
10 using std::vector;
12 //Uncomment for debug print statements
13 #define AMPI_DEBUG(...) //CkPrintf(__VA_ARGS__)
15 #if AMPIMSGLOG
16 #include "ckliststring.h"
17 static CkListString msgLogRanks;
18 static int msgLogWrite;
19 static int msgLogRead;
20 static char *msgLogFilename;
22 #if CMK_PROJECTIONS_USE_ZLIB && 0
23 #include <zlib.h>
24 namespace PUP{
25 class zdisk : public er {
26 protected:
27 gzFile F;//Disk file to read from/write to
28 zdisk(unsigned int type,gzFile f):er(type),F(f) {}
29 zdisk(const zdisk &p); //You don't want to copy
30 void operator=(const zdisk &p); // You don't want to copy
32 //For seeking (pack/unpack in different orders)
33 virtual void impl_startSeek(seekBlock &s); /*Begin a seeking block*/
34 virtual int impl_tell(seekBlock &s); /*Give the current offset*/
35 virtual void impl_seek(seekBlock &s,int off); /*Seek to the given offset*/
38 //For packing to a disk file
39 class tozDisk : public zdisk {
40 protected:
41 //Generic bottleneck: pack n items of size itemSize from p.
42 virtual void bytes(void *p,int n,size_t itemSize,dataType t);
43 public:
44 //Write data to the given file pointer
45 // (must be opened for binary write)
46 // You must close the file yourself when done.
47 tozDisk(gzFile f):zdisk(IS_PACKING,f) {}
50 //For unpacking from a disk file
51 class fromzDisk : public zdisk {
52 protected:
53 //Generic bottleneck: unpack n items of size itemSize from p.
54 virtual void bytes(void *p,int n,size_t itemSize,dataType t);
55 public:
56 //Write data to the given file pointer
57 // (must be opened for binary read)
58 // You must close the file yourself when done.
59 fromzDisk(gzFile f):zdisk(IS_UNPACKING,f) {}
61 }; // namespace PUP
62 #endif
63 #endif // AMPIMSGLOG
65 /* AMPI sends messages inline to PE-local destination VPs if: BigSim is not being used and
66 * if tracing is not being used (see bug #1640 for more details on the latter). */
67 #ifndef AMPI_LOCAL_IMPL
68 #define AMPI_LOCAL_IMPL ( !CMK_BIGSIM_CHARM && !CMK_TRACE_ENABLED )
69 #endif
71 /* AMPI uses RDMA sends if BigSim is not being used and the underlying comm
72 * layer supports it (except for GNI, which has experimental RDMA support). */
73 #ifndef AMPI_RDMA_IMPL
74 #define AMPI_RDMA_IMPL ( !CMK_BIGSIM_CHARM && CMK_ONESIDED_IMPL && !CMK_CONVERSE_UGNI )
75 #endif
77 /* contiguous messages larger than or equal to this threshold are sent via RDMA */
78 #ifndef AMPI_RDMA_THRESHOLD_DEFAULT
79 #if CMK_USE_IBVERBS || CMK_CONVERSE_UGNI
80 #define AMPI_RDMA_THRESHOLD_DEFAULT 65536
81 #else
82 #define AMPI_RDMA_THRESHOLD_DEFAULT 32768
83 #endif
84 #endif
86 /* contiguous messages larger than or equal to this threshold that are being sent
87 * within a process are sent via RDMA. */
88 #ifndef AMPI_SMP_RDMA_THRESHOLD_DEFAULT
89 #define AMPI_SMP_RDMA_THRESHOLD_DEFAULT 16384
90 #endif
92 extern int AMPI_RDMA_THRESHOLD;
93 extern int AMPI_SMP_RDMA_THRESHOLD;
95 #if CMK_CONVERSE_LAPI || CMK_BIGSIM_CHARM
96 #define AMPI_ALLTOALL_LONG_MSG 4194304
97 #else
98 #define AMPI_ALLTOALL_LONG_MSG 32768
99 #endif
101 typedef void (*MPI_MigrateFn)(void);
104 * AMPI Message Matching (Amm) Interface:
105 * messages are matched on 2 ints: [tag, src]
107 #define AMM_TAG 0
108 #define AMM_SRC 1
109 #define AMM_NTAGS 2
111 typedef struct AmmTableStruct* AmmTable;
112 typedef struct AmmEntryStruct* AmmEntry;
113 typedef void (*AmmPupMessageFn)(pup_er p, void **msg);
115 AmmTable AmmNew();
116 void AmmFree(AmmTable t);
117 void AmmFreeAll(AmmTable t);
118 void AmmPut(AmmTable t, const int tags[AMM_NTAGS], void* msg);
119 static bool AmmMatch(const int tags1[AMM_NTAGS], const int tags2[AMM_NTAGS]);
120 void* AmmGet(AmmTable t, const int tags[AMM_NTAGS], int* rtags);
121 void* AmmProbe(AmmTable t, const int tags[AMM_NTAGS], int* rtags);
122 int AmmEntries(AmmTable t);
123 AmmTable AmmPup(pup_er p, AmmTable t, AmmPupMessageFn msgpup);
125 PUPfunctionpointer(MPI_User_function*)
128 * OpStruct's are used to lookup an MPI_User_function* and check its commutativity.
129 * They are also used to create AmpiOpHeader's, which are transmitted in reductions
130 * that are user-defined or else lack an equivalent Charm++ reducer type.
132 class OpStruct {
133 public:
134 MPI_User_function* func;
135 bool isCommutative;
136 private:
137 bool isValid;
139 public:
140 OpStruct(void) {}
141 OpStruct(MPI_User_function* f) : func(f), isCommutative(true), isValid(true) {}
142 OpStruct(MPI_User_function* f, bool c) : func(f), isCommutative(c), isValid(true) {}
143 void init(MPI_User_function* f, bool c) {
144 func = f;
145 isCommutative = c;
146 isValid = true;
148 bool isFree(void) const { return !isValid; }
149 void free(void) { isValid = false; }
150 void pup(PUP::er &p) {
151 p|func; p|isCommutative; p|isValid;
155 class AmpiOpHeader {
156 public:
157 MPI_User_function* func;
158 MPI_Datatype dtype;
159 int len;
160 int szdata;
161 AmpiOpHeader(MPI_User_function* f,MPI_Datatype d,int l,int szd):
162 func(f),dtype(d),len(l),szdata(szd) { }
165 //------------------- added by YAN for one-sided communication -----------
166 /* the index is unique within a communicator */
167 class WinStruct{
168 public:
169 MPI_Comm comm;
170 int index;
172 private:
173 bool areRecvsPosted;
174 bool inEpoch;
175 vector<int> exposureRankList;
176 vector<int> accessRankList;
177 vector<MPI_Request> requestList;
179 public:
180 WinStruct(void) : comm(MPI_COMM_NULL), index(-1), areRecvsPosted(false), inEpoch(false) {
181 exposureRankList.clear(); accessRankList.clear(); requestList.clear();
183 WinStruct(MPI_Comm comm_, int index_) : comm(comm_), index(index_), areRecvsPosted(false), inEpoch(false) {
184 exposureRankList.clear(); accessRankList.clear(); requestList.clear();
186 void pup(PUP::er &p) {
187 p|comm; p|index; p|areRecvsPosted; p|inEpoch; p|exposureRankList; p|accessRankList; p|requestList;
189 void clearEpochAccess() {
190 accessRankList.clear(); inEpoch = false;
192 void clearEpochExposure() {
193 exposureRankList.clear(); areRecvsPosted = false; requestList.clear(); inEpoch=false;
195 vector<int>& getExposureRankList() {return exposureRankList;}
196 vector<int>& getAccessRankList() {return accessRankList;}
197 void setExposureRankList(vector<int> &tmpExposureRankList) {exposureRankList = tmpExposureRankList;}
198 void setAccessRankList(vector<int> &tmpAccessRankList) {accessRankList = tmpAccessRankList;}
199 vector<int>& getRequestList() {return requestList;}
200 bool AreRecvsPosted() const {return areRecvsPosted;}
201 void setAreRecvsPosted(bool setR) {areRecvsPosted = setR;}
202 bool isInEpoch() const {return inEpoch;}
203 void setInEpoch(bool arg) {inEpoch = arg;}
206 class lockQueueEntry {
207 public:
208 int requestRank;
209 int lock_type;
210 lockQueueEntry (int _requestRank, int _lock_type)
211 : requestRank(_requestRank), lock_type(_lock_type) {}
212 lockQueueEntry () {}
215 typedef CkQ<lockQueueEntry *> LockQueue;
217 class ampiParent;
219 class win_obj {
220 public:
221 char winName[MPI_MAX_OBJECT_NAME];
222 int winNameLen;
223 bool initflag;
225 void *baseAddr;
226 MPI_Aint winSize;
227 int disp_unit;
228 MPI_Comm comm;
230 int owner; // Rank of owner of the lock, -1 if not locked
231 LockQueue lockQueue; // queue of waiting processors for the lock
232 // top of queue is the one holding the lock
233 // queue is empty if lock is not applied
235 void setName(const char *src);
236 void getName(char *src,int *len);
238 public:
239 void pup(PUP::er &p);
241 win_obj();
242 win_obj(const char *name, void *base, MPI_Aint size, int disp_unit, MPI_Comm comm);
243 ~win_obj();
245 int create(const char *name, void *base, MPI_Aint size, int disp_unit,
246 MPI_Comm comm);
247 int free();
249 int put(void *orgaddr, int orgcnt, int orgunit,
250 MPI_Aint targdisp, int targcnt, int targunit);
252 int get(void *orgaddr, int orgcnt, int orgunit,
253 MPI_Aint targdisp, int targcnt, int targunit);
254 int accumulate(void *orgaddr, int count, MPI_Aint targdisp, MPI_Datatype targtype,
255 MPI_Op op, ampiParent* pptr);
257 int iget(int orgcnt, MPI_Datatype orgtype,
258 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype);
259 int igetWait(MPI_Request *req, MPI_Status *status);
260 int igetFree(MPI_Request *req, MPI_Status *status);
262 int fence();
264 int lock(int requestRank, int lock_type);
265 int unlock(int requestRank);
267 int wait();
268 int post();
269 int start();
270 int complete();
272 void lockTopQueue();
273 void enqueue(int requestRank, int lock_type);
274 void dequeue();
275 bool emptyQueue();
277 //-----------------------End of code by YAN ----------------------
279 class KeyvalPair{
280 protected:
281 int klen, vlen;
282 const char* key;
283 const char* val;
284 public:
285 KeyvalPair(void){ }
286 KeyvalPair(const char* k, const char* v);
287 ~KeyvalPair(void);
288 void pup(PUP::er& p){
289 p|klen;
290 p|vlen;
291 if(p.isUnpacking()){
292 if(klen>0)
293 key = new char[klen+1];
294 if(vlen>0)
295 val = new char[vlen+1];
297 if(klen>0)
298 p((char*)key, klen+1);
299 if(vlen>0)
300 p((char*)val, vlen+1);
302 friend class InfoStruct;
305 class InfoStruct{
306 CkPupPtrVec<KeyvalPair> nodes;
307 bool valid;
308 public:
309 InfoStruct(void):valid(true) { }
310 void setvalid(bool valid_){ valid = valid_; }
311 bool getvalid(void) const { return valid; }
312 int set(const char* k, const char* v);
313 int dup(InfoStruct& src);
314 int get(const char* k, int vl, char*& v, int *flag) const;
315 int deletek(const char* k);
316 int get_valuelen(const char* k, int* vl, int *flag) const;
317 int get_nkeys(int *nkeys) const;
318 int get_nthkey(int n,char* k) const;
319 void myfree(void);
320 void pup(PUP::er& p);
323 class CProxy_ampi;
324 class CProxyElement_ampi;
326 //Describes an AMPI communicator
327 class ampiCommStruct {
328 MPI_Comm comm; //Communicator
329 CkArrayID ampiID; //ID of corresponding ampi array
330 int size; //Number of processes in communicator
331 bool isWorld; //true if ranks are 0..size-1?
332 bool isInter; // false: intra-communicator; true: inter-communicator
333 vector<int> indices; //indices[r] gives the array index for rank r
334 vector<int> remoteIndices; // remote group for inter-communicator
336 // cartesian virtual topology parameters
337 int ndims;
338 vector<int> dims;
339 vector<int> periods;
341 // graph virtual topology parameters
342 int nvertices;
343 vector<int> index;
344 vector<int> edges;
346 // For virtual topology neighbors
347 vector<int> nbors;
349 // For communicator attributes (MPI_*_get_attr): indexed by keyval
350 vector<void *> keyvals;
352 // For communicator names
353 char commName[MPI_MAX_OBJECT_NAME];
354 int commNameLen;
356 // Lazily fill world communicator indices
357 void makeWorldIndices(void) const {
358 vector<int> *ind=const_cast<vector<int> *>(&indices);
359 for (int i=0;i<size;i++) ind->push_back(i);
361 public:
362 ampiCommStruct(int ignored=0) {size=-1;isWorld=false;isInter=false;commNameLen=0;}
363 ampiCommStruct(MPI_Comm comm_,const CkArrayID &id_,int size_)
364 :comm(comm_), ampiID(id_),size(size_), isWorld(true), isInter(false), commNameLen(0) {}
365 ampiCommStruct(MPI_Comm comm_,const CkArrayID &id_,
366 int size_,const vector<int> &indices_)
367 :comm(comm_), ampiID(id_),size(size_),isWorld(false),
368 isInter(false), indices(indices_), commNameLen(0) {}
369 ampiCommStruct(MPI_Comm comm_,const CkArrayID &id_,
370 int size_,const vector<int> &indices_,
371 const vector<int> &remoteIndices_)
372 :comm(comm_),ampiID(id_),size(size_),isWorld(false),isInter(true),
373 indices(indices_),remoteIndices(remoteIndices_),commNameLen(0) {}
374 void setArrayID(const CkArrayID &nID) {ampiID=nID;}
376 MPI_Comm getComm(void) const {return comm;}
377 inline const vector<int> &getIndices(void) const {
378 if (isWorld && indices.size()!=size) makeWorldIndices();
379 return indices;
381 const vector<int> &getRemoteIndices(void) const {return remoteIndices;}
382 vector<void *> &getKeyvals(void) {return keyvals;}
384 void setName(const char *src) {
385 CkDDT_SetName(commName, src, &commNameLen);
388 void getName(char *name, int *len) const {
389 *len = commNameLen;
390 memcpy(name, commName, *len+1);
393 //Get the proxy for the entire array
394 CProxy_ampi getProxy(void) const;
396 //Get the array index for rank r in this communicator
397 int getIndexForRank(int r) const {
398 #if CMK_ERROR_CHECKING
399 if (r>=size) CkAbort("AMPI> You passed in an out-of-bounds process rank!");
400 #endif
401 if (isWorld) return r;
402 else return indices[r];
404 int getIndexForRemoteRank(int r) const {
405 #if CMK_ERROR_CHECKING
406 if (r>=remoteIndices.size()) CkAbort("AMPI> You passed in an out-of-bounds process rank!");
407 #endif
408 if (isWorld) return r;
409 else return remoteIndices[r];
411 //Get the rank for this array index (Warning: linear time)
412 int getRankForIndex(int i) const {
413 if (isWorld) return i;
414 else {
415 for (int r=0;r<indices.size();r++)
416 if (indices[r]==i) return r;
417 return -1; /*That index isn't in this communicator*/
421 int getSize(void) const {return size;}
423 inline bool isinter(void) const { return isInter; }
424 inline const vector<int> &getdims() const {return dims;}
425 inline const vector<int> &getperiods() const {return periods;}
426 inline int getndims() const {return ndims;}
428 inline void setdims(const vector<int> &dims_) { dims = dims_; }
429 inline void setperiods(const vector<int> &periods_) { periods = periods_; }
430 inline void setndims(int ndims_) {ndims = ndims_; }
432 /* Similar hack for graph vt */
433 inline int getnvertices() const {return nvertices;}
434 inline const vector<int> &getindex() const {return index;}
435 inline const vector<int> &getedges() const {return edges;}
437 inline void setnvertices(int nvertices_) {nvertices = nvertices_; }
438 inline void setindex(const vector<int> &index_) { index = index_; }
439 inline void setedges(const vector<int> &edges_) { edges = edges_; }
441 inline const vector<int> &getnbors() const {return nbors;}
442 inline void setnbors(const vector<int> &nbors_) { nbors = nbors_; }
444 void pup(PUP::er &p) {
445 p|comm;
446 p|ampiID;
447 p|size;
448 p|isWorld;
449 p|isInter;
450 p|indices;
451 p|remoteIndices;
452 p|ndims;
453 p|dims;
454 p|periods;
455 p|nvertices;
456 p|index;
457 p|edges;
458 p|nbors;
459 p|commNameLen;
460 p(commName,MPI_MAX_OBJECT_NAME);
463 PUPmarshall(ampiCommStruct)
465 class mpi_comm_worlds{
466 ampiCommStruct comms[MPI_MAX_COMM_WORLDS];
467 public:
468 ampiCommStruct &operator[](int i) {return comms[i];}
469 void pup(PUP::er &p) {
470 for (int i=0;i<MPI_MAX_COMM_WORLDS;i++)
471 comms[i].pup(p);
475 typedef vector<int> groupStruct;
476 // groupStructure operations
477 inline void outputOp(groupStruct vec){
478 if(vec.size()>50){
479 CkPrintf("vector too large to output!\n");
480 return;
482 CkPrintf("output vector: size=%d {",vec.size());
483 for(int i=0;i<vec.size();i++)
484 CkPrintf(" %d ",vec[i]);
485 CkPrintf("}\n");
488 inline int getPosOp(int idx, groupStruct vec){
489 for (int r=0;r<vec.size();r++)
490 if (vec[r]==idx) return r;
491 return MPI_UNDEFINED;
494 inline groupStruct unionOp(groupStruct vec1, groupStruct vec2){
495 groupStruct newvec(vec1);
496 for(int i=0;i<vec2.size();i++){
497 if(getPosOp(vec2[i],vec1)==MPI_UNDEFINED)
498 newvec.push_back(vec2[i]);
500 return newvec;
503 inline groupStruct intersectOp(groupStruct vec1, groupStruct vec2){
504 groupStruct newvec;
505 for(int i=0;i<vec1.size();i++){
506 if(getPosOp(vec1[i],vec2)!=MPI_UNDEFINED)
507 newvec.push_back(vec1[i]);
509 return newvec;
512 inline groupStruct diffOp(groupStruct vec1, groupStruct vec2){
513 groupStruct newvec;
514 for(int i=0;i<vec1.size();i++){
515 if(getPosOp(vec1[i],vec2)==MPI_UNDEFINED)
516 newvec.push_back(vec1[i]);
518 return newvec;
521 inline int* translateRanksOp(int n,groupStruct vec1,const int* ranks1,groupStruct vec2, int *ret){
522 for(int i=0;i<n;i++){
523 ret[i] = (ranks1[i] == MPI_PROC_NULL) ? MPI_PROC_NULL : getPosOp(vec1[ranks1[i]],vec2);
525 return ret;
528 inline int compareVecOp(groupStruct vec1,groupStruct vec2){
529 int i,pos,ret = MPI_IDENT;
530 if(vec1.size() != vec2.size()) return MPI_UNEQUAL;
531 for(i=0;i<vec1.size();i++){
532 pos = getPosOp(vec1[i],vec2);
533 if(pos == MPI_UNDEFINED) return MPI_UNEQUAL;
534 if(pos != i) ret = MPI_SIMILAR;
536 return ret;
539 inline groupStruct inclOp(int n,const int* ranks,groupStruct vec){
540 groupStruct retvec;
541 for(int i=0;i<n;i++){
542 retvec.push_back(vec[ranks[i]]);
544 return retvec;
547 inline groupStruct exclOp(int n,const int* ranks,groupStruct vec){
548 groupStruct retvec;
549 int add=1;
550 for(int j=0;j<vec.size();j++){
551 for(int i=0;i<n;i++)
552 if(j==ranks[i]){ add=0; break; }
553 if(add==1) retvec.push_back(vec[j]);
554 else add=1;
556 return retvec;
559 inline groupStruct rangeInclOp(int n, int ranges[][3], groupStruct vec, int *flag){
560 groupStruct retvec;
561 int first,last,stride;
562 for(int i=0;i<n;i++){
563 first = ranges[i][0];
564 last = ranges[i][1];
565 stride = ranges[i][2];
566 if(stride!=0){
567 for(int j=0;j<=(last-first)/stride;j++)
568 retvec.push_back(vec[first+stride*j]);
569 }else{
570 *flag = MPI_ERR_ARG;
571 return retvec;
574 *flag = MPI_SUCCESS;
575 return retvec;
578 inline groupStruct rangeExclOp(int n, int ranges[][3], groupStruct vec, int *flag){
579 groupStruct retvec;
580 vector<int> ranksvec;
581 int first,last,stride;
582 int *ranks,cnt;
583 int i,j;
584 for(i=0;i<n;i++){
585 first = ranges[i][0];
586 last = ranges[i][1];
587 stride = ranges[i][2];
588 if(stride!=0){
589 for(j=0;j<=(last-first)/stride;j++)
590 ranksvec.push_back(first+stride*j);
591 }else{
592 *flag = MPI_ERR_ARG;
593 return retvec;
596 cnt=ranksvec.size();
597 ranks=new int[cnt];
598 for(i=0;i<cnt;i++)
599 ranks[i]=ranksvec[i];
600 *flag = MPI_SUCCESS;
601 return exclOp(cnt,ranks,vec);
604 #include "tcharm.h"
605 #include "tcharmc.h"
607 #include "ampi.decl.h"
608 #include "charm-api.h"
609 #include <sys/stat.h> // for mkdir
611 extern int _mpi_nworlds;
613 //MPI_ANY_TAG is defined in ampi.h to MPI_TAG_UB_VALUE+1
614 #define MPI_ATA_SEQ_TAG MPI_TAG_UB_VALUE+2
615 #define MPI_BCAST_TAG MPI_TAG_UB_VALUE+3
616 #define MPI_REDN_TAG MPI_TAG_UB_VALUE+4
617 #define MPI_SCATTER_TAG MPI_TAG_UB_VALUE+5
618 #define MPI_SCAN_TAG MPI_TAG_UB_VALUE+6
619 #define MPI_EXSCAN_TAG MPI_TAG_UB_VALUE+7
620 #define MPI_ATA_TAG MPI_TAG_UB_VALUE+8
621 #define MPI_NBOR_TAG MPI_TAG_UB_VALUE+9
622 #define MPI_RMA_TAG MPI_TAG_UB_VALUE+10
623 #define MPI_EPOCH_START_TAG MPI_TAG_UB_VALUE+11
624 #define MPI_EPOCH_END_TAG MPI_TAG_UB_VALUE+12
626 #define AMPI_COLL_SOURCE 0
627 #define AMPI_COLL_COMM MPI_COMM_WORLD
629 #define MPI_I_REQ 1
630 #define MPI_IATA_REQ 2
631 #define MPI_SEND_REQ 3
632 #define MPI_SSEND_REQ 4
633 #define MPI_REDN_REQ 5
634 #define MPI_GATHER_REQ 6
635 #define MPI_GATHERV_REQ 7
636 #define MPI_GPU_REQ 8
638 enum AmpiReqSts : char {
639 AMPI_REQ_PENDING = 0,
640 AMPI_REQ_BLOCKED = 1,
641 AMPI_REQ_COMPLETED = 2
644 enum AmpiSendType : bool {
645 BLOCKING_SEND = false,
646 I_SEND = true
649 #define MyAlign8(x) (((x)+7)&(~7))
652 Represents an MPI request that has been initiated
653 using Isend, Irecv, Ialltoall, Send_init, etc.
655 class AmpiRequest {
656 public:
657 void *buf;
658 int count;
659 MPI_Datatype type;
660 int tag; // the order must match MPI_Status
661 int src;
662 MPI_Comm comm;
663 bool statusIreq;
664 bool blocked; // this req is currently blocked on
666 #if CMK_BIGSIM_CHARM
667 public:
668 void *event; // the event point that corresponds to this message
669 int eventPe; // the PE that the event is located on
670 #endif
671 protected:
672 bool isvalid;
673 public:
674 AmpiRequest(){ statusIreq=false; blocked=false; }
675 /// Close this request (used by free and cancel)
676 virtual ~AmpiRequest(){ }
678 /// Activate this persistent request.
679 /// Only meaningful for persistent Ireq, SendReq, and SsendReq requests.
680 virtual void start(MPI_Request reqIdx){ }
682 /// Return true if this request is finished (progress):
683 virtual bool test(MPI_Status *sts=MPI_STATUS_IGNORE) =0;
685 /// Block until this request is finished,
686 /// returning a valid MPI error code.
687 virtual int wait(MPI_Status *sts) =0;
689 /// Mark this request for cancellation.
690 /// Supported only for IReq requests
691 virtual void cancel() =0;
693 /// Mark this request persistent.
694 /// Supported only for IReq, SendReq, and SsendReq requests
695 virtual void setPersistent(bool p){ }
697 /// Is this request persistent?
698 /// Supported only for IReq, SendReq, and SsendReq requests
699 virtual bool isPersistent(void) const { return false; }
701 /// Receive an AmpiMsg
702 virtual void receive(ampi *ptr, AmpiMsg *msg) = 0;
704 /// Receive a CkReductionMsg
705 virtual void receive(ampi *ptr, CkReductionMsg *msg) = 0;
707 /// Receive an Rdma message
708 virtual void receiveRdma(ampi *ptr, char *sbuf, int slength, int ssendReq,
709 int srcRank, MPI_Comm scomm) {
710 CkAbort("AMPI> RDMA receive attempted on an incompatible type of request!");
713 virtual void setBlocked(bool b) { blocked = b; }
714 virtual bool isBlocked(void) const { return blocked; }
716 /// Frees up the request: invalidate it
717 virtual void free(void){ isvalid=false; }
718 inline bool isValid(void) const { return isvalid; }
720 /// Returns the type of request:
721 /// MPI_I_REQ, MPI_IATA_REQ, MPI_SEND_REQ, MPI_SSEND_REQ,
722 /// MPI_REDN_REQ, MPI_GATHER_REQ, MPI_GATHERV_REQ, MPI_GPU_REQ
723 virtual int getType(void) const =0;
725 /// Return the actual number of bytes that were received.
726 virtual int getNumReceivedBytes(CkDDT *ddt) const {
727 // by default, return number of bytes requested
728 return count * ddt->getSize(type);
731 virtual void pup(PUP::er &p) {
732 p((char *)&buf,sizeof(void *)); //supposed to work only with isomalloc
733 p(count);
734 p(type);
735 p(src);
736 p(tag);
737 p(comm);
738 p(isvalid);
739 p(statusIreq);
740 p(blocked);
741 #if CMK_BIGSIM_CHARM
742 //needed for bigsim out-of-core emulation
743 //as the "log" is not moved from memory, this pointer is safe
744 //to be reused
745 p((char *)&event, sizeof(void *));
746 p(eventPe);
747 #endif
750 virtual void print();
753 class IReq : public AmpiRequest {
754 public:
755 int length; // recv'ed length in bytes
756 bool cancelled; // track if request is cancelled
757 bool persistent; // Is this a persistent recv request?
758 IReq(void *buf_, int count_, MPI_Datatype type_, int src_, int tag_, MPI_Comm comm_,
759 AmpiReqSts sts_=AMPI_REQ_PENDING)
761 buf=buf_; count=count_; type=type_; src=src_; tag=tag_;
762 comm=comm_; isvalid=true; length=0; cancelled=false; persistent=false;
763 if (sts_ == AMPI_REQ_BLOCKED) blocked=true;
764 else if (sts_ == AMPI_REQ_COMPLETED) statusIreq=true;
766 IReq(){}
767 ~IReq(){}
768 bool test(MPI_Status *sts=MPI_STATUS_IGNORE);
769 int wait(MPI_Status *sts);
770 void cancel() {
771 if (!statusIreq) {
772 cancelled = true;
775 inline int getType(void) const { return MPI_I_REQ; }
776 inline void setPersistent(bool p) { persistent = p; }
777 inline bool isPersistent(void) const { return persistent; }
778 void start(MPI_Request reqIdx);
779 void receive(ampi *ptr, AmpiMsg *msg);
780 void receive(ampi *ptr, CkReductionMsg *msg) {}
781 void receiveRdma(ampi *ptr, char *sbuf, int slength, int ssendReq, int srcRank, MPI_Comm scomm);
782 virtual int getNumReceivedBytes(CkDDT *ptr) const {
783 return length;
785 virtual void pup(PUP::er &p){
786 AmpiRequest::pup(p);
787 p|length;
788 p|cancelled;
789 p|persistent;
791 virtual void print();
794 class RednReq : public AmpiRequest {
795 public:
796 MPI_Op op;
797 RednReq(void *buf_, int count_, MPI_Datatype type_, MPI_Comm comm_, MPI_Op op_,
798 AmpiReqSts sts_=AMPI_REQ_PENDING)
800 buf=buf_; count=count_; type=type_; src=AMPI_COLL_SOURCE; tag=MPI_REDN_TAG;
801 comm=comm_; op=op_; isvalid=true;
802 if (sts_ == AMPI_REQ_BLOCKED) blocked=true;
803 else if (sts_ == AMPI_REQ_COMPLETED) statusIreq=true;
805 RednReq(){};
806 ~RednReq(){}
807 bool test(MPI_Status *sts=MPI_STATUS_IGNORE);
808 int wait(MPI_Status *sts);
809 void cancel() {}
810 inline int getType(void) const { return MPI_REDN_REQ; }
811 void receive(ampi *ptr, AmpiMsg *msg) {}
812 void receive(ampi *ptr, CkReductionMsg *msg);
813 virtual void pup(PUP::er &p){
814 AmpiRequest::pup(p);
815 p|op;
817 virtual void print();
820 class GatherReq : public AmpiRequest {
821 public:
822 GatherReq(void *buf_, int count_, MPI_Datatype type_, MPI_Comm comm_,
823 AmpiReqSts sts_=AMPI_REQ_PENDING)
825 buf=buf_; count=count_; type=type_; src=AMPI_COLL_SOURCE; tag=MPI_REDN_TAG;
826 comm=comm_; isvalid=true;
827 if (sts_ == AMPI_REQ_BLOCKED) blocked=true;
828 else if (sts_ == AMPI_REQ_COMPLETED) statusIreq=true;
830 GatherReq(){}
831 ~GatherReq(){}
832 bool test(MPI_Status *sts=MPI_STATUS_IGNORE);
833 int wait(MPI_Status *sts);
834 void cancel() {}
835 inline int getType(void) const { return MPI_GATHER_REQ; }
836 void receive(ampi *ptr, AmpiMsg *msg) {}
837 void receive(ampi *ptr, CkReductionMsg *msg);
838 virtual void pup(PUP::er &p){
839 AmpiRequest::pup(p);
841 virtual void print();
844 class GathervReq : public AmpiRequest {
845 public:
846 vector<int> recvCounts;
847 vector<int> displs;
848 GathervReq(void *buf_, int count_, MPI_Datatype type_, MPI_Comm comm_, const int *rc, const int *d,
849 AmpiReqSts sts_=AMPI_REQ_PENDING)
851 buf=buf_; count=count_; type=type_; src=AMPI_COLL_SOURCE; tag=MPI_REDN_TAG;
852 comm=comm_; isvalid=true;
853 recvCounts.resize(count);
854 for(int i=0; i<count; i++) recvCounts[i]=rc[i];
855 displs.resize(count);
856 for(int i=0; i<count; i++) displs[i]=d[i];
857 if (sts_ == AMPI_REQ_BLOCKED) blocked=true;
858 else if (sts_ == AMPI_REQ_COMPLETED) statusIreq=true;
860 GathervReq(){}
861 ~GathervReq(){}
862 bool test(MPI_Status *sts=MPI_STATUS_IGNORE);
863 int wait(MPI_Status *sts);
864 void cancel() {}
865 inline int getType(void) const { return MPI_GATHERV_REQ; }
866 void receive(ampi *ptr, AmpiMsg *msg) {}
867 void receive(ampi *ptr, CkReductionMsg *msg);
868 virtual void pup(PUP::er &p){
869 AmpiRequest::pup(p);
870 p|recvCounts; p|displs;
872 virtual void print();
875 class SendReq : public AmpiRequest {
876 bool persistent; // is this a persistent send request?
877 public:
878 SendReq(MPI_Comm comm_, AmpiReqSts sts_=AMPI_REQ_PENDING) {
879 comm = comm_; isvalid=true; persistent=false;
880 if (sts_ == AMPI_REQ_BLOCKED) blocked=true;
881 else if (sts_ == AMPI_REQ_COMPLETED) statusIreq=true;
883 SendReq(void* buf_, int count_, MPI_Datatype type_, int dest_, int tag_, MPI_Comm comm_) {
884 buf=buf_; count=count_; type=type_; src=dest_; tag=tag_;
885 comm=comm_; isvalid=true; blocked=false; statusIreq=false; persistent=false;
887 SendReq(){}
888 ~SendReq(){ }
889 bool test(MPI_Status *sts=MPI_STATUS_IGNORE);
890 int wait(MPI_Status *sts);
891 void cancel() {}
892 inline void setPersistent(bool p) { persistent = p; }
893 inline bool isPersistent(void) const { return persistent; }
894 void start(MPI_Request reqIdx);
895 void receive(ampi *ptr, AmpiMsg *msg) {}
896 void receive(ampi *ptr, CkReductionMsg *msg) {}
897 inline int getType(void) const { return MPI_SEND_REQ; }
898 virtual void pup(PUP::er &p){
899 AmpiRequest::pup(p);
900 p|persistent;
902 virtual void print();
905 class SsendReq : public AmpiRequest {
906 bool persistent; // is this a persistent Ssend request?
907 public:
908 SsendReq(MPI_Comm comm_) {
909 comm = comm_; isvalid=true; persistent=false;
911 SsendReq(void* buf_, int count_, MPI_Datatype type_, int dest_, int tag_, MPI_Comm comm_) {
912 buf=buf_; count=count_; type=type_; src=dest_; tag=tag_;
913 comm=comm_; isvalid=true; blocked=false; statusIreq=false; persistent=false;
915 SsendReq() {}
916 ~SsendReq(){ }
917 bool test(MPI_Status *sts=MPI_STATUS_IGNORE);
918 int wait(MPI_Status *sts);
919 void cancel() {}
920 inline void setPersistent(bool p) { persistent = p; }
921 inline bool isPersistent(void) const { return persistent; }
922 void start(MPI_Request reqIdx);
923 void receive(ampi *ptr, AmpiMsg *msg) {}
924 void receive(ampi *ptr, CkReductionMsg *msg) {}
925 inline int getType(void) const { return MPI_SSEND_REQ; }
926 virtual void pup(PUP::er &p){
927 AmpiRequest::pup(p);
928 p|persistent;
930 virtual void print();
933 class GPUReq : public AmpiRequest {
934 public:
935 GPUReq();
936 inline int getType(void) const { return MPI_GPU_REQ; }
937 bool test(MPI_Status *sts=MPI_STATUS_IGNORE);
938 int wait(MPI_Status *sts);
939 void cancel() {}
940 void receive(ampi *ptr, AmpiMsg *msg);
941 void receive(ampi *ptr, CkReductionMsg *msg) {}
942 void setComplete();
945 class IATAReq : public AmpiRequest {
946 vector<IReq> myreqs;
947 int elmcount;
948 int idx;
949 public:
950 IATAReq(int c_):elmcount(c_),idx(0){ myreqs.resize(c_); isvalid=true; }
951 IATAReq(){};
952 ~IATAReq(void) { }
953 int addReq(void *buf_, int count_, MPI_Datatype type_, int src_, int tag_, MPI_Comm comm_){
954 myreqs[idx].buf=buf_; myreqs[idx].count=count_;
955 myreqs[idx].type=type_; myreqs[idx].src=src_;
956 myreqs[idx].tag=tag_; myreqs[idx].comm=comm_;
957 return (++idx);
959 bool test(MPI_Status *sts=MPI_STATUS_IGNORE);
960 int wait(MPI_Status *sts);
961 void cancel() {}
962 void receive(ampi *ptr, AmpiMsg *msg) {}
963 void receive(ampi *ptr, CkReductionMsg *msg) {}
964 inline int getCount(void) const { return elmcount; }
965 inline int getType(void) const { return MPI_IATA_REQ; }
966 virtual void pup(PUP::er &p){
967 AmpiRequest::pup(p);
968 p(elmcount);
969 p(idx);
970 p|myreqs;
972 virtual void print();
975 /// Special CkVec<AmpiRequest*> for AMPI. Most code copied from cklist.h
976 class AmpiRequestList : private CkSTLHelper<AmpiRequest *> {
977 AmpiRequest** block; //Elements of vector
978 int blklen; //Allocated size of block
979 int len; //Number of used elements in block
980 void makeBlock(int blklen_,int len_) {
981 block=new AmpiRequest* [blklen_];
982 blklen=blklen_; len=len_;
984 void freeBlock(void) {
985 len=0; blklen=0;
986 delete[] block; block=NULL;
988 void copyFrom(const AmpiRequestList &src) {
989 makeBlock(src.blklen, src.len);
990 elementCopy(block,src.block,blklen);
992 public:
993 AmpiRequestList() {block=NULL;blklen=len=0;}
994 ~AmpiRequestList() { freeBlock(); }
995 AmpiRequestList(const AmpiRequestList &src) {copyFrom(src);}
996 AmpiRequestList(int size) { makeBlock(size,size); }
997 AmpiRequestList &operator=(const AmpiRequestList &src) {
998 freeBlock();
999 copyFrom(src);
1000 return *this;
1003 AmpiRequest* operator[](size_t n) { return block[n]; }
1005 int size(void) const {return len;}
1006 void setSize(int blklen_) {
1007 AmpiRequest **oldBlock=block;
1008 makeBlock(blklen_,len);
1009 elementCopy(block,oldBlock,len);
1010 delete[] oldBlock; //WARNING: leaks if element copy throws exception
1012 //Grow to contain at least this position:
1013 void growAtLeast(int pos) {
1014 if (pos>=blklen) setSize(pos*2+16);
1016 void insertAt(int pos, AmpiRequest* elt) {
1017 if (pos>=len) {
1018 growAtLeast(pos);
1019 len=pos+1;
1021 block[pos] = elt;
1023 void free(int pos) {
1024 if (pos<0 || pos>=len) return;
1025 block[pos]->free();
1026 delete block[pos];
1027 block[pos]=NULL;
1029 void push_back(AmpiRequest* elt) {insertAt(len,elt);}
1030 int insert(AmpiRequest* elt){
1031 for(int i=0;i<len;i++){
1032 if(block[i]==NULL){
1033 block[i] = elt;
1034 return i;
1037 push_back(elt);
1038 return len-1;
1041 inline void checkRequest(MPI_Request idx) const {
1042 if(!(idx==-1 || (idx < this->len && (block[idx])->isValid())))
1043 CkAbort("Invalid MPI_Request\n");
1046 //find an AmpiRequest by its pointer value
1047 //return -1 if not found!
1048 int findRequestIndex(AmpiRequest *req) const {
1049 for(int i=0; i<len; i++){
1050 if(block[i]==req) return i;
1052 return -1;
1055 inline void unblockReqs(MPI_Request *request, int numReqs) {
1056 for (int i=0; i<numReqs; i++) {
1057 if (request[i] != MPI_REQUEST_NULL) {
1058 block[request[i]]->setBlocked(false);
1063 void pup(PUP::er &p);
1065 void print(){
1066 for(int i=0; i<len; i++){
1067 if(block[i]==NULL) continue;
1068 CkPrintf("AmpiRequestList Element %d [%p]: \n", i+1, block[i]);
1069 block[i]->print();
1074 //A simple memory buffer
1075 class memBuf {
1076 CkVec<char> buf;
1077 public:
1078 memBuf() { }
1079 memBuf(int size) :buf(size) {}
1080 void setSize(int s) {buf.resize(s);}
1081 int getSize(void) const {return buf.size();}
1082 const void *getData(void) const {return (const void *)&buf[0];}
1083 void *getData(void) {return (void *)&buf[0];}
1086 template <class T>
1087 inline void pupIntoBuf(memBuf &b,T &t) {
1088 PUP::sizer ps;ps|t;
1089 b.setSize(ps.size());
1090 PUP::toMem pm(b.getData()); pm|t;
1093 template <class T>
1094 inline void pupFromBuf(const void *data,T &t) {
1095 PUP::fromMem p(data); p|t;
1098 class AmpiMsg : public CMessage_AmpiMsg {
1099 private:
1100 int seq; //Sequence number (for message ordering)
1101 int tag; //MPI tag
1102 int srcRank; //Communicator rank for source
1103 int length; //Number of bytes in this message
1104 public:
1105 char *data; //Payload
1106 #if CMK_BIGSIM_CHARM
1107 public:
1108 void *event;
1109 int eventPe; // the PE that the event is located
1110 #endif
1112 public:
1113 AmpiMsg(void) { data = NULL; }
1114 AmpiMsg(int _s, int t, int sRank, int l) :
1115 seq(_s), tag(t), srcRank(sRank), length(l) {}
1116 inline int getSeq(void) const { return seq; }
1117 inline int getSeqIdx(void) const { return srcRank; }
1118 inline int getSrcRank(void) const { return srcRank; }
1119 inline int getLength(void) const { return length; }
1120 inline char* getData(void) const { return data; }
1121 inline int getTag(void) const { return tag; }
1122 static AmpiMsg* pup(PUP::er &p, AmpiMsg *m)
1124 int seq, length, tag, srcRank;
1125 if(p.isPacking() || p.isSizing()) {
1126 seq = m->seq;
1127 tag = m->tag;
1128 srcRank = m->srcRank;
1129 length = m->length;
1131 p(seq); p(tag); p(srcRank); p(length);
1132 if(p.isUnpacking()) {
1133 m = new (length, 0) AmpiMsg(seq, tag, srcRank, length);
1135 p(m->data, length);
1136 if(p.isDeleting()) {
1137 delete m;
1138 m = 0;
1140 return m;
1145 Our local representation of another AMPI
1146 array element. Used to keep track of incoming
1147 and outgoing message sequence numbers, and
1148 the out-of-order message list.
1150 class AmpiOtherElement {
1151 public:
1152 /// Next incoming and outgoing message sequence number
1153 int seqIncoming, seqOutgoing;
1155 /// Number of elements in out-of-order queue. (normally 0)
1156 int nOut;
1158 AmpiOtherElement(void) {
1159 seqIncoming=0; seqOutgoing=0;
1160 nOut=0;
1163 void pup(PUP::er &p) {
1164 p|seqIncoming; p|seqOutgoing;
1165 p|nOut;
1169 #if CMK_USING_XLC
1170 #include <tr1/unordered_map>
1171 typedef std::tr1::unordered_map<int, AmpiOtherElement> AmpiElements;
1172 #else
1173 typedef std::unordered_map<int, AmpiOtherElement> AmpiElements;
1174 #endif
1176 class AmpiSeqQ : private CkNoncopyable {
1177 CkMsgQ<AmpiMsg> out; // all out of order messages
1178 AmpiElements elements; // element info: indexed by seqIdx (comm rank)
1180 public:
1181 AmpiSeqQ() {}
1182 ~AmpiSeqQ ();
1183 void pup(PUP::er &p);
1185 /// Insert this message in the table. Returns the number
1186 /// of messages now available for the element.
1187 /// If 0, the message was out-of-order and is buffered.
1188 /// If 1, this message can be immediately processed.
1189 /// If >1, this message can be immediately processed,
1190 /// and you should call "getOutOfOrder" repeatedly.
1191 inline int put(int seqIdx, AmpiMsg *msg) {
1192 AmpiOtherElement &el=elements[seqIdx];
1193 if (msg->getSeq()==el.seqIncoming) { // In order:
1194 el.seqIncoming++;
1195 return 1+el.nOut;
1197 else { // Out of order: stash message
1198 putOutOfOrder(seqIdx, msg);
1199 return 0;
1203 /// Is this message in order (return >0) or not (return 0)?
1204 /// Same as put() except we don't call putOutOfOrder() here,
1205 /// so the caller should do that separately
1206 inline int isInOrder(int srcRank, int seq) {
1207 AmpiOtherElement &el = elements[srcRank];
1208 if (seq == el.seqIncoming) { // In order:
1209 el.seqIncoming++;
1210 return 1+el.nOut;
1212 else { // Out of order: caller should stash message
1213 return 0;
1217 /// Get an out-of-order message from the table.
1218 /// (in-order messages never go into the table)
1219 AmpiMsg *getOutOfOrder(int seqIdx);
1221 /// Stash an out-of-order message
1222 void putOutOfOrder(int seqIdx, AmpiMsg *msg);
1224 /// Return the next outgoing sequence number, and increment it.
1225 inline int nextOutgoing(int destRank) {
1226 return elements[destRank].seqOutgoing++;
1229 PUPmarshall(AmpiSeqQ)
1232 inline CProxy_ampi ampiCommStruct::getProxy(void) const {return ampiID;}
1233 const ampiCommStruct &universeComm2CommStruct(MPI_Comm universeNo);
1235 /* KeyValue class for caching */
1236 class KeyvalNode {
1237 public:
1238 MPI_Copy_function *copy_fn;
1239 MPI_Delete_function *delete_fn;
1240 void *extra_state;
1241 /* value is associated with getKeyvals of communicator */
1242 KeyvalNode(void): copy_fn(NULL), delete_fn(NULL), extra_state(NULL) { }
1243 KeyvalNode(MPI_Copy_function *cf, MPI_Delete_function *df, void* es):
1244 copy_fn(cf), delete_fn(df), extra_state(es) { }
1245 // KeyvalNode is not supposed to be pup'ed
1246 void pup(PUP::er& p){ /* empty */ }
1250 An ampiParent holds all the communicators and the TCharm thread
1251 for its children, which are bound to it.
1253 class ampiParent : public CBase_ampiParent {
1254 CProxy_TCharm threads;
1255 TCharm *thread;
1256 void prepareCtv(void);
1258 MPI_Comm worldNo; //My MPI_COMM_WORLD
1259 ampi *worldPtr; //AMPI element corresponding to MPI_COMM_WORLD
1260 ampiCommStruct worldStruct;
1262 CkPupPtrVec<ampiCommStruct> splitComm; //Communicators from MPI_Comm_split
1263 CkPupPtrVec<ampiCommStruct> groupComm; //Communicators from MPI_Comm_group
1264 CkPupPtrVec<ampiCommStruct> cartComm; //Communicators from MPI_Cart_create
1265 CkPupPtrVec<ampiCommStruct> graphComm; //Communicators from MPI_Graph_create
1266 CkPupPtrVec<ampiCommStruct> interComm; //Communicators from MPI_Intercomm_create
1267 CkPupPtrVec<ampiCommStruct> intraComm; //Communicators from MPI_Intercomm_merge
1269 CkPupPtrVec<groupStruct> groups; // "Wild" groups that don't have a communicator
1270 CkPupPtrVec<WinStruct> winStructList; //List of windows for one-sided communication
1271 CkPupPtrVec<InfoStruct> infos; // list of all MPI_Infos
1272 vector<OpStruct> ops; // list of all MPI_Ops
1274 inline bool isSplit(MPI_Comm comm) const {
1275 return (comm>=MPI_COMM_FIRST_SPLIT && comm<MPI_COMM_FIRST_GROUP);
1277 const ampiCommStruct &getSplit(MPI_Comm comm) const {
1278 int idx=comm-MPI_COMM_FIRST_SPLIT;
1279 if (idx>=splitComm.size()) CkAbort("Bad split communicator used");
1280 return *splitComm[idx];
1282 void splitChildRegister(const ampiCommStruct &s);
1284 inline bool isGroup(MPI_Comm comm) const {
1285 return (comm>=MPI_COMM_FIRST_GROUP && comm<MPI_COMM_FIRST_CART);
1287 const ampiCommStruct &getGroup(MPI_Comm comm) const {
1288 int idx=comm-MPI_COMM_FIRST_GROUP;
1289 if (idx>=groupComm.size()) CkAbort("Bad group communicator used");
1290 return *groupComm[idx];
1292 void groupChildRegister(const ampiCommStruct &s);
1293 inline bool isInGroups(MPI_Group group) const {
1294 return (group>=0 && group<groups.size());
1297 void cartChildRegister(const ampiCommStruct &s);
1298 void graphChildRegister(const ampiCommStruct &s);
1299 void interChildRegister(const ampiCommStruct &s);
1301 inline bool isIntra(MPI_Comm comm) const {
1302 return (comm>=MPI_COMM_FIRST_INTRA && comm<MPI_COMM_FIRST_RESVD);
1304 const ampiCommStruct &getIntra(MPI_Comm comm) const {
1305 int idx=comm-MPI_COMM_FIRST_INTRA;
1306 if (idx>=intraComm.size()) CkAbort("Bad intra-communicator used");
1307 return *intraComm[idx];
1309 void intraChildRegister(const ampiCommStruct &s);
1311 /* MPI_*_get_attr C binding returns a *pointer* to an integer,
1312 * so there needs to be some storage somewhere to point to.
1313 * All builtin keyvals are ints, except for MPI_WIN_BASE, which
1314 * is a pointer, and MPI_WIN_SIZE, which is an MPI_Aint. */
1315 int* kv_builtin_storage;
1316 MPI_Aint* win_size_storage;
1317 void** win_base_storage;
1318 bool kv_set_builtin(int keyval, void* attribute_val);
1319 bool kv_get_builtin(int keyval);
1320 CkPupPtrVec<KeyvalNode> kvlist;
1322 bool isTmpRProxySet;
1323 CProxy_ampi tmpRProxy;
1325 MPI_MigrateFn userAboutToMigrateFn, userJustMigratedFn;
1327 public:
1328 bool ampiInitCallDone;
1329 bool resumeOnRecv, resumeOnColl;
1330 int numBlockedReqs; // number of requests currently blocked on
1332 public:
1333 ampiParent(MPI_Comm worldNo_,CProxy_TCharm threads_);
1334 ampiParent(CkMigrateMessage *msg);
1335 void ckAboutToMigrate(void);
1336 void ckJustMigrated(void);
1337 void ckJustRestored(void);
1338 void setUserAboutToMigrateFn(MPI_MigrateFn f);
1339 void setUserJustMigratedFn(MPI_MigrateFn f);
1340 ~ampiParent();
1342 //Children call this when they are first created, or just migrated
1343 TCharm *registerAmpi(ampi *ptr,ampiCommStruct s,bool forMigration);
1345 // exchange proxy info between two ampi proxies
1346 void ExchangeProxy(CProxy_ampi rproxy){
1347 if(!isTmpRProxySet){ tmpRProxy=rproxy; isTmpRProxySet=true; }
1348 else{ tmpRProxy.setRemoteProxy(rproxy); rproxy.setRemoteProxy(tmpRProxy); isTmpRProxySet=false; }
1351 //Grab the next available split/group communicator
1352 MPI_Comm getNextSplit(void) const {return MPI_COMM_FIRST_SPLIT+splitComm.size();}
1353 MPI_Comm getNextGroup(void) const {return MPI_COMM_FIRST_GROUP+groupComm.size();}
1354 MPI_Comm getNextCart(void) const {return MPI_COMM_FIRST_CART+cartComm.size();}
1355 MPI_Comm getNextGraph(void) const {return MPI_COMM_FIRST_GRAPH+graphComm.size();}
1356 MPI_Comm getNextInter(void) const {return MPI_COMM_FIRST_INTER+interComm.size();}
1357 MPI_Comm getNextIntra(void) const {return MPI_COMM_FIRST_INTRA+intraComm.size();}
1359 inline bool isCart(MPI_Comm comm) const {
1360 return (comm>=MPI_COMM_FIRST_CART && comm<MPI_COMM_FIRST_GRAPH);
1362 ampiCommStruct &getCart(MPI_Comm comm) const {
1363 int idx=comm-MPI_COMM_FIRST_CART;
1364 if (idx>=cartComm.size()) CkAbort("AMPI> Bad cartesian communicator used!\n");
1365 return *cartComm[idx];
1367 inline bool isGraph(MPI_Comm comm) const {
1368 return (comm>=MPI_COMM_FIRST_GRAPH && comm<MPI_COMM_FIRST_INTER);
1370 ampiCommStruct &getGraph(MPI_Comm comm) const {
1371 int idx=comm-MPI_COMM_FIRST_GRAPH;
1372 if (idx>=graphComm.size()) CkAbort("AMPI> Bad graph communicator used!\n");
1373 return *graphComm[idx];
1375 inline bool isInter(MPI_Comm comm) const {
1376 return (comm>=MPI_COMM_FIRST_INTER && comm<MPI_COMM_FIRST_INTRA);
1378 const ampiCommStruct &getInter(MPI_Comm comm) const {
1379 int idx=comm-MPI_COMM_FIRST_INTER;
1380 if (idx>=interComm.size()) CkAbort("AMPI> Bad inter-communicator used!\n");
1381 return *interComm[idx];
1384 void pup(PUP::er &p);
1386 void startCheckpoint(const char* dname);
1387 void Checkpoint(int len, const char* dname);
1388 void ResumeThread(void);
1389 TCharm* getTCharmThread() const {return thread;}
1390 inline ampiParent* blockOnRecv(void);
1392 #if CMK_LBDB_ON
1393 void setMigratable(bool mig) {
1394 thread->setMigratable(mig);
1396 #endif
1398 inline const ampiCommStruct &comm2CommStruct(MPI_Comm comm) const {
1399 if (comm==MPI_COMM_WORLD) return worldStruct;
1400 if (comm==worldNo) return worldStruct;
1401 if (isSplit(comm)) return getSplit(comm);
1402 if (isGroup(comm)) return getGroup(comm);
1403 if (isCart(comm)) return getCart(comm);
1404 if (isGraph(comm)) return getGraph(comm);
1405 if (isInter(comm)) return getInter(comm);
1406 if (isIntra(comm)) return getIntra(comm);
1407 return universeComm2CommStruct(comm);
1410 inline ampi *comm2ampi(MPI_Comm comm) const {
1411 if (comm==MPI_COMM_WORLD) return worldPtr;
1412 if (comm==worldNo) return worldPtr;
1413 if (isSplit(comm)) {
1414 const ampiCommStruct &st=getSplit(comm);
1415 return st.getProxy()[thisIndex].ckLocal();
1417 if (isGroup(comm)) {
1418 const ampiCommStruct &st=getGroup(comm);
1419 return st.getProxy()[thisIndex].ckLocal();
1421 if (isCart(comm)) {
1422 const ampiCommStruct &st = getCart(comm);
1423 return st.getProxy()[thisIndex].ckLocal();
1425 if (isGraph(comm)) {
1426 const ampiCommStruct &st = getGraph(comm);
1427 return st.getProxy()[thisIndex].ckLocal();
1429 if (isInter(comm)) {
1430 const ampiCommStruct &st=getInter(comm);
1431 return st.getProxy()[thisIndex].ckLocal();
1433 if (isIntra(comm)) {
1434 const ampiCommStruct &st=getIntra(comm);
1435 return st.getProxy()[thisIndex].ckLocal();
1437 if (comm>MPI_COMM_WORLD) return worldPtr; //Use MPI_WORLD ampi for cross-world messages:
1438 CkAbort("Invalid communicator used!");
1439 return NULL;
1442 inline bool hasComm(const MPI_Group group) const {
1443 MPI_Comm comm = (MPI_Comm)group;
1444 return ( comm==MPI_COMM_WORLD || comm==worldNo || isSplit(comm) || isGroup(comm) ||
1445 isCart(comm) || isGraph(comm) || isIntra(comm) );
1446 //isInter omitted because its comm number != its group number
1448 inline const groupStruct group2vec(MPI_Group group) const {
1449 if(group == MPI_GROUP_NULL || group == MPI_GROUP_EMPTY)
1450 return groupStruct();
1451 if(hasComm(group))
1452 return comm2CommStruct((MPI_Comm)group).getIndices();
1453 if(isInGroups(group))
1454 return *groups[group];
1455 CkAbort("ampiParent::group2vec: Invalid group id!");
1456 return *groups[0]; //meaningless return
1458 inline MPI_Group saveGroupStruct(groupStruct vec){
1459 if (vec.empty()) return MPI_GROUP_EMPTY;
1460 int idx = groups.size();
1461 groups.resize(idx+1);
1462 groups[idx]=new groupStruct(vec);
1463 return (MPI_Group)idx;
1465 inline int getRank(const MPI_Group group) const {
1466 groupStruct vec = group2vec(group);
1467 return getPosOp(thisIndex,vec);
1470 inline int getMyPe(void) const {
1471 return CkMyPe();
1473 inline bool hasWorld(void) const {
1474 return worldPtr!=NULL;
1477 inline void checkComm(MPI_Comm comm) const {
1478 if ((comm > MPI_COMM_FIRST_RESVD && comm != MPI_COMM_WORLD)
1479 || (isSplit(comm) && comm-MPI_COMM_FIRST_SPLIT >= splitComm.size())
1480 || (isGroup(comm) && comm-MPI_COMM_FIRST_GROUP >= groupComm.size())
1481 || (isCart(comm) && comm-MPI_COMM_FIRST_CART >= cartComm.size())
1482 || (isGraph(comm) && comm-MPI_COMM_FIRST_GRAPH >= graphComm.size())
1483 || (isInter(comm) && comm-MPI_COMM_FIRST_INTER >= interComm.size())
1484 || (isIntra(comm) && comm-MPI_COMM_FIRST_INTRA >= intraComm.size()) )
1485 CkAbort("Invalid MPI_Comm\n");
1488 /// if intra-communicator, return comm, otherwise return null group
1489 inline MPI_Group comm2group(const MPI_Comm comm) const {
1490 if(isInter(comm)) return MPI_GROUP_NULL; // we don't support inter-communicator in such functions
1491 ampiCommStruct s = comm2CommStruct(comm);
1492 if(comm!=MPI_COMM_WORLD && comm!=s.getComm()) CkAbort("Error in ampiParent::comm2group()");
1493 return (MPI_Group)(s.getComm());
1496 inline int getRemoteSize(const MPI_Comm comm) const {
1497 if(isInter(comm)) return getInter(comm).getRemoteIndices().size();
1498 else return -1;
1500 inline MPI_Group getRemoteGroup(const MPI_Comm comm) {
1501 if(isInter(comm)) return saveGroupStruct(getInter(comm).getRemoteIndices());
1502 else return MPI_GROUP_NULL;
1505 int createKeyval(MPI_Copy_function *copy_fn, MPI_Delete_function *delete_fn,
1506 int *keyval, void* extra_state);
1507 int freeKeyval(int *keyval);
1508 bool getBuiltinKeyval(int keyval, void *attribute_val);
1509 int setUserKeyval(MPI_Comm comm, int keyval, void *attribute_val);
1510 bool getUserKeyval(MPI_Comm comm, int keyval, void *attribute_val, int *flag);
1512 int setCommAttr(MPI_Comm comm, int keyval, void *attribute_val);
1513 int getCommAttr(MPI_Comm comm, int keyval, void *attribute_val, int *flag);
1514 int deleteCommAttr(MPI_Comm comm, int keyval);
1516 int setWinAttr(MPI_Win win, int keyval, void *attribute_val);
1517 int getWinAttr(MPI_Win win, int keyval, void *attribute_val, int *flag);
1518 int deleteWinAttr(MPI_Win win, int keyval);
1520 CkDDT myDDTsto;
1521 CkDDT *myDDT;
1522 AmpiRequestList ampiReqs;
1524 int addWinStruct(WinStruct *win);
1525 WinStruct *getWinStruct(MPI_Win win) const;
1526 void removeWinStruct(WinStruct *win);
1528 public:
1529 int createInfo(MPI_Info *newinfo);
1530 int dupInfo(MPI_Info info, MPI_Info *newinfo);
1531 int setInfo(MPI_Info info, const char *key, const char *value);
1532 int deleteInfo(MPI_Info info, const char *key);
1533 int getInfo(MPI_Info info, const char *key, int valuelen, char *value, int *flag) const;
1534 int getInfoValuelen(MPI_Info info, const char *key, int *valuelen, int *flag) const;
1535 int getInfoNkeys(MPI_Info info, int *nkeys) const;
1536 int getInfoNthkey(MPI_Info info, int n, char *key) const;
1537 int freeInfo(MPI_Info info);
1539 void initOps(void);
1540 inline int createOp(MPI_User_function *fn, bool isCommutative) {
1541 // Search thru non-predefined op's for any invalidated ones:
1542 for (int i=MPI_NO_OP+1; i<ops.size(); i++) {
1543 if (ops[i].isFree()) {
1544 ops[i].init(fn, isCommutative);
1545 return i;
1548 // No invalid entries, so create a new one:
1549 OpStruct newop = OpStruct(fn, isCommutative);
1550 ops.push_back(newop);
1551 return ops.size()-1;
1553 inline void freeOp(MPI_Op op) {
1554 // Don't free predefined op's:
1555 if (op > MPI_NO_OP) {
1556 // Invalidate op, then free all invalid op's from the back of the op's vector
1557 ops[op].free();
1558 while (ops.back().isFree()) {
1559 ops.pop_back();
1563 inline bool opIsPredefined(MPI_Op op) const {
1564 return (op>=MPI_INFO_NULL && op<=MPI_NO_OP);
1566 inline bool opIsCommutative(MPI_Op op) const {
1567 CkAssert(op>MPI_OP_NULL && op<ops.size());
1568 return ops[op].isCommutative;
1570 inline MPI_User_function* op2User_function(MPI_Op op) const {
1571 CkAssert(op>MPI_OP_NULL && op<ops.size());
1572 return ops[op].func;
1574 inline AmpiOpHeader op2AmpiOpHeader(MPI_Op op, MPI_Datatype type, int count) const {
1575 CkAssert(op>MPI_OP_NULL && op<ops.size());
1576 int size = myDDT->getType(type)->getSize(count);
1577 return AmpiOpHeader(ops[op].func, type, count, size);
1579 inline void applyOp(MPI_Datatype datatype, MPI_Op op, int count, const void* invec, void* inoutvec) const {
1580 // inoutvec[i] = invec[i] op inoutvec[i]
1581 MPI_User_function *func = op2User_function(op);
1582 (func)((void*)invec, inoutvec, &count, &datatype);
1585 public:
1586 #if AMPI_PRINT_MSG_SIZES
1587 // Map of AMPI routine names to message sizes and number of messages:
1588 // ["AMPI_Routine"][ [msg_size][num_msgs] ]
1589 #if CMK_USING_XLC
1590 #include <tr1/unordered_map>
1591 std::tr1::unordered_map<std::string, std::map<int, int> > msgSizes;
1592 #else
1593 std::unordered_map<std::string, std::map<int, int> > msgSizes;
1594 #endif
1595 inline bool isRankRecordingMsgSizes(void);
1596 inline void recordMsgSize(const char* func, int msgSize);
1597 void printMsgSizes(void);
1598 #endif
1600 #if AMPIMSGLOG
1601 /* message logging */
1602 int pupBytes;
1603 #if CMK_PROJECTIONS_USE_ZLIB && 0
1604 gzFile fMsgLog;
1605 PUP::tozDisk *toPUPer;
1606 PUP::fromzDisk *fromPUPer;
1607 #else
1608 FILE* fMsgLog;
1609 PUP::toDisk *toPUPer;
1610 PUP::fromDisk *fromPUPer;
1611 #endif
1612 #endif
1613 void init();
1614 void finalize();
1615 void block(void);
1616 void yield(void);
1620 An ampi manages the communication of one thread over
1621 one MPI communicator.
1623 class ampi : public CBase_ampi {
1624 friend class IReq; // for checking resumeOnRecv
1625 friend class SendReq;
1626 friend class SsendReq;
1627 friend class RednReq;
1628 friend class GatherReq;
1629 friend class GathervReq;
1630 CProxy_ampiParent parentProxy;
1631 void findParent(bool forMigration);
1632 ampiParent *parent;
1633 TCharm *thread;
1634 AmpiRequest *blockingReq;
1636 ampiCommStruct myComm;
1637 int myRank;
1638 groupStruct tmpVec; // stores temp group info
1639 CProxy_ampi remoteProxy; // valid only for intercommunicator
1641 AmpiSeqQ oorder;
1642 void inorder(AmpiMsg *msg);
1643 void inorderRdma(char* buf, int size, int seq, int tag, int srcRank,
1644 MPI_Comm comm, int ssendReq);
1646 void init(void);
1648 public: // entry methods
1650 ampi();
1651 ampi(CkArrayID parent_,const ampiCommStruct &s);
1652 ampi(CkMigrateMessage *msg);
1653 void ckJustMigrated(void);
1654 void ckJustRestored(void);
1655 ~ampi();
1657 void pup(PUP::er &p);
1659 void allInitDone();
1660 void setInitDoneFlag();
1662 void unblock(void);
1663 void generic(AmpiMsg *);
1664 void genericRdma(char* buf, int size, int seq, int tag, int srcRank,
1665 MPI_Comm destcomm, int ssendReq);
1666 void completedRdmaSend(CkDataMsg *msg);
1667 void ssend_ack(int sreq);
1668 void barrierResult(void);
1669 void ibarrierResult(void);
1670 void rednResult(CkReductionMsg *msg);
1671 void irednResult(CkReductionMsg *msg);
1673 void splitPhase1(CkReductionMsg *msg);
1674 void splitPhaseInter(CkReductionMsg *msg);
1675 void commCreatePhase1(MPI_Comm nextGroupComm);
1676 void intercommCreatePhase1(MPI_Comm nextInterComm);
1677 void intercommMergePhase1(MPI_Comm nextIntraComm);
1679 private: // Used by the above entry methods that create new MPI_Comm objects
1680 CProxy_ampi createNewChildAmpiSync();
1681 void insertNewChildAmpiElements(MPI_Comm newComm, CProxy_ampi newAmpi);
1683 public: // to be used by MPI_* functions
1685 inline const ampiCommStruct &comm2CommStruct(MPI_Comm comm) const {
1686 return parent->comm2CommStruct(comm);
1689 inline ampi* blockOnRecv(void);
1690 inline ampi* blockOnColl(void);
1691 inline ampi* blockOnRedn(AmpiRequest *req);
1692 MPI_Request postReq(AmpiRequest* newreq);
1694 inline int getSeqNo(int destRank, MPI_Comm destcomm, int tag);
1695 AmpiMsg *makeAmpiMsg(int destRank,int t,int sRank,const void *buf,int count,
1696 MPI_Datatype type,MPI_Comm destcomm, int ssendReq=0);
1698 MPI_Request send(int t, int s, const void* buf, int count, MPI_Datatype type, int rank,
1699 MPI_Comm destcomm, int ssendReq=0, AmpiSendType sendType=BLOCKING_SEND);
1700 static void sendraw(int t, int s, void* buf, int len, CkArrayID aid, int idx);
1701 inline MPI_Request sendLocalMsg(int t, int sRank, const void* buf, int size, int destRank,
1702 MPI_Comm destcomm, ampi* destPtr, int ssendReq, AmpiSendType sendType);
1703 inline MPI_Request sendRdmaMsg(int t, int sRank, const void* buf, int size, int destIdx,
1704 int destRank, MPI_Comm destcomm, CProxy_ampi arrProxy, int ssendReq);
1705 inline bool destLikelyWithinProcess(CProxy_ampi arrProxy, int destIdx) const {
1706 CkArray* localBranch = arrProxy.ckLocalBranch();
1707 int destPe = localBranch->lastKnown(CkArrayIndex1D(destIdx));
1708 return (CkNodeOf(destPe) == CkMyNode());
1710 MPI_Request delesend(int t, int s, const void* buf, int count, MPI_Datatype type, int rank,
1711 MPI_Comm destcomm, CProxy_ampi arrproxy, int ssend, AmpiSendType sendType);
1712 inline void processAmpiMsg(AmpiMsg *msg, const void* buf, MPI_Datatype type, int count);
1713 inline void processRdmaMsg(const void *sbuf, int slength, int ssendReq, int srank, void* rbuf,
1714 int rcount, MPI_Datatype rtype, MPI_Comm comm);
1715 inline void processRednMsg(CkReductionMsg *msg, const void* buf, MPI_Datatype type, int count);
1716 inline void processNoncommutativeRednMsg(CkReductionMsg *msg, void* buf, MPI_Datatype type, int count,
1717 MPI_User_function* func);
1718 inline void processGatherMsg(CkReductionMsg *msg, const void* buf, MPI_Datatype type, int recvCount);
1719 inline void processGathervMsg(CkReductionMsg *msg, const void* buf, MPI_Datatype type,
1720 int* recvCounts, int* displs);
1721 inline AmpiMsg * getMessage(int t, int s, MPI_Comm comm, int *sts) const;
1722 int recv(int t,int s,void* buf,int count,MPI_Datatype type,MPI_Comm comm,MPI_Status *sts=NULL);
1723 void irecv(void *buf, int count, MPI_Datatype type, int src,
1724 int tag, MPI_Comm comm, MPI_Request *request);
1725 void sendrecv(const void *sbuf, int scount, MPI_Datatype stype, int dest, int stag,
1726 void *rbuf, int rcount, MPI_Datatype rtype, int src, int rtag,
1727 MPI_Comm comm, MPI_Status *sts);
1728 void probe(int t,int s,MPI_Comm comm,MPI_Status *sts);
1729 int iprobe(int t,int s,MPI_Comm comm,MPI_Status *sts);
1730 void barrier(void);
1731 void ibarrier(MPI_Request *request);
1732 void bcast(int root, void* buf, int count, MPI_Datatype type, MPI_Comm comm);
1733 int intercomm_bcast(int root, void* buf, int count, MPI_Datatype type, MPI_Comm intercomm);
1734 void ibcast(int root, void* buf, int count, MPI_Datatype type, MPI_Comm comm, MPI_Request* request);
1735 int intercomm_ibcast(int root, void* buf, int count, MPI_Datatype type, MPI_Comm intercomm, MPI_Request *request);
1736 static void bcastraw(void* buf, int len, CkArrayID aid);
1737 void split(int color,int key,MPI_Comm *dest, int type);
1738 void commCreate(const groupStruct vec,MPI_Comm *newcomm);
1739 MPI_Comm cartCreate0D(void);
1740 MPI_Comm cartCreate(groupStruct vec, int ndims, const int* dims);
1741 void graphCreate(const groupStruct vec, MPI_Comm *newcomm);
1742 void intercommCreate(const groupStruct rvec, int root, MPI_Comm tcomm, MPI_Comm *ncomm);
1744 inline bool isInter(void) const { return myComm.isinter(); }
1745 void intercommMerge(int first, MPI_Comm *ncomm);
1747 inline int getWorldRank(void) const {return parent->thisIndex;}
1748 /// Return our rank in this communicator
1749 inline int getRank(void) const {return myRank;}
1750 inline int getSize(void) const {return myComm.getSize();}
1751 inline MPI_Comm getComm(void) const {return myComm.getComm();}
1752 inline void setCommName(const char *name){myComm.setName(name);}
1753 inline void getCommName(char *name, int *len) const {myComm.getName(name,len);}
1754 inline vector<int> getIndices(void) const { return myComm.getIndices(); }
1755 inline vector<int> getRemoteIndices(void) const { return myComm.getRemoteIndices(); }
1756 inline const CProxy_ampi &getProxy(void) const {return thisProxy;}
1757 inline const CProxy_ampi &getRemoteProxy(void) const {return remoteProxy;}
1758 inline void setRemoteProxy(CProxy_ampi rproxy) { remoteProxy = rproxy; thread->resume(); }
1759 inline int getIndexForRank(int r) const {return myComm.getIndexForRank(r);}
1760 inline int getIndexForRemoteRank(int r) const {return myComm.getIndexForRemoteRank(r);}
1761 void findNeighbors(MPI_Comm comm, int rank, vector<int>& neighbors) const;
1762 inline const vector<int>& getNeighbors() const { return myComm.getnbors(); }
1763 inline bool opIsCommutative(MPI_Op op) const { return parent->opIsCommutative(op); }
1764 inline MPI_User_function* op2User_function(MPI_Op op) const { return parent->op2User_function(op); }
1766 CkDDT *getDDT(void) const {return parent->myDDT;}
1767 CthThread getThread() const { return thread->getThread(); }
1768 public:
1770 * AmmTable is indexed by the tag and sender.
1771 * Since ampi objects are per-communicator, there are separate AmmTables per communicator.
1772 * FIXME: These are directly used by API routines, which is hideous.
1774 AmmTable msgs; // unexpected message queue
1775 AmmTable posted_ireqs; // posted request queue
1776 //------------------------ Added by YAN ---------------------
1777 private:
1778 CkPupPtrVec<win_obj> winObjects;
1779 public:
1780 MPI_Win createWinInstance(void *base, MPI_Aint size, int disp_unit, MPI_Info info);
1781 int deleteWinInstance(MPI_Win win);
1782 int winGetGroup(WinStruct *win, MPI_Group *group) const;
1783 int winPut(const void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
1784 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, WinStruct *win);
1785 int winGet(void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
1786 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, WinStruct *win);
1787 int winIget(MPI_Aint orgdisp, int orgcnt, MPI_Datatype orgtype, int rank,
1788 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, WinStruct *win,
1789 MPI_Request *req);
1790 int winIgetWait(MPI_Request *request, MPI_Status *status);
1791 int winIgetFree(MPI_Request *request, MPI_Status *status);
1792 void winRemotePut(int orgtotalsize, char* orgaddr, int orgcnt, MPI_Datatype orgtype,
1793 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, int winIndex);
1794 char* winLocalGet(int orgcnt, MPI_Datatype orgtype, MPI_Aint targdisp, int targcnt,
1795 MPI_Datatype targtype, int winIndex);
1796 AmpiMsg* winRemoteGet(int orgcnt, MPI_Datatype orgtype, MPI_Aint targdisp,
1797 int targcnt, MPI_Datatype targtype, int winIndex);
1798 AmpiMsg* winRemoteIget(MPI_Aint orgdisp, int orgcnt, MPI_Datatype orgtype, MPI_Aint targdisp,
1799 int targcnt, MPI_Datatype targtype, int winIndex);
1800 int winLock(int lock_type, int rank, WinStruct *win);
1801 int winUnlock(int rank, WinStruct *win);
1802 void winRemoteLock(int lock_type, int winIndex, int requestRank);
1803 void winRemoteUnlock(int winIndex, int requestRank);
1804 int winAccumulate(const void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
1805 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
1806 MPI_Op op, WinStruct *win);
1807 void winRemoteAccumulate(int orgtotalsize, char* orgaddr, int orgcnt, MPI_Datatype orgtype,
1808 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
1809 MPI_Op op, int winIndex);
1810 int winGetAccumulate(const void *orgaddr, int orgcnt, MPI_Datatype orgtype, void *resaddr,
1811 int rescnt, MPI_Datatype restype, int rank, MPI_Aint targdisp,
1812 int targcnt, MPI_Datatype targtype, MPI_Op op, WinStruct *win);
1813 void winLocalGetAccumulate(int orgtotalsize, char* sorgaddr, int orgcnt, MPI_Datatype orgtype,
1814 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, MPI_Op op,
1815 char *resaddr, int winIndex);
1816 AmpiMsg* winRemoteGetAccumulate(int orgtotalsize, char* sorgaddr, int orgcnt, MPI_Datatype orgtype,
1817 MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, MPI_Op op,
1818 int winIndex);
1819 int winCompareAndSwap(const void *orgaddr, const void *compaddr, void *resaddr, MPI_Datatype type,
1820 int rank, MPI_Aint targdisp, WinStruct *win);
1821 char* winLocalCompareAndSwap(int size, char* sorgaddr, char* compaddr, MPI_Datatype type,
1822 MPI_Aint targdisp, int winIndex);
1823 AmpiMsg* winRemoteCompareAndSwap(int size, char *sorgaddr, char *compaddr, MPI_Datatype type,
1824 MPI_Aint targdisp, int winIndex);
1825 void winSetName(WinStruct *win, const char *name);
1826 void winGetName(WinStruct *win, char *name, int *length) const;
1827 win_obj* getWinObjInstance(WinStruct *win) const;
1828 int getNewSemaId();
1830 AmpiMsg* Alltoall_RemoteIget(MPI_Aint disp, int targcnt, MPI_Datatype targtype, int tag);
1831 int intercomm_scatter(int root, const void *sendbuf, int sendcount, MPI_Datatype sendtype,
1832 void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm intercomm);
1833 int intercomm_iscatter(int root, const void *sendbuf, int sendcount, MPI_Datatype sendtype,
1834 void *recvbuf, int recvcount, MPI_Datatype recvtype,
1835 MPI_Comm intercomm, MPI_Request *request);
1836 int intercomm_scatterv(int root, const void* sendbuf, const int* sendcounts, const int* displs,
1837 MPI_Datatype sendtype, void* recvbuf, int recvcount,
1838 MPI_Datatype recvtype, MPI_Comm intercomm);
1839 int intercomm_iscatterv(int root, const void* sendbuf, const int* sendcounts, const int* displs,
1840 MPI_Datatype sendtype, void* recvbuf, int recvcount,
1841 MPI_Datatype recvtype, MPI_Comm intercomm, MPI_Request* request);
1843 private:
1844 bool AlltoallGetFlag;
1845 void *Alltoallbuff;
1846 public:
1847 void setA2AIgetFlag(void* ptr) {AlltoallGetFlag=true;Alltoallbuff=ptr;}
1848 void resetA2AIgetFlag() {AlltoallGetFlag=false;Alltoallbuff=NULL;}
1849 //------------------------ End of code by YAN ---------------------
1852 ampiParent *getAmpiParent(void);
1853 bool isAmpiThread(void);
1854 ampi *getAmpiInstance(MPI_Comm comm);
1855 void checkComm(MPI_Comm comm);
1856 void checkRequest(MPI_Request req);
1857 void handle_MPI_BOTTOM(void* &buf, MPI_Datatype type);
1858 void handle_MPI_BOTTOM(void* &buf1, MPI_Datatype type1, void* &buf2, MPI_Datatype type2);
1860 #if AMPI_ERROR_CHECKING
1861 int ampiErrhandler(const char* func, int errcode);
1862 #else
1863 #define ampiErrhandler(func, errcode) (errcode)
1864 #endif
1867 #if CMK_TRACE_ENABLED
1869 // List of AMPI functions to trace:
1870 static const char *funclist[] = {"AMPI_Abort", "AMPI_Add_error_class", "AMPI_Add_error_code", "AMPI_Add_error_string",
1871 "AMPI_Address", "AMPI_Allgather", "AMPI_Allgatherv", "AMPI_Allreduce", "AMPI_Alltoall",
1872 "AMPI_Alltoallv", "AMPI_Alltoallw", "AMPI_Attr_delete", "AMPI_Attr_get",
1873 "AMPI_Attr_put", "AMPI_Barrier", "AMPI_Bcast", "AMPI_Bsend", "AMPI_Cancel",
1874 "AMPI_Cart_coords", "AMPI_Cart_create", "AMPI_Cart_get", "AMPI_Cart_map",
1875 "AMPI_Cart_rank", "AMPI_Cart_shift", "AMPI_Cart_sub", "AMPI_Cartdim_get",
1876 "AMPI_Comm_call_errhandler", "AMPI_Comm_compare", "AMPI_Comm_create",
1877 "AMPI_Comm_create_errhandler", "AMPI_Comm_create_keyval", "AMPI_Comm_delete_attr",
1878 "AMPI_Comm_dup", "AMPI_Comm_dup_with_info", "AMPI_Comm_free",
1879 "AMPI_Comm_free_errhandler", "AMPI_Comm_free_keyval", "AMPI_Comm_get_attr",
1880 "AMPI_Comm_get_errhandler", "AMPI_Comm_get_info", "AMPI_Comm_get_name",
1881 "AMPI_Comm_group", "AMPI_Comm_rank", "AMPI_Comm_remote_group", "AMPI_Comm_remote_size",
1882 "AMPI_Comm_set_attr", "AMPI_Comm_set_errhandler", "AMPI_Comm_set_info", "AMPI_Comm_set_name",
1883 "AMPI_Comm_size", "AMPI_Comm_split", "AMPI_Comm_split_type", "AMPI_Comm_test_inter",
1884 "AMPI_Dims_create", "AMPI_Errhandler_create", "AMPI_Errhandler_free", "AMPI_Errhandler_get",
1885 "AMPI_Errhandler_set", "AMPI_Error_class", "AMPI_Error_string", "AMPI_Exscan", "AMPI_Finalize",
1886 "AMPI_Finalized", "AMPI_Gather", "AMPI_Gatherv", "AMPI_Get_address", "AMPI_Get_count",
1887 "AMPI_Get_elements", "AMPI_Get_library_version", "AMPI_Get_processor_name", "AMPI_Get_version",
1888 "AMPI_Graph_create", "AMPI_Graph_get", "AMPI_Graph_map", "AMPI_Graph_neighbors",
1889 "AMPI_Graph_neighbors_count", "AMPI_Graphdims_get", "AMPI_Group_compare", "AMPI_Group_difference",
1890 "AMPI_Group_excl", "AMPI_Group_free", "AMPI_Group_incl", "AMPI_Group_intersection",
1891 "AMPI_Group_range_excl", "AMPI_Group_range_incl", "AMPI_Group_rank", "AMPI_Group_size",
1892 "AMPI_Group_translate_ranks", "AMPI_Group_union", "AMPI_Iallgather", "AMPI_Iallgatherv",
1893 "AMPI_Iallreduce", "AMPI_Ialltoall", "AMPI_Ialltoallv", "AMPI_Ialltoallw", "AMPI_Ibarrier",
1894 "AMPI_Ibcast", "AMPI_Igather", "AMPI_Igatherv", "AMPI_Ineighbor_allgather",
1895 "AMPI_Ineighbor_allgatherv", "AMPI_Ineighbor_alltoall", "AMPI_Ineighbor_alltoallv",
1896 "AMPI_Ineighbor_alltoallw", "AMPI_Init", "AMPI_Init_thread", "AMPI_Initialized",
1897 "AMPI_Intercomm_create", "AMPI_Intercomm_merge", "AMPI_Iprobe", "AMPI_Irecv", "AMPI_Ireduce",
1898 "AMPI_Is_thread_main", "AMPI_Iscatter", "AMPI_Iscatterv", "AMPI_Isend", "AMPI_Issend",
1899 "AMPI_Keyval_create", "AMPI_Keyval_free", "AMPI_Neighbor_allgather", "AMPI_Neighbor_allgatherv",
1900 "AMPI_Neighbor_alltoall", "AMPI_Neighbor_alltoallv", "AMPI_Neighbor_alltoallw",
1901 "AMPI_Op_commutative", "AMPI_Op_create", "AMPI_Op_free", "AMPI_Pack", "AMPI_Pack_size",
1902 "AMPI_Pcontrol", "AMPI_Probe", "AMPI_Query_thread", "AMPI_Recv", "AMPI_Recv_init", "AMPI_Reduce",
1903 "AMPI_Reduce_local", "AMPI_Reduce_scatter", "AMPI_Reduce_scatter_block", "AMPI_Request_free",
1904 "AMPI_Request_get_status", "AMPI_Rsend", "AMPI_Scan", "AMPI_Scatter", "AMPI_Scatterv", "AMPI_Send",
1905 "AMPI_Send_init", "AMPI_Sendrecv", "AMPI_Sendrecv_replace", "AMPI_Ssend", "AMPI_Ssend_init",
1906 "AMPI_Start", "AMPI_Startall", "AMPI_Status_set_cancelled", "AMPI_Status_set_elements", "AMPI_Test",
1907 "AMPI_Test_cancelled", "AMPI_Testall", "AMPI_Testany", "AMPI_Testsome", "AMPI_Topo_test",
1908 "AMPI_Type_commit", "AMPI_Type_contiguous", "AMPI_Type_create_hindexed",
1909 "AMPI_Type_create_hindexed_block", "AMPI_Type_create_hvector", "AMPI_Type_create_indexed_block",
1910 "AMPI_Type_create_keyval", "AMPI_Type_create_resized", "AMPI_Type_create_struct",
1911 "AMPI_Type_delete_attr", "AMPI_Type_dup", "AMPI_Type_extent", "AMPI_Type_free",
1912 "AMPI_Type_free_keyval", "AMPI_Type_get_attr", "AMPI_Type_get_contents", "AMPI_Type_get_envelope",
1913 "AMPI_Type_get_extent", "AMPI_Type_get_name", "AMPI_Type_get_true_extent", "AMPI_Type_hindexed",
1914 "AMPI_Type_hvector", "AMPI_Type_indexed", "AMPI_Type_lb", "AMPI_Type_set_attr",
1915 "AMPI_Type_set_name", "AMPI_Type_size", "AMPI_Type_struct", "AMPI_Type_ub", "AMPI_Type_vector",
1916 "AMPI_Unpack", "AMPI_Wait", "AMPI_Waitall", "AMPI_Waitany", "AMPI_Waitsome", "AMPI_Wtick", "AMPI_Wtime",
1917 "AMPI_Accumulate", "AMPI_Alloc_mem", "AMPI_Compare_and_swap", "AMPI_Fetch_and_op",
1918 "AMPI_Free_mem", "AMPI_Get", "AMPI_Get_accumulate", "AMPI_Info_create",
1919 "AMPI_Info_delete", "AMPI_Info_dup", "AMPI_Info_free", "AMPI_Info_get",
1920 "AMPI_Info_get_nkeys", "AMPI_Info_get_nthkey", "AMPI_Info_get_valuelen",
1921 "AMPI_Info_set", "AMPI_Put", "AMPI_Raccumulate", "AMPI_Rget", "AMPI_Rget_accumulate",
1922 "AMPI_Rput", "AMPI_Win_complete", "AMPI_Win_create", "AMPI_Win_create_errhandler",
1923 "AMPI_Win_create_keyval", "AMPI_Win_delete_attr", "AMPI_Win_fence", "AMPI_Win_free",
1924 "AMPI_Win_free_keyval", "AMPI_Win_get_attr", "AMPI_Win_get_errhandler",
1925 "AMPI_Win_get_group", "AMPI_Win_get_info", "AMPI_Win_get_name", "AMPI_Win_lock",
1926 "AMPI_Win_post", "AMPI_Win_set_attr", "AMPI_Win_set_errhandler", "AMPI_Win_set_info",
1927 "AMPI_Win_set_name", "AMPI_Win_start", "AMPI_Win_test", "AMPI_Win_unlock",
1928 "AMPI_Win_wait", "AMPI_Exit" /*AMPI extensions:*/, "AMPI_Migrate",
1929 "AMPI_Load_start_measure", "AMPI_Load_stop_measure",
1930 "AMPI_Load_set_value", "AMPI_Migrate_to_pe", "AMPI_Set_migratable",
1931 "AMPI_Register_pup", "AMPI_Get_pup_data", "AMPI_Register_main",
1932 "AMPI_Register_about_to_migrate", "AMPI_Register_just_migrated",
1933 "AMPI_Iget", "AMPI_Iget_wait", "AMPI_Iget_free", "AMPI_Iget_data",
1934 "AMPI_Type_is_contiguous", "AMPI_Evacuate", "AMPI_Yield", "AMPI_Suspend",
1935 "AMPI_Resume", "AMPI_Print", "AMPI_Alltoall_iget", "AMPI_Alltoall_medium",
1936 "AMPI_Alltoall_long", /*CUDA:*/ "AMPI_GPU_Iinvoke", "AMPI_GPU_Invoke", "AMPI_System"};
1938 // not traced: AMPI_Trace_begin, AMPI_Trace_end
1940 #endif // CMK_TRACE_ENABLED
1942 //Use this to mark the start of AMPI interface routines that can only be called on AMPI threads:
1943 #if CMK_ERROR_CHECKING
1944 #define AMPIAPI(routineName) \
1945 if (!isAmpiThread()) { CkAbort("AMPI> cannot call MPI routines from non-AMPI threads!"); } \
1946 TCHARM_API_TRACE(routineName, "ampi");
1947 #else
1948 #define AMPIAPI(routineName) TCHARM_API_TRACE(routineName, "ampi")
1949 #endif
1951 //Use this for MPI_Init and routines than can be called before AMPI threads have been initialized:
1952 #define AMPIAPI_INIT(routineName) TCHARM_API_TRACE(routineName, "ampi")
1954 #endif // _AMPIIMPL_H