7 #include "trace-summary.h"
8 #include "trace-summaryBOC.h"
10 #define DEBUGF(x) // CmiPrintf x
17 // 1 minutes of run before it'll fill up:
18 #define DefaultBinCount (1000*60*1)
20 CkpvStaticDeclare(TraceSummary*, _trace);
21 static int _numEvents = 0;
22 #define NUM_DUMMY_EPS 9
23 CkpvDeclare(int, binCount);
24 CkpvDeclare(double, binSize);
25 CkpvDeclare(double, version);
28 CkpvDeclare(int, previouslySentBins);
33 A class that reads/writes a buffer out of different types of data.
35 This class exists because I need to get references to parts of the buffer
36 that have already been used so that I can increment counters inside the buffer.
39 class compressedBuffer {
42 int pos; ///<< byte position just beyond the previously read/written data
49 compressedBuffer(int bytes){
50 buf = (char*)malloc(bytes);
54 compressedBuffer(void *buffer){
59 void init(void *buffer){
64 inline void * currentPtr(){
65 return (void*)(buf+pos);
70 // to resolve unaligned writes causing bus errors, need memcpy
72 memcpy(&v, buf+offset, sizeof(T));
77 void write(T v, int offset){
79 // to resolve unaligned writes causing bus errors, need memcpy
80 memcpy(buf+offset, &v2, sizeof(T));
84 void increment(int offset){
86 temp = read<T>(offset);
88 write<T>(temp, offset);
92 void accumulate(T v, int offset){
94 temp = read<T>(offset);
96 write<T>(temp, offset);
107 template <typename T>
109 T temp = read<T>(pos);
114 template <typename T>
116 T temp = read<T>(pos);
120 template <typename T0, typename T>
123 memcpy(&temp, buf+pos+sizeof(T0), sizeof(T));
140 // don't free the buf because the user my have supplied the buffer
149 CkGroupID traceSummaryGID;
150 bool summaryCcsStreaming;
156 For each TraceFoo module, _createTraceFoo() must be defined.
157 This function is called in _createTraces() generated in moduleInit.C
159 void _createTracesummary(char **argv)
161 DEBUGF(("%d createTraceSummary\n", CkMyPe()));
162 CkpvInitialize(TraceSummary*, _trace);
163 CkpvInitialize(int, previouslySentBins);
164 CkpvAccess(previouslySentBins) = 0;
165 CkpvAccess(_trace) = new TraceSummary(argv);
166 CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
167 if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Summary enabled.\n");
171 /// function call for starting a phase in trace summary logs
172 void CkSummary_StartPhase(int phase)
174 CkpvAccess(_trace)->startPhase(phase);
178 /// function call for adding an event mark
179 void CkSummary_MarkEvent(int eventType)
181 CkpvAccess(_trace)->addEventType(eventType);
184 static inline void writeU(FILE* fp, int u)
186 fprintf(fp, "%4d", u);
189 PhaseEntry::PhaseEntry()
191 int _numEntries=_entryTable.size();
192 // FIXME: Hopes there won't be more than 10 more EP's registered from now on...
193 nEPs = _numEntries+10;
194 count = new int[nEPs];
195 times = new double[nEPs];
196 maxtimes = new double[nEPs];
197 for (int i=0; i<nEPs; i++) {
204 SumLogPool::~SumLogPool()
209 if (sumDetail) fclose(sdfp);
211 // free memory for mark
213 for (int i=0; i<MAX_MARKS; i++) {
214 for (int j=0; j<events[i].length(); j++)
220 delete[] numExecutions;
223 void SumLogPool::addEventType(int eventType, double time)
225 if (eventType <0 || eventType >= MAX_MARKS) {
226 CkPrintf("Invalid event type %d!\n", eventType);
229 MarkEntry *e = new MarkEntry;
231 events[eventType].push_back(e);
235 SumLogPool::SumLogPool(char *pgm) : numBins(0), phaseTab(MAX_PHASES)
237 // TBD: Can this be moved to initMem?
239 poolSize = CkpvAccess(binCount);
240 if (poolSize % 2) poolSize++; // make sure it is even
241 pool = new BinEntry[poolSize];
244 this->pgm = new char[strlen(pgm)+1];
245 strcpy(this->pgm,pgm);
248 // create the sts file
251 new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
252 sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
253 stsfp = fopen(fname, "w+");
254 //CmiPrintf("File: %s \n", fname);
256 CmiAbort("Cannot open summary sts file for writing.\n");
266 void SumLogPool::initMem()
268 int _numEntries=_entryTable.size();
269 epInfoSize = _numEntries + NUM_DUMMY_EPS + 1; // keep a spare EP
270 epInfo = new SumEntryInfo[epInfoSize];
274 numExecutions = NULL;
276 cpuTime = new double[poolSize*epInfoSize];
278 memset(cpuTime, 0, poolSize*epInfoSize*sizeof(double));
279 numExecutions = new int[poolSize*epInfoSize];
280 _MEMCHECK(numExecutions);
281 memset(numExecutions, 0, poolSize*epInfoSize*sizeof(int));
284 // for(i=0; i<poolSize; i++) {
285 // for(e=0; e< epInfoSize; e++) {
286 // setCPUtime(i,e,0.0);
287 // setNumExecutions(i,e,0);
293 int SumLogPool::getUtilization(int interval, int ep) {
294 return (int)(getCPUtime(interval, ep) * 100.0 / CkpvAccess(binSize));
297 void SumLogPool::write(void)
301 int _numEntries=_entryTable.size();
307 // CmiPrintf("TRACE: %s:%d\n", fname, errno);
310 sprintf(pestr, "%d", CkMyPe());
311 int len = strlen(pgm) + strlen(".sumd.") + strlen(pestr) + 1;
312 char *fname = new char[len+1];
314 sprintf(fname, "%s.%s.sum", pgm, pestr);
316 fp = fopen(fname, "w+");
317 } while (!fp && errno == EINTR);
319 CkPrintf("[%d] Attempting to open [%s]\n",CkMyPe(),fname);
320 CmiAbort("Cannot open Summary Trace File for writing...\n");
324 sprintf(fname, "%s.%s.sumd", pgm, pestr);
326 sdfp = fopen(fname, "w+");
327 } while (!sdfp && errno == EINTR);
329 CmiAbort("Cannot open Detailed Summary Trace File for writing...\n");
335 fprintf(fp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e", CkpvAccess(version), CkMyPe(), CkNumPes(), numBins, _numEntries, CkpvAccess(binSize));
336 if (CkpvAccess(version)>=3.0)
338 fprintf(fp, " phases:%d", phaseTab.numPhasesCalled());
344 int last=pool[0].getU();
347 for(j=1; j<numBins; j++) {
348 int u = pool[j].getU();
353 if (count > 1) fprintf(fp, "+%d", count);
359 if (count > 1) fprintf(fp, "+%d", count);
361 for(j=0; j<numEntries; j++)
366 // write entry execution time
367 fprintf(fp, "EPExeTime: ");
368 for (i=0; i<_numEntries; i++)
369 fprintf(fp, "%ld ", (long)(epInfo[i].epTime*1.0e6));
371 // write entry function call times
372 fprintf(fp, "EPCallTime: ");
373 for (i=0; i<_numEntries; i++)
374 fprintf(fp, "%d ", epInfo[i].epCount);
376 // write max entry function execute times
377 fprintf(fp, "MaxEPTime: ");
378 for (i=0; i<_numEntries; i++)
379 fprintf(fp, "%ld ", (long)(epInfo[i].epMaxTime*1.0e6));
382 for (i=0; i<SumEntryInfo::HIST_SIZE; i++) {
383 for (j=0; j<_numEntries; j++)
384 fprintf(fp, "%d ", epInfo[j].hist[i]);
389 if (CkpvAccess(version)>=2.0)
391 fprintf(fp, "NumMarks: %d ", markcount);
392 for (i=0; i<MAX_MARKS; i++) {
393 for(int j=0; j<events[i].length(); j++)
394 fprintf(fp, "%d %f ", i, events[i][j]->time);
399 if (CkpvAccess(version)>=3.0)
404 if (CkpvAccess(version)>=7.1) {
405 fprintf(fp, "IdlePercent: ");
406 int last=pool[0].getUIdle();
409 for(j=1; j<numBins; j++) {
410 int u = pool[j].getUIdle();
415 if (count > 1) fprintf(fp, "+%d", count);
421 if (count > 1) fprintf(fp, "+%d", count);
425 //CkPrintf("writing to detail file:%d %d \n", getNumEntries(), numBins);
426 // write summary details
428 fprintf(sdfp, "ver:%3.1f cpu:%d/%d numIntervals:%d numEPs:%d intervalSize:%e\n",
429 CkpvAccess(version), CkMyPe(), CkNumPes(),
430 numBins, _numEntries, CkpvAccess(binSize));
432 // Write out cpuTime in microseconds
433 // Run length encoding (RLE) along EP axis
434 fprintf(sdfp, "ExeTimePerEPperInterval ");
436 long last= (long) (getCPUtime(0,0)*1.0e6);
438 fprintf(sdfp, "%ld", last);
439 for(e=0; e<_numEntries; e++) {
440 for(i=0; i<numBins; i++) {
442 long u= (long) (getCPUtime(i,e)*1.0e6);
447 if (count > 1) fprintf(sdfp, "+%d", count);
448 fprintf(sdfp, " %ld", u);
454 if (count > 1) fprintf(sdfp, "+%d", count);
456 // Write out numExecutions
457 // Run length encoding (RLE) along EP axis
458 fprintf(sdfp, "EPCallTimePerInterval ");
459 last= getNumExecutions(0,0);
461 fprintf(sdfp, "%ld", last);
462 for(e=0; e<_numEntries; e++) {
463 for(i=0; i<numBins; i++) {
465 long u= getNumExecutions(i, e);
470 if (count > 1) fprintf(sdfp, "+%d", count);
471 fprintf(sdfp, " %ld", u);
477 if (count > 1) fprintf(sdfp, "+%d", count);
482 void SumLogPool::writeSts(void)
486 new char[strlen(CkpvAccess(traceRoot))+strlen(".sum.sts")+1];
487 sprintf(fname, "%s.sum.sts", CkpvAccess(traceRoot));
488 stsfp = fopen(fname, "w+");
489 //CmiPrintf("File: %s \n", fname);
491 CmiAbort("Cannot open summary sts file for writing.\n");
495 traceWriteSTS(stsfp,_numEvents);
496 for(int i=0;i<_numEvents;i++)
497 fprintf(stsfp, "EVENT %d Event%d\n", i, i);
498 fprintf(stsfp, "END\n");
503 // Called once per interval
504 void SumLogPool::add(double time, double idleTime, int pe)
506 new (&pool[numBins++]) BinEntry(time, idleTime);
507 if (poolSize==numBins) {
512 // Called once per run of an EP
513 // adds 'time' to EP's time, increments epCount
514 void SumLogPool::setEp(int epidx, double time)
516 if (epidx >= epInfoSize) {
517 CmiAbort("Invalid entry point!!\n");
519 //CmiPrintf("set EP: %d %e \n", epidx, time);
520 epInfo[epidx].setTime(time);
521 // set phase table counter
522 phaseTab.setEp(epidx, time);
525 // Called once from endExecute, endPack, etc. this function updates
526 // the sumDetail intervals.
527 void SumLogPool::updateSummaryDetail(int epIdx, double startTime, double endTime)
529 if (epIdx >= epInfoSize) {
530 CmiAbort("Too many entry points!!\n");
533 double binSz = CkpvAccess(binSize);
534 int startingBinIdx, endingBinIdx;
535 startingBinIdx = (int)(startTime/binSz);
536 endingBinIdx = (int)(endTime/binSz);
538 while (endingBinIdx >= poolSize) {
540 CmiAssert(CkpvAccess(binSize) > binSz);
541 binSz = CkpvAccess(binSize);
542 startingBinIdx = (int)(startTime/binSz);
543 endingBinIdx = (int)(endTime/binSz);
546 if (startingBinIdx == endingBinIdx) {
547 addToCPUtime(startingBinIdx, epIdx, endTime - startTime);
548 } else if (startingBinIdx < endingBinIdx) { // EP spans intervals
549 addToCPUtime(startingBinIdx, epIdx, (startingBinIdx+1)*binSz - startTime);
550 while(++startingBinIdx < endingBinIdx)
551 addToCPUtime(startingBinIdx, epIdx, binSz);
552 addToCPUtime(endingBinIdx, epIdx, endTime - endingBinIdx*binSz);
554 CkPrintf("[%d] EP:%d Start:%lf End:%lf\n",CkMyPe(),epIdx,
556 CmiAbort("Error: end time of EP is less than start time\n");
559 incNumExecutions(startingBinIdx, epIdx);
562 // Shrinks pool[], cpuTime[], and numExecutions[]
563 void SumLogPool::shrink(void)
565 // double t = CmiWallTimer();
567 // We ensured earlier that poolSize is even; therefore now numBins
568 // == poolSize == even.
569 int entries = numBins/2;
570 for (int i=0; i<entries; i++)
572 pool[i].time() = pool[i*2].time() + pool[i*2+1].time();
573 pool[i].getIdleTime() = pool[i*2].getIdleTime() + pool[i*2+1].getIdleTime();
575 for (int e=0; e < epInfoSize; e++) {
576 setCPUtime(i, e, getCPUtime(i*2, e) + getCPUtime(i*2+1, e));
577 setNumExecutions(i, e, getNumExecutions(i*2, e) + getNumExecutions(i*2+1, e));
580 // zero out the remaining intervals
582 memset(&cpuTime[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(double));
583 memset(&numExecutions[entries*epInfoSize], 0, (numBins-entries)*epInfoSize*sizeof(int));
586 CkpvAccess(binSize) *= 2;
588 //CkPrintf("Shrinked binsize: %f entries:%d takes %fs!!!!\n", CkpvAccess(binSize), numEntries, CmiWallTimer()-t);
591 void SumLogPool::shrink(double _maxBinSize)
593 while(CkpvAccess(binSize) < _maxBinSize)
600 return (int)(_time * 100.0 / CkpvAccess(binSize));
603 int BinEntry::getUIdle() {
604 return (int)(_idleTime * 100.0 / CkpvAccess(binSize));
607 void BinEntry::write(FILE* fp)
612 TraceSummary::TraceSummary(char **argv):msgNum(0),binStart(0.0),idleStart(0.0),
613 binTime(0.0),binIdle(0.0)
615 if (CkpvAccess(traceOnPe) == 0) return;
618 if (CmiTimerAbsolute()) binStart = CmiInitTime();
620 CkpvInitialize(int, binCount);
621 CkpvInitialize(double, binSize);
622 CkpvInitialize(double, version);
623 CkpvAccess(binSize) = BIN_SIZE;
624 CkpvAccess(version) = VER;
625 CkpvAccess(binCount) = DefaultBinCount;
626 if (CmiGetArgIntDesc(argv,"+bincount",&CkpvAccess(binCount), "Total number of summary bins"))
628 CmiPrintf("Trace: bincount: %d\n", CkpvAccess(binCount));
629 CmiGetArgDoubleDesc(argv,"+binsize",&CkpvAccess(binSize),
630 "CPU usage log time resolution");
631 CmiGetArgDoubleDesc(argv,"+version",&CkpvAccess(version),
632 "Write this .sum file version");
635 CmiGetArgDoubleDesc(argv,"+epThreshold",&epThreshold,
636 "Execution time histogram lower bound");
638 CmiGetArgDoubleDesc(argv,"+epInterval",&epInterval,
639 "Execution time histogram bin size");
641 sumonly = CmiGetArgFlagDesc(argv, "+sumonly", "merge histogram bins on processor 0");
642 // +sumonly overrides +sumDetail
644 sumDetail = CmiGetArgFlagDesc(argv, "+sumDetail", "more detailed summary info");
646 _logPool = new SumLogPool(CkpvAccess(traceRoot));
647 // assume invalid entry point on start
654 void TraceSummary::traceClearEps(void)
656 _logPool->clearEps();
659 void TraceSummary::traceWriteSts(void)
662 _logPool->writeSts();
665 void TraceSummary::traceClose(void)
668 _logPool->writeSts();
669 CkpvAccess(_trace)->endComputation();
672 CkpvAccess(_traces)->removeTrace(this);
675 void TraceSummary::beginExecute(CmiObjId *tid)
677 beginExecute(-1,-1,_threadEP,-1);
680 void TraceSummary::beginExecute(envelope *e, void *obj)
682 // no message means thread execution
684 beginExecute(-1,-1,_threadEP,-1);
687 beginExecute(-1,-1,e->getEpIdx(),-1);
691 void TraceSummary::beginExecute(char *msg)
693 #if CMK_SMP_TRACE_COMMTHREAD
694 //This function is called from comm thread in SMP mode
695 envelope *e = (envelope *)msg;
696 int num = _entryTable.size();
697 int ep = e->getEpIdx();
698 if(ep<0 || ep>=num) return;
699 if(_entryTable[ep]->traceEnabled)
700 beginExecute(-1,-1,e->getEpIdx(),-1);
704 void TraceSummary::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx, void *obj)
706 if (execEp == TRACEON_EP) {
709 CmiAssert(inIdle == 0);
711 CmiAssert(depth == 0);
715 // printf("BEGIN exec: %d %d %d\n", inIdle, inExec, depth);
717 if (depth > 1) return; // nested
720 if (execEp != INVALIDEP) {
721 TRACE_WARN("Warning: TraceSummary two consecutive BEGIN_PROCESSING!\n");
727 double t = TraceTimer();
728 //CmiPrintf("start: %f \n", start);
731 double ts = binStart;
733 while ((ts = ts + CkpvAccess(binSize)) < t) {
734 /* Keep as a template for error checking. The current form of this check
735 is vulnerable to round-off errors (eg. 0.001 vs 0.001 the first time
736 I used it). Perhaps an improved form could be used in case vastly
737 incompatible EP vs idle times are reported (binSize*2?).
739 This check will have to be duplicated before each call to add()
741 CkPrintf("[%d] %f vs %f\n", CkMyPe(),
742 binTime + binIdle, CkpvAccess(binSize));
743 CkAssert(binTime + binIdle <= CkpvAccess(binSize));
745 _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
746 binTime=0.0; // fill all other bins with 0 up till start
752 void TraceSummary::endExecute()
754 CmiAssert(inIdle == 0 && inExec == 1);
756 if (depth == 0) inExec = 0;
757 CmiAssert(depth >= 0);
758 // printf("END exec: %d %d %d\n", inIdle, inExec, depth);
760 if (depth != 0) return;
762 double t = TraceTimer();
764 double nts = binStart;
767 if (execEp == TRACEON_EP) {
768 // if trace just got turned on, then one expects to see this
769 // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
774 if (execEp == INVALIDEP) {
775 TRACE_WARN("Warning: TraceSummary END_PROCESSING without BEGIN_PROCESSING!\n");
781 _logPool->setEp(execEp, t-ts);
784 while ((nts = nts + CkpvAccess(binSize)) < t)
786 // fill the bins with time for this entry method
789 // This calls shrink() if needed
790 _logPool->add(binTime, binIdle, CkMyPe());
797 if (sumDetail && execEp >= 0 )
798 _logPool->updateSummaryDetail(execEp, start, t);
803 void TraceSummary::endExecute(char *msg){
804 #if CMK_SMP_TRACE_COMMTHREAD
805 //This function is called from comm thread in SMP mode
806 envelope *e = (envelope *)msg;
807 int num = _entryTable.size();
808 int ep = e->getEpIdx();
809 if(ep<0 || ep>=num) return;
810 if(_entryTable[ep]->traceEnabled){
816 void TraceSummary::beginIdle(double currT)
818 if (execEp == TRACEON_EP) {
822 CmiAssert(inIdle == 0 && inExec == 0);
824 //printf("BEGIN idle: %d %d %d\n", inIdle, inExec, depth);
826 double t = TraceTimer(currT);
828 // mark the time of this idle period. Only the next endIdle should see
831 double ts = binStart;
833 while ((ts = ts + CkpvAccess(binSize)) < t) {
834 _logPool->add(binTime, binIdle, CkMyPe()); // add leftovers of last bin
835 binTime=0.0; // fill all other bins with 0 up till start
841 void TraceSummary::endIdle(double currT)
843 CmiAssert(inIdle == 1 && inExec == 0);
845 // printf("END idle: %d %d %d\n", inIdle, inExec, depth);
847 double t = TraceTimer(currT);
848 double t_idleStart = idleStart;
849 double t_binStart = binStart;
851 while ((t_binStart = t_binStart + CkpvAccess(binSize)) < t)
853 // fill the bins with time for idle
854 binIdle += t_binStart - t_idleStart;
855 binStart = t_binStart;
856 _logPool->add(binTime, binIdle, CkMyPe()); // This calls shrink() if needed
859 t_idleStart = t_binStart;
861 binIdle += t - t_idleStart;
864 void TraceSummary::traceBegin(void)
866 // fake as a start of an event, assuming traceBegin is called inside an
868 beginExecute(-1, -1, TRACEON_EP, -1, -1);
871 void TraceSummary::traceEnd(void)
876 void TraceSummary::beginPack(void)
878 packstart = CmiWallTimer();
881 void TraceSummary::endPack(void)
883 _logPool->setEp(_packEP, CmiWallTimer() - packstart);
885 _logPool->updateSummaryDetail(_packEP, TraceTimer(packstart), TraceTimer(CmiWallTimer()));
888 void TraceSummary::beginUnpack(void)
890 unpackstart = CmiWallTimer();
893 void TraceSummary::endUnpack(void)
895 _logPool->setEp(_unpackEP, CmiWallTimer()-unpackstart);
897 _logPool->updateSummaryDetail(_unpackEP, TraceTimer(unpackstart), TraceTimer(CmiWallTimer()));
900 void TraceSummary::beginComputation(void)
902 // initialze arrays because now the number of entries is known.
906 void TraceSummary::endComputation(void)
912 //CmiPrintf("Add at last: %d pe:%d time:%f msg:%d\n", index, CkMyPe(), bin, msgNum);
913 _logPool->add(binTime, binIdle, CkMyPe());
918 binStart += CkpvAccess(binSize);
919 double t = TraceTimer();
920 double ts = binStart;
923 _logPool->add(binTime, binIdle, CkMyPe());
926 ts += CkpvAccess(binSize);
932 void TraceSummary::addEventType(int eventType)
934 _logPool->addEventType(eventType, TraceTimer());
937 void TraceSummary::startPhase(int phase)
939 _logPool->startPhase(phase);
942 void TraceSummary::traceEnableCCS() {
943 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
948 void TraceSummary::fillData(double *buffer, double reqStartTime,
949 double reqBinSize, int reqNumBins) {
950 // buffer has to be pre-allocated by the requester and must be an array of
953 // Assumptions: **CWL** FOR DEMO ONLY - a production-capable version will
954 // need a number of these assumptions dropped:
955 // 1) reqBinSize == binSize (unrealistic)
956 // 2) bins boundary aligned (ok even under normal circumstances)
957 // 3) bins are "factor"-aligned (where reqBinSize != binSize)
958 // 4) bins are always available (true unless flush)
959 // 5) bins always starts from 0 (unrealistic)
961 // works only because of 1)
962 // **CWL** - FRACKING STUPID NAME "binStart" has nothing to do with
963 // "starting" at all!
964 int binOffset = (int)(reqStartTime/reqBinSize);
965 for (int i=binOffset; i<binOffset + reqNumBins; i++) {
966 // CkPrintf("[%d] %f\n", i, pool()->getTime(i));
967 buffer[i-binOffset] = pool()->getTime(i);
971 void TraceSummaryBOC::traceSummaryParallelShutdown(int pe) {
973 UInt numBins = CkpvAccess(_trace)->pool()->getNumEntries();
974 //CkPrintf("trace shut down pe=%d bincount=%d\n", CkMyPe(), numBins);
975 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
976 CkCallback cb(CkReductionTarget(TraceSummaryBOC, maxBinSize), sumProxy[0]);
977 contribute(sizeof(double), &(CkpvAccess(binSize)), CkReduction::max_double, cb);
980 // collect the max bin size
981 void TraceSummaryBOC::maxBinSize(double _maxBinSize)
983 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
984 sumProxy.shrink(_maxBinSize);
987 void TraceSummaryBOC::shrink(double _mBin){
988 UInt numBins = CkpvAccess(_trace)->pool()->getNumEntries();
989 UInt epNums = CkpvAccess(_trace)->pool()->getEpInfoSize();
991 if(CkpvAccess(binSize) < _maxBinSize)
993 CkpvAccess(_trace)->pool()->shrink(_maxBinSize);
995 double *sumData = CkpvAccess(_trace)->pool()->getCpuTime();
996 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
997 CkCallback cb(CkReductionTarget(TraceSummaryBOC, sumData), sumProxy[0]);
998 contribute(sizeof(double) * numBins * epNums, CkpvAccess(_trace)->pool()->getCpuTime(), CkReduction::sum_double, cb);
1001 void TraceSummaryBOC::sumData(double *sumData, int totalsize) {
1002 UInt epNums = CkpvAccess(_trace)->pool()->getEpInfoSize();
1003 UInt numBins = totalsize/epNums;
1004 int numEntries = epNums - NUM_DUMMY_EPS - 1;
1005 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sumall")+1];
1006 sprintf(fname, "%s.sumall", CkpvAccess(traceRoot));
1007 FILE *sumfp = fopen(fname, "w+");
1009 fprintf(sumfp, "ver:%3.1f cpu:%d numIntervals:%d numEPs:%d intervalSize:%e\n",
1010 CkpvAccess(version), CkNumPes(),
1011 numBins, numEntries, _maxBinSize);
1012 for(int i=0; i<numBins; i++){
1013 for(int j=0; j<numEntries; j++)
1015 fprintf(sumfp, "%ld ", (long)(sumData[i*epNums+j]*1.0e6));
1019 //CkPrintf("done with analysis\n");
1023 /// for TraceSummaryBOC
1025 void TraceSummaryBOC::initCCS() {
1027 CkPrintf("[%d] initCCS() called for first time\n", CkMyPe());
1028 // initializing CCS-based parameters on all processors
1029 lastRequestedIndexBlock = 0;
1030 indicesPerBlock = 1000;
1031 collectionGranularity = 0.001; // time in seconds
1034 // initialize buffer, register CCS handler and start the collection
1035 // pulse only on pe 0.
1036 if (CkMyPe() == 0) {
1037 ccsBufferedData = new CkVec<double>();
1039 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1040 CkPrintf("Trace Summary now listening in for CCS Client\n");
1041 CcsRegisterHandler("CkPerfSummaryCcsClientCB",
1042 CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryDouble(NULL), sumProxy[0]));
1043 CcsRegisterHandler("CkPerfSummaryCcsClientCB uchar",
1044 CkCallback(CkIndex_TraceSummaryBOC::ccsRequestSummaryUnsignedChar(NULL), sumProxy[0]));
1046 CkPrintf("[%d] Setting up periodic startCollectData callback\n", CkMyPe());
1047 CcdCallOnConditionKeep(CcdPERIODIC_1second, startCollectData,
1049 summaryCcsStreaming = true;
1055 /** Return summary information as double precision values for each sample period.
1056 The actual data collection is in double precision values.
1058 The units on the returned values are total execution time across all PEs.
1060 void TraceSummaryBOC::ccsRequestSummaryDouble(CkCcsRequestMsg *m) {
1063 CkPrintf("[%d] Request from Client detected.\n", CkMyPe());
1065 CkPrintf("Responding ...\n");
1067 // if we have no data to send, send an acknowledgement code of -13.37
1069 if (ccsBufferedData->length() == 0) {
1070 sendBuffer = new double[1];
1071 sendBuffer[0] = -13.37;
1072 datalength = sizeof(double);
1073 CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1074 delete [] sendBuffer;
1076 sendBuffer = ccsBufferedData->getVec();
1077 datalength = ccsBufferedData->length()*sizeof(double);
1078 CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1079 ccsBufferedData->free();
1081 CkPrintf("Response Sent. Proceeding with computation.\n");
1086 /** Return summary information as unsigned char values for each sample period.
1087 The actual data collection is in double precision values.
1089 This returns the utilization in a range from 0 to 200.
1091 void TraceSummaryBOC::ccsRequestSummaryUnsignedChar(CkCcsRequestMsg *m) {
1092 unsigned char *sendBuffer;
1094 CkPrintf("[%d] Request from Client detected. \n", CkMyPe());
1096 CkPrintf("Responding ...\n");
1099 if (ccsBufferedData->length() == 0) {
1100 sendBuffer = new unsigned char[1];
1101 sendBuffer[0] = 255;
1102 datalength = sizeof(unsigned char);
1103 CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1104 delete [] sendBuffer;
1106 double * doubleData = ccsBufferedData->getVec();
1107 int numData = ccsBufferedData->length();
1109 // pack data into unsigned char array
1110 sendBuffer = new unsigned char[numData];
1112 for(int i=0;i<numData;i++){
1113 sendBuffer[i] = 1000.0 * doubleData[i] / (double)CkNumPes() * 200.0; // max = 200 is the same as 100% utilization
1116 datalength = sizeof(unsigned char) * numData;
1118 CcsSendDelayedReply(m->reply, datalength, (void *)sendBuffer);
1119 ccsBufferedData->free();
1120 delete [] sendBuffer;
1122 CkPrintf("Response Sent. Proceeding with computation.\n");
1128 void startCollectData(void *data, double currT) {
1129 CkAssert(CkMyPe() == 0);
1130 // CkPrintf("startCollectData()\n");
1131 TraceSummaryBOC *sumObj = (TraceSummaryBOC *)data;
1132 int lastRequestedIndexBlock = sumObj->lastRequestedIndexBlock;
1133 double collectionGranularity = sumObj->collectionGranularity;
1134 int indicesPerBlock = sumObj->indicesPerBlock;
1136 double startTime = lastRequestedIndexBlock*
1137 collectionGranularity * indicesPerBlock;
1138 int numIndicesToGet = (int)floor((currT - startTime)/
1139 collectionGranularity);
1140 int numBlocksToGet = numIndicesToGet/indicesPerBlock;
1141 // **TODO** consider limiting the total number of blocks each collection
1142 // request will pick up. This is to limit the amount of perturbation
1143 // if it proves to be a problem.
1144 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1146 sumProxy.collectSummaryData(startTime,
1147 collectionGranularity,
1148 numBlocksToGet*indicesPerBlock);
1150 sumObj->lastRequestedIndexBlock += numBlocksToGet;
1153 void TraceSummaryBOC::collectSummaryData(double startTime, double binSize,
1155 // CkPrintf("[%d] asked to contribute performance data\n", CkMyPe());
1157 double *contribution = new double[numBins];
1158 for (int i=0; i<numBins; i++) {
1159 contribution[i] = 0.0;
1161 CkpvAccess(_trace)->fillData(contribution, startTime, binSize, numBins);
1164 for (int i=0; i<numBins; i++) {
1165 CkPrintf("[%d] %f\n", i, contribution[i]);
1169 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1170 CkCallback cb(CkReductionTarget(TraceSummaryBOC, summaryDataCollected), sumProxy[0]);
1171 contribute(sizeof(double)*numBins, contribution, CkReduction::sum_double,
1173 delete [] contribution;
1176 void TraceSummaryBOC::summaryDataCollected(double *recvData, int numBins) {
1177 CkAssert(CkMyPe() == 0);
1178 // **CWL** No memory management for the ccs buffer for now.
1180 // CkPrintf("[%d] Reduction completed and received\n", CkMyPe());
1182 // if there's an easier way to append a data block to a CkVec, I'll take it
1183 for (int i=0; i<numBins; i++) {
1184 ccsBufferedData->insertAtEnd(recvData[i]);
1191 void TraceSummaryBOC::startSumOnly()
1193 CmiAssert(CkMyPe() == 0);
1195 CProxy_TraceSummaryBOC p(traceSummaryGID);
1196 int size = CkpvAccess(_trace)->pool()->getNumEntries();
1200 void TraceSummaryBOC::askSummary(int size)
1202 if (CkpvAccess(_trace) == NULL) return;
1204 int traced = CkpvAccess(_trace)->traceOnPE();
1206 BinEntry *reductionBuffer = new BinEntry[size+1];
1207 reductionBuffer[size].time() = traced; // last element is the traced pe count
1208 reductionBuffer[size].getIdleTime() = 0; // last element is the traced pe count
1210 CkpvAccess(_trace)->endComputation();
1211 int n = CkpvAccess(_trace)->pool()->getNumEntries();
1212 BinEntry *localBins = CkpvAccess(_trace)->pool()->bins();
1214 for (int i=0; i<n; i++) reductionBuffer[i] = localBins[i];
1217 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1218 CkCallback cb(CkReductionTarget(TraceSummaryBOC, sendSummaryBOC), 0, sumProxy);
1219 contribute(sizeof(BinEntry)*(size+1), reductionBuffer,
1220 CkReduction::sum_double, cb);
1221 delete [] reductionBuffer;
1224 void TraceSummaryBOC::sendSummaryBOC(double *results, int n)
1226 if (CkpvAccess(_trace) == NULL) return;
1228 CkAssert(CkMyPe() == 0);
1231 bins = (BinEntry *)results;
1232 nTracedPEs = (int)bins[n-1].time();
1233 // CmiPrintf("traced: %d entry:%d\n", nTracedPEs, nBins);
1240 void TraceSummaryBOC::write(void)
1244 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".sum")+1];
1245 sprintf(fname, "%s.sum", CkpvAccess(traceRoot));
1246 FILE *sumfp = fopen(fname, "w+");
1247 //CmiPrintf("File: %s \n", fname);
1249 CmiAbort("Cannot open summary sts file for writing.\n");
1252 int _numEntries=_entryTable.size();
1253 fprintf(sumfp, "ver:%3.1f %d/%d count:%d ep:%d interval:%e numTracedPE:%d", CkpvAccess(version), CkMyPe(), CkNumPes(), nBins, _numEntries, CkpvAccess(binSize), nTracedPEs);
1254 fprintf(sumfp, "\n");
1258 int last=pool[0].getU();
1261 for(j=1; j<numEntries; j++) {
1262 int u = pool[j].getU();
1267 if (count > 1) fprintf(fp, "+%d", count);
1273 if (count > 1) fprintf(fp, "+%d", count);
1275 for(j=0; j<nBins; j++) {
1276 bins[j].time() /= nTracedPEs;
1277 bins[j].write(sumfp);
1280 fprintf(sumfp, "\n");
1285 static void CombineSummary()
1287 #if CMK_TRACE_ENABLED
1288 CmiPrintf("[%d] CombineSummary called!\n", CkMyPe());
1290 if ((sumonly || sumDetail) && traceSummaryGID.isZero()) {
1296 CmiPrintf("[%d] Sum Only start!\n", CkMyPe());
1297 // pe 0 start the sumonly process
1298 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1299 sumProxy[0].startSumOnly();
1302 CProxy_TraceSummaryBOC sumProxy(traceSummaryGID);
1303 sumProxy.traceSummaryParallelShutdown(-1);
1313 void initTraceSummaryBOC()
1316 if(BgNodeRank()==0) {
1318 if (CkMyRank() == 0) {
1320 registerExitFn(CombineSummary);
1324 #include "TraceSummary.def.h"