Link conv-cpm as C++
[charm.git] / src / ck-perf / trace-utilization.C
blobf3ff431a0afd11991c87dd566a24db7269485ce9
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
6 #include "trace-utilization.h"
9 /* readonly */ CProxy_TraceUtilizationBOC traceUtilizationGroupProxy;
12 /// A reduction type for merging compressed sum detail data
13 CkReduction::reducerType sumDetailCompressedReducer;
16 // This function has unused arguments to match the type of
17 // CcdVoidFn, which CcdCallOnConditionKeep takes
18 void collectUtilizationData(void *, double) {
19   traceUtilizationGroupProxy.collectSumDetailData();
23 CkpvStaticDeclare(TraceUtilization*, _trace);
25 /**
26   For each TraceFoo module, _createTraceFoo() must be defined.
27   This function is called in _createTraces() generated in moduleInit.C
29 void _createTraceutilization(char **argv)
31   //  CkPrintf("[%d] _createTraceutilization\n", CkMyPe());
33   // Register the reducer
34   CkAssert(sizeof(short) == 2);
35   sumDetailCompressedReducer=CkReduction::addReducer(sumDetailCompressedReduction, false, "sumDetailCompressedReduction");
37   CkpvInitialize(TraceUtilization*, _trace);
38   CkpvAccess(_trace) = new TraceUtilization();
39   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
45 void TraceUtilization::beginExecute(CmiObjId *tid)
47   beginExecute(-1,-1,_threadEP,-1);
50 void TraceUtilization::beginExecute(envelope *e, void *obj)
52   // no message means thread execution
53   if (e==NULL) {
54     beginExecute(-1,-1,_threadEP,-1);
55   }
56   else {
57     beginExecute(-1,-1,e->getEpIdx(),-1);
58   }  
61 void TraceUtilization::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx, void *obj)
63   if (execEp != INVALIDEP) {
64     TRACE_WARN("Warning: TraceUtilization two consecutive BEGIN_PROCESSING!\n");
65     return;
66   }
67   
68   execEp=ep;
69   start = TraceTimer();
73 void TraceUtilization::endExecute(void)
76   if (execEp == TRACEON_EP) {
77     // if trace just got turned on, then one expects to see this
78     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
79     execEp = INVALIDEP;
80     return;
81   }
83   double endTime = TraceTimer();
85   updateCpuTime(execEp, start, endTime);
88   execEp = INVALIDEP;
93 void TraceUtilization::addEventType(int eventType)
95   CkPrintf("FIXME handle TraceUtilization::addEventType(%d)\n", eventType);
103 Send back to the client compressed sum-detail style measurements about the 
104 utilization for each active PE combined across all PEs.
106 The data format sent by this handler is a bunch of records(one for each bin) of the following format:
107    #samples (EP,utilization)* 
109 One example record for two EPS that executed during the sample period. 
110 EP 3 used 150/200 of the time while EP 122 executed for 20/200 of the time. 
111 All of these would be packed as bytes into the message:
112 2 3 150 122 20
114  */
115 void TraceUtilizationBOC::ccsRequestSumDetailCompressed(CkCcsRequestMsg *m) {
116   CkPrintf("CCS request for compressed sum detail. (found %d stored in deque)\n",  storedSumDetailResults.size() );
117   //  CkAssert(sumDetail);
118   int datalength;
120 #if 0
122   compressedBuffer fakeMessage = fakeCompressedMessage();
123   CcsSendDelayedReply(m->reply, fakeMessage.datalength(), fakeMessage.buffer() );
124   fakeMessage.freeBuf();
126 #else
128   if (storedSumDetailResults.size()  == 0) {
129     compressedBuffer b = emptyCompressedBuffer();
130     CcsSendDelayedReply(m->reply, b.datalength(), b.buffer()); 
131     b.freeBuf();
132   } else {
133     CkReductionMsg * msg = storedSumDetailResults.front();
134     storedSumDetailResults.pop_front();
136     
137     void *sendBuffer = (void *)msg->getData();
138     datalength = msg->getSize();
139     CcsSendDelayedReply(m->reply, datalength, sendBuffer);
140     
141     delete msg;
142   }
143     
144   
145 #endif
147   //  CkPrintf("CCS response of %d bytes sent.\n", datalength);
148   delete m;
153 void TraceUtilizationBOC::collectSumDetailData() {
154   TraceUtilization* t = CkpvAccess(_trace);
156   // If we don't have enough data, just return and wait for the next invocation
157   if (t->cpuTimeEntriesAvailable() - t->cpuTimeEntriesSentSoFar() < BIN_PER_SEC)
158     return;
160   compressedBuffer b = t->compressNRecentSumDetail(BIN_PER_SEC);
162   //  CkPrintf("[%d] contributing buffer created by compressNRecentSumDetail avg util=%lg\n", CkMyPe(), averageUtilizationInBuffer(b));
163   //  printCompressedBuf(b);
164   // fflush(stdout);
165   
166   
167 #if 0
168   b = fakeCompressedMessage();
169 #endif
170   
171   //  CkPrintf("[%d] contributing %d bytes worth of SumDetail data\n", CkMyPe(), b.datalength());
172   
173   //  CProxy_TraceUtilizationBOC sumProxy(traceSummaryGID);
174   CkCallback cb(CkIndex_TraceUtilizationBOC::sumDetailDataCollected(NULL), thisProxy[0]);
175   contribute(b.datalength(), b.buffer(), sumDetailCompressedReducer, cb);
176   
177   b.freeBuf();
181 void TraceUtilizationBOC::sumDetailDataCollected(CkReductionMsg *msg) {
182   CkAssert(CkMyPe() == 0);
184   compressedBuffer b(msg->getData());
185   CkPrintf("putting CCS reply in queue (average utilization= %lg)\n", averageUtilizationInBuffer(b));
186   //if(isCompressedBufferSane(b)){
187     storedSumDetailResults.push_back(msg); 
188     //}
190     // CkPrintf("[%d] Reduction of SumDetail completed. Result stored in storedSumDetailResults deque(size now=%d)\n", CkMyPe(), storedSumDetailResults.size() );
191     //  fflush(stdout);
197 void TraceUtilization::writeSts(void) {
198   // open sts file
199   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".util.sts")+1];
200   sprintf(fname, "%s.util.sts", CkpvAccess(traceRoot));
201   FILE* stsfp = fopen(fname, "w+");
202   if (stsfp == 0) {
203        CmiAbort("Cannot open summary sts file for writing.\n");
204   }
205   delete[] fname;
207   traceWriteSTS(stsfp,0);
208   fprintf(stsfp, "END\n");
210   fclose(stsfp);
215 ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
218   
219 /// Compress a buffer by merging all entries in a bin that are less than the threshold into a single "other" category
220   compressedBuffer moveTinyEntriesToOther(compressedBuffer src, double threshold){
221     //    CkPrintf("[%d] moveTinyEntriesToOther\n", CkMyPe());
222     
223     // reset the src buffer to the beginning
224     src.pos = 0;
226     compressedBuffer dest(100000); 
227     
228     int numBins = src.pop<numBins_T>();
229     int numProcs = src.pop<numProcs_T>();
230     
231     dest.push<numBins_T>(numBins);
232     dest.push<numProcs_T>(numProcs);
233     
234     
235     for(int i=0;i<numBins;i++){
236       double utilizationInOther = 0.0;
237       
238       entriesInBin_T numEntriesInSrcBin = src.pop<entriesInBin_T>();
239       int numEntriesInDestBinOffset = dest.push<entriesInBin_T>(0);
240       
241       CkAssert(numEntriesInSrcBin < 200);
243       for(int j=0;j<numEntriesInSrcBin;j++){
244         ep_T ep = src.pop<ep_T>();
245         double v = src.pop<utilization_T>();
246         
247         if(v < threshold * 250.0){
248           // do not copy bin into destination
249           utilizationInOther += v / 250.0;
250         } else {
251           // copy bin into destination
252           dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
253           dest.push<ep_T>(ep);
254           dest.push<utilization_T>(v);
255         }
257       }
258       
259       // if other category has stuff in it, add it to the destination buffer
260       if(utilizationInOther > 0.0){
261         dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
262         dest.push<ep_T>(other_EP);
263         if(utilizationInOther > 1.0)
264           utilizationInOther = 1.0;
265         dest.push<utilization_T>(utilizationInOther*250.0);
266       }
267       
268     }
269    
270     return dest;
271   }
272   
273     
276 /// A reducer for merging compressed sum detail data
277 CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs){
278   // CkPrintf("[%d] sumDetailCompressedReduction(nMsgs=%d)\n", CkMyPe(), nMsg);
279   
280   compressedBuffer *incomingMsgs = new compressedBuffer[nMsg];
281   int *numProcsRepresentedInMessage = new int[nMsg];
282   
283   int numBins = 0;
284   int totalsize = 0;
285   int totalProcsAcrossAllMessages = 0;
286   
287   for (int i=0;i<nMsg;i++) {
288     incomingMsgs[i].init(msgs[i]->getData());
289     
290     //  CkPrintf("[%d] Incoming reduction message %d has average utilization %lg\n", CkMyPe(),  i, averageUtilizationInBuffer(incomingMsgs[i])); 
291     //   CkPrintf("Is buffer %d sane? %s\n", i, isCompressedBufferSane(incomingMsgs[i]) ? "yes": "no" );
294     totalsize += msgs[i]->getSize();
295     //  CkPrintf("BEGIN MERGE MESSAGE=========================================================\n");
296     //   printCompressedBuf(incomingMsgs[i]);
297     
298     // Read first value from message. 
299     // Make sure all messages have the same number of bins
300     if(i==0)
301       numBins = incomingMsgs[i].pop<numBins_T>();
302     else 
303       CkAssert( numBins ==  incomingMsgs[i].pop<numBins_T>() );
304     
305     // Read second value from message. 
306     numProcsRepresentedInMessage[i] = incomingMsgs[i].pop<numProcs_T>();
307     totalProcsAcrossAllMessages += numProcsRepresentedInMessage[i];
308     //    CkPrintf("Number of procs in message[%d] is %d\n", i,  (int)numProcsRepresentedInMessage[i]);
309   }
310   
311   compressedBuffer dest(totalsize + 100); 
312   
313   // build a compressed representation of each merged bin
314   dest.push<numBins_T>(numBins);
315   dest.push<numProcs_T>(totalProcsAcrossAllMessages);
316   
317   for(int i=0; i<numBins; i++){
318     mergeCompressedBin(incomingMsgs, nMsg, numProcsRepresentedInMessage, totalProcsAcrossAllMessages, dest);
319   }
320   
321   // CkPrintf("END MERGE RESULT=========================================================\n");
322   // printCompressedBuf(dest);
325   //CkPrintf("[%d] Merged buffer has average utilization %lg \n", CkMyPe(), averageUtilizationInBuffer(dest));
327   //CkPrintf("Is resulting merged buffer sane? %s\n", isCompressedBufferSane(dest) ? "yes": "no" );  
328   
329   compressedBuffer dest2 = moveTinyEntriesToOther(dest, 0.10);
330   
331   //  CkPrintf("Is resulting merged Filtered buffer sane? %s\n", isCompressedBufferSane(dest2) ? "yes": "no" ); 
333   //  CkPrintf("[%d] Outgoing reduction (filtered) message has average utilization %lf \n", CkMyPe(), averageUtilizationInBuffer(dest2));
335   
336   CkReductionMsg *m = CkReductionMsg::buildNew(dest2.datalength(),dest2.buffer());   
337   dest.freeBuf();
338   delete[] incomingMsgs;
339   delete[] numProcsRepresentedInMessage;
340   return m;
349 /// Create fake sum detail data in the compressed format (for debugging)
350  compressedBuffer fakeCompressedMessage(){
351    CkPrintf("[%d] fakeCompressedMessage\n", CkMyPe());
352    
353    compressedBuffer fakeBuf(10000);
354    
355    int numBins = 55;
356    int numProcs = 1000;
358    // build a compressed representation of each merged bin
359    fakeBuf.push<numBins_T>(numBins);
360    fakeBuf.push<numProcs_T>(numProcs);
361    for(int i=0; i<numBins; i++){
362      int numRecords = 3;
363      fakeBuf.push<entriesInBin_T>(numRecords);
364      for(int j=0;j<numRecords;j++){
365        fakeBuf.push<ep_T>(j*10+2);
366        fakeBuf.push<utilization_T>(120.00);
367      }  
368    }
369    
370    //CkPrintf("Fake Compressed Message:=========================================================\n");
371    //   printCompressedBuf(fakeBuf);
373    CkAssert(isCompressedBufferSane(fakeBuf));
375    return fakeBuf;
379  /// Create an empty message
380  compressedBuffer emptyCompressedBuffer(){
381    compressedBuffer result(sizeof(numBins_T));
382    result.push<numBins_T>(0);
383    return result;
389 /** print out the compressed buffer starting from its begining*/
390 void printCompressedBuf(compressedBuffer b){
391   // b should be passed in by value, and hence we can modify it
392   b.pos = 0;
393   int numEntries = b.pop<numBins_T>();
394   CkPrintf("Buffer contains %d records\n", numEntries);
395   int numProcs = b.pop<numProcs_T>();
396   CkPrintf("Buffer represents an average over %d PEs\n", numProcs);
398   for(int i=0;i<numEntries;i++){
399     entriesInBin_T recordLength = b.pop<entriesInBin_T>();
400     if(recordLength > 0){
401       CkPrintf("    Record %d is of length %d : ", i, recordLength);
402       
403       for(int j=0;j<recordLength;j++){
404         ep_T ep = b.pop<ep_T>();
405         utilization_T v = b.pop<utilization_T>();
406         CkPrintf("(%d,%f) ", ep, v);
407       }
408     
409       CkPrintf("\n");
410     }
411   }
416  bool isCompressedBufferSane(compressedBuffer b){
417    // b should be passed in by value, and hence we can modify it  
418    b.pos = 0;  
419    numBins_T numBins = b.pop<numBins_T>();  
420    numProcs_T numProcs = b.pop<numProcs_T>();  
421    
422    if(numBins > 2000){
423      ckout << "WARNING: numBins=" << numBins << endl;
424      return false;
425    }
426    
427    for(int i=0;i<numBins;i++){  
428      entriesInBin_T recordLength = b.pop<entriesInBin_T>();  
429      if(recordLength > 200){
430        ckout << "WARNING: recordLength=" << recordLength << endl;
431        return false;
432      }
433      
434      if(recordLength > 0){  
435        
436        for(int j=0;j<recordLength;j++){  
437          ep_T ep = b.pop<ep_T>();  
438          utilization_T v = b.pop<utilization_T>();  
439          //      CkPrintf("(%d,%f) ", ep, v);  
440          if(((ep>800 || ep <0 ) && ep != other_EP) || v < 0.0 || v > 251.0){
441            ckout << "WARNING: ep=" << ep << " v=" << v << endl;
442            return false;
443          }
444        }  
445        
446      }  
447    }  
448    
449    return true;
454  double averageUtilizationInBuffer(compressedBuffer b){
455    // b should be passed in by value, and hence we can modify it  
456    b.pos = 0;  
457    numBins_T numBins = b.pop<numBins_T>();  
458    numProcs_T numProcs = b.pop<numProcs_T>();  
459    
460    //   CkPrintf("[%d] averageUtilizationInBuffer numProcs=%d   (grep reduction message)\n", CkMyPe(), numProcs);
461    
462    double totalUtilization = 0.0;
463    
464    for(int i=0;i<numBins;i++) {  
465      entriesInBin_T entriesInBin = b.pop<entriesInBin_T>();     
466      for(int j=0;j<entriesInBin;j++){  
467        ep_T ep = b.pop<ep_T>();  
468        totalUtilization +=  b.pop<utilization_T>();  
469      }
470    }
471    
472    return totalUtilization / numBins / 2.5;
477 void sanityCheckCompressedBuf(compressedBuffer b){  
478    CkAssert(isCompressedBufferSane(b)); 
479  }  
483 double TraceUtilization::sumUtilization(int startBin, int endBin){
484    int epInfoSize = getEpInfoSize();
485    
486    double a = 0.0;
488    for(int i=startBin; i<=endBin; i++){
489      for(int j=0; j<epInfoSize; j++){
490        a += cpuTime[(i%NUM_BINS)*epInfoSize+j];
491      }
492    }
493    return a;
497  /// Create a compressed buffer of the n most recent sum detail samples
498  compressedBuffer TraceUtilization::compressNRecentSumDetail(int desiredBinsToSend){
499    //   CkPrintf("compressNRecentSumDetail(desiredBinsToSend=%d)\n", desiredBinsToSend);
501    int startBin =  cpuTimeEntriesSentSoFar();
502    int numEntries = getEpInfoSize();
504    int endBin = startBin + desiredBinsToSend - 1;
505    int binsToSend = endBin - startBin + 1;
506    CkAssert(binsToSend >= desiredBinsToSend );
507    incrementNumCpuTimeEntriesSent(binsToSend);
510 #if 0
511    bool nonePrinted = true;
512    for(int i=0;i<(NUM_BINS-1000);i+=1000){
513      double expectedU = sumUtilization(i, i+999);
514      if(expectedU > 0.0){
515           CkPrintf("[%d of %d] compressNRecentSumDetail All bins: start=%05d end=%05d values in array sum to %lg\n", CkMyPe(), CkNumPes(),  i, i+999, expectedU);
516        nonePrinted = false;
517      }
518    }
519    
520    if(nonePrinted)
521      CkPrintf("[%d of %d] compressNRecentSumDetail All bins are 0\n", CkMyPe(), CkNumPes() );
523    fflush(stdout);
524 #endif
526    int bufferSize = 8*(2+numEntries) * (2+binsToSend)+100;
527    compressedBuffer b(bufferSize);
529    b.push<numBins_T>(binsToSend);
530    b.push<numProcs_T>(1); // number of processors along reduction subtree. I am just one processor.
531    //   double myu = 0.0;
532    
533    for(int i=0; i<binsToSend; i++) {
534      // Create a record for bin i
535      //  CkPrintf("Adding record for bin %d\n", i);
536      int numEntriesInRecordOffset = b.push<entriesInBin_T>(0); // The number of entries in this record
537      
538      for(int e=0; e<numEntries; e++) {
539        double scaledUtilization = getUtilization(i+startBin,e) * 2.5; // use range of 0 to 250 for the utilization, so it can fit in an unsigned char
540        if(scaledUtilization > 0.0) {
541          //CkPrintf("scaledUtilization=%lg !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n", scaledUtilization);
542          if(scaledUtilization > 250.0)
543            scaledUtilization = 250.0;
544          
545          b.push<ep_T>(e);
546          b.push<utilization_T>(scaledUtilization);
547          //      myu += scaledUtilization;
548          b.increment<entriesInBin_T>(numEntriesInRecordOffset);
549        }
550      }
551    }
552    
553    // CkPrintf("[%d] compressNRecentSumDetail resulting buffer: averageUtilizationInBuffer()=%lg myu=%lg\n", CkMyPe(), averageUtilizationInBuffer(b), myu);
554    // fflush(stdout);
555    
556    return b;
563 /** Merge the compressed entries from the first bin in each of the srcBuf buffers.
564      
566  void mergeCompressedBin(compressedBuffer *srcBufferArray, int numSrcBuf, int *numProcsRepresentedInMessage, int totalProcsAcrossAllMessages, compressedBuffer &destBuf){
567   // put a counter at the beginning of destBuf
568   int numEntriesInDestRecordOffset = destBuf.push<entriesInBin_T>(0);
569   
570   //  CkPrintf("BEGIN MERGE------------------------------------------------------------------\n");
571   
572   // Read off the number of bins in each buffer
573   int *remainingEntriesToRead = new int[numSrcBuf];
574   for(int i=0;i<numSrcBuf;i++){
575     remainingEntriesToRead[i] = srcBufferArray[i].pop<entriesInBin_T>();
576   }
578   int count = 0;
579   // Count remaining entries to process
580   for(int i=0;i<numSrcBuf;i++){
581     count += remainingEntriesToRead[i];
582   }
583   
584   while (count>0) {
585     // find first EP from all buffers (these are sorted by EP already)
586     int minEp = 10000;
587     for(int i=0;i<numSrcBuf;i++){
588       if(remainingEntriesToRead[i]>0){
589         int ep = srcBufferArray[i].peek<ep_T>();
590         if(ep < minEp){
591           minEp = ep;
592         }
593       }
594     }
595     
596     //   CkPrintf("[%d] mergeCompressedBin minEp found was %d   totalProcsAcrossAllMessages=%d\n", CkMyPe(), minEp, (int)totalProcsAcrossAllMessages);
597     
598     destBuf.increment<entriesInBin_T>(numEntriesInDestRecordOffset);
600     // Merge contributions from all buffers that list the EP
601     double v = 0.0;
602     for(int i=0;i<numSrcBuf;i++){
603       if(remainingEntriesToRead[i]>0){
604         int ep = srcBufferArray[i].peek<ep_T>(); 
605         if(ep == minEp){
606           srcBufferArray[i].pop<ep_T>();  // pop ep
607           double util = srcBufferArray[i].pop<utilization_T>();
608           v += util * numProcsRepresentedInMessage[i];
609           remainingEntriesToRead[i]--;
610           count --;
611         }
612       }
613     }
615     // create a new entry in the output for this EP.
616     destBuf.push<ep_T>(minEp);
617     destBuf.push<utilization_T>(v / (double)totalProcsAcrossAllMessages);
619   }
622   delete [] remainingEntriesToRead;
623   // CkPrintf("[%d] End of mergeCompressedBin:\n", CkMyPe() );
624   // CkPrintf("END MERGE ------------------------------------------------------------------\n");
629 #include "TraceUtilization.def.h"
632 /*@}*/