Link conv-cpm as C++
[charm.git] / src / ck-perf / trace-summary.C
blob8343a98117d644223052038603da8cc56e943428
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
6 #include "charm++.h"
7 #include "trace-summary.h"
8 #include "trace-summaryBOC.h"
10 #define DEBUGF(x)  // CmiPrintf x
12 #define VER   7.1
14 #define INVALIDEP     -2
15 #define TRACEON_EP     -3
17 // 1 minutes of run before it'll fill up:
18 #define DefaultBinCount      (1000*60*1) 
20 CkpvStaticDeclare(TraceSummary*, _trace);
21 static int _numEvents = 0;
22 #define NUM_DUMMY_EPS 9
23 CkpvDeclare(int, binCount);
24 CkpvDeclare(double, binSize);
25 CkpvDeclare(double, version);
28 CkpvDeclare(int, previouslySentBins);
32 /** 
33     A class that reads/writes a buffer out of different types of data.
35     This class exists because I need to get references to parts of the buffer 
36     that have already been used so that I can increment counters inside the buffer.
39 class compressedBuffer {
40 public:
41   char* buf;
42   int pos; ///<< byte position just beyond the previously read/written data
44   compressedBuffer(){
45     buf = NULL;
46     pos = 0;
47   }
49   compressedBuffer(int bytes){
50     buf = (char*)malloc(bytes);
51     pos = 0;
52   }
54   compressedBuffer(void *buffer){
55     buf = (char*)buffer;
56     pos = 0;
57   }
58     
59   void init(void *buffer){
60     buf = (char*)buffer;
61     pos = 0;
62   }
63     
64   inline void * currentPtr(){
65     return (void*)(buf+pos);
66   }
68   template <typename T>
69   T read(int offset){
70     // to resolve unaligned writes causing bus errors, need memcpy
71     T v;
72     memcpy(&v, buf+offset, sizeof(T));
73     return v;
74   }
75     
76   template <typename T>
77   void write(T v, int offset){
78     T v2 = v; // on stack
79     // to resolve unaligned writes causing bus errors, need memcpy
80     memcpy(buf+offset, &v2, sizeof(T));
81   }
82     
83   template <typename T>
84   void increment(int offset){
85     T temp;
86     temp = read<T>(offset);
87     temp ++;
88     write<T>(temp, offset);
89   }
91   template <typename T>
92   void accumulate(T v, int offset){
93     T temp;
94     temp = read<T>(offset);
95     temp += v;
96     write<T>(temp, offset);
97   }
98   
99   template <typename T>
100   int push(T v){
101     int oldpos = pos;
102     write<T>(v, pos);
103     pos += sizeof(T);
104     return oldpos;
105   }
106   
107   template <typename T>
108   T pop(){
109     T temp = read<T>(pos);
110     pos += sizeof(T);
111     return temp;
112   }
114   template <typename T>
115   T peek(){
116     T temp = read<T>(pos);
117     return temp;
118   }
120   template <typename T0, typename T>
121   T peekSecond(){
122     T temp;
123     memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
124     return temp;
125   }
127   int datalength(){
128     return pos;
129   }
130      
131   void * buffer(){
132     return (void*) buf;
133   }  
135   void freeBuf(){
136     free(buf);
137   }
139   ~compressedBuffer(){
140     // don't free the buf because the user my have supplied the buffer
141   }
142     
148 // Global Readonly
149 CkGroupID traceSummaryGID;
150 bool summaryCcsStreaming;
152 int sumonly = 0;
153 int sumDetail = 0;
156   For each TraceFoo module, _createTraceFoo() must be defined.
157   This function is called in _createTraces() generated in moduleInit.C
159 void _createTracesummary(char **argv)
161   DEBUGF(("%d createTraceSummary\n", CkMyPe()));
162   CkpvInitialize(TraceSummary*, _trace);
163   CkpvInitialize(int, previouslySentBins);
164   CkpvAccess(previouslySentBins) = 0;
165   CkpvAccess(_trace) = new  TraceSummary(argv);
166   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
167   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
171 /// function call for starting a phase in trace summary logs 
172 void CkSummary_StartPhase(int phase)
174    CkpvAccess(_trace)->startPhase(phase);
178 /// function call for adding an event mark
179 void CkSummary_MarkEvent(int eventType)
181    CkpvAccess(_trace)->addEventType(eventType);
184 static inline void writeU(FILE* fp, int u)
186   fprintf(fp, "%4d", u);
189 PhaseEntry::PhaseEntry() 
191   int _numEntries=_entryTable.size();
192   // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
193   nEPs = _numEntries+10; 
194   count = new int[nEPs];
195   times = new double[nEPs];
196   maxtimes = new double[nEPs];
197   for (int i=0; i<nEPs; i++) {
198     count[i] = 0;
199     times[i] = 0.0;
200     maxtimes[i] = 0.;
201   }
204 SumLogPool::~SumLogPool() 
206     if (!sumonly) {
207       write();
208       fclose(fp);
209       if (sumDetail) fclose(sdfp);
210   }
211   // free memory for mark
212   if (markcount > 0)
213   for (int i=0; i<MAX_MARKS; i++) {
214     for (int j=0; j<events[i].length(); j++)
215       delete events[i][j];
216   }
217   delete[] pool;
218   delete[] epInfo;
219   delete[] cpuTime;
220   delete[] numExecutions;
223 void SumLogPool::addEventType(int eventType, double time)
225    if (eventType <0 || eventType >= MAX_MARKS) {
226        CkPrintf("Invalid event type %d!\n", eventType);
227        return;
228    }
229    MarkEntry *e = new MarkEntry;
230    e->time = time;
231    events[eventType].push_back(e);
232    markcount ++;
235 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES) 
237    // TBD: Can this be moved to initMem?
238   cpuTime = NULL;
239    poolSize = CkpvAccess(binCount);
240    if (poolSize % 2) poolSize++;        // make sure it is even
241    pool = new BinEntry[poolSize];
242    _MEMCHECK(pool);
244    this->pgm = new char[strlen(pgm)+1];
245    strcpy(this->pgm,pgm);
246    
247 #if 0
248    // create the sts file
249    if (CkMyPe() == 0) {
250      char *fname = 
251        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
252      sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
253      stsfp = fopen(fname, "w+");
254      //CmiPrintf("File: %s \n", fname);
255      if (stsfp == 0) {
256        CmiAbort("Cannot open summary sts file for writing.\n");
257       }
258      delete[] fname;
259    }
260 #endif
261    
262    // event
263    markcount = 0;
266 void SumLogPool::initMem()
268    int _numEntries=_entryTable.size();
269    epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
270    epInfo = new SumEntryInfo[epInfoSize];
271    _MEMCHECK(epInfo);
273    cpuTime = NULL;
274    numExecutions = NULL;
275    if (sumDetail) {
276        cpuTime = new double[poolSize*epInfoSize];
277        _MEMCHECK(cpuTime);
278        memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
279        numExecutions = new int[poolSize*epInfoSize];
280        _MEMCHECK(numExecutions);
281        memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
283 //         int i, e;
284 //         for(i=0; i<poolSize; i++) {
285 //             for(e=0; e< epInfoSize; e++) {
286 //                 setCPUtime(i,e,0.0);
287 //                 setNumExecutions(i,e,0);
288 //             }
289 //         }
290    }
293 int SumLogPool::getUtilization(int interval, int ep) {
294     return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize)); 
297 void SumLogPool::write(void) 
299   int i;
300   unsigned int j;
301   int _numEntries=_entryTable.size();
303   fp = NULL;
304   sdfp = NULL;
306   // create file(s)
307   // CmiPrintf("TRACE: %s:%d\n", fname, errno);
308   if (!sumonly) {
309     char pestr[10];
310     sprintf(pestr, "%d", CkMyPe());
311     int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
312     char *fname = new char[len+1];
313     
314     sprintf(fname, "%s.%s.sum", pgm, pestr);
315     do {
316       fp = fopen(fname, "w+");
317     } while (!fp && errno == EINTR);
318     if (!fp) {
319       CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
320       CmiAbort("Cannot open Summary Trace File for writing...\n");
321     }
322     
323     if (sumDetail) {
324       sprintf(fname, "%s.%s.sumd", pgm, pestr);
325       do {
326         sdfp = fopen(fname, "w+");
327       } while (!sdfp && errno == EINTR);
328       if(!sdfp) {
329         CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
330       }
331     }
332     delete[] fname;
333   }
335   fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
336   if (CkpvAccess(version)>=3.0)
337   {
338     fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
339   }
340   fprintf(fp, "\n");
342   // write bin time
343 #if 1
344   int last=pool[0].getU();
345   writeU(fp, last);
346   int count=1;
347   for(j=1; j<numBins; j++) {
348     int u = pool[j].getU();
349     if (last == u) {
350       count++;
351     }
352     else {
353       if (count > 1) fprintf(fp, "+%d", count);
354       writeU(fp, u);
355       last = u;
356       count = 1;
357     }
358   }
359   if (count > 1) fprintf(fp, "+%d", count);
360 #else
361   for(j=0; j<numEntries; j++) 
362       pool[j].write(fp);
363 #endif
364   fprintf(fp, "\n");
366   // write entry execution time
367   fprintf(fp, "EPExeTime: ");
368   for (i=0; i<_numEntries; i++)
369     fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
370   fprintf(fp, "\n");
371   // write entry function call times
372   fprintf(fp, "EPCallTime: ");
373   for (i=0; i<_numEntries; i++)
374     fprintf(fp, "%d ", epInfo[i].epCount);
375   fprintf(fp, "\n");
376   // write max entry function execute times
377   fprintf(fp, "MaxEPTime: ");
378   for (i=0; i<_numEntries; i++)
379     fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
380   fprintf(fp, "\n");
381 #if 0
382   for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
383     for (j=0; j<_numEntries; j++) 
384       fprintf(fp, "%d ", epInfo[j].hist[i]);
385     fprintf(fp, "\n");
386   }
387 #endif
388   // write marks
389   if (CkpvAccess(version)>=2.0) 
390   {
391     fprintf(fp, "NumMarks: %d ", markcount);
392     for (i=0; i<MAX_MARKS; i++) {
393       for(int j=0; j<events[i].length(); j++)
394         fprintf(fp, "%d %f ", i, events[i][j]->time);
395     }
396     fprintf(fp, "\n");
397   }
398   // write phases
399   if (CkpvAccess(version)>=3.0)
400   {
401     phaseTab.write(fp);
402   }
403   // write idle time
404   if (CkpvAccess(version)>=7.1) {
405     fprintf(fp, "IdlePercent: ");
406     int last=pool[0].getUIdle();
407     writeU(fp, last);
408     int count=1;
409     for(j=1; j<numBins; j++) {
410       int u = pool[j].getUIdle();
411       if (last == u) {
412         count++;
413       }
414       else {
415         if (count > 1) fprintf(fp, "+%d", count);
416         writeU(fp, u);
417         last = u;
418         count = 1;
419       }
420     }
421     if (count > 1) fprintf(fp, "+%d", count);
422     fprintf(fp, "\n");
423   }
425   //CkPrintf("writing to detail file:%d    %d \n", getNumEntries(), numBins);
426   // write summary details
427   if (sumDetail) {
428         fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
429                 CkpvAccess(version), CkMyPe(), CkNumPes(),
430                 numBins, _numEntries, CkpvAccess(binSize));
432         // Write out cpuTime in microseconds
433         // Run length encoding (RLE) along EP axis
434         fprintf(sdfp, "ExeTimePerEPperInterval ");
435         unsigned int e, i;
436         long last= (long) (getCPUtime(0,0)*1.0e6);
437         int count=0;
438         fprintf(sdfp, "%ld", last);
439         for(e=0; e<_numEntries; e++) {
440             for(i=0; i<numBins; i++) {
442                 long u= (long) (getCPUtime(i,e)*1.0e6);
443                 if (last == u) {
444                     count++;
445                 } else {
447                     if (count > 1) fprintf(sdfp, "+%d", count);
448                     fprintf(sdfp, " %ld", u);
449                     last = u;
450                     count = 1;
451                 }
452             }
453         }
454         if (count > 1) fprintf(sdfp, "+%d", count);
455         fprintf(sdfp, "\n");
456         // Write out numExecutions
457         // Run length encoding (RLE) along EP axis
458         fprintf(sdfp, "EPCallTimePerInterval ");
459         last= getNumExecutions(0,0);
460         count=0;
461         fprintf(sdfp, "%ld", last);
462         for(e=0; e<_numEntries; e++) {
463             for(i=0; i<numBins; i++) {
465                 long u= getNumExecutions(i, e);
466                 if (last == u) {
467                     count++;
468                 } else {
470                     if (count > 1) fprintf(sdfp, "+%d", count);
471                     fprintf(sdfp, " %ld", u);
472                     last = u;
473                     count = 1;
474                 }
475             }
476         }
477         if (count > 1) fprintf(sdfp, "+%d", count);
478         fprintf(sdfp, "\n");
479   }
482 void SumLogPool::writeSts(void)
484     // open sts file
485   char *fname = 
486        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
487   sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
488   stsfp = fopen(fname, "w+");
489   //CmiPrintf("File: %s \n", fname);
490   if (stsfp == 0) {
491        CmiAbort("Cannot open summary sts file for writing.\n");
492   }
493   delete[] fname;
495   traceWriteSTS(stsfp,_numEvents);
496   for(int i=0;i<_numEvents;i++)
497     fprintf(stsfp, "EVENT %d Event%d\n", i, i);
498   fprintf(stsfp, "END\n");
500   fclose(stsfp);
503 // Called once per interval
504 void SumLogPool::add(double time, double idleTime, int pe) 
506   new (&pool[numBins++]) BinEntry(time, idleTime);
507   if (poolSize==numBins) {
508     shrink();
509   }
512 // Called once per run of an EP
513 // adds 'time' to EP's time, increments epCount
514 void SumLogPool::setEp(int epidx, double time) 
516   if (epidx >= epInfoSize) {
517         CmiAbort("Invalid entry point!!\n");
518   }
519   //CmiPrintf("set EP: %d %e \n", epidx, time);
520   epInfo[epidx].setTime(time);
521   // set phase table counter
522   phaseTab.setEp(epidx, time);
525 // Called once from endExecute, endPack, etc. this function updates
526 // the sumDetail intervals.
527 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
529         if (epIdx >= epInfoSize) {
530             CmiAbort("Too many entry points!!\n");
531         }
533         double binSz = CkpvAccess(binSize);
534         int startingBinIdx, endingBinIdx;
535         startingBinIdx = (int)(startTime/binSz);
536         endingBinIdx = (int)(endTime/binSz);
537         // shrink if needed
538         while (endingBinIdx >= poolSize) {
539           shrink();
540           CmiAssert(CkpvAccess(binSize) > binSz);
541           binSz = CkpvAccess(binSize);
542           startingBinIdx = (int)(startTime/binSz);
543           endingBinIdx = (int)(endTime/binSz);
544         }
546         if (startingBinIdx == endingBinIdx) {
547             addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
548         } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
549             addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
550             while(++startingBinIdx < endingBinIdx)
551                 addToCPUtime(startingBinIdx, epIdx, binSz);
552             addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
553         } else {
554           CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
555                    startTime, endTime);
556             CmiAbort("Error: end time of EP is less than start time\n");
557         }
559         incNumExecutions(startingBinIdx, epIdx);
562 // Shrinks pool[], cpuTime[], and numExecutions[]
563 void SumLogPool::shrink(void)
565 //  double t = CmiWallTimer();
567   // We ensured earlier that poolSize is even; therefore now numBins
568   // == poolSize == even.
569   int entries = numBins/2;
570   for (int i=0; i<entries; i++)
571   {
572      pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
573      pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
574      if (sumDetail)
575      for (int e=0; e < epInfoSize; e++) {
576          setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
577          setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
578      }
579   }
580   // zero out the remaining intervals
581   if (sumDetail) {
582     memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
583     memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
584   }
585   numBins = entries;
586   CkpvAccess(binSize) *= 2;
588 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
591 void SumLogPool::shrink(double _maxBinSize)
593     while(CkpvAccess(binSize) < _maxBinSize)
594     {
595         shrink();
596     };
598 int  BinEntry::getU() 
600   return (int)(_time * 100.0 / CkpvAccess(binSize)); 
603 int BinEntry::getUIdle() {
604   return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
607 void BinEntry::write(FILE* fp)
609   writeU(fp, getU());
612 TraceSummary::TraceSummary(char **argv):msgNum(0),binStart(0.0),idleStart(0.0),
613                                         binTime(0.0),binIdle(0.0)
615   if (CkpvAccess(traceOnPe) == 0) return;
617     // use absolute time
618   if (CmiTimerAbsolute()) binStart = CmiInitTime();
620   CkpvInitialize(int, binCount);
621   CkpvInitialize(double, binSize);
622   CkpvInitialize(double, version);
623   CkpvAccess(binSize) = BIN_SIZE;
624   CkpvAccess(version) = VER;
625   CkpvAccess(binCount) = DefaultBinCount;
626   if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
627     if (CkMyPe() == 0) 
628       CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
629   CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
630         "CPU usage log time resolution");
631   CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
632         "Write this .sum file version");
634   epThreshold = 0.001; 
635   CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
636         "Execution time histogram lower bound");
637   epInterval = 0.001; 
638   CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
639         "Execution time histogram bin size");
641   sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
642   // +sumonly overrides +sumDetail
643   if (!sumonly)
644       sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
646   _logPool = new SumLogPool(CkpvAccess(traceRoot));
647   // assume invalid entry point on start
648   execEp=INVALIDEP;
649   inIdle = 0;
650   inExec = 0;
651   depth = 0;
654 void TraceSummary::traceClearEps(void)
656   _logPool->clearEps();
659 void TraceSummary::traceWriteSts(void)
661   if(CkMyPe()==0)
662       _logPool->writeSts();
665 void TraceSummary::traceClose(void)
667     if(CkMyPe()==0)
668         _logPool->writeSts();
669     CkpvAccess(_trace)->endComputation();
671     delete _logPool;
672     CkpvAccess(_traces)->removeTrace(this);
675 void TraceSummary::beginExecute(CmiObjId *tid)
677   beginExecute(-1,-1,_threadEP,-1);
680 void TraceSummary::beginExecute(envelope *e, void *obj)
682   // no message means thread execution
683   if (e==NULL) {
684     beginExecute(-1,-1,_threadEP,-1);
685   }
686   else {
687     beginExecute(-1,-1,e->getEpIdx(),-1);
688   }  
691 void TraceSummary::beginExecute(char *msg)
693 #if CMK_SMP_TRACE_COMMTHREAD
694     //This function is called from comm thread in SMP mode
695     envelope *e = (envelope *)msg;
696     int num = _entryTable.size();
697     int ep = e->getEpIdx();
698     if(ep<0 || ep>=num) return;
699     if(_entryTable[ep]->traceEnabled)
700         beginExecute(-1,-1,e->getEpIdx(),-1);
701 #endif
704 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx, void *obj)
706   if (execEp == TRACEON_EP) {
707     endExecute();
708   }
709   CmiAssert(inIdle == 0);
710   if (inExec == 0) {
711     CmiAssert(depth == 0);
712     inExec = 1;
713   }
714   depth ++;
715   // printf("BEGIN exec: %d %d %d\n", inIdle, inExec, depth);
717   if (depth > 1) return;          //  nested
720   if (execEp != INVALIDEP) {
721     TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
722     return;
723   }
725   
726   execEp=ep;
727   double t = TraceTimer();
728   //CmiPrintf("start: %f \n", start);
729   
730   start = t;
731   double ts = binStart;
732   // fill gaps
733   while ((ts = ts + CkpvAccess(binSize)) < t) {
734     /* Keep as a template for error checking. The current form of this check
735        is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
736        I used it). Perhaps an improved form could be used in case vastly
737        incompatible EP vs idle times are reported (binSize*2?).
739        This check will have to be duplicated before each call to add()
741     CkPrintf("[%d] %f vs %f\n", CkMyPe(),
742              binTime + binIdle, CkpvAccess(binSize));
743     CkAssert(binTime + binIdle <= CkpvAccess(binSize));
744     */
745      _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
746      binTime=0.0;                 // fill all other bins with 0 up till start
747      binIdle = 0.0;
748      binStart = ts;
749   }
752 void TraceSummary::endExecute()
754   CmiAssert(inIdle == 0 && inExec == 1);
755   depth --;
756   if (depth == 0) inExec = 0;
757   CmiAssert(depth >= 0);
758   // printf("END exec: %d %d %d\n", inIdle, inExec, depth);
760   if (depth != 0) return;
762   double t = TraceTimer();
763   double ts = start;
764   double nts = binStart;
767   if (execEp == TRACEON_EP) {
768     // if trace just got turned on, then one expects to see this
769     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
770     return;
771   }
774   if (execEp == INVALIDEP) {
775     TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
776     return;
777   }
779   if (execEp >= 0)
780   {
781     _logPool->setEp(execEp, t-ts);
782   }
784   while ((nts = nts + CkpvAccess(binSize)) < t)
785   {
786     // fill the bins with time for this entry method
787      binTime += nts-ts;
788      binStart  = nts;
789      // This calls shrink() if needed
790      _logPool->add(binTime, binIdle, CkMyPe()); 
791      binTime = 0.0;
792      binIdle = 0.0;
793      ts = nts;
794   }
795   binTime += t - ts;
797   if (sumDetail && execEp >= 0 )
798       _logPool->updateSummaryDetail(execEp, start, t);
800   execEp = INVALIDEP;
803 void TraceSummary::endExecute(char *msg){
804 #if CMK_SMP_TRACE_COMMTHREAD
805     //This function is called from comm thread in SMP mode
806     envelope *e = (envelope *)msg;
807     int num = _entryTable.size();
808     int ep = e->getEpIdx();
809     if(ep<0 || ep>=num) return;
810     if(_entryTable[ep]->traceEnabled){
811         endExecute();
812     }
813 #endif    
816 void TraceSummary::beginIdle(double currT)
818   if (execEp == TRACEON_EP) {
819     endExecute();
820   }
822   CmiAssert(inIdle == 0 && inExec == 0);
823   inIdle = 1;
824   //printf("BEGIN idle: %d %d %d\n", inIdle, inExec, depth);
826   double t = TraceTimer(currT);
827   
828   // mark the time of this idle period. Only the next endIdle should see
829   // this value
830   idleStart = t; 
831   double ts = binStart;
832   // fill gaps
833   while ((ts = ts + CkpvAccess(binSize)) < t) {
834     _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
835     binTime=0.0;                 // fill all other bins with 0 up till start
836     binIdle = 0.0;
837     binStart = ts;
838   }
841 void TraceSummary::endIdle(double currT)
843   CmiAssert(inIdle == 1 && inExec == 0);
844   inIdle = 0;
845   // printf("END idle: %d %d %d\n", inIdle, inExec, depth);
847   double t = TraceTimer(currT);
848   double t_idleStart = idleStart;
849   double t_binStart = binStart;
851   while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
852   {
853     // fill the bins with time for idle
854     binIdle += t_binStart - t_idleStart;
855     binStart = t_binStart;
856     _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
857     binTime = 0.0;
858     binIdle = 0.0;
859     t_idleStart = t_binStart;
860   }
861   binIdle += t - t_idleStart;
864 void TraceSummary::traceBegin(void)
866     // fake as a start of an event, assuming traceBegin is called inside an
867     // entry function.
868   beginExecute(-1, -1, TRACEON_EP, -1, -1);
871 void TraceSummary::traceEnd(void)
873   endExecute();
876 void TraceSummary::beginPack(void)
878     packstart = CmiWallTimer();
881 void TraceSummary::endPack(void)
883     _logPool->setEp(_packEP, CmiWallTimer() - packstart);
884     if (sumDetail)
885         _logPool->updateSummaryDetail(_packEP,  TraceTimer(packstart), TraceTimer(CmiWallTimer()));
888 void TraceSummary::beginUnpack(void)
890     unpackstart = CmiWallTimer();
893 void TraceSummary::endUnpack(void)
895     _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
896     if (sumDetail)
897         _logPool->updateSummaryDetail(_unpackEP,  TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
900 void TraceSummary::beginComputation(void)
902   // initialze arrays because now the number of entries is known.
903   _logPool->initMem();
906 void TraceSummary::endComputation(void)
908   static int done = 0;
909   if (done) return;
910   done = 1;
911   if (msgNum==0) {
912 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
913      _logPool->add(binTime, binIdle, CkMyPe());
914      binTime = 0.0;
915      binIdle = 0.0;
916      msgNum ++;
918      binStart  += CkpvAccess(binSize);
919      double t = TraceTimer();
920      double ts = binStart;
921      while (ts < t)
922      {
923        _logPool->add(binTime, binIdle, CkMyPe());
924        binTime=0.0;
925        binIdle = 0.0;
926        ts += CkpvAccess(binSize);
927      }
929   }
932 void TraceSummary::addEventType(int eventType)
934   _logPool->addEventType(eventType, TraceTimer());
937 void TraceSummary::startPhase(int phase)
939    _logPool->startPhase(phase);
942 void TraceSummary::traceEnableCCS() {
943   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
944   sumProxy.initCCS();
948 void TraceSummary::fillData(double *buffer, double reqStartTime, 
949                             double reqBinSize, int reqNumBins) {
950   // buffer has to be pre-allocated by the requester and must be an array of
951   // size reqNumBins.
952   //
953   // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
954   //              need a number of these assumptions dropped:
955   //              1) reqBinSize == binSize (unrealistic)
956   //              2) bins boundary aligned (ok even under normal circumstances)
957   //              3) bins are "factor"-aligned (where reqBinSize != binSize)
958   //              4) bins are always available (true unless flush)
959   //              5) bins always starts from 0 (unrealistic)
961   // works only because of 1)
962   // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with 
963   //           "starting" at all!
964   int binOffset = (int)(reqStartTime/reqBinSize); 
965   for (int i=binOffset; i<binOffset + reqNumBins; i++) {
966     // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
967     buffer[i-binOffset] = pool()->getTime(i);
968   }
971 void TraceSummaryBOC::traceSummaryParallelShutdown(int pe) {
972    
973     UInt    numBins = CkpvAccess(_trace)->pool()->getNumEntries();  
974     //CkPrintf("trace shut down pe=%d bincount=%d\n", CkMyPe(), numBins);
975     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
976     CkCallback cb(CkReductionTarget(TraceSummaryBOC, maxBinSize), sumProxy[0]);
977     contribute(sizeof(double), &(CkpvAccess(binSize)), CkReduction::max_double, cb);
980 // collect the max bin size
981 void TraceSummaryBOC::maxBinSize(double _maxBinSize)
983     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
984     sumProxy.shrink(_maxBinSize);
987 void TraceSummaryBOC::shrink(double _mBin){
988     UInt    numBins = CkpvAccess(_trace)->pool()->getNumEntries();  
989     UInt    epNums  = CkpvAccess(_trace)->pool()->getEpInfoSize();
990     _maxBinSize = _mBin;
991     if(CkpvAccess(binSize) < _maxBinSize)
992     {
993         CkpvAccess(_trace)->pool()->shrink(_maxBinSize);
994     }
995     double *sumData = CkpvAccess(_trace)->pool()->getCpuTime();  
996     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
997     CkCallback cb(CkReductionTarget(TraceSummaryBOC, sumData), sumProxy[0]);
998     contribute(sizeof(double) * numBins * epNums, CkpvAccess(_trace)->pool()->getCpuTime(), CkReduction::sum_double, cb);
1001 void TraceSummaryBOC::sumData(double *sumData, int totalsize) {
1002     UInt    epNums  = CkpvAccess(_trace)->pool()->getEpInfoSize();
1003     UInt    numBins = totalsize/epNums;  
1004     int     numEntries = epNums - NUM_DUMMY_EPS - 1; 
1005     char    *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sumall")+1];
1006     sprintf(fname, "%s.sumall", CkpvAccess(traceRoot));
1007     FILE *sumfp = fopen(fname, "w+");
1008     delete [] fname;
1009     fprintf(sumfp, "ver:%3.1f cpu:%d numIntervals:%d numEPs:%d intervalSize:%e\n",
1010                 CkpvAccess(version), CkNumPes(),
1011                 numBins, numEntries, _maxBinSize);
1012     for(int i=0; i<numBins; i++){
1013         for(int j=0; j<numEntries; j++)
1014         {
1015             fprintf(sumfp, "%ld ", (long)(sumData[i*epNums+j]*1.0e6));
1016         }
1017     }
1018    fclose(sumfp);
1019    //CkPrintf("done with analysis\n");
1020    CkContinueExit();
1023 /// for TraceSummaryBOC
1025 void TraceSummaryBOC::initCCS() {
1026   if(firstTime){
1027     CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
1028     // initializing CCS-based parameters on all processors
1029     lastRequestedIndexBlock = 0;
1030     indicesPerBlock = 1000;
1031     collectionGranularity = 0.001; // time in seconds
1032     nBufferedBins = 0;
1033     
1034     // initialize buffer, register CCS handler and start the collection
1035     // pulse only on pe 0.
1036     if (CkMyPe() == 0) { 
1037       ccsBufferedData = new CkVec<double>();
1038     
1039       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1040       CkPrintf("Trace Summary now listening in for CCS Client\n");
1041       CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
1042                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
1043       CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
1044                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
1046       CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
1047       CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
1048                              (void *)this);
1049       summaryCcsStreaming = true;
1050     }
1051     firstTime = false;
1052   }
1055 /** Return summary information as double precision values for each sample period. 
1056     The actual data collection is in double precision values. 
1058     The units on the returned values are total execution time across all PEs.
1060 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
1061   double *sendBuffer;
1063   CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
1065   CkPrintf("Responding ...\n");
1066   int datalength = 0;
1067   // if we have no data to send, send an acknowledgement code of -13.37
1068   // instead.
1069   if (ccsBufferedData->length() == 0) {
1070     sendBuffer = new double[1];
1071     sendBuffer[0] = -13.37;
1072     datalength = sizeof(double);
1073     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1074     delete [] sendBuffer;
1075   } else {
1076     sendBuffer = ccsBufferedData->getVec();
1077     datalength = ccsBufferedData->length()*sizeof(double);
1078     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1079     ccsBufferedData->free();
1080   }
1081   CkPrintf("Response Sent. Proceeding with computation.\n");
1082   delete m;
1086 /** Return summary information as unsigned char values for each sample period. 
1087     The actual data collection is in double precision values.
1089     This returns the utilization in a range from 0 to 200.
1091 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
1092   unsigned char *sendBuffer;
1094   CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
1096   CkPrintf("Responding ...\n");
1097   int datalength = 0;
1099   if (ccsBufferedData->length() == 0) {
1100     sendBuffer = new unsigned char[1];
1101     sendBuffer[0] = 255;
1102     datalength = sizeof(unsigned char);
1103     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1104     delete [] sendBuffer;
1105   } else {
1106     double * doubleData = ccsBufferedData->getVec();
1107     int numData = ccsBufferedData->length();
1108     
1109     // pack data into unsigned char array
1110     sendBuffer = new unsigned char[numData];
1111     
1112     for(int i=0;i<numData;i++){
1113       sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1114     }    
1116     datalength = sizeof(unsigned char) * numData;
1117     
1118     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1119     ccsBufferedData->free();
1120     delete [] sendBuffer;
1121   }
1122   CkPrintf("Response Sent. Proceeding with computation.\n");
1123   delete m;
1128 void startCollectData(void *data, double currT) {
1129   CkAssert(CkMyPe() == 0);
1130   // CkPrintf("startCollectData()\n");
1131   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1132   int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1133   double collectionGranularity = sumObj->collectionGranularity;
1134   int indicesPerBlock = sumObj->indicesPerBlock;
1135   
1136   double startTime = lastRequestedIndexBlock*
1137     collectionGranularity * indicesPerBlock;
1138   int numIndicesToGet = (int)floor((currT - startTime)/
1139                                    collectionGranularity);
1140   int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1141   // **TODO** consider limiting the total number of blocks each collection
1142   //   request will pick up. This is to limit the amount of perturbation
1143   //   if it proves to be a problem.
1144   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1146    sumProxy.collectSummaryData(startTime, 
1147                        collectionGranularity,
1148                        numBlocksToGet*indicesPerBlock);
1149   // assume success
1150   sumObj->lastRequestedIndexBlock += numBlocksToGet; 
1153 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1154                                   int numBins) {
1155   // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1157   double *contribution = new double[numBins];
1158   for (int i=0; i<numBins; i++) {
1159     contribution[i] = 0.0;
1160   }
1161   CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1163   /*
1164   for (int i=0; i<numBins; i++) {
1165     CkPrintf("[%d] %f\n", i, contribution[i]);
1166   }
1167   */
1169   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1170   CkCallback cb(CkReductionTarget(TraceSummaryBOC, summaryDataCollected), sumProxy[0]);
1171   contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double,
1172              cb);
1173   delete [] contribution;
1176 void TraceSummaryBOC::summaryDataCollected(double *recvData, int numBins) {
1177   CkAssert(CkMyPe() == 0);
1178   // **CWL** No memory management for the ccs buffer for now.
1180   // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1182   // if there's an easier way to append a data block to a CkVec, I'll take it
1183   for (int i=0; i<numBins; i++) {
1184     ccsBufferedData->insertAtEnd(recvData[i]);
1185   }
1191 void TraceSummaryBOC::startSumOnly()
1193   CmiAssert(CkMyPe() == 0);
1195   CProxy_TraceSummaryBOC p(traceSummaryGID);
1196   int size = CkpvAccess(_trace)->pool()->getNumEntries();
1197   p.askSummary(size);
1200 void TraceSummaryBOC::askSummary(int size)
1202   if (CkpvAccess(_trace) == NULL) return;
1204   int traced = CkpvAccess(_trace)->traceOnPE();
1206   BinEntry *reductionBuffer = new BinEntry[size+1];
1207   reductionBuffer[size].time() = traced;  // last element is the traced pe count
1208   reductionBuffer[size].getIdleTime() = 0;  // last element is the traced pe count
1209   if (traced) {
1210     CkpvAccess(_trace)->endComputation();
1211     int n = CkpvAccess(_trace)->pool()->getNumEntries();
1212     BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1213     if (n>size) n=size;
1214     for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1215   }
1217   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1218   CkCallback cb(CkReductionTarget(TraceSummaryBOC, sendSummaryBOC), 0, sumProxy);
1219   contribute(sizeof(BinEntry)*(size+1), reductionBuffer, 
1220              CkReduction::sum_double, cb);
1221   delete [] reductionBuffer;
1224 void TraceSummaryBOC::sendSummaryBOC(double *results, int n)
1226   if (CkpvAccess(_trace) == NULL) return;
1228   CkAssert(CkMyPe() == 0);
1230   nBins = n-1;
1231   bins = (BinEntry *)results;
1232   nTracedPEs = (int)bins[n-1].time();
1233   // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1235   write();
1237   CkContinueExit();
1240 void TraceSummaryBOC::write(void) 
1242   unsigned int j;
1244   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1245   sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1246   FILE *sumfp = fopen(fname, "w+");
1247   //CmiPrintf("File: %s \n", fname);
1248   if(sumfp == 0)
1249       CmiAbort("Cannot open summary sts file for writing.\n");
1250   delete[] fname;
1252   int _numEntries=_entryTable.size();
1253   fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1254   fprintf(sumfp, "\n");
1256   // write bin time
1257 #if 0
1258   int last=pool[0].getU();
1259   writeU(fp, last);
1260   int count=1;
1261   for(j=1; j<numEntries; j++) {
1262     int u = pool[j].getU();
1263     if (last == u) {
1264       count++;
1265     }
1266     else {
1267       if (count > 1) fprintf(fp, "+%d", count);
1268       writeU(fp, u);
1269       last = u;
1270       count = 1;
1271     }
1272   }
1273   if (count > 1) fprintf(fp, "+%d", count);
1274 #else
1275   for(j=0; j<nBins; j++) {
1276     bins[j].time() /= nTracedPEs;
1277     bins[j].write(sumfp);
1278   }
1279 #endif
1280   fprintf(sumfp, "\n");
1281   fclose(sumfp);
1285 static void CombineSummary()
1287 #if CMK_TRACE_ENABLED
1288   CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1290   if ((sumonly || sumDetail) && traceSummaryGID.isZero()) {
1291     CkContinueExit();
1292     return;
1293   }
1295   if (sumonly) {
1296     CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1297       // pe 0 start the sumonly process
1298     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1299     sumProxy[0].startSumOnly();
1300   }else if(sumDetail)
1301   {
1302       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1303       sumProxy.traceSummaryParallelShutdown(-1);
1304   }
1305   else {
1306     CkContinueExit();
1307   }
1308 #else
1309   CkContinueExit();
1310 #endif
1313 void initTraceSummaryBOC()
1315 #ifdef __BIGSIM__
1316   if(BgNodeRank()==0) {
1317 #else
1318   if (CkMyRank() == 0) {
1319 #endif
1320     registerExitFn(CombineSummary);
1321   }
1324 #include "TraceSummary.def.h"
1326 /*@}*/