interop: handle the special case for MPI layer where main thread is a comm
[charm.git] / examples / charm++ / mpi-coexist / libs / kNeighbor / kNeighbor.C
blobe2872943a6bb2ca4ba44b56dcdd3e98fad01155f
1 #include "kNeighbor.decl.h"
2 #include <stdio.h>
3 #include <stdlib.h>
4 //header from Charm to enable Interoperation
5 #include "mpi-interoperate.h"
7 #define STRIDEK         1
8 #define CALCPERSTEP     10
10 #define DEBUG           0
13 /* readonly */ CProxy_Main mainProxy;
14 /* readonly */ int num_chares;
15 /* readonly */ int gMsgSize;
16 /* readonly */ int gLBFreq;
18 int cmpFunc(const void *a, const void *b) {
19   if(*(double *)a < *(double *)b) return -1;
20   if(*(double *)a > *(double *)b) return 1;
21   return 0;
24 class toNeighborMsg: public CMessage_toNeighborMsg {
25   public:
26     int *data;
27     int size;
28     int fromX;
29     int nID;
31   public:
32     toNeighborMsg() {};
33     toNeighborMsg(int s): size(s) {  
34     }
36     void setMsgSrc(int X, int id) {
37       fromX = X;
38       nID = id;
39     }
43 class Main: public CBase_Main {
44   public:
45     CProxy_Block array;
47     int numSteps;
48     int currentStep;
49     int currentMsgSize;
51     int numElemsRcvd;
52     double totalTime;
53     double maxTime;
54     double minTime;
55     double *timeRec;
57     double gStarttime;
59   public:
60     Main(CkArgMsg *m) {
61       mainProxy = thisProxy;
62     }
64                 void beginWork(int numsteps, int msgsize, int lbfreq)
65                 {
66         numSteps = numsteps;;
67         currentMsgSize = msgsize;
69                         gLBFreq = lbfreq;
71         CkPrintf("\nStarting kNeighbor ...\n");
72         gMsgSize = currentMsgSize;
73         currentStep = -1;
75         timeRec = new double[numSteps];
76                         array = CProxy_Block::ckNew(num_chares);
77         CkCallback *cb = new CkCallback(CkIndex_Main::nextStep(NULL), thisProxy);
78         array.ckSetReductionClient(cb);
80         beginIteration();
81                 }
83     void beginIteration() {
84       currentStep++;
85       if (currentStep == numSteps) {
86         CkPrintf("kNeighbor program finished!\n\n");
87         //CkCallback *cb = new CkCallback(CkIndex_Main::terminate(NULL), thisProxy);
88         //array.ckSetReductionClient(cb);
89         terminate(NULL);
90         return;
91       }
93       numElemsRcvd = 0;
94       totalTime = 0.0;
95       maxTime = 0.0;
96       minTime = 3600.0;
98       //currentMsgSize = msgSize;
99       if(currentStep!=0 && (currentStep % gLBFreq == 0)) {
100         array.pauseForLB();
101         return;
102       }
104       gStarttime = CmiWallTimer();
105       for (int i=0; i<num_chares; i++)
106         array[i].commWithNeighbors();
107     }
109     void resumeIter() {
110 #if DEBUG
111       CkPrintf("Resume iteration at step %d\n", currentStep);
112 #endif
113       gStarttime = CmiWallTimer();
114       for (int i=0; i<num_chares; i++)
115         array[i].commWithNeighbors();
116     }
118     void terminate(CkReductionMsg *msg) {
119       delete msg;
120       double total = 0.0;
122       for (int i=0; i<numSteps; i++)
123         timeRec[i] = timeRec[i]*1e6;
125       qsort(timeRec, numSteps, sizeof(double), cmpFunc);
126       printf("Time stats: lowest: %f, median: %f, highest: %f\n", timeRec[0], timeRec[numSteps/2], timeRec[numSteps-1]);
128       int samples = 100;
129       if(numSteps<=samples) samples = numSteps-1;
130       for (int i=0; i<samples; i++)
131         total += timeRec[i];
132       total /= samples;
134       CkPrintf("Average time for each %d-Neighbor iteration with msg size %d is %f (us)\n", STRIDEK, currentMsgSize, total);
135       CkExit();
136     }
138     void nextStep(CkReductionMsg  *msg) {
139       maxTime = *((double *)msg->getData());
140       delete msg;
141       double wholeStepTime = CmiWallTimer() - gStarttime;
142       timeRec[currentStep] = wholeStepTime/CALCPERSTEP;
143       if(currentStep % 10 == 0)
144         CkPrintf("Step %d with msg size %d finished: max=%f, total=%f\n", currentStep, currentMsgSize, maxTime/CALCPERSTEP, wholeStepTime/CALCPERSTEP);
145       beginIteration();
146     }
151 //no wrap around for sending messages to neighbors
152 class Block: public CBase_Block {
153   public:
154     int numNeighbors;
155     int numNborsRcvd;
156     int *neighbors;
157     double *recvTimes;
158     double startTime;
160     int random;
161     int curIterMsgSize;
162     int internalStepCnt;
163     int sum;
165     toNeighborMsg **iterMsg;
167   public:
168     Block() {
169       //srand(thisIndex.x+thisIndex.y);
170       usesAtSync = true;
172       numNeighbors = 2*STRIDEK;
173       neighbors = new int[numNeighbors];
174       recvTimes = new double[numNeighbors];
175       int nidx=0;
176       //setting left neighbors
177       for (int i=thisIndex-STRIDEK; i<thisIndex; i++, nidx++) {
178         int tmpnei = i;
179         while (tmpnei<0) tmpnei += num_chares;
180         neighbors[nidx] = tmpnei;
181       }
182       //setting right neighbors
183       for (int i=thisIndex+1; i<=thisIndex+STRIDEK; i++, nidx++) {
184         int tmpnei = i;
185         while (tmpnei>=num_chares) tmpnei -= num_chares;
186         neighbors[nidx] = tmpnei;
187       }
189       for (int i=0; i<numNeighbors; i++)
190         recvTimes[i] = 0.0;
192       iterMsg = new toNeighborMsg *[numNeighbors];
193       for (int i=0; i<numNeighbors; i++)
194         iterMsg[i] = NULL;
196 #if DEBUG
197       CkPrintf("Neighbors of %d: ", thisIndex);
198       for (int i=0; i<numNeighbors; i++)
199         CkPrintf("%d ", neighbors[i]);
200       CkPrintf("\n");
201 #endif
203       random = thisIndex*31+73;
204     }
206     ~Block() {
207       delete [] neighbors;
208       delete [] recvTimes;
209       delete [] iterMsg;
210     }
212     void pup(PUP::er &p){
213       CBase_Block::pup(p);
214       p(numNeighbors);
215       p(numNborsRcvd);
217       if(p.isUnpacking()) {
218         neighbors = new int[numNeighbors];
219         recvTimes = new double[numNeighbors];
220       }
221       PUParray(p, neighbors, numNeighbors);
222       PUParray(p, recvTimes, numNeighbors);
223       p(startTime);
224       p(random);
225       p(curIterMsgSize);
226       p(internalStepCnt);
227       p(sum);
228       if(p.isUnpacking()) iterMsg = new toNeighborMsg *[numNeighbors];
229       for(int i=0; i<numNeighbors; i++){
230         CkPupMessage(p, (void **)&iterMsg[i]);
231       }
232     }
234     Block(CkMigrateMessage *m) {}
236     void pauseForLB(){
237 #if DEBUG
238       CkPrintf("Element %d pause for LB on PE %d\n", thisIndex, CkMyPe());
239 #endif
240       AtSync();
241     }
243     void ResumeFromSync(){ //Called by load-balancing framework
244       CkCallback cb(CkIndex_Main::resumeIter(), mainProxy);
245       contribute(0, NULL, CkReduction::sum_int, cb);
246     }
248     void startInternalIteration() {
249 #if DEBUG
250       CkPrintf("[%d]: Start internal iteration \n", thisIndex);
251 #endif
253       numNborsRcvd = 0;
254       /* 1: pick a work size and do some computation */
255       int N = (thisIndex * thisIndex / num_chares) * 100;
256       for (int i=0; i<N; i++)
257         for (int j=0; j<N; j++) {
258           sum += (thisIndex * i + j);
259         }
261       /* 2. send msg to K neighbors */
262       int msgSize = curIterMsgSize;
264       // Send msgs to neighbors
265       for (int i=0; i<numNeighbors; i++) {
266         //double memtimer = CmiWallTimer();
268         toNeighborMsg *msg = iterMsg[i];
270 #if DEBUG
271         CkPrintf("[%d]: send msg to neighbor[%d]=%d\n", thisIndex, i, neighbors[i]);
272 #endif
273         msg->setMsgSrc(thisIndex, i);
274         //double entrytimer = CmiWallTimer();
275         thisProxy(neighbors[i]).recvMsgs(msg);
276         //double entrylasttimer = CmiWallTimer();
277         //if(thisIndex==0){
278         //      CkPrintf("At current step %d to neighbor %d, msg creation time: %f, entrymethod fire time: %f\n", internalStepCnt, neighbors[i], entrytimer-memtimer, entrylasttimer-entrytimer);
279         //}
280       }
281     }
283     void commWithNeighbors() {
284       internalStepCnt = 0;
285       curIterMsgSize = gMsgSize;
286       //currently the work size is only changed every big steps (which
287       //are initiated by the main proxy
288       random++;
290       if(iterMsg[0]==NULL) { //indicating the messages have not been created
291         for(int i=0; i<numNeighbors; i++)
292           iterMsg[i] = new(curIterMsgSize/4, 0) toNeighborMsg(curIterMsgSize/4);
293       }
295       startTime = CmiWallTimer();
296       startInternalIteration();
297     }
299     void recvReplies(toNeighborMsg *m) {
300       int fromNID = m->nID;
302 #if DEBUG
303       CkPrintf("[%d]: receive ack from neighbor[%d]=%d\n", thisIndex, fromNID, neighbors[fromNID]);
304 #endif
306       iterMsg[fromNID] = m;
307       //recvTimes[fromNID] += (CmiWallTimer() - startTime);
309       //get one step time and send it back to mainProxy
310       numNborsRcvd++;
311       if (numNborsRcvd == numNeighbors) {
312         internalStepCnt++;
313         if (internalStepCnt==CALCPERSTEP) {
314           double iterCommTime = CmiWallTimer() - startTime;
315           contribute(sizeof(double), &iterCommTime, CkReduction::max_double);
316           /*if(thisIndex==0){
317             for(int i=0; i<numNeighbors; i++){
318             CkPrintf("RTT time from neighbor %d (actual elem id %d): %lf\n", i, neighbors[i], recvTimes[i]);
319             }
320             }*/
321         } else {
322           startInternalIteration();
323         }
324       }
325     }
327     void recvMsgs(toNeighborMsg *m) {
328 #if DEBUG
329       CkPrintf("[%d]: recv msg from %d as its %dth neighbor\n", thisIndex, m->fromX, m->nID);
330 #endif
332       thisProxy(m->fromX).recvReplies(m);
333     }
335     inline int MAX(int a, int b) {
336       return (a>b)?a:b;
337     }
338     inline int MIN(int a, int b) {
339       return (a<b)?a:b;
340     }
343 //C++ function invoked from MPI, marks the begining of Charm++
344 void kNeighbor(int numchares, int numsteps, int msgsize, int lbfreq)
346   num_chares = num_chares;
347   gMsgSize = msgsize;
348   if(num_chares < CkNumPes()) {
349     num_chares = CkNumPes();
350   }
351   if(CkMyPe() == 0) {
352     mainProxy.beginWork(numsteps,msgsize,lbfreq);
353   }
354   StartCharmScheduler();
356 #include "kNeighbor.def.h"