7 #include "trace-summary.h"
8 #include "trace-summaryBOC.h"
10 #define DEBUGF(x) // CmiPrintf x
17 // 1 minutes of run before it'll fill up:
18 #define DefaultBinCount (1000*60*1)
20 CkpvStaticDeclare(TraceSummary*, _trace);
21 static int _numEvents = 0;
22 #define NUM_DUMMY_EPS 9
23 CkpvDeclare(int, binCount);
24 CkpvDeclare(double, binSize);
25 CkpvDeclare(double, version);
28 CkpvDeclare(int, previouslySentBins);
33 A class that reads/writes a buffer out of different types of data.
35 This class exists because I need to get references to parts of the buffer
36 that have already been used so that I can increment counters inside the buffer.
39 class compressedBuffer {
42 int pos; ///<< byte position just beyond the previously read/written data
49 compressedBuffer(int bytes){
50 buf = (char*)malloc(bytes);
54 compressedBuffer(void *buffer){
59 void init(void *buffer){
64 inline void * currentPtr(){
65 return (void*)(buf+pos);
70 // to resolve unaligned writes causing bus errors, need memcpy
72 memcpy(&v, buf+offset, sizeof(T));
77 void write(T v, int offset){
79 // to resolve unaligned writes causing bus errors, need memcpy
80 memcpy(buf+offset, &v2, sizeof(T));
84 void increment(int offset){
86 temp = read<T>(offset);
88 write<T>(temp, offset);
92 void accumulate(T v, int offset){
94 temp = read<T>(offset);
96 write<T>(temp, offset);
107 template <typename T>
109 T temp = read<T>(pos);
114 template <typename T>
116 T temp = read<T>(pos);
120 template <typename T0, typename T>
123 memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
140 // don't free the buf because the user my have supplied the buffer
149 CkGroupID traceSummaryGID;
150 bool summaryCcsStreaming;
156 For each TraceFoo module, _createTraceFoo() must be defined.
157 This function is called in _createTraces() generated in moduleInit.C
159 void _createTracesummary(char **argv)
161 DEBUGF(("%d createTraceSummary\n", CkMyPe()));
162 CkpvInitialize(TraceSummary*, _trace);
163 CkpvInitialize(int, previouslySentBins);
164 CkpvAccess(previouslySentBins) = 0;
165 CkpvAccess(_trace) = new TraceSummary(argv);
166 CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
167 if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
171 /// function call for starting a phase in trace summary logs
173 void CkSummary_StartPhase(int phase)
175 CkpvAccess(_trace)->startPhase(phase);
179 /// function call for adding an event mark
181 void CkSummary_MarkEvent(int eventType)
183 CkpvAccess(_trace)->addEventType(eventType);
186 static inline void writeU(FILE* fp, int u)
188 fprintf(fp, "%4d", u);
191 PhaseEntry::PhaseEntry()
193 int _numEntries=_entryTable.size();
194 // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
195 nEPs = _numEntries+10;
196 count = new int[nEPs];
197 times = new double[nEPs];
198 maxtimes = new double[nEPs];
199 for (int i=0; i<nEPs; i++) {
206 SumLogPool::~SumLogPool()
211 if (sumDetail) fclose(sdfp);
213 // free memory for mark
215 for (int i=0; i<MAX_MARKS; i++) {
216 for (int j=0; j<events[i].length(); j++)
222 delete[] numExecutions;
225 void SumLogPool::addEventType(int eventType, double time)
227 if (eventType <0 || eventType >= MAX_MARKS) {
228 CkPrintf("Invalid event type %d!\n", eventType);
231 MarkEntry *e = new MarkEntry;
233 events[eventType].push_back(e);
237 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES)
239 // TBD: Can this be moved to initMem?
241 poolSize = CkpvAccess(binCount);
242 if (poolSize % 2) poolSize++; // make sure it is even
243 pool = new BinEntry[poolSize];
246 this->pgm = new char[strlen(pgm)+1];
247 strcpy(this->pgm,pgm);
250 // create the sts file
253 new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
254 sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
255 stsfp = fopen(fname, "w+");
256 //CmiPrintf("File: %s \n", fname);
258 CmiAbort("Cannot open summary sts file for writing.\n");
268 void SumLogPool::initMem()
270 int _numEntries=_entryTable.size();
271 epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
272 epInfo = new SumEntryInfo[epInfoSize];
276 numExecutions = NULL;
278 cpuTime = new double[poolSize*epInfoSize];
280 memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
281 numExecutions = new int[poolSize*epInfoSize];
282 _MEMCHECK(numExecutions);
283 memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
286 // for(i=0; i<poolSize; i++) {
287 // for(e=0; e< epInfoSize; e++) {
288 // setCPUtime(i,e,0.0);
289 // setNumExecutions(i,e,0);
295 int SumLogPool::getUtilization(int interval, int ep) {
296 return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize));
299 void SumLogPool::write(void)
303 int _numEntries=_entryTable.size();
309 // CmiPrintf("TRACE: %s:%d\n", fname, errno);
312 sprintf(pestr, "%d", CkMyPe());
313 int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
314 char *fname = new char[len+1];
316 sprintf(fname, "%s.%s.sum", pgm, pestr);
318 fp = fopen(fname, "w+");
319 } while (!fp && errno == EINTR);
321 CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
322 CmiAbort("Cannot open Summary Trace File for writing...\n");
326 sprintf(fname, "%s.%s.sumd", pgm, pestr);
328 sdfp = fopen(fname, "w+");
329 } while (!sdfp && errno == EINTR);
331 CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
337 fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
338 if (CkpvAccess(version)>=3.0)
340 fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
346 int last=pool[0].getU();
349 for(j=1; j<numBins; j++) {
350 int u = pool[j].getU();
355 if (count > 1) fprintf(fp, "+%d", count);
361 if (count > 1) fprintf(fp, "+%d", count);
363 for(j=0; j<numEntries; j++)
368 // write entry execution time
369 fprintf(fp, "EPExeTime: ");
370 for (i=0; i<_numEntries; i++)
371 fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
373 // write entry function call times
374 fprintf(fp, "EPCallTime: ");
375 for (i=0; i<_numEntries; i++)
376 fprintf(fp, "%d ", epInfo[i].epCount);
378 // write max entry function execute times
379 fprintf(fp, "MaxEPTime: ");
380 for (i=0; i<_numEntries; i++)
381 fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
384 for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
385 for (j=0; j<_numEntries; j++)
386 fprintf(fp, "%d ", epInfo[j].hist[i]);
391 if (CkpvAccess(version)>=2.0)
393 fprintf(fp, "NumMarks: %d ", markcount);
394 for (i=0; i<MAX_MARKS; i++) {
395 for(int j=0; j<events[i].length(); j++)
396 fprintf(fp, "%d %f ", i, events[i][j]->time);
401 if (CkpvAccess(version)>=3.0)
406 if (CkpvAccess(version)>=7.1) {
407 fprintf(fp, "IdlePercent: ");
408 int last=pool[0].getUIdle();
411 for(j=1; j<numBins; j++) {
412 int u = pool[j].getUIdle();
417 if (count > 1) fprintf(fp, "+%d", count);
423 if (count > 1) fprintf(fp, "+%d", count);
427 //CkPrintf("writing to detail file:%d %d \n", getNumEntries(), numBins);
428 // write summary details
430 fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
431 CkpvAccess(version), CkMyPe(), CkNumPes(),
432 numBins, _numEntries, CkpvAccess(binSize));
434 // Write out cpuTime in microseconds
435 // Run length encoding (RLE) along EP axis
436 fprintf(sdfp, "ExeTimePerEPperInterval ");
438 long last= (long) (getCPUtime(0,0)*1.0e6);
440 fprintf(sdfp, "%ld", last);
441 for(e=0; e<_numEntries; e++) {
442 for(i=0; i<numBins; i++) {
444 long u= (long) (getCPUtime(i,e)*1.0e6);
449 if (count > 1) fprintf(sdfp, "+%d", count);
450 fprintf(sdfp, " %ld", u);
456 if (count > 1) fprintf(sdfp, "+%d", count);
458 // Write out numExecutions
459 // Run length encoding (RLE) along EP axis
460 fprintf(sdfp, "EPCallTimePerInterval ");
461 last= getNumExecutions(0,0);
463 fprintf(sdfp, "%ld", last);
464 for(e=0; e<_numEntries; e++) {
465 for(i=0; i<numBins; i++) {
467 long u= getNumExecutions(i, e);
472 if (count > 1) fprintf(sdfp, "+%d", count);
473 fprintf(sdfp, " %ld", u);
479 if (count > 1) fprintf(sdfp, "+%d", count);
484 void SumLogPool::writeSts(void)
488 new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
489 sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
490 stsfp = fopen(fname, "w+");
491 //CmiPrintf("File: %s \n", fname);
493 CmiAbort("Cannot open summary sts file for writing.\n");
497 traceWriteSTS(stsfp,_numEvents);
498 for(int i=0;i<_numEvents;i++)
499 fprintf(stsfp, "EVENT %d Event%d\n", i, i);
500 fprintf(stsfp, "END\n");
505 // Called once per interval
506 void SumLogPool::add(double time, double idleTime, int pe)
508 new (&pool[numBins++]) BinEntry(time, idleTime);
509 if (poolSize==numBins) {
514 // Called once per run of an EP
515 // adds 'time' to EP's time, increments epCount
516 void SumLogPool::setEp(int epidx, double time)
518 if (epidx >= epInfoSize) {
519 CmiAbort("Invalid entry point!!\n");
521 //CmiPrintf("set EP: %d %e \n", epidx, time);
522 epInfo[epidx].setTime(time);
523 // set phase table counter
524 phaseTab.setEp(epidx, time);
527 // Called once from endExecute, endPack, etc. this function updates
528 // the sumDetail intervals.
529 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
531 if (epIdx >= epInfoSize) {
532 CmiAbort("Too many entry points!!\n");
535 double binSz = CkpvAccess(binSize);
536 int startingBinIdx, endingBinIdx;
537 startingBinIdx = (int)(startTime/binSz);
538 endingBinIdx = (int)(endTime/binSz);
540 while (endingBinIdx >= poolSize) {
542 CmiAssert(CkpvAccess(binSize) > binSz);
543 binSz = CkpvAccess(binSize);
544 startingBinIdx = (int)(startTime/binSz);
545 endingBinIdx = (int)(endTime/binSz);
548 if (startingBinIdx == endingBinIdx) {
549 addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
550 } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
551 addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
552 while(++startingBinIdx < endingBinIdx)
553 addToCPUtime(startingBinIdx, epIdx, binSz);
554 addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
556 CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
558 CmiAbort("Error: end time of EP is less than start time\n");
561 incNumExecutions(startingBinIdx, epIdx);
564 // Shrinks pool[], cpuTime[], and numExecutions[]
565 void SumLogPool::shrink(void)
567 // double t = CmiWallTimer();
569 // We ensured earlier that poolSize is even; therefore now numBins
570 // == poolSize == even.
571 int entries = numBins/2;
572 for (int i=0; i<entries; i++)
574 pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
575 pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
577 for (int e=0; e < epInfoSize; e++) {
578 setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
579 setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
582 // zero out the remaining intervals
584 memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
585 memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
588 CkpvAccess(binSize) *= 2;
590 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
593 void SumLogPool::shrink(double _maxBinSize)
595 while(CkpvAccess(binSize) < _maxBinSize)
602 return (int)(_time * 100.0 / CkpvAccess(binSize));
605 int BinEntry::getUIdle() {
606 return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
609 void BinEntry::write(FILE* fp)
614 TraceSummary::TraceSummary(char **argv):binStart(0.0),idleStart(0.0),
615 binTime(0.0),binIdle(0.0),msgNum(0)
617 if (CkpvAccess(traceOnPe) == 0) return;
620 if (CmiTimerAbsolute()) binStart = CmiInitTime();
622 CkpvInitialize(int, binCount);
623 CkpvInitialize(double, binSize);
624 CkpvInitialize(double, version);
625 CkpvAccess(binSize) = BIN_SIZE;
626 CkpvAccess(version) = VER;
627 CkpvAccess(binCount) = DefaultBinCount;
628 if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
630 CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
631 CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
632 "CPU usage log time resolution");
633 CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
634 "Write this .sum file version");
637 CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
638 "Execution time histogram lower bound");
640 CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
641 "Execution time histogram bin size");
643 sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
644 // +sumonly overrides +sumDetail
646 sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
648 _logPool = new SumLogPool(CkpvAccess(traceRoot));
649 // assume invalid entry point on start
656 void TraceSummary::traceClearEps(void)
658 _logPool->clearEps();
661 void TraceSummary::traceWriteSts(void)
664 _logPool->writeSts();
667 void TraceSummary::traceClose(void)
670 _logPool->writeSts();
671 CkpvAccess(_trace)->endComputation();
674 CkpvAccess(_traces)->removeTrace(this);
677 void TraceSummary::beginExecute(CmiObjId *tid)
679 beginExecute(-1,-1,_threadEP,-1);
682 void TraceSummary::beginExecute(envelope *e, void *obj)
684 // no message means thread execution
686 beginExecute(-1,-1,_threadEP,-1);
689 beginExecute(-1,-1,e->getEpIdx(),-1);
693 void TraceSummary::beginExecute(char *msg)
695 #if CMK_SMP_TRACE_COMMTHREAD
696 //This function is called from comm thread in SMP mode
697 envelope *e = (envelope *)msg;
698 int num = _entryTable.size();
699 int ep = e->getEpIdx();
700 if(ep<0 || ep>=num) return;
701 if(_entryTable[ep]->traceEnabled)
702 beginExecute(-1,-1,e->getEpIdx(),-1);
706 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx, void *obj)
708 if (execEp == TRACEON_EP) {
711 CmiAssert(inIdle == 0);
713 CmiAssert(depth == 0);
717 // printf("BEGIN exec: %d %d %d\n", inIdle, inExec, depth);
719 if (depth > 1) return; // nested
722 if (execEp != INVALIDEP) {
723 TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
729 double t = TraceTimer();
730 //CmiPrintf("start: %f \n", start);
733 double ts = binStart;
735 while ((ts = ts + CkpvAccess(binSize)) < t) {
736 /* Keep as a template for error checking. The current form of this check
737 is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
738 I used it). Perhaps an improved form could be used in case vastly
739 incompatible EP vs idle times are reported (binSize*2?).
741 This check will have to be duplicated before each call to add()
743 CkPrintf("[%d] %f vs %f\n", CkMyPe(),
744 binTime + binIdle, CkpvAccess(binSize));
745 CkAssert(binTime + binIdle <= CkpvAccess(binSize));
747 _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
748 binTime=0.0; // fill all other bins with 0 up till start
754 void TraceSummary::endExecute()
756 CmiAssert(inIdle == 0 && inExec == 1);
758 if (depth == 0) inExec = 0;
759 CmiAssert(depth >= 0);
760 // printf("END exec: %d %d %d\n", inIdle, inExec, depth);
762 if (depth != 0) return;
764 double t = TraceTimer();
766 double nts = binStart;
769 if (execEp == TRACEON_EP) {
770 // if trace just got turned on, then one expects to see this
771 // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
776 if (execEp == INVALIDEP) {
777 TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
783 _logPool->setEp(execEp, t-ts);
786 while ((nts = nts + CkpvAccess(binSize)) < t)
788 // fill the bins with time for this entry method
791 // This calls shrink() if needed
792 _logPool->add(binTime, binIdle, CkMyPe());
799 if (sumDetail && execEp >= 0 )
800 _logPool->updateSummaryDetail(execEp, start, t);
805 void TraceSummary::endExecute(char *msg){
806 #if CMK_SMP_TRACE_COMMTHREAD
807 //This function is called from comm thread in SMP mode
808 envelope *e = (envelope *)msg;
809 int num = _entryTable.size();
810 int ep = e->getEpIdx();
811 if(ep<0 || ep>=num) return;
812 if(_entryTable[ep]->traceEnabled){
818 void TraceSummary::beginIdle(double currT)
820 if (execEp == TRACEON_EP) {
824 CmiAssert(inIdle == 0 && inExec == 0);
826 //printf("BEGIN idle: %d %d %d\n", inIdle, inExec, depth);
828 double t = TraceTimer(currT);
830 // mark the time of this idle period. Only the next endIdle should see
833 double ts = binStart;
835 while ((ts = ts + CkpvAccess(binSize)) < t) {
836 _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
837 binTime=0.0; // fill all other bins with 0 up till start
843 void TraceSummary::endIdle(double currT)
845 CmiAssert(inIdle == 1 && inExec == 0);
847 // printf("END idle: %d %d %d\n", inIdle, inExec, depth);
849 double t = TraceTimer(currT);
850 double t_idleStart = idleStart;
851 double t_binStart = binStart;
853 while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
855 // fill the bins with time for idle
856 binIdle += t_binStart - t_idleStart;
857 binStart = t_binStart;
858 _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
861 t_idleStart = t_binStart;
863 binIdle += t - t_idleStart;
866 void TraceSummary::traceBegin(void)
868 // fake as a start of an event, assuming traceBegin is called inside an
870 beginExecute(-1, -1, TRACEON_EP, -1, -1);
873 void TraceSummary::traceEnd(void)
878 void TraceSummary::beginPack(void)
880 packstart = CmiWallTimer();
883 void TraceSummary::endPack(void)
885 _logPool->setEp(_packEP, CmiWallTimer() - packstart);
887 _logPool->updateSummaryDetail(_packEP, TraceTimer(packstart), TraceTimer(CmiWallTimer()));
890 void TraceSummary::beginUnpack(void)
892 unpackstart = CmiWallTimer();
895 void TraceSummary::endUnpack(void)
897 _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
899 _logPool->updateSummaryDetail(_unpackEP, TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
902 void TraceSummary::beginComputation(void)
904 // initialze arrays because now the number of entries is known.
908 void TraceSummary::endComputation(void)
914 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
915 _logPool->add(binTime, binIdle, CkMyPe());
920 binStart += CkpvAccess(binSize);
921 double t = TraceTimer();
922 double ts = binStart;
925 _logPool->add(binTime, binIdle, CkMyPe());
928 ts += CkpvAccess(binSize);
934 void TraceSummary::addEventType(int eventType)
936 _logPool->addEventType(eventType, TraceTimer());
939 void TraceSummary::startPhase(int phase)
941 _logPool->startPhase(phase);
944 void TraceSummary::traceEnableCCS() {
945 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
950 void TraceSummary::fillData(double *buffer, double reqStartTime,
951 double reqBinSize, int reqNumBins) {
952 // buffer has to be pre-allocated by the requester and must be an array of
955 // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
956 // need a number of these assumptions dropped:
957 // 1) reqBinSize == binSize (unrealistic)
958 // 2) bins boundary aligned (ok even under normal circumstances)
959 // 3) bins are "factor"-aligned (where reqBinSize != binSize)
960 // 4) bins are always available (true unless flush)
961 // 5) bins always starts from 0 (unrealistic)
963 // works only because of 1)
964 // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with
965 // "starting" at all!
966 int binOffset = (int)(reqStartTime/reqBinSize);
967 for (int i=binOffset; i<binOffset + reqNumBins; i++) {
968 // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
969 buffer[i-binOffset] = pool()->getTime(i);
973 void TraceSummaryBOC::traceSummaryParallelShutdown(int pe) {
975 UInt numBins = CkpvAccess(_trace)->pool()->getNumEntries();
976 //CkPrintf("trace shut down pe=%d bincount=%d\n", CkMyPe(), numBins);
977 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
978 CkCallback cb(CkIndex_TraceSummaryBOC::maxBinSize(NULL), sumProxy[0]);
979 contribute(sizeof(double), &(CkpvAccess(binSize)), CkReduction::max_double, cb);
982 // collect the max bin size
983 void TraceSummaryBOC::maxBinSize(CkReductionMsg *msg)
985 double _maxBinSize = *((double *)msg->getData());
986 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
987 sumProxy.shrink(_maxBinSize);
990 void TraceSummaryBOC::shrink(double _mBin){
991 UInt numBins = CkpvAccess(_trace)->pool()->getNumEntries();
992 UInt epNums = CkpvAccess(_trace)->pool()->getEpInfoSize();
994 if(CkpvAccess(binSize) < _maxBinSize)
996 CkpvAccess(_trace)->pool()->shrink(_maxBinSize);
998 double *sumData = CkpvAccess(_trace)->pool()->getCpuTime();
999 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1000 CkCallback cb(CkIndex_TraceSummaryBOC::sumData(NULL), sumProxy[0]);
1001 contribute(sizeof(double) * numBins * epNums, CkpvAccess(_trace)->pool()->getCpuTime(), CkReduction::sum_double, cb);
1004 void TraceSummaryBOC::sumData(CkReductionMsg *msg) {
1005 double *sumData = (double *)msg->getData();
1006 int totalsize = msg->getSize()/sizeof(double);
1007 UInt epNums = CkpvAccess(_trace)->pool()->getEpInfoSize();
1008 UInt numBins = totalsize/epNums;
1009 int numEntries = epNums - NUM_DUMMY_EPS - 1;
1010 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sumall")+1];
1011 sprintf(fname, "%s.sumall", CkpvAccess(traceRoot));
1012 FILE *sumfp = fopen(fname, "w+");
1014 fprintf(sumfp, "ver:%3.1f cpu:%d numIntervals:%d numEPs:%d intervalSize:%e\n",
1015 CkpvAccess(version), CkNumPes(),
1016 numBins, numEntries, _maxBinSize);
1017 for(int i=0; i<numBins; i++){
1018 for(int j=0; j<numEntries; j++)
1020 fprintf(sumfp, "%ld ", (long)(sumData[i*epNums+j]*1.0e6));
1024 //CkPrintf("done with analysis\n");
1028 /// for TraceSummaryBOC
1030 void TraceSummaryBOC::initCCS() {
1032 CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
1033 // initializing CCS-based parameters on all processors
1034 lastRequestedIndexBlock = 0;
1035 indicesPerBlock = 1000;
1036 collectionGranularity = 0.001; // time in seconds
1039 // initialize buffer, register CCS handler and start the collection
1040 // pulse only on pe 0.
1041 if (CkMyPe() == 0) {
1042 ccsBufferedData = new CkVec<double>();
1044 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1045 CkPrintf("Trace Summary now listening in for CCS Client\n");
1046 CcsRegisterHandler("CkPerfSummaryCcsClientCB",
1047 CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
1048 CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar",
1049 CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0]));
1051 CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
1052 CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
1054 summaryCcsStreaming = true;
1060 /** Return summary information as double precision values for each sample period.
1061 The actual data collection is in double precision values.
1063 The units on the returned values are total execution time across all PEs.
1065 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
1068 CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
1070 CkPrintf("Responding ...\n");
1072 // if we have no data to send, send an acknowledgement code of -13.37
1074 if (ccsBufferedData->length() == 0) {
1075 sendBuffer = new double[1];
1076 sendBuffer[0] = -13.37;
1077 datalength = sizeof(double);
1078 CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1079 delete [] sendBuffer;
1081 sendBuffer = ccsBufferedData->getVec();
1082 datalength = ccsBufferedData->length()*sizeof(double);
1083 CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1084 ccsBufferedData->free();
1086 CkPrintf("Response Sent. Proceeding with computation.\n");
1091 /** Return summary information as unsigned char values for each sample period.
1092 The actual data collection is in double precision values.
1094 This returns the utilization in a range from 0 to 200.
1096 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
1097 unsigned char *sendBuffer;
1099 CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
1101 CkPrintf("Responding ...\n");
1104 if (ccsBufferedData->length() == 0) {
1105 sendBuffer = new unsigned char[1];
1106 sendBuffer[0] = 255;
1107 datalength = sizeof(unsigned char);
1108 CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1109 delete [] sendBuffer;
1111 double * doubleData = ccsBufferedData->getVec();
1112 int numData = ccsBufferedData->length();
1114 // pack data into unsigned char array
1115 sendBuffer = new unsigned char[numData];
1117 for(int i=0;i<numData;i++){
1118 sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1121 datalength = sizeof(unsigned char) * numData;
1123 CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1124 ccsBufferedData->free();
1125 delete [] sendBuffer;
1127 CkPrintf("Response Sent. Proceeding with computation.\n");
1133 void startCollectData(void *data, double currT) {
1134 CkAssert(CkMyPe() == 0);
1135 // CkPrintf("startCollectData()\n");
1136 TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1137 int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1138 double collectionGranularity = sumObj->collectionGranularity;
1139 int indicesPerBlock = sumObj->indicesPerBlock;
1141 double startTime = lastRequestedIndexBlock*
1142 collectionGranularity * indicesPerBlock;
1143 int numIndicesToGet = (int)floor((currT - startTime)/
1144 collectionGranularity);
1145 int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1146 // **TODO** consider limiting the total number of blocks each collection
1147 // request will pick up. This is to limit the amount of perturbation
1148 // if it proves to be a problem.
1149 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1151 sumProxy.collectSummaryData(startTime,
1152 collectionGranularity,
1153 numBlocksToGet*indicesPerBlock);
1155 sumObj->lastRequestedIndexBlock += numBlocksToGet;
1158 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1160 // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1162 double *contribution = new double[numBins];
1163 for (int i=0; i<numBins; i++) {
1164 contribution[i] = 0.0;
1166 CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1169 for (int i=0; i<numBins; i++) {
1170 CkPrintf("[%d] %f\n", i, contribution[i]);
1174 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1175 CkCallback cb(CkIndex_TraceSummaryBOC::summaryDataCollected(NULL), sumProxy[0]);
1176 contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double,
1178 delete [] contribution;
1181 void TraceSummaryBOC::summaryDataCollected(CkReductionMsg *msg) {
1182 CkAssert(CkMyPe() == 0);
1183 // **CWL** No memory management for the ccs buffer for now.
1185 // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1186 double *recvData = (double *)msg->getData();
1187 int numBins = msg->getSize()/sizeof(double);
1189 // if there's an easier way to append a data block to a CkVec, I'll take it
1190 for (int i=0; i<numBins; i++) {
1191 ccsBufferedData->insertAtEnd(recvData[i]);
1199 void TraceSummaryBOC::startSumOnly()
1201 CmiAssert(CkMyPe() == 0);
1203 CProxy_TraceSummaryBOC p(traceSummaryGID);
1204 int size = CkpvAccess(_trace)->pool()->getNumEntries();
1208 void TraceSummaryBOC::askSummary(int size)
1210 if (CkpvAccess(_trace) == NULL) return;
1212 int traced = CkpvAccess(_trace)->traceOnPE();
1214 BinEntry *reductionBuffer = new BinEntry[size+1];
1215 reductionBuffer[size].time() = traced; // last element is the traced pe count
1216 reductionBuffer[size].getIdleTime() = 0; // last element is the traced pe count
1218 CkpvAccess(_trace)->endComputation();
1219 int n = CkpvAccess(_trace)->pool()->getNumEntries();
1220 BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1222 for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1225 contribute(sizeof(BinEntry)*(size+1), reductionBuffer,
1226 CkReduction::sum_double);
1227 delete [] reductionBuffer;
1230 //extern "C" void _CkExit();
1232 void TraceSummaryBOC::sendSummaryBOC(CkReductionMsg *msg)
1234 if (CkpvAccess(_trace) == NULL) return;
1236 CkAssert(CkMyPe() == 0);
1238 int n = msg->getSize()/sizeof(BinEntry);
1240 bins = (BinEntry *)msg->getData();
1241 nTracedPEs = (int)bins[n-1].time();
1242 // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1251 void TraceSummaryBOC::write(void)
1255 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1256 sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1257 FILE *sumfp = fopen(fname, "w+");
1258 //CmiPrintf("File: %s \n", fname);
1260 CmiAbort("Cannot open summary sts file for writing.\n");
1263 int _numEntries=_entryTable.size();
1264 fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1265 fprintf(sumfp, "\n");
1269 int last=pool[0].getU();
1272 for(j=1; j<numEntries; j++) {
1273 int u = pool[j].getU();
1278 if (count > 1) fprintf(fp, "+%d", count);
1284 if (count > 1) fprintf(fp, "+%d", count);
1286 for(j=0; j<nBins; j++) {
1287 bins[j].time() /= nTracedPEs;
1288 bins[j].write(sumfp);
1291 fprintf(sumfp, "\n");
1296 extern "C" void CombineSummary()
1298 #if CMK_TRACE_ENABLED
1299 CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1301 CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1302 // pe 0 start the sumonly process
1303 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1304 sumProxy[0].startSumOnly();
1307 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1308 sumProxy.traceSummaryParallelShutdown(-1);
1311 _TRACE_BEGIN_EXECUTE_DETAILED(-1, -1, _threadEP,CkMyPe(), 0, NULL, NULL);
1319 void initTraceSummaryBOC()
1322 if(BgNodeRank()==0) {
1324 if (CkMyRank() == 0) {
1326 registerExitFn(CombineSummary);
1335 #include "TraceSummary.def.h"