Fix memory leaks in trace projections and summary
[charm.git] / src / ck-perf / trace-summary.C
blob4fb168c529fea86bb16911b6751360f27563ce66
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
6 #include "charm++.h"
7 #include "trace-summary.h"
8 #include "trace-summaryBOC.h"
10 #define DEBUGF(x)  // CmiPrintf x
12 #define VER   7.1
14 #define INVALIDEP     -2
15 #define TRACEON_EP     -3
17 // 1 minutes of run before it'll fill up:
18 #define DefaultBinCount      (1000*60*1) 
20 CkpvStaticDeclare(TraceSummary*, _trace);
21 static int _numEvents = 0;
22 #define NUM_DUMMY_EPS 9
23 CkpvDeclare(int, binCount);
24 CkpvDeclare(double, binSize);
25 CkpvDeclare(double, version);
28 CkpvDeclare(int, previouslySentBins);
32 /** 
33     A class that reads/writes a buffer out of different types of data.
35     This class exists because I need to get references to parts of the buffer 
36     that have already been used so that I can increment counters inside the buffer.
39 class compressedBuffer {
40 public:
41   char* buf;
42   int pos; ///<< byte position just beyond the previously read/written data
44   compressedBuffer(){
45     buf = NULL;
46     pos = 0;
47   }
49   compressedBuffer(int bytes){
50     buf = (char*)malloc(bytes);
51     pos = 0;
52   }
54   compressedBuffer(void *buffer){
55     buf = (char*)buffer;
56     pos = 0;
57   }
58     
59   void init(void *buffer){
60     buf = (char*)buffer;
61     pos = 0;
62   }
63     
64   inline void * currentPtr(){
65     return (void*)(buf+pos);
66   }
68   template <typename T>
69   T read(int offset){
70     // to resolve unaligned writes causing bus errors, need memcpy
71     T v;
72     memcpy(&v, buf+offset, sizeof(T));
73     return v;
74   }
75     
76   template <typename T>
77   void write(T v, int offset){
78     T v2 = v; // on stack
79     // to resolve unaligned writes causing bus errors, need memcpy
80     memcpy(buf+offset, &v2, sizeof(T));
81   }
82     
83   template <typename T>
84   void increment(int offset){
85     T temp;
86     temp = read<T>(offset);
87     temp ++;
88     write<T>(temp, offset);
89   }
91   template <typename T>
92   void accumulate(T v, int offset){
93     T temp;
94     temp = read<T>(offset);
95     temp += v;
96     write<T>(temp, offset);
97   }
98   
99   template <typename T>
100   int push(T v){
101     int oldpos = pos;
102     write<T>(v, pos);
103     pos += sizeof(T);
104     return oldpos;
105   }
106   
107   template <typename T>
108   T pop(){
109     T temp = read<T>(pos);
110     pos += sizeof(T);
111     return temp;
112   }
114   template <typename T>
115   T peek(){
116     T temp = read<T>(pos);
117     return temp;
118   }
120   template <typename T0, typename T>
121   T peekSecond(){
122     T temp;
123     memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
124     return temp;
125   }
127   int datalength(){
128     return pos;
129   }
130      
131   void * buffer(){
132     return (void*) buf;
133   }  
135   void freeBuf(){
136     free(buf);
137   }
139   ~compressedBuffer(){
140     // don't free the buf because the user my have supplied the buffer
141   }
142     
148 // Global Readonly
149 CkGroupID traceSummaryGID;
150 bool summaryCcsStreaming;
152 int sumonly = 0;
153 int sumDetail = 0;
156   For each TraceFoo module, _createTraceFoo() must be defined.
157   This function is called in _createTraces() generated in moduleInit.C
159 void _createTracesummary(char **argv)
161   DEBUGF(("%d createTraceSummary\n", CkMyPe()));
162   CkpvInitialize(TraceSummary*, _trace);
163   CkpvInitialize(int, previouslySentBins);
164   CkpvAccess(previouslySentBins) = 0;
165   CkpvAccess(_trace) = new  TraceSummary(argv);
166   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
167   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
171 /// function call for starting a phase in trace summary logs 
172 extern "C" 
173 void CkSummary_StartPhase(int phase)
175    CkpvAccess(_trace)->startPhase(phase);
179 /// function call for adding an event mark
180 extern "C" 
181 void CkSummary_MarkEvent(int eventType)
183    CkpvAccess(_trace)->addEventType(eventType);
186 static inline void writeU(FILE* fp, int u)
188   fprintf(fp, "%4d", u);
191 PhaseEntry::PhaseEntry() 
193   int _numEntries=_entryTable.size();
194   // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
195   nEPs = _numEntries+10; 
196   count = new int[nEPs];
197   times = new double[nEPs];
198   maxtimes = new double[nEPs];
199   for (int i=0; i<nEPs; i++) {
200     count[i] = 0;
201     times[i] = 0.0;
202     maxtimes[i] = 0.;
203   }
206 SumLogPool::~SumLogPool() 
208     if (!sumonly) {
209       write();
210       fclose(fp);
211       if (sumDetail) fclose(sdfp);
212   }
213   // free memory for mark
214   if (markcount > 0)
215   for (int i=0; i<MAX_MARKS; i++) {
216     for (int j=0; j<events[i].length(); j++)
217       delete events[i][j];
218   }
219   delete[] pool;
220   delete[] epInfo;
221   delete[] cpuTime;
222   delete[] numExecutions;
225 void SumLogPool::addEventType(int eventType, double time)
227    if (eventType <0 || eventType >= MAX_MARKS) {
228        CkPrintf("Invalid event type %d!\n", eventType);
229        return;
230    }
231    MarkEntry *e = new MarkEntry;
232    e->time = time;
233    events[eventType].push_back(e);
234    markcount ++;
237 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES) 
239    // TBD: Can this be moved to initMem?
240   cpuTime = NULL;
241    poolSize = CkpvAccess(binCount);
242    if (poolSize % 2) poolSize++;        // make sure it is even
243    pool = new BinEntry[poolSize];
244    _MEMCHECK(pool);
246    this->pgm = new char[strlen(pgm)+1];
247    strcpy(this->pgm,pgm);
248    
249 #if 0
250    // create the sts file
251    if (CkMyPe() == 0) {
252      char *fname = 
253        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
254      sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
255      stsfp = fopen(fname, "w+");
256      //CmiPrintf("File: %s \n", fname);
257      if (stsfp == 0) {
258        CmiAbort("Cannot open summary sts file for writing.\n");
259       }
260      delete[] fname;
261    }
262 #endif
263    
264    // event
265    markcount = 0;
268 void SumLogPool::initMem()
270    int _numEntries=_entryTable.size();
271    epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
272    epInfo = new SumEntryInfo[epInfoSize];
273    _MEMCHECK(epInfo);
275    cpuTime = NULL;
276    numExecutions = NULL;
277    if (sumDetail) {
278        cpuTime = new double[poolSize*epInfoSize];
279        _MEMCHECK(cpuTime);
280        memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
281        numExecutions = new int[poolSize*epInfoSize];
282        _MEMCHECK(numExecutions);
283        memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
285 //         int i, e;
286 //         for(i=0; i<poolSize; i++) {
287 //             for(e=0; e< epInfoSize; e++) {
288 //                 setCPUtime(i,e,0.0);
289 //                 setNumExecutions(i,e,0);
290 //             }
291 //         }
292    }
295 int SumLogPool::getUtilization(int interval, int ep) {
296     return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize)); 
299 void SumLogPool::write(void) 
301   int i;
302   unsigned int j;
303   int _numEntries=_entryTable.size();
305   fp = NULL;
306   sdfp = NULL;
308   // create file(s)
309   // CmiPrintf("TRACE: %s:%d\n", fname, errno);
310   if (!sumonly) {
311     char pestr[10];
312     sprintf(pestr, "%d", CkMyPe());
313     int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
314     char *fname = new char[len+1];
315     
316     sprintf(fname, "%s.%s.sum", pgm, pestr);
317     do {
318       fp = fopen(fname, "w+");
319     } while (!fp && errno == EINTR);
320     if (!fp) {
321       CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
322       CmiAbort("Cannot open Summary Trace File for writing...\n");
323     }
324     
325     if (sumDetail) {
326       sprintf(fname, "%s.%s.sumd", pgm, pestr);
327       do {
328         sdfp = fopen(fname, "w+");
329       } while (!sdfp && errno == EINTR);
330       if(!sdfp) {
331         CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
332       }
333     }
334     delete[] fname;
335   }
337   fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
338   if (CkpvAccess(version)>=3.0)
339   {
340     fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
341   }
342   fprintf(fp, "\n");
344   // write bin time
345 #if 1
346   int last=pool[0].getU();
347   writeU(fp, last);
348   int count=1;
349   for(j=1; j<numBins; j++) {
350     int u = pool[j].getU();
351     if (last == u) {
352       count++;
353     }
354     else {
355       if (count > 1) fprintf(fp, "+%d", count);
356       writeU(fp, u);
357       last = u;
358       count = 1;
359     }
360   }
361   if (count > 1) fprintf(fp, "+%d", count);
362 #else
363   for(j=0; j<numEntries; j++) 
364       pool[j].write(fp);
365 #endif
366   fprintf(fp, "\n");
368   // write entry execution time
369   fprintf(fp, "EPExeTime: ");
370   for (i=0; i<_numEntries; i++)
371     fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
372   fprintf(fp, "\n");
373   // write entry function call times
374   fprintf(fp, "EPCallTime: ");
375   for (i=0; i<_numEntries; i++)
376     fprintf(fp, "%d ", epInfo[i].epCount);
377   fprintf(fp, "\n");
378   // write max entry function execute times
379   fprintf(fp, "MaxEPTime: ");
380   for (i=0; i<_numEntries; i++)
381     fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
382   fprintf(fp, "\n");
383 #if 0
384   for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
385     for (j=0; j<_numEntries; j++) 
386       fprintf(fp, "%d ", epInfo[j].hist[i]);
387     fprintf(fp, "\n");
388   }
389 #endif
390   // write marks
391   if (CkpvAccess(version)>=2.0) 
392   {
393     fprintf(fp, "NumMarks: %d ", markcount);
394     for (i=0; i<MAX_MARKS; i++) {
395       for(int j=0; j<events[i].length(); j++)
396         fprintf(fp, "%d %f ", i, events[i][j]->time);
397     }
398     fprintf(fp, "\n");
399   }
400   // write phases
401   if (CkpvAccess(version)>=3.0)
402   {
403     phaseTab.write(fp);
404   }
405   // write idle time
406   if (CkpvAccess(version)>=7.1) {
407     fprintf(fp, "IdlePercent: ");
408     int last=pool[0].getUIdle();
409     writeU(fp, last);
410     int count=1;
411     for(j=1; j<numBins; j++) {
412       int u = pool[j].getUIdle();
413       if (last == u) {
414         count++;
415       }
416       else {
417         if (count > 1) fprintf(fp, "+%d", count);
418         writeU(fp, u);
419         last = u;
420         count = 1;
421       }
422     }
423     if (count > 1) fprintf(fp, "+%d", count);
424     fprintf(fp, "\n");
425   }
427   //CkPrintf("writing to detail file:%d    %d \n", getNumEntries(), numBins);
428   // write summary details
429   if (sumDetail) {
430         fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
431                 CkpvAccess(version), CkMyPe(), CkNumPes(),
432                 numBins, _numEntries, CkpvAccess(binSize));
434         // Write out cpuTime in microseconds
435         // Run length encoding (RLE) along EP axis
436         fprintf(sdfp, "ExeTimePerEPperInterval ");
437         unsigned int e, i;
438         long last= (long) (getCPUtime(0,0)*1.0e6);
439         int count=0;
440         fprintf(sdfp, "%ld", last);
441         for(e=0; e<_numEntries; e++) {
442             for(i=0; i<numBins; i++) {
444                 long u= (long) (getCPUtime(i,e)*1.0e6);
445                 if (last == u) {
446                     count++;
447                 } else {
449                     if (count > 1) fprintf(sdfp, "+%d", count);
450                     fprintf(sdfp, " %ld", u);
451                     last = u;
452                     count = 1;
453                 }
454             }
455         }
456         if (count > 1) fprintf(sdfp, "+%d", count);
457         fprintf(sdfp, "\n");
458         // Write out numExecutions
459         // Run length encoding (RLE) along EP axis
460         fprintf(sdfp, "EPCallTimePerInterval ");
461         last= getNumExecutions(0,0);
462         count=0;
463         fprintf(sdfp, "%ld", last);
464         for(e=0; e<_numEntries; e++) {
465             for(i=0; i<numBins; i++) {
467                 long u= getNumExecutions(i, e);
468                 if (last == u) {
469                     count++;
470                 } else {
472                     if (count > 1) fprintf(sdfp, "+%d", count);
473                     fprintf(sdfp, " %ld", u);
474                     last = u;
475                     count = 1;
476                 }
477             }
478         }
479         if (count > 1) fprintf(sdfp, "+%d", count);
480         fprintf(sdfp, "\n");
481   }
484 void SumLogPool::writeSts(void)
486     // open sts file
487   char *fname = 
488        new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
489   sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
490   stsfp = fopen(fname, "w+");
491   //CmiPrintf("File: %s \n", fname);
492   if (stsfp == 0) {
493        CmiAbort("Cannot open summary sts file for writing.\n");
494   }
495   delete[] fname;
497   traceWriteSTS(stsfp,_numEvents);
498   for(int i=0;i<_numEvents;i++)
499     fprintf(stsfp, "EVENT %d Event%d\n", i, i);
500   fprintf(stsfp, "END\n");
502   fclose(stsfp);
505 // Called once per interval
506 void SumLogPool::add(double time, double idleTime, int pe) 
508   new (&pool[numBins++]) BinEntry(time, idleTime);
509   if (poolSize==numBins) {
510     shrink();
511   }
514 // Called once per run of an EP
515 // adds 'time' to EP's time, increments epCount
516 void SumLogPool::setEp(int epidx, double time) 
518   if (epidx >= epInfoSize) {
519         CmiAbort("Invalid entry point!!\n");
520   }
521   //CmiPrintf("set EP: %d %e \n", epidx, time);
522   epInfo[epidx].setTime(time);
523   // set phase table counter
524   phaseTab.setEp(epidx, time);
527 // Called once from endExecute, endPack, etc. this function updates
528 // the sumDetail intervals.
529 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
531         if (epIdx >= epInfoSize) {
532             CmiAbort("Too many entry points!!\n");
533         }
535         double binSz = CkpvAccess(binSize);
536         int startingBinIdx, endingBinIdx;
537         startingBinIdx = (int)(startTime/binSz);
538         endingBinIdx = (int)(endTime/binSz);
539         // shrink if needed
540         while (endingBinIdx >= poolSize) {
541           shrink();
542           CmiAssert(CkpvAccess(binSize) > binSz);
543           binSz = CkpvAccess(binSize);
544           startingBinIdx = (int)(startTime/binSz);
545           endingBinIdx = (int)(endTime/binSz);
546         }
548         if (startingBinIdx == endingBinIdx) {
549             addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
550         } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
551             addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
552             while(++startingBinIdx < endingBinIdx)
553                 addToCPUtime(startingBinIdx, epIdx, binSz);
554             addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
555         } else {
556           CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
557                    startTime, endTime);
558             CmiAbort("Error: end time of EP is less than start time\n");
559         }
561         incNumExecutions(startingBinIdx, epIdx);
564 // Shrinks pool[], cpuTime[], and numExecutions[]
565 void SumLogPool::shrink(void)
567 //  double t = CmiWallTimer();
569   // We ensured earlier that poolSize is even; therefore now numBins
570   // == poolSize == even.
571   int entries = numBins/2;
572   for (int i=0; i<entries; i++)
573   {
574      pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
575      pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
576      if (sumDetail)
577      for (int e=0; e < epInfoSize; e++) {
578          setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
579          setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
580      }
581   }
582   // zero out the remaining intervals
583   if (sumDetail) {
584     memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
585     memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
586   }
587   numBins = entries;
588   CkpvAccess(binSize) *= 2;
590 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
593 void SumLogPool::shrink(double _maxBinSize)
595     while(CkpvAccess(binSize) < _maxBinSize)
596     {
597         shrink();
598     };
600 int  BinEntry::getU() 
602   return (int)(_time * 100.0 / CkpvAccess(binSize)); 
605 int BinEntry::getUIdle() {
606   return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
609 void BinEntry::write(FILE* fp)
611   writeU(fp, getU());
614 TraceSummary::TraceSummary(char **argv):binStart(0.0),idleStart(0.0),
615                                         binTime(0.0),binIdle(0.0),msgNum(0)
617   if (CkpvAccess(traceOnPe) == 0) return;
619     // use absolute time
620   if (CmiTimerAbsolute()) binStart = CmiInitTime();
622   CkpvInitialize(int, binCount);
623   CkpvInitialize(double, binSize);
624   CkpvInitialize(double, version);
625   CkpvAccess(binSize) = BIN_SIZE;
626   CkpvAccess(version) = VER;
627   CkpvAccess(binCount) = DefaultBinCount;
628   if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
629     if (CkMyPe() == 0) 
630       CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
631   CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
632         "CPU usage log time resolution");
633   CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
634         "Write this .sum file version");
636   epThreshold = 0.001; 
637   CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
638         "Execution time histogram lower bound");
639   epInterval = 0.001; 
640   CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
641         "Execution time histogram bin size");
643   sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
644   // +sumonly overrides +sumDetail
645   if (!sumonly)
646       sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
648   _logPool = new SumLogPool(CkpvAccess(traceRoot));
649   // assume invalid entry point on start
650   execEp=INVALIDEP;
651   inIdle = 0;
652   inExec = 0;
653   depth = 0;
656 void TraceSummary::traceClearEps(void)
658   _logPool->clearEps();
661 void TraceSummary::traceWriteSts(void)
663   if(CkMyPe()==0)
664       _logPool->writeSts();
667 void TraceSummary::traceClose(void)
669     if(CkMyPe()==0)
670         _logPool->writeSts();
671     CkpvAccess(_trace)->endComputation();
673     delete _logPool;
674     CkpvAccess(_traces)->removeTrace(this);
677 void TraceSummary::beginExecute(CmiObjId *tid)
679   beginExecute(-1,-1,_threadEP,-1);
682 void TraceSummary::beginExecute(envelope *e, void *obj)
684   // no message means thread execution
685   if (e==NULL) {
686     beginExecute(-1,-1,_threadEP,-1);
687   }
688   else {
689     beginExecute(-1,-1,e->getEpIdx(),-1);
690   }  
693 void TraceSummary::beginExecute(char *msg)
695 #if CMK_SMP_TRACE_COMMTHREAD
696     //This function is called from comm thread in SMP mode
697     envelope *e = (envelope *)msg;
698     int num = _entryTable.size();
699     int ep = e->getEpIdx();
700     if(ep<0 || ep>=num) return;
701     if(_entryTable[ep]->traceEnabled)
702         beginExecute(-1,-1,e->getEpIdx(),-1);
703 #endif
706 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx, void *obj)
708   if (execEp == TRACEON_EP) {
709     endExecute();
710   }
711   CmiAssert(inIdle == 0);
712   if (inExec == 0) {
713     CmiAssert(depth == 0);
714     inExec = 1;
715   }
716   depth ++;
717   // printf("BEGIN exec: %d %d %d\n", inIdle, inExec, depth);
719   if (depth > 1) return;          //  nested
722   if (execEp != INVALIDEP) {
723     TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
724     return;
725   }
727   
728   execEp=ep;
729   double t = TraceTimer();
730   //CmiPrintf("start: %f \n", start);
731   
732   start = t;
733   double ts = binStart;
734   // fill gaps
735   while ((ts = ts + CkpvAccess(binSize)) < t) {
736     /* Keep as a template for error checking. The current form of this check
737        is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
738        I used it). Perhaps an improved form could be used in case vastly
739        incompatible EP vs idle times are reported (binSize*2?).
741        This check will have to be duplicated before each call to add()
743     CkPrintf("[%d] %f vs %f\n", CkMyPe(),
744              binTime + binIdle, CkpvAccess(binSize));
745     CkAssert(binTime + binIdle <= CkpvAccess(binSize));
746     */
747      _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
748      binTime=0.0;                 // fill all other bins with 0 up till start
749      binIdle = 0.0;
750      binStart = ts;
751   }
754 void TraceSummary::endExecute()
756   CmiAssert(inIdle == 0 && inExec == 1);
757   depth --;
758   if (depth == 0) inExec = 0;
759   CmiAssert(depth >= 0);
760   // printf("END exec: %d %d %d\n", inIdle, inExec, depth);
762   if (depth != 0) return;
764   double t = TraceTimer();
765   double ts = start;
766   double nts = binStart;
769   if (execEp == TRACEON_EP) {
770     // if trace just got turned on, then one expects to see this
771     // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
772     return;
773   }
776   if (execEp == INVALIDEP) {
777     TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
778     return;
779   }
781   if (execEp >= 0)
782   {
783     _logPool->setEp(execEp, t-ts);
784   }
786   while ((nts = nts + CkpvAccess(binSize)) < t)
787   {
788     // fill the bins with time for this entry method
789      binTime += nts-ts;
790      binStart  = nts;
791      // This calls shrink() if needed
792      _logPool->add(binTime, binIdle, CkMyPe()); 
793      binTime = 0.0;
794      binIdle = 0.0;
795      ts = nts;
796   }
797   binTime += t - ts;
799   if (sumDetail && execEp >= 0 )
800       _logPool->updateSummaryDetail(execEp, start, t);
802   execEp = INVALIDEP;
805 void TraceSummary::endExecute(char *msg){
806 #if CMK_SMP_TRACE_COMMTHREAD
807     //This function is called from comm thread in SMP mode
808     envelope *e = (envelope *)msg;
809     int num = _entryTable.size();
810     int ep = e->getEpIdx();
811     if(ep<0 || ep>=num) return;
812     if(_entryTable[ep]->traceEnabled){
813         endExecute();
814     }
815 #endif    
818 void TraceSummary::beginIdle(double currT)
820   if (execEp == TRACEON_EP) {
821     endExecute();
822   }
824   CmiAssert(inIdle == 0 && inExec == 0);
825   inIdle = 1;
826   //printf("BEGIN idle: %d %d %d\n", inIdle, inExec, depth);
828   double t = TraceTimer(currT);
829   
830   // mark the time of this idle period. Only the next endIdle should see
831   // this value
832   idleStart = t; 
833   double ts = binStart;
834   // fill gaps
835   while ((ts = ts + CkpvAccess(binSize)) < t) {
836     _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
837     binTime=0.0;                 // fill all other bins with 0 up till start
838     binIdle = 0.0;
839     binStart = ts;
840   }
843 void TraceSummary::endIdle(double currT)
845   CmiAssert(inIdle == 1 && inExec == 0);
846   inIdle = 0;
847   // printf("END idle: %d %d %d\n", inIdle, inExec, depth);
849   double t = TraceTimer(currT);
850   double t_idleStart = idleStart;
851   double t_binStart = binStart;
853   while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
854   {
855     // fill the bins with time for idle
856     binIdle += t_binStart - t_idleStart;
857     binStart = t_binStart;
858     _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
859     binTime = 0.0;
860     binIdle = 0.0;
861     t_idleStart = t_binStart;
862   }
863   binIdle += t - t_idleStart;
866 void TraceSummary::traceBegin(void)
868     // fake as a start of an event, assuming traceBegin is called inside an
869     // entry function.
870   beginExecute(-1, -1, TRACEON_EP, -1, -1);
873 void TraceSummary::traceEnd(void)
875   endExecute();
878 void TraceSummary::beginPack(void)
880     packstart = CmiWallTimer();
883 void TraceSummary::endPack(void)
885     _logPool->setEp(_packEP, CmiWallTimer() - packstart);
886     if (sumDetail)
887         _logPool->updateSummaryDetail(_packEP,  TraceTimer(packstart), TraceTimer(CmiWallTimer()));
890 void TraceSummary::beginUnpack(void)
892     unpackstart = CmiWallTimer();
895 void TraceSummary::endUnpack(void)
897     _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
898     if (sumDetail)
899         _logPool->updateSummaryDetail(_unpackEP,  TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
902 void TraceSummary::beginComputation(void)
904   // initialze arrays because now the number of entries is known.
905   _logPool->initMem();
908 void TraceSummary::endComputation(void)
910   static int done = 0;
911   if (done) return;
912   done = 1;
913   if (msgNum==0) {
914 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
915      _logPool->add(binTime, binIdle, CkMyPe());
916      binTime = 0.0;
917      binIdle = 0.0;
918      msgNum ++;
920      binStart  += CkpvAccess(binSize);
921      double t = TraceTimer();
922      double ts = binStart;
923      while (ts < t)
924      {
925        _logPool->add(binTime, binIdle, CkMyPe());
926        binTime=0.0;
927        binIdle = 0.0;
928        ts += CkpvAccess(binSize);
929      }
931   }
934 void TraceSummary::addEventType(int eventType)
936   _logPool->addEventType(eventType, TraceTimer());
939 void TraceSummary::startPhase(int phase)
941    _logPool->startPhase(phase);
944 void TraceSummary::traceEnableCCS() {
945   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
946   sumProxy.initCCS();
950 void TraceSummary::fillData(double *buffer, double reqStartTime, 
951                             double reqBinSize, int reqNumBins) {
952   // buffer has to be pre-allocated by the requester and must be an array of
953   // size reqNumBins.
954   //
955   // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
956   //              need a number of these assumptions dropped:
957   //              1) reqBinSize == binSize (unrealistic)
958   //              2) bins boundary aligned (ok even under normal circumstances)
959   //              3) bins are "factor"-aligned (where reqBinSize != binSize)
960   //              4) bins are always available (true unless flush)
961   //              5) bins always starts from 0 (unrealistic)
963   // works only because of 1)
964   // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with 
965   //           "starting" at all!
966   int binOffset = (int)(reqStartTime/reqBinSize); 
967   for (int i=binOffset; i<binOffset + reqNumBins; i++) {
968     // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
969     buffer[i-binOffset] = pool()->getTime(i);
970   }
973 void TraceSummaryBOC::traceSummaryParallelShutdown(int pe) {
974    
975     UInt    numBins = CkpvAccess(_trace)->pool()->getNumEntries();  
976     //CkPrintf("trace shut down pe=%d bincount=%d\n", CkMyPe(), numBins);
977     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
978     CkCallback cb(CkIndex_TraceSummaryBOC::maxBinSize(NULL), sumProxy[0]);
979     contribute(sizeof(double), &(CkpvAccess(binSize)), CkReduction::max_double, cb);
982 // collect the max bin size
983 void TraceSummaryBOC::maxBinSize(CkReductionMsg *msg)
985     double _maxBinSize = *((double *)msg->getData());
986     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
987     sumProxy.shrink(_maxBinSize);
990 void TraceSummaryBOC::shrink(double _mBin){
991     UInt    numBins = CkpvAccess(_trace)->pool()->getNumEntries();  
992     UInt    epNums  = CkpvAccess(_trace)->pool()->getEpInfoSize();
993     _maxBinSize = _mBin;
994     if(CkpvAccess(binSize) < _maxBinSize)
995     {
996         CkpvAccess(_trace)->pool()->shrink(_maxBinSize);
997     }
998     double *sumData = CkpvAccess(_trace)->pool()->getCpuTime();  
999     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1000     CkCallback cb(CkIndex_TraceSummaryBOC::sumData(NULL), sumProxy[0]);
1001     contribute(sizeof(double) * numBins * epNums, CkpvAccess(_trace)->pool()->getCpuTime(), CkReduction::sum_double, cb);
1004 void TraceSummaryBOC::sumData(CkReductionMsg *msg) {
1005     double *sumData = (double *)msg->getData();
1006     int     totalsize = msg->getSize()/sizeof(double);
1007     UInt    epNums  = CkpvAccess(_trace)->pool()->getEpInfoSize();
1008     UInt    numBins = totalsize/epNums;  
1009     int     numEntries = epNums - NUM_DUMMY_EPS - 1; 
1010     char    *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sumall")+1];
1011     sprintf(fname, "%s.sumall", CkpvAccess(traceRoot));
1012     FILE *sumfp = fopen(fname, "w+");
1013     delete [] fname;
1014     fprintf(sumfp, "ver:%3.1f cpu:%d numIntervals:%d numEPs:%d intervalSize:%e\n",
1015                 CkpvAccess(version), CkNumPes(),
1016                 numBins, numEntries, _maxBinSize);
1017     for(int i=0; i<numBins; i++){
1018         for(int j=0; j<numEntries; j++)
1019         {
1020             fprintf(sumfp, "%ld ", (long)(sumData[i*epNums+j]*1.0e6));
1021         }
1022     }
1023    fclose(sumfp);
1024    //CkPrintf("done with analysis\n");
1025    CkExit();
1028 /// for TraceSummaryBOC
1030 void TraceSummaryBOC::initCCS() {
1031   if(firstTime){
1032     CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
1033     // initializing CCS-based parameters on all processors
1034     lastRequestedIndexBlock = 0;
1035     indicesPerBlock = 1000;
1036     collectionGranularity = 0.001; // time in seconds
1037     nBufferedBins = 0;
1038     
1039     // initialize buffer, register CCS handler and start the collection
1040     // pulse only on pe 0.
1041     if (CkMyPe() == 0) { 
1042       ccsBufferedData = new CkVec<double>();
1043     
1044       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1045       CkPrintf("Trace Summary now listening in for CCS Client\n");
1046       CcsRegisterHandler("CkPerfSummaryCcsClientCB", 
1047                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
1048       CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar", 
1049                          CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0])); 
1051       CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
1052       CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
1053                              (void *)this);
1054       summaryCcsStreaming = true;
1055     }
1056     firstTime = false;
1057   }
1060 /** Return summary information as double precision values for each sample period. 
1061     The actual data collection is in double precision values. 
1063     The units on the returned values are total execution time across all PEs.
1065 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
1066   double *sendBuffer;
1068   CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
1070   CkPrintf("Responding ...\n");
1071   int datalength = 0;
1072   // if we have no data to send, send an acknowledgement code of -13.37
1073   // instead.
1074   if (ccsBufferedData->length() == 0) {
1075     sendBuffer = new double[1];
1076     sendBuffer[0] = -13.37;
1077     datalength = sizeof(double);
1078     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1079     delete [] sendBuffer;
1080   } else {
1081     sendBuffer = ccsBufferedData->getVec();
1082     datalength = ccsBufferedData->length()*sizeof(double);
1083     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1084     ccsBufferedData->free();
1085   }
1086   CkPrintf("Response Sent. Proceeding with computation.\n");
1087   delete m;
1091 /** Return summary information as unsigned char values for each sample period. 
1092     The actual data collection is in double precision values.
1094     This returns the utilization in a range from 0 to 200.
1096 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
1097   unsigned char *sendBuffer;
1099   CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
1101   CkPrintf("Responding ...\n");
1102   int datalength = 0;
1104   if (ccsBufferedData->length() == 0) {
1105     sendBuffer = new unsigned char[1];
1106     sendBuffer[0] = 255;
1107     datalength = sizeof(unsigned char);
1108     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1109     delete [] sendBuffer;
1110   } else {
1111     double * doubleData = ccsBufferedData->getVec();
1112     int numData = ccsBufferedData->length();
1113     
1114     // pack data into unsigned char array
1115     sendBuffer = new unsigned char[numData];
1116     
1117     for(int i=0;i<numData;i++){
1118       sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1119     }    
1121     datalength = sizeof(unsigned char) * numData;
1122     
1123     CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1124     ccsBufferedData->free();
1125     delete [] sendBuffer;
1126   }
1127   CkPrintf("Response Sent. Proceeding with computation.\n");
1128   delete m;
1133 void startCollectData(void *data, double currT) {
1134   CkAssert(CkMyPe() == 0);
1135   // CkPrintf("startCollectData()\n");
1136   TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1137   int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1138   double collectionGranularity = sumObj->collectionGranularity;
1139   int indicesPerBlock = sumObj->indicesPerBlock;
1140   
1141   double startTime = lastRequestedIndexBlock*
1142     collectionGranularity * indicesPerBlock;
1143   int numIndicesToGet = (int)floor((currT - startTime)/
1144                                    collectionGranularity);
1145   int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1146   // **TODO** consider limiting the total number of blocks each collection
1147   //   request will pick up. This is to limit the amount of perturbation
1148   //   if it proves to be a problem.
1149   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1151    sumProxy.collectSummaryData(startTime, 
1152                        collectionGranularity,
1153                        numBlocksToGet*indicesPerBlock);
1154   // assume success
1155   sumObj->lastRequestedIndexBlock += numBlocksToGet; 
1158 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1159                                   int numBins) {
1160   // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1162   double *contribution = new double[numBins];
1163   for (int i=0; i<numBins; i++) {
1164     contribution[i] = 0.0;
1165   }
1166   CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1168   /*
1169   for (int i=0; i<numBins; i++) {
1170     CkPrintf("[%d] %f\n", i, contribution[i]);
1171   }
1172   */
1174   CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1175   CkCallback cb(CkIndex_TraceSummaryBOC::summaryDataCollected(NULL), sumProxy[0]);
1176   contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double, 
1177              cb);
1178   delete [] contribution;
1181 void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
1182   CkAssert(CkMyPe() == 0);
1183   // **CWL** No memory management for the ccs buffer for now.
1185   // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1186   double *recvData = (double *)msg->getData();
1187   int numBins = msg->getSize()/sizeof(double);
1189   // if there's an easier way to append a data block to a CkVec, I'll take it
1190   for (int i=0; i<numBins; i++) {
1191     ccsBufferedData->insertAtEnd(recvData[i]);
1192   }
1193   delete msg;
1199 void TraceSummaryBOC::startSumOnly()
1201   CmiAssert(CkMyPe() == 0);
1203   CProxy_TraceSummaryBOC p(traceSummaryGID);
1204   int size = CkpvAccess(_trace)->pool()->getNumEntries();
1205   p.askSummary(size);
1208 void TraceSummaryBOC::askSummary(int size)
1210   if (CkpvAccess(_trace) == NULL) return;
1212   int traced = CkpvAccess(_trace)->traceOnPE();
1214   BinEntry *reductionBuffer = new BinEntry[size+1];
1215   reductionBuffer[size].time() = traced;  // last element is the traced pe count
1216   reductionBuffer[size].getIdleTime() = 0;  // last element is the traced pe count
1217   if (traced) {
1218     CkpvAccess(_trace)->endComputation();
1219     int n = CkpvAccess(_trace)->pool()->getNumEntries();
1220     BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1221     if (n>size) n=size;
1222     for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1223   }
1225   contribute(sizeof(BinEntry)*(size+1), reductionBuffer, 
1226              CkReduction::sum_double);
1227   delete [] reductionBuffer;
1230 //extern "C" void _CkExit();
1232 void TraceSummaryBOC::sendSummaryBOC(CkReductionMsg *msg)
1234   if (CkpvAccess(_trace) == NULL) return;
1236   CkAssert(CkMyPe() == 0);
1238   int n = msg->getSize()/sizeof(BinEntry);
1239   nBins = n-1;
1240   bins = (BinEntry *)msg->getData();
1241   nTracedPEs = (int)bins[n-1].time();
1242   // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1244   write();
1246   delete msg;
1248   CkExit();
1251 void TraceSummaryBOC::write(void) 
1253   unsigned int j;
1255   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1256   sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1257   FILE *sumfp = fopen(fname, "w+");
1258   //CmiPrintf("File: %s \n", fname);
1259   if(sumfp == 0)
1260       CmiAbort("Cannot open summary sts file for writing.\n");
1261   delete[] fname;
1263   int _numEntries=_entryTable.size();
1264   fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1265   fprintf(sumfp, "\n");
1267   // write bin time
1268 #if 0
1269   int last=pool[0].getU();
1270   writeU(fp, last);
1271   int count=1;
1272   for(j=1; j<numEntries; j++) {
1273     int u = pool[j].getU();
1274     if (last == u) {
1275       count++;
1276     }
1277     else {
1278       if (count > 1) fprintf(fp, "+%d", count);
1279       writeU(fp, u);
1280       last = u;
1281       count = 1;
1282     }
1283   }
1284   if (count > 1) fprintf(fp, "+%d", count);
1285 #else
1286   for(j=0; j<nBins; j++) {
1287     bins[j].time() /= nTracedPEs;
1288     bins[j].write(sumfp);
1289   }
1290 #endif
1291   fprintf(sumfp, "\n");
1292   fclose(sumfp);
1296 extern "C" void CombineSummary()
1298 #if CMK_TRACE_ENABLED
1299   CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1300   if (sumonly) {
1301     CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1302       // pe 0 start the sumonly process
1303     CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1304     sumProxy[0].startSumOnly();
1305   }else if(sumDetail)
1306   {
1307       CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1308       sumProxy.traceSummaryParallelShutdown(-1);
1309   }
1310   else {
1311     _TRACE_BEGIN_EXECUTE_DETAILED(-1, -1, _threadEP,CkMyPe(), 0, NULL, NULL);
1312     CkExit();
1313   }
1314 #else
1315   CkExit();
1316 #endif
1319 void initTraceSummaryBOC()
1321 #ifdef __BIGSIM__
1322   if(BgNodeRank()==0) {
1323 #else
1324   if (CkMyRank() == 0) {
1325 #endif
1326     registerExitFn(CombineSummary);
1327   }
1335 #include "TraceSummary.def.h"
1338 /*@}*/