1 #include "kNeighbor.decl.h"
4 //header from Charm to enable Interoperation
5 #include "mpi-interoperate.h"
13 /* readonly */ CProxy_Main mainProxy;
14 /* readonly */ int num_chares;
15 /* readonly */ int gMsgSize;
16 /* readonly */ int gLBFreq;
18 int cmpFunc(const void *a, const void *b) {
19 if(*(double *)a < *(double *)b) return -1;
20 if(*(double *)a > *(double *)b) return 1;
24 class toNeighborMsg: public CMessage_toNeighborMsg {
33 toNeighborMsg(int s): size(s) {
36 void setMsgSrc(int X, int id) {
43 class Main: public CBase_Main {
61 mainProxy = thisProxy;
64 void beginWork(int numsteps, int msgsize, int lbfreq)
67 currentMsgSize = msgsize;
71 CkPrintf("\nStarting kNeighbor ...\n");
72 gMsgSize = currentMsgSize;
75 timeRec = new double[numSteps];
76 array = CProxy_Block::ckNew(num_chares);
77 CkCallback *cb = new CkCallback(CkIndex_Main::nextStep(NULL), thisProxy);
78 array.ckSetReductionClient(cb);
83 void beginIteration() {
85 if (currentStep == numSteps) {
86 CkPrintf("kNeighbor program finished!\n\n");
87 //CkCallback *cb = new CkCallback(CkIndex_Main::terminate(NULL), thisProxy);
88 //array.ckSetReductionClient(cb);
98 //currentMsgSize = msgSize;
99 if(currentStep!=0 && (currentStep % gLBFreq == 0)) {
104 gStarttime = CmiWallTimer();
105 for (int i=0; i<num_chares; i++)
106 array[i].commWithNeighbors();
111 CkPrintf("Resume iteration at step %d\n", currentStep);
113 gStarttime = CmiWallTimer();
114 for (int i=0; i<num_chares; i++)
115 array[i].commWithNeighbors();
118 void terminate(CkReductionMsg *msg) {
122 for (int i=0; i<numSteps; i++)
123 timeRec[i] = timeRec[i]*1e6;
125 qsort(timeRec, numSteps, sizeof(double), cmpFunc);
126 printf("Time stats: lowest: %f, median: %f, highest: %f\n", timeRec[0], timeRec[numSteps/2], timeRec[numSteps-1]);
129 if(numSteps<=samples) samples = numSteps-1;
130 for (int i=0; i<samples; i++)
134 CkPrintf("Average time for each %d-Neighbor iteration with msg size %d is %f (us)\n", STRIDEK, currentMsgSize, total);
138 void nextStep(CkReductionMsg *msg) {
139 maxTime = *((double *)msg->getData());
141 double wholeStepTime = CmiWallTimer() - gStarttime;
142 timeRec[currentStep] = wholeStepTime/CALCPERSTEP;
143 if(currentStep % 10 == 0)
144 CkPrintf("Step %d with msg size %d finished: max=%f, total=%f\n", currentStep, currentMsgSize, maxTime/CALCPERSTEP, wholeStepTime/CALCPERSTEP);
151 //no wrap around for sending messages to neighbors
152 class Block: public CBase_Block {
165 toNeighborMsg **iterMsg;
169 //srand(thisIndex.x+thisIndex.y);
172 numNeighbors = 2*STRIDEK;
173 neighbors = new int[numNeighbors];
174 recvTimes = new double[numNeighbors];
176 //setting left neighbors
177 for (int i=thisIndex-STRIDEK; i<thisIndex; i++, nidx++) {
179 while (tmpnei<0) tmpnei += num_chares;
180 neighbors[nidx] = tmpnei;
182 //setting right neighbors
183 for (int i=thisIndex+1; i<=thisIndex+STRIDEK; i++, nidx++) {
185 while (tmpnei>=num_chares) tmpnei -= num_chares;
186 neighbors[nidx] = tmpnei;
189 for (int i=0; i<numNeighbors; i++)
192 iterMsg = new toNeighborMsg *[numNeighbors];
193 for (int i=0; i<numNeighbors; i++)
197 CkPrintf("Neighbors of %d: ", thisIndex);
198 for (int i=0; i<numNeighbors; i++)
199 CkPrintf("%d ", neighbors[i]);
203 random = thisIndex*31+73;
212 void pup(PUP::er &p){
217 if(p.isUnpacking()) {
218 neighbors = new int[numNeighbors];
219 recvTimes = new double[numNeighbors];
221 PUParray(p, neighbors, numNeighbors);
222 PUParray(p, recvTimes, numNeighbors);
228 if(p.isUnpacking()) iterMsg = new toNeighborMsg *[numNeighbors];
229 for(int i=0; i<numNeighbors; i++){
230 CkPupMessage(p, (void **)&iterMsg[i]);
234 Block(CkMigrateMessage *m) {}
238 CkPrintf("Element %d pause for LB on PE %d\n", thisIndex, CkMyPe());
243 void ResumeFromSync(){ //Called by load-balancing framework
244 CkCallback cb(CkIndex_Main::resumeIter(), mainProxy);
245 contribute(0, NULL, CkReduction::sum_int, cb);
248 void startInternalIteration() {
250 CkPrintf("[%d]: Start internal iteration \n", thisIndex);
254 /* 1: pick a work size and do some computation */
255 int N = (thisIndex * thisIndex / num_chares) * 100;
256 for (int i=0; i<N; i++)
257 for (int j=0; j<N; j++) {
258 sum += (thisIndex * i + j);
261 /* 2. send msg to K neighbors */
262 int msgSize = curIterMsgSize;
264 // Send msgs to neighbors
265 for (int i=0; i<numNeighbors; i++) {
266 //double memtimer = CmiWallTimer();
268 toNeighborMsg *msg = iterMsg[i];
271 CkPrintf("[%d]: send msg to neighbor[%d]=%d\n", thisIndex, i, neighbors[i]);
273 msg->setMsgSrc(thisIndex, i);
274 //double entrytimer = CmiWallTimer();
275 thisProxy(neighbors[i]).recvMsgs(msg);
276 //double entrylasttimer = CmiWallTimer();
278 // CkPrintf("At current step %d to neighbor %d, msg creation time: %f, entrymethod fire time: %f\n", internalStepCnt, neighbors[i], entrytimer-memtimer, entrylasttimer-entrytimer);
283 void commWithNeighbors() {
285 curIterMsgSize = gMsgSize;
286 //currently the work size is only changed every big steps (which
287 //are initiated by the main proxy
290 if(iterMsg[0]==NULL) { //indicating the messages have not been created
291 for(int i=0; i<numNeighbors; i++)
292 iterMsg[i] = new(curIterMsgSize/4, 0) toNeighborMsg(curIterMsgSize/4);
295 startTime = CmiWallTimer();
296 startInternalIteration();
299 void recvReplies(toNeighborMsg *m) {
300 int fromNID = m->nID;
303 CkPrintf("[%d]: receive ack from neighbor[%d]=%d\n", thisIndex, fromNID, neighbors[fromNID]);
306 iterMsg[fromNID] = m;
307 //recvTimes[fromNID] += (CmiWallTimer() - startTime);
309 //get one step time and send it back to mainProxy
311 if (numNborsRcvd == numNeighbors) {
313 if (internalStepCnt==CALCPERSTEP) {
314 double iterCommTime = CmiWallTimer() - startTime;
315 contribute(sizeof(double), &iterCommTime, CkReduction::max_double);
317 for(int i=0; i<numNeighbors; i++){
318 CkPrintf("RTT time from neighbor %d (actual elem id %d): %lf\n", i, neighbors[i], recvTimes[i]);
322 startInternalIteration();
327 void recvMsgs(toNeighborMsg *m) {
329 CkPrintf("[%d]: recv msg from %d as its %dth neighbor\n", thisIndex, m->fromX, m->nID);
332 thisProxy(m->fromX).recvReplies(m);
335 inline int MAX(int a, int b) {
338 inline int MIN(int a, int b) {
343 //C++ function invoked from MPI, marks the begining of Charm++
344 void kNeighbor(int numchares, int numsteps, int msgsize, int lbfreq)
346 num_chares = num_chares;
348 if(num_chares < CkNumPes()) {
349 num_chares = CkNumPes();
352 mainProxy.beginWork(numsteps,msgsize,lbfreq);
354 StartCharmScheduler();
356 #include "kNeighbor.def.h"