Bug #664 tests/charm++/communication_overhead: Assert that received message size...
[charm.git] / tests / charm++ / communication_overhead / overhead_test.C
blobaaca6fab58954aa42936ec3a21bb8a76315cd511
1 #include "overhead_test.decl.h"
2 #include <vector>
4 // This benchmark measures communication overhead and bandwidth for
5 //  Charm++ groups and arrays, in similar fashion to the converse
6 //  benchmark: tests/converse/machinetest/multiping.C
8 // Index 0 sends kFactor messages to Index 1 and gets an ack for them.
9 //  The benchmark measures bandwidth for this burst of messages.
10 int kFactor = 64;
11 int minMsgSize = 16;
12 int maxMsgSize = 262144;
13 int nCycles = 100; // Number of iterations for each message size
15 CProxy_TestDriver testDriverProxy;
17 void idleStartHandler(void *timingGroupObj, double start);
18 void idleEndHandler(void *timingGroupObj, double cur);
19 void idleStartHandlerArray(void *timingGroupObj, double start);
20 void idleEndHandlerArray(void *timingGroupObj, double cur);
22 class SimpleMessage: public CMessage_SimpleMessage {
23 public:
24   char *data;
25   size_t size = -1;
28 class TestDriver: public CBase_TestDriver {
29 private:
30   CProxy_CommunicationArray communicationArrayProxy;
31   CProxy_CommunicationGroup communicationGroupProxy;
32   bool doneGroupTest;
33   bool timeAllocation;
35 public:
36   TestDriver(CkArgMsg *args) {
37     if (args->argc > 1) {
38       kFactor = atoi(args->argv[1]);
39       minMsgSize = atoi(args->argv[2]);
40       maxMsgSize = atoi(args->argv[3]);
41       nCycles = atoi(args->argv[4]);
42     }
44     communicationGroupProxy = CProxy_CommunicationGroup::ckNew();
45     communicationArrayProxy = CProxy_CommunicationArray::ckNew(CkNumPes());
47     doneGroupTest = false;
48     timeAllocation = false;
49     runTest();
50   }
52   void runTest() {
53     if (!doneGroupTest) {
54       CkPrintf("\nCharm++ group communication with allocation timing %s\n\n",
55                timeAllocation ? "enabled" : "disabled");
56       CkPrintf("%3s %20s %20s %20s %20s\n", "PE", "MSG SIZE", "PER MSG TIME(us)",
57                "BW(MB/s)", "OVERHEAD(us)");
58       communicationGroupProxy.startOperation(timeAllocation);
59     }
60     else {
61       CkPrintf("\nCharm++ 1D array communication with allocation timing %s\n\n",
62                timeAllocation ? "enabled" : "disabled");
63       CkPrintf("%3s %20s %20s %20s %20s\n", "PE", "MSG SIZE", "PER MSG TIME(us)",
64                "BW(MB/s)", "OVERHEAD(us)");
65       communicationArrayProxy.startOperation(timeAllocation);
66     }
68   }
70   void testDone() {
71     if (doneGroupTest && timeAllocation == true) {
72       CkExit();
73     }
74     else {
75       if (timeAllocation == true) {
76         doneGroupTest = true;
77       }
78       timeAllocation = !timeAllocation;
79       runTest();
80     }
81   }
85 class CommunicationGroup: public CBase_CommunicationGroup {
86 private:
87   int cycleNum;
88   int msgSize;
89   int neighbor;
90   int nReceived;
91   double startTime;
92   double totalTime;
94   std::vector<SimpleMessage *> sentMessages;
95   std::vector<SimpleMessage *> receivedMessages;
96   bool timeAllocation;
97 public:
99   int beginHandlerId;
100   int endHandlerId;
102   double startIdleTime;
103   double iterationIdleTime;
104   double totalIdleTime;
106   CommunicationGroup() {
107     nReceived = 0;
108     beginHandlerId = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
109                                          idleStartHandler, (void *) this);
110     endHandlerId = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,
111                                          idleEndHandler, (void *) this);
112     msgSize = minMsgSize;
113     cycleNum = 0;
114     neighbor = (CkMyPe() + CkNumPes() / 2) % CkNumPes();
115     totalTime = 0.0;
116     totalIdleTime = 0.0;
117   }
119   void startOperation(bool includeAlloc) {
120     timeAllocation = includeAlloc;
121     iterationIdleTime = 0.0;
122     if (CkMyPe() < CkNumPes() / 2) {
123       if (timeAllocation) {
124         startTime = CkWallTimer();
125         for (int i = 0; i < kFactor; i++) {
126           SimpleMessage *msg = new (msgSize) SimpleMessage();
127           msg->size = msgSize;
128           thisProxy[neighbor].receiveMessage(msg);
129         }
130       }
131       else {
132         for (int i = 0; i < kFactor; i++) {
133           SimpleMessage *msg = new (msgSize) SimpleMessage();
134           msg->size = msgSize;
135           sentMessages.push_back(msg);
136         }
137         startTime = CkWallTimer();
138         for (int i = 0; i < kFactor; i++) {
139           thisProxy[neighbor].receiveMessage(sentMessages[i]);
140         }
141       }
142     }
143     else {
144       startTime = CkWallTimer();
145       receivedMessages.reserve(kFactor);
146     }
147   }
149   void receiveMessage(SimpleMessage *msg) {
150       if (msg->size != msgSize) {
151         CkAbort("Out of order message in receiveMessage");
152       }
153     if (timeAllocation) {
154       delete msg;
155       if (++nReceived == kFactor) {
156         nReceived = 0;
157         operationFinished(NULL);
158         msg = new (msgSize) SimpleMessage();
159         msg->size = msgSize;
160         thisProxy[neighbor].operationFinished(msg);
161       }
162     }
163     else {
164       if (receivedMessages.size() == kFactor - 1) {
165         thisProxy[neighbor].operationFinished(msg);
166         operationFinished(NULL);
167       }
168       else {
169         receivedMessages.push_back(msg);
170       }
171     }
172   }
174   void operationFinished(SimpleMessage *msg) {
175     if (msg) {
176       if (msg->size != msgSize) {
177         CkPrintf("Expected message of size %d, got message of size %ld\n", msgSize, msg->size);
178         CkAbort("Out of order message");
179       }
180     }
181     double endTime = CkWallTimer();
182     totalTime += endTime - startTime;
183     totalIdleTime += iterationIdleTime;
184     cycleNum++;
185     for (int i = 0; i < receivedMessages.size(); i++) {
186       delete receivedMessages[i];
187     }
188     sentMessages.clear();
189     receivedMessages.clear();
191     if (cycleNum == nCycles) {
192       double numIterations =
193         msg == NULL ? nCycles * kFactor : nCycles * (kFactor + 1);
194       double cycleTime = 1e6 * totalTime / numIterations;
195       double idleTimePerCycle = 1e6 * totalIdleTime / numIterations;
196       double computeTime = cycleTime - idleTimePerCycle;
197       double bandwidth = msgSize * 1e6 / cycleTime / 1024.0 / 1024.0;
198       delete msg;
199       CkPrintf("[%d] %20d %20.3lf %20.3lf %20.3lf\n",
200                CmiMyPe(), msgSize, cycleTime, bandwidth, computeTime);
201       totalIdleTime = 0.0;
202       totalTime = 0.0;
203       msgSize *= 2;
204       cycleNum = 0;
205     }
207     if (msgSize <= maxMsgSize) {
208       startOperation(timeAllocation);
209     }
210     else {
211       if (timeAllocation == true) {
212         CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, beginHandlerId);
213         CcdCancelCallOnConditionKeep(CcdPROCESSOR_END_IDLE, endHandlerId);
214       }
215       msgSize = minMsgSize;
216       cycleNum = 0;
217       neighbor = (CkMyPe() + CkNumPes() / 2) % CkNumPes();
218       totalTime = 0.0;
219       totalIdleTime = 0.0;
220       contribute(CkCallback(CkReductionTarget(TestDriver, testDone),
221                             testDriverProxy));
222     }
223   }
227 // TO DO - remove code duplication - code is almost the same as group version
228 class CommunicationArray: public CBase_CommunicationArray {
229 private:
230   int cycleNum;
231   int msgSize;
232   int neighbor;
233   int nReceived;
234   double startTime;
235   double totalTime;
237   std::vector<SimpleMessage *> sentMessages;
238   std::vector<SimpleMessage *> receivedMessages;
239   bool timeAllocation;
240 public:
242   int beginHandlerId;
243   int endHandlerId;
245   double startIdleTime;
246   double iterationIdleTime;
247   double totalIdleTime;
249   CommunicationArray() {
250     nReceived = 0;
251     beginHandlerId = CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
252                                          idleStartHandlerArray, (void *) this);
253     endHandlerId = CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,
254                                          idleEndHandlerArray, (void *) this);
255     msgSize = minMsgSize;
256     cycleNum = 0;
257     neighbor = (CkMyPe() + CkNumPes() / 2) % CkNumPes();
258     totalTime = 0.0;
259     totalIdleTime = 0.0;
260   }
262   CommunicationArray(CkMigrateMessage *msg) {}
264   void startOperation(bool includeAlloc) {
265     timeAllocation = includeAlloc;
266     iterationIdleTime = 0.0;
267     if (CkMyPe() < CkNumPes() / 2) {
268       if (timeAllocation) {
269         startTime = CkWallTimer();
270         for (int i = 0; i < kFactor; i++) {
271           SimpleMessage *msg = new (msgSize) SimpleMessage();
272           thisProxy[neighbor].receiveMessage(msg);
273         }
274       }
275       else {
276         for (int i = 0; i < kFactor; i++) {
277           SimpleMessage *msg = new (msgSize) SimpleMessage();
278           sentMessages.push_back(msg);
279         }
280         startTime = CkWallTimer();
281         for (int i = 0; i < kFactor; i++) {
282           thisProxy[neighbor].receiveMessage(sentMessages[i]);
283         }
284       }
285     }
286     else {
287       startTime = CkWallTimer();
288       receivedMessages.reserve(kFactor);
289     }
290   }
292   void receiveMessage(SimpleMessage *msg) {
293     if (timeAllocation) {
294       delete msg;
295       if (++nReceived == kFactor) {
296         nReceived = 0;
297         operationFinished(NULL);
298         msg = new (msgSize) SimpleMessage();
299         thisProxy[neighbor].operationFinished(msg);
300       }
301     }
302     else {
303       if (receivedMessages.size() == kFactor - 1) {
304         thisProxy[neighbor].operationFinished(msg);
305         operationFinished(NULL);
306       }
307       else {
308         receivedMessages.push_back(msg);
309       }
310     }
311   }
313   void operationFinished(SimpleMessage *msg) {
314     double endTime = CkWallTimer();
315     totalTime += endTime - startTime;
316     totalIdleTime += iterationIdleTime;
317     cycleNum++;
318     for (int i = 0; i < receivedMessages.size(); i++) {
319       delete receivedMessages[i];
320     }
321     sentMessages.clear();
322     receivedMessages.clear();
324     if (cycleNum == nCycles) {
325       double numIterations =
326         msg == NULL ? nCycles * kFactor : nCycles * (kFactor + 1);
327       delete msg;
328       double cycleTime = 1e6 * totalTime / numIterations;
329       double idleTimePerCycle = 1e6 * totalIdleTime / numIterations;
330       double computeTime = cycleTime - idleTimePerCycle;
331       double bandwidth = msgSize * 1e6 / cycleTime / 1024.0 / 1024.0;
332       CkPrintf("[%d] %20d %20.3lf %20.3lf %20.3lf\n",
333                CmiMyPe(), msgSize, cycleTime, bandwidth, computeTime);
334       totalIdleTime = 0.0;
335       totalTime = 0.0;
336       msgSize *= 2;
337       cycleNum = 0;
338     }
340     if (msgSize <= maxMsgSize) {
341       startOperation(timeAllocation);
342     }
343     else {
344       if (timeAllocation == true) {
345         CcdCancelCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, beginHandlerId);
346         CcdCancelCallOnConditionKeep(CcdPROCESSOR_END_IDLE, endHandlerId);
347       }
348       msgSize = minMsgSize;
349       cycleNum = 0;
350       neighbor = (CkMyPe() + CkNumPes() / 2) % CkNumPes();
351       totalTime = 0.0;
352       totalIdleTime = 0.0;
353       contribute(CkCallback(CkReductionTarget(TestDriver, testDone),
354                             testDriverProxy));
355     }
356   }
360 void idleStartHandler(void *timingGroupObj, double start) {
361   CommunicationGroup *localInstance = (CommunicationGroup *) timingGroupObj;
362   localInstance->startIdleTime = start;
365 void idleEndHandler(void *timingGroupObj, double cur) {
366   CommunicationGroup *localInstance = (CommunicationGroup *) timingGroupObj;
367   if(localInstance->startIdleTime > 0) {
368     localInstance->iterationIdleTime += cur - localInstance->startIdleTime;
369     localInstance->startIdleTime = -1;
370   }
373 void idleStartHandlerArray(void *timingGroupObj, double start) {
374   CommunicationArray *localInstance = (CommunicationArray *) timingGroupObj;
375   localInstance->startIdleTime = start;
378 void idleEndHandlerArray(void *timingGroupObj, double cur) {
379   CommunicationArray *localInstance = (CommunicationArray *) timingGroupObj;
380   if(localInstance->startIdleTime > 0) {
381     localInstance->iterationIdleTime += cur - localInstance->startIdleTime;
382     localInstance->startIdleTime = -1;
383   }
388 #include "overhead_test.def.h"