9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11 #include "TopoManager.h"
14 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
18 #define DEBUGN(...) // easy way to selectively disable DEBUGs
20 #define DefaultLogBufSize 1000000
21 #define DEBUG_KMEANS 0
22 // **CW** Simple delta encoding implementation
23 // delta encoding is on by default. It may be turned off later in
26 bool checknested=false; // check illegal nested begin/end execute
29 // BOC operations readonlys
30 CkGroupID traceProjectionsGID;
33 // New reduction type for Outlier Analysis purposes. This is allowed to be
34 // a global variable according to the Charm++ manual.
35 CkReductionMsg *outlierReduction(int nMsgs,
36 CkReductionMsg **msgs);
37 CkReductionMsg *minMaxReduction(int nMsgs,
38 CkReductionMsg **msgs);
39 CkReduction::reducerType outlierReductionType;
40 CkReduction::reducerType minMaxReductionType;
41 #endif // PROJ_ANALYSIS
43 CkpvStaticDeclare(TraceProjections*, _trace);
44 CtvExtern(int,curThreadEvent);
46 CkpvDeclare(CmiInt8, CtrLogBufSize);
48 typedef CkVec<char *> usrEventVec;
49 CkpvStaticDeclare(usrEventVec, usrEventlist);
54 UsrEvent(int _e, char* _s): e(_e),str(_s) {}
56 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
57 /*User Stat Vector Mirroring usrEvents. Holds all the stat names.
58 Reuses UsrEvent Class since all that is needed is an int and a string to store name*/
59 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrStats);
60 // When tracing is disabled, these are defined as empty static inlines
61 // in the header, to minimize overhead
63 /// Disable the outputting of the trace logs
64 void disableTraceLogOutput()
66 CkpvAccess(_trace)->setWriteData(false);
69 /// Enable the outputting of the trace logs
70 void enableTraceLogOutput()
72 CkpvAccess(_trace)->setWriteData(true);
76 #if ! CMK_TRACE_ENABLED
78 #define OPTIMIZED_VERSION \
79 if (!warned) { warned=1; \
80 CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
82 #define OPTIMIZED_VERSION /*empty*/
83 #endif // CMK_TRACE_ENABLED
86 On T3E, we need to have file number control by open/close files only when needed.
88 #if CMK_TRACE_LOGFILE_NUM_CONTROL
89 #define OPEN_LOG openLog("a");
90 #define CLOSE_LOG closeLog();
94 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
97 For each TraceFoo module, _createTraceFoo() must be defined.
98 This function is called in _createTraces() generated in moduleInit.C
100 void _createTraceprojections(char **argv)
102 DEBUGF("%d createTraceProjections\n", CkMyPe());
103 CkpvInitialize(CkVec<char *>, usrEventlist);
104 CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
105 CkpvInitialize(CkVec<UsrEvent *>*, usrStats);
106 CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
107 CkpvAccess(usrStats) = new CkVec<UsrEvent *>();
109 // CthRegister does not call the constructor
110 // CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
111 #endif //CMK_BIGSIM_CHARM
112 CkpvInitialize(TraceProjections*, _trace);
113 CkpvAccess(_trace) = new TraceProjections(argv);
114 CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
115 if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
118 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
120 struct TraceThreadListener {
121 struct CthThreadListener base;
130 static void traceThreadListener_suspend(struct CthThreadListener *l)
132 TraceThreadListener *a=(TraceThreadListener *)l;
133 /* here, we activate the appropriate trace codes for the appropriate
134 registered modules */
138 static void traceThreadListener_resume(struct CthThreadListener *l)
140 TraceThreadListener *a=(TraceThreadListener *)l;
141 /* here, we activate the appropriate trace codes for the appropriate
142 registered modules */
143 _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
144 CthGetThreadID(a->base.thread), NULL);
146 a->srcPe=CkMyPe(); /* potential lie to migrated threads */
150 static void traceThreadListener_free(struct CthThreadListener *l)
152 TraceThreadListener *a=(TraceThreadListener *)l;
156 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
158 #if CMK_TRACE_ENABLED
159 /* strip essential information from the envelope */
160 TraceThreadListener *a= new TraceThreadListener;
162 a->base.suspend=traceThreadListener_suspend;
163 a->base.resume=traceThreadListener_resume;
164 a->base.free=traceThreadListener_free;
165 a->event=e->getEvent();
166 a->msgType=e->getMsgtype();
168 a->srcPe=e->getSrcPe();
169 a->ml=e->getTotalsize();
171 CthAddListener(tid, (CthThreadListener *)a);
175 void LogPool::openLog(const char *mode)
180 zfp = gzopen(fname, mode);
181 } while (!zfp && (errno == EINTR || errno == EMFILE));
182 if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
185 fp = fopen(fname, mode);
186 } while (!fp && (errno == EINTR || errno == EMFILE));
188 CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
189 CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
194 fp = fopen(fname, mode);
195 } while (!fp && (errno == EINTR || errno == EMFILE));
197 CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
198 CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
203 void LogPool::closeLog(void)
217 LogPool::LogPool(char *pgm) {
218 pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
219 // defaults to writing data (no outlier changes)
220 writeSummaryFiles = false;
223 lastCreationEvent = -1;
224 // **CW** for simple delta encoding
227 globalStartTime = 0.0;
229 headerWritten = false;
236 poolSize = CkpvAccess(CtrLogBufSize);
237 pgmname = new char[strlen(pgm)+1];
238 strcpy(pgmname, pgm);
241 statisLastProcessTimer = 0;
242 statisLastIdleTimer = 0;
243 statisLastPackTimer = 0;
244 statisLastUnpackTimer = 0;
245 statisTotalExecutionTime = 0;
246 statisTotalIdleTime = 0;
247 statisTotalPackTime = 0;
248 statisTotalUnpackTime = 0;
249 statisTotalCreationMsgs = 0;
250 statisTotalCreationBytes = 0;
251 statisTotalMCastMsgs = 0;
252 statisTotalMCastBytes = 0;
253 statisTotalEnqueueMsgs = 0;
254 statisTotalDequeueMsgs = 0;
255 statisTotalRecvMsgs = 0;
256 statisTotalRecvBytes = 0;
257 statisTotalMemAlloc = 0;
258 statisTotalMemFree = 0;
262 void LogPool::createFile(const char *fix)
268 if(CmiNumPartitions() > 1) {
269 CmiMkdir(CkpvAccess(partitionRoot));
272 char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
273 char *pathPlusFilePrefix = new char[1024];
276 int sd = CkMyPe() % nSubdirs;
277 char *subdir = new char[1024];
278 sprintf(subdir, "%s.projdir.%d", pgmname, sd);
280 sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
283 sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
287 sprintf(pestr, "%d", CkMyPe());
291 len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
293 len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
295 int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
298 fname = new char[len];
301 sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
304 sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
307 sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
310 delete[] pathPlusFilePrefix;
315 void LogPool::createSts(const char *fix)
317 CkAssert(CkMyPe() == 0);
318 if(CmiNumPartitions() > 1) {
319 CmiMkdir(CkpvAccess(partitionRoot));
322 // create the sts file
323 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
324 sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
327 stsfp = fopen(fname, "w");
328 } while (!stsfp && (errno == EINTR || errno == EMFILE));
330 CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
331 CmiAbort("Error!!\n");
336 void LogPool::createRC()
338 // create the projections rc file.
340 new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
341 sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
343 rcfp = fopen(fname, "w");
344 } while (!rcfp && (errno == EINTR || errno == EMFILE));
346 CmiAbort("Cannot open projections configuration file for writing.\n");
354 if(writeSummaryFiles)
357 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
363 extern int correctTimeLog;
364 if (correctTimeLog) {
370 if (CkMyPe() == 0) writeSts(NULL);
379 void LogPool::writeHeader()
381 if (headerWritten) return;
382 headerWritten = true;
386 gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
388 else /* else clause is below... */
390 /*... may hang over from else above */ {
391 fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
395 fwrite(&numEntries,sizeof(numEntries),1,fp);
399 void LogPool::writeLog(void)
408 void LogPool::write(int writedelta)
410 // **CW** Simple delta encoding implementation
411 // prevTime has to be maintained as an object variable because
412 // LogPool::write may be called several times depending on the
416 p = new PUP::toDisk(writedelta?deltafp:fp);
419 else if (compressed) {
420 p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
424 p = new toProjectionsFile(writedelta?deltafp:fp);
428 // **FIXME** - Should probably consider a more sophisticated bounds-based
429 // approach for selective writing instead of making multiple if-checks
430 // for every single event.
431 for(UInt i=0; i<numEntries; i++) {
433 if (keepPhase == NULL) {
434 // default case, when no phase selection is required.
437 // **FIXME** Might be a good idea to create a "filler" event block for
438 // all the events taken out by phase filtering.
439 if (pool[i].type == END_PHASE) {
440 // always write phase markers
443 } else if (pool[i].type == BEGIN_COMPUTATION ||
444 pool[i].type == END_COMPUTATION) {
445 // always write BEGIN and END COMPUTATION markers
447 } else if (keepPhase[curPhase]) {
453 // **FIXME** Implement phase-selective writing for delta logs
455 double time = pool[i].time;
456 if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
458 double timeDiff = (time-prevTime)*1.0e6;
459 UInt intTimeDiff = (UInt)timeDiff;
460 timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
465 pool[i].time = intTimeDiff/1.0e6;
468 pool[i].time = time; // restore time value
476 void LogPool::writeSts(void)
478 // for whining compilers
480 // generate an automatic unique ID for each log
481 fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
482 fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
483 fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
484 #if CMK_HAS_COUNTER_PAPI
485 fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
486 // for now, use i, next time use papiEvents[i].
487 // **CW** papi event names is a hack.
488 char eventName[PAPI_MAX_STR_LEN];
489 for (i=0;i<NUMPAPIEVENTS;i++) {
490 PAPI_event_code_to_name(papiEvents[i], eventName);
491 fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
494 // Adds common elements to sts file such as num stats, num chares etc.
495 traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
496 fprintf(stsfp, "TOTAL_STATS %d\n", (int) CkpvAccess(usrStats)->length());
497 for(i=0;i<CkpvAccess(usrEvents)->length();i++){
498 fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
500 //Mirrors Event printing. Prints every stat name and number
501 for(i=0;i<CkpvAccess(usrStats)->length();i++){
502 fprintf(stsfp, "STAT %d %s\n", (*CkpvAccess(usrStats))[i]->e, (*CkpvAccess(usrStats))[i]->str);
506 void LogPool::writeSts(TraceProjections *traceProj){
508 fprintf(stsfp, "END\n");
512 void LogPool::writeRC(void)
514 //CkPrintf("write RC is being executed\n");
516 CkAssert(CkMyPe() == 0);
517 fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
518 (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
519 fprintf(rcfp,"RC_GLOBAL_END_TIME %lld\n",
520 (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
521 /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
522 if (CkpvAccess(_trace)->isOutlierAutomatic()) {
523 fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
525 fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
528 #endif //PROJ_ANALYSIS
532 void LogPool::writeStatis(void)
535 // create the statis file
536 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".statis")+10];
537 sprintf(fname, "%s.%d.statis", CkpvAccess(traceRoot), CkMyPe());
540 statisfp = fopen(fname, "w");
541 } while (!statisfp && (errno == EINTR || errno == EMFILE));
543 CmiPrintf("Cannot open projections statistic file for writing due to %s\n", strerror(errno));
544 CmiAbort("Error!!\n");
547 double totaltime = endComputationTime - beginComputationTime;
548 fprintf(statisfp, "time(sec) percentage\n");
549 fprintf(statisfp, "Time: \t%f\n", totaltime);
550 fprintf(statisfp, "Idle :\t%f\t %.1f\n", statisTotalIdleTime, statisTotalIdleTime/totaltime * 100);
551 fprintf(statisfp, "Overhead: \t%f\t %.1f\n", totaltime - statisTotalIdleTime - statisTotalExecutionTime , (totaltime - statisTotalIdleTime - statisTotalExecutionTime)/totaltime * 100);
552 fprintf(statisfp, "Exeuction:\t%f\t %.1f\n", statisTotalExecutionTime, statisTotalExecutionTime/totaltime*100);
553 fprintf(statisfp, "Pack: \t%f\t %.2f\n", statisTotalPackTime, statisTotalPackTime/totaltime*100);
554 fprintf(statisfp, "Unpack: \t%f\t %.2f\n", statisTotalUnpackTime, statisTotalUnpackTime/totaltime*100);
555 fprintf(statisfp, "Creation Msgs Numbers, Bytes, Avg: \t%lld\t %lld\t %lld \n", statisTotalCreationMsgs, statisTotalCreationBytes, (statisTotalCreationMsgs>0)?statisTotalCreationBytes/statisTotalCreationMsgs:statisTotalCreationMsgs);
556 fprintf(statisfp, "Multicast Msgs Numbers, Bytes, Avg: \t%lld\t %lld\t %lld \n", statisTotalMCastMsgs, statisTotalMCastBytes, (statisTotalMCastMsgs>0)?statisTotalMCastBytes/statisTotalMCastMsgs:statisTotalMCastMsgs);
557 fprintf(statisfp, "Received Msgs Numbers, Bytes, Avg: \t%lld\t %lld\t %lld \n", statisTotalRecvMsgs, statisTotalRecvBytes, (statisTotalRecvMsgs>0)?statisTotalRecvBytes/statisTotalRecvMsgs:statisTotalRecvMsgs);
562 static void updateProjLog(void *data, double t, double recvT, void *ptr)
564 LogEntry *log = (LogEntry *)data;
565 FILE *fp = *(FILE **)ptr;
567 log->recvTime = recvT<0.0?0:recvT;
569 toProjectionsFile p(fp);
574 // flush log entries to disk
575 void LogPool::flushLogBuffer()
578 double writeTime = TraceTimer();
582 lastCreationEvent = -1;
583 new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
584 new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
585 //CkPrintf("Warning: Projections log flushed to disk on PE %d.\n", CkMyPe());
586 if (!traceProjectionsGID.isZero()) { // report flushing events to PE 0
587 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
588 bocProxy[0].flush_warning(CkMyPe());
593 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
594 double time, int event, int pe, int ml, CmiObjId *id,
595 double recvT, double cpuT, int numPe, double statVal)
599 case BEGIN_COMPUTATION:
600 beginComputationTime = time;
602 case END_COMPUTATION:
603 endComputationTime = time;
606 statisTotalCreationMsgs++;
607 statisTotalCreationBytes += ml;
609 case CREATION_MULTICAST:
610 statisTotalMCastMsgs++;
611 statisTotalMCastBytes += ml;
614 statisTotalEnqueueMsgs++;
617 statisTotalDequeueMsgs++;
620 statisTotalRecvMsgs++;
621 statisTotalRecvBytes += ml;
624 statisTotalMemAlloc++;
627 statisTotalMemFree++;
629 case BEGIN_PROCESSING:
630 statisLastProcessTimer = time;
633 statisLastUnpackTimer = time;
636 statisLastPackTimer = time;
639 statisLastIdleTimer = time;
642 statisTotalExecutionTime += (time - statisLastProcessTimer);
645 statisTotalUnpackTime += (time - statisLastUnpackTimer);
648 statisTotalPackTime += (time - statisLastPackTimer);
651 statisTotalIdleTime += (time - statisLastIdleTimer);
657 if (type == CREATION ||
658 type == CREATION_MULTICAST ||
659 type == CREATION_BCAST) {
660 lastCreationEvent = numEntries;
662 new (&pool[numEntries++])
663 LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe, statVal);
664 if ((type == END_PHASE) || (type == END_COMPUTATION)) {
667 if(poolSize==numEntries) {
670 extern int correctTimeLog;
671 if (correctTimeLog) CmiAbort("I/O interrupt!\n");
676 case BEGIN_PROCESSING:
677 pool[numEntries-1].recvTime = BgGetRecvTime();
679 case BEGIN_COMPUTATION:
680 case END_COMPUTATION:
686 case USER_EVENT_PAIR:
687 bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
692 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
693 #ifndef CMK_BIGSIM_CHARM
694 if (type == CREATION ||
695 type == CREATION_MULTICAST ||
696 type == CREATION_BCAST) {
697 lastCreationEvent = numEntries;
699 new (&pool[numEntries++])
700 LogEntry(time,type,funcID,lineNum,fileName);
701 if(poolSize == numEntries){
707 void LogPool::addUserBracketEventNestedID(unsigned char type, double time,
708 UShort mIdx, int event, int nestedID) {
709 new (&pool[numEntries++])
710 LogEntry(time, type, mIdx, 0, event, CkMyPe(), 0, 0, 0, 0, 0, 0, nestedID);
711 if(poolSize == numEntries){
717 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
718 #ifndef CMK_BIGSIM_CHARM
719 if (type == CREATION ||
720 type == CREATION_MULTICAST ||
721 type == CREATION_BCAST) {
722 lastCreationEvent = numEntries;
724 new (&pool[numEntries++])
725 LogEntry(type,time,memUsage);
726 if(poolSize == numEntries){
733 void LogPool::addUserSupplied(int data){
735 add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
737 // set the user supplied value for the previously created event
738 pool[numEntries-1].setUserSuppliedData(data);
742 void LogPool::addUserSuppliedNote(char *note){
744 add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
746 // set the user supplied note for the previously created event
747 pool[numEntries-1].setUserSuppliedNote(note);
750 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
751 //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
752 #ifndef CMK_BIGSIM_CHARM
753 #if MPI_TRACE_MACHINE_HACK
754 //This part of code is used to combine the contiguous
755 //MPI_Test and MPI_Iprobe events to reduce the number of
757 #define MPI_TEST_EVENT_ID 60
758 #define MPI_IPROBE_EVENT_ID 70
759 int lastEvent = pool[numEntries-1].event;
760 if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
761 //just replace the endtime of last event
762 //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
763 pool[numEntries].endTime = et;
765 new (&pool[numEntries++])
766 LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
769 new (&pool[numEntries++])
770 LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
772 if(poolSize == numEntries){
779 /* **CW** Not sure if this is the right thing to do. Feels more like
780 a hack than a solution to Sameer's request to add the destination
781 processor information to multicasts and broadcasts.
783 In the unlikely event this method is used for Broadcasts as well,
784 pelist == NULL will be used to indicate a global broadcast with
787 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
788 int event, int pe, int ml, CmiObjId *id,
789 double recvT, int numPe, int *pelist)
791 lastCreationEvent = numEntries;
792 new (&pool[numEntries++])
793 LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
794 if(poolSize==numEntries) {
799 void LogPool::postProcessLog()
802 bgUpdateProj(1); // event type
806 void LogPool::modLastEntryTimestamp(double ts)
808 pool[numEntries-1].time = ts;
809 //pool[numEntries-1].cputime = ts;
812 // /** Constructor for a multicast log entry */
814 // THIS WAS MOVED TO trace-projections.h with the other constructors
816 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
817 // int ml, CmiObjId *d, double rt, int numPe, int *pelist)
819 // type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
820 // if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
823 // userSuppliedNote = NULL;
824 // if (pelist != NULL) {
825 // pes = new int[numPe];
826 // for (int i=0; i<numPe; i++) {
827 // pes[i] = pelist[i];
834 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
836 //#if CMK_HAS_COUNTER_PAPI
837 // memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
843 void LogEntry::pup(PUP::er &p)
846 CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
850 if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
851 if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
855 case USER_EVENT_PAIR:
856 case BEGIN_USER_EVENT_PAIR:
857 case END_USER_EVENT_PAIR:
858 p|mIdx; p|itime; p|event; p|pe; p|nestedID;
868 case BEGIN_PROCESSING:
870 irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
871 icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
873 p|mIdx; p|eIdx; p|itime; p|event; p|pe;
874 p|msglen; p|irecvtime;
875 { // This brace is so that ndims can be declared inside a switch
876 const int ndims = _chareTable[_entryTable[eIdx]->chareIdx]->ndims;
877 // Should only be true if the chare is part of an array, otherwise ndims should be -1
880 short int* idShorts = (short int*)&(id.id);
881 for (int i = 0; i < ndims; i++)
885 for (int i = 0; i < ndims; i++)
890 p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
894 #if CMK_HAS_COUNTER_PAPI
896 for (i=0; i<NUMPAPIEVENTS; i++) {
903 //p|numPapiEvents; // non papi version has value 0
905 if (p.isUnpacking()) {
906 recvTime = irecvtime/1.0e6;
907 cputime = icputime/1.0e6;
911 if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
912 p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
913 #if CMK_HAS_COUNTER_PAPI
915 for (i=0; i<NUMPAPIEVENTS; i++) {
921 //p|numPapiEvents; // non papi version has value 0
923 if (p.isUnpacking()) cputime = icputime/1.0e6;
929 case USER_SUPPLIED_NOTE:
933 if (p.isPacking()) length = strlen(userSuppliedNote);
938 if (p.isUnpacking()) {
939 userSuppliedNote = new char[length+1];
940 userSuppliedNote[length] = '\0';
942 PUParray(p,userSuppliedNote, length);
944 case USER_SUPPLIED_BRACKETED_NOTE:
945 //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
951 if (p.isPacking()) length2 = strlen(userSuppliedNote);
956 if (p.isUnpacking()) {
957 userSuppliedNote = new char[length2+1];
958 userSuppliedNote[length2] = '\0';
960 PUParray(p,userSuppliedNote, length2);
962 case MEMORY_USAGE_CURRENT:
968 p | cputime; //This is user specified time
974 if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
975 p|mIdx; p|eIdx; p|itime;
976 p|event; p|pe; p|msglen; p|irecvtime;
977 if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
980 if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
981 p|mIdx; p|eIdx; p|itime;
982 p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
983 if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
985 case CREATION_MULTICAST:
986 if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
987 p|mIdx; p|eIdx; p|itime;
988 p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
989 if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
990 for (i=0; i<numpes; i++) p|pes[i];
991 if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
994 p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
999 p|mIdx; p|itime; p|event; p|pe;
1002 case BEGIN_INTERRUPT:
1004 p|itime; p|event; p|pe;
1007 // **CW** absolute timestamps are used here to support a quick
1008 // way of determining the total time of a run in projections
1010 case BEGIN_COMPUTATION:
1011 case END_COMPUTATION:
1017 p|eIdx; // FIXME: actually the phase ID
1021 CmiError("***Internal Error*** Wierd Event %d.\n", type);
1024 if (p.isUnpacking()) time = itime/1.0e6;
1028 TraceProjections::TraceProjections(char **argv):
1029 _logPool(NULL), curevent(0), inEntry(false), computationStarted(false),
1030 traceNestedEvents(false), converseExit(false),
1031 currentPhaseID(0), lastPhaseEvent(NULL), endTime(0.0)
1033 // CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
1034 if (CkpvAccess(traceOnPe) == 0) return;
1036 CkpvInitialize(CmiInt8, CtrLogBufSize);
1037 CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
1038 if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize),
1039 "Log entries to buffer per I/O")) {
1040 if (CkMyPe() == 0) {
1041 CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
1045 CmiGetArgFlagDesc(argv,"+checknested",
1046 "check projections nest begin end execute events");
1048 CmiGetArgFlagDesc(argv,"+tracenested",
1049 "trace projections nest begin/end execute events");
1051 CmiGetArgFlagDesc(argv,"+binary-trace",
1052 "Write log files in binary format");
1055 CmiGetArgIntDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
1059 int compressed = true;
1060 CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1061 int disableCompressed = CmiGetArgFlagDesc(argv,"+no-gz-trace","Disable writing log files pre-compressed with gzip");
1062 compressed = compressed && !disableCompressed;
1064 // consume the flag so there's no confusing
1065 CmiGetArgFlagDesc(argv,"+gz-trace",
1066 "Write log files pre-compressed with gzip");
1067 if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1070 int writeSummaryFiles = CmiGetArgFlagDesc(argv,"+write-analysis-file","Enable writing summary files ");
1072 // **CW** default to non delta log encoding. The user may choose to do
1073 // create both logs (for debugging) or just the old log timestamping
1074 // (for compatibility).
1075 // Generating just the non delta log takes precedence over generating
1076 // both logs (if both arguments appear on the command line).
1078 // switch to OLD log format until everything works // Gengbin
1080 _logPool = new LogPool(CkpvAccess(traceRoot));
1081 _logPool->setNumSubdirs(nSubdirs);
1082 _logPool->setBinary(binary);
1083 _logPool->setWriteSummaryFiles(writeSummaryFiles);
1085 _logPool->setCompressed(compressed);
1087 if (CkMyPe() == 0) {
1088 _logPool->createSts();
1089 _logPool->createRC();
1093 #if CMK_HAS_COUNTER_PAPI
1098 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1101 CkAssert(e==-1 || e>=0);
1102 CkAssert(evt != NULL);
1105 for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1106 int cur = (*CkpvAccess(usrEvents))[i]->e;
1108 //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1109 if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0)
1112 CmiAbort("UserEvent double registered!");
1114 if (cur > biggest) biggest = cur;
1116 // if no user events have so far been registered. biggest will be -1
1117 // and hence newly assigned event numbers will begin from 0.
1118 if (e==-1) event = biggest+1; // automatically assign new event number
1120 CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1123 // Registers User Stat by adding its name and number to the usrStat vector.
1125 int TraceProjections::traceRegisterUserStat(const char* evt, int e)
1128 CkAssert(e==-1 || e>=0);
1129 CkAssert(evt != NULL);
1132 for (int i=0; i<CkpvAccess(usrStats)->length(); i++) {
1133 int cur = (*CkpvAccess(usrStats))[i]->e;
1135 //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1136 if (strcmp((*CkpvAccess(usrStats))[i]->str, evt) == 0)
1139 CmiAbort("UserStat double registered!");
1141 if (cur > biggest) biggest = cur;
1143 // if no user events have so far been registered. biggest will be -1
1144 // and hence newly assigned event numbers will begin from 0.
1145 if (e==-1) event = biggest+1; // automatically assign new event number
1147 CkpvAccess(usrStats)->push_back(new UsrEvent(event,(char *)evt));
1151 void TraceProjections::traceClearEps(void)
1153 // In trace-summary, this zeros out the EP bins, to eliminate noise
1154 // from startup. Here, this isn't useful, since we can do that in
1158 void TraceProjections::traceWriteSts(void)
1161 _logPool->writeSts(this);
1167 * This is called when Converse closes during ConverseCommonExit().
1168 * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1170 * Some programs bypass CkExit() (like NAMD, which eventually calls
1171 * ConverseExit()), modules like traces will have to pretend to shutdown
1172 * as if CkExit() was called but at the same time avoid making
1173 * subsequent CkExit() calls (which is usually required for allowing
1174 * other modules to shutdown).
1176 * Note that we can only get here if CkExit() was not called, since the
1177 * trace module will un-register itself from TraceArray if it did.
1180 void TraceProjections::traceClose(void)
1182 #ifdef PROJ_ANALYSIS
1183 // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1185 if (_logPool == NULL) {
1189 _logPool->writeSts(this);
1190 _logPool->writeRC();
1192 CkpvAccess(_trace)->endComputation();
1193 delete _logPool; // will write
1195 // remove myself from traceArray so that no tracing will be called.
1196 CkpvAccess(_traces)->removeTrace(this);
1198 // we've already deleted the logpool, so multiple calls to traceClose
1200 if (_logPool == NULL) {
1204 _logPool->writeSts(this);
1206 CkpvAccess(_trace)->endComputation();
1207 delete _logPool; // will write
1209 // remove myself from traceArray so that no tracing will be called.
1210 CkpvAccess(_traces)->removeTrace(this);
1212 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1213 bocProxy.ckLocalBranch()->print_warning();
1220 * This is meant to be called internally by the tracing framework.
1223 void TraceProjections::closeTrace() {
1224 // CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1225 if (CkMyPe() == 0 && _logPool!= NULL) {
1226 // CkPrintf("Pe 0 will now write sts and projrc files\n");
1227 _logPool->writeSts(this);
1228 _logPool->writeRC();
1229 // CkPrintf("Pe 0 has now written sts and projrc files\n");
1231 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1232 bocProxy.ckLocalBranch()->print_warning();
1234 delete _logPool; // will write logs to file
1238 #if CMK_SMP_TRACE_COMMTHREAD
1239 void TraceProjections::traceBeginOnCommThread()
1241 if (!computationStarted) return;
1242 _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1245 void TraceProjections::traceEndOnCommThread()
1247 _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1251 void TraceProjections::traceBegin(void)
1253 if (!computationStarted) return;
1254 _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1257 void TraceProjections::traceEnd(void)
1259 _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1262 void TraceProjections::userEvent(int e)
1264 if (!computationStarted) return;
1265 _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1268 void TraceProjections::userBracketEvent(int e, double bt, double et, int nestedID=0)
1270 if (!computationStarted) return;
1271 // two events record Begin/End of event e.
1272 _logPool->addUserBracketEventNestedID(USER_EVENT_PAIR, TraceTimer(bt),
1273 e, curevent, nestedID);
1274 _logPool->addUserBracketEventNestedID(USER_EVENT_PAIR, TraceTimer(et),
1275 e, curevent++, nestedID);
1278 void TraceProjections::beginUserBracketEvent(int e, int nestedID=0)
1280 if (!computationStarted) return;
1281 _logPool->addUserBracketEventNestedID(BEGIN_USER_EVENT_PAIR, TraceTimer(),
1282 e, curevent++, nestedID);
1285 void TraceProjections::endUserBracketEvent(int e, int nestedID=0)
1287 if (!computationStarted) return;
1288 _logPool->addUserBracketEventNestedID(END_USER_EVENT_PAIR, TraceTimer(),
1289 e, curevent++, nestedID);
1292 void TraceProjections::userSuppliedData(int d)
1294 if (!computationStarted) return;
1295 _logPool->addUserSupplied(d);
1298 void TraceProjections::userSuppliedNote(char *note)
1300 if (!computationStarted) return;
1301 _logPool->addUserSuppliedNote(note);
1305 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1307 if (!computationStarted) return;
1308 _logPool->addUserSuppliedBracketedNote(note, eventID, bt, et);
1311 void TraceProjections::memoryUsage(double m)
1313 if (!computationStarted) return;
1314 _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1317 //Updates User stat value. Makes appropriate Call to LogPool updateStat function
1318 void TraceProjections::updateStatPair(int e, double stat, double time)
1320 if (!computationStarted) return;
1321 _logPool->add(USER_STAT, e, 0, TraceTimer(), curevent, CkMyPe(), 0, 0, 0.0, time, 0, stat);
1324 //When user time is not given, -1 is stored instead.
1325 void TraceProjections::updateStat(int e, double stat)
1327 if (!computationStarted) return;
1328 _logPool->add(USER_STAT, e, 0, TraceTimer(), curevent, CkMyPe(), 0, 0, 0.0, -1, 0, stat);
1331 void TraceProjections::creation(envelope *e, int ep, int num)
1333 #if CMK_TRACE_ENABLED
1334 double curTime = TraceTimer();
1336 CtvAccess(curThreadEvent) = curevent;
1337 _logPool->add(CREATION, ForChareMsg, ep, curTime,
1338 curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1340 int type=e->getMsgtype();
1341 e->setEvent(curevent);
1342 CpvAccess(curPeEvent) = curevent;
1344 _logPool->add(CREATION_BCAST, type, ep, curTime,
1345 curevent++, CkMyPe(), e->getTotalsize(),
1348 _logPool->add(CREATION, type, ep, curTime,
1349 curevent++, CkMyPe(), e->getTotalsize(),
1356 //This function is only called from a comm thread in SMP mode.
1357 void TraceProjections::creation(char *msg)
1359 #if CMK_SMP_TRACE_COMMTHREAD
1360 // msg must be a charm message
1361 envelope *e = (envelope *)msg;
1362 int ep = e->getEpIdx();
1363 if(_entryTable[ep]->traceEnabled) {
1365 e->setSrcPe(CkMyPe()); // pretend I am the sender
1370 void TraceProjections::traceCommSetMsgID(char *msg)
1372 #if CMK_SMP_TRACE_COMMTHREAD
1373 // msg must be a charm message
1374 envelope *e = (envelope *)msg;
1375 int ep = e->getEpIdx();
1376 if(_entryTable[ep]->traceEnabled) {
1377 e->setSrcPe(CkMyPe()); // pretend I am the sender
1378 e->setEvent(curevent);
1383 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
1385 #if CMK_TRACE_ENABLED
1386 // msg must be a charm message
1388 envelope *e = (envelope *)msg;
1389 int ep = e->getEpIdx();
1390 if(_entryTable[ep]->traceEnabled) {
1391 *pe = e->getSrcPe();
1392 *event = e->getEvent();
1397 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
1399 #if CMK_TRACE_ENABLED
1400 // msg must be a charm message
1401 envelope *e = (envelope *)msg;
1402 int ep = e->getEpIdx();
1403 if(ep<=0 || ep>=_entryTable.size()) return;
1404 if (e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
1405 if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
1406 if(_entryTable[ep]->traceEnabled) {
1413 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1414 Communication Library-specific Multicasts via new event
1418 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1421 #if CMK_TRACE_ENABLED
1422 double curTime = TraceTimer();
1424 CtvAccess(curThreadEvent)=curevent;
1425 _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1426 CkMyPe(), 0, 0, 0.0, num, pelist);
1428 int type=e->getMsgtype();
1429 e->setEvent(curevent);
1430 _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1431 e->getTotalsize(), 0, 0.0, num, pelist);
1436 void TraceProjections::creationDone(int num)
1438 // modified the creation done time of the last num log entries
1439 // FIXME: totally a hack
1440 double curTime = TraceTimer();
1441 int idx = _logPool->lastCreationEvent;
1442 while (idx >=0 && num >0 ) {
1443 LogEntry &log = _logPool->pool[idx];
1444 if ((log.type == CREATION) ||
1445 (log.type == CREATION_BCAST) ||
1446 (log.type == CREATION_MULTICAST)) {
1447 log.recvTime = curTime - log.time;
1454 void TraceProjections::beginExecute(CmiObjId *tid)
1456 #if CMK_HAS_COUNTER_PAPI
1457 if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1458 CmiAbort("PAPI failed to read at begin execute!\n");
1461 if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1462 execEvent = CtvAccess(curThreadEvent);
1464 _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP, TraceTimer(),
1465 execEvent,CkMyPe(), 0, tid);
1466 #if CMK_HAS_COUNTER_PAPI
1467 _logPool->addPapi(CkpvAccess(papiValues));
1472 void TraceProjections::beginExecute(envelope *e, void *obj)
1474 #if CMK_TRACE_ENABLED
1476 #if CMK_HAS_COUNTER_PAPI
1477 if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1478 CmiAbort("PAPI failed to read at begin execute!\n");
1481 if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1482 execEvent = CtvAccess(curThreadEvent);
1484 _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP, TraceTimer(),
1485 execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1486 #if CMK_HAS_COUNTER_PAPI
1487 _logPool->addPapi(CkpvAccess(papiValues));
1491 beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1492 e->getSrcPe(),e->getTotalsize());
1497 void TraceProjections::beginExecute(char *msg){
1498 #if CMK_SMP_TRACE_COMMTHREAD
1499 //This function is called from comm thread in SMP mode
1500 envelope *e = (envelope *)msg;
1501 int ep = e->getEpIdx();
1502 if(_entryTable[ep]->traceEnabled)
1507 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1508 int mlen, CmiObjId *idx, void *obj )
1510 if (traceNestedEvents) {
1511 if (!nestedEvents.empty()) {
1514 nestedEvents.emplace(event, msgType, ep, srcPe, mlen, idx);
1516 beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1519 void TraceProjections::changeLastEntryTimestamp(double ts)
1521 _logPool->modLastEntryTimestamp(ts);
1524 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1525 int mlen, CmiObjId *idx)
1527 #if CMK_HAS_COUNTER_PAPI
1528 if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1529 CmiAbort("PAPI failed to read at begin execute!\n");
1532 if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1536 _logPool->add(BEGIN_PROCESSING,msgType,ep, TraceTimer(),event,
1537 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1538 #if CMK_HAS_COUNTER_PAPI
1539 _logPool->addPapi(CkpvAccess(papiValues));
1544 void TraceProjections::endExecute(void)
1546 if (traceNestedEvents && !nestedEvents.empty()) nestedEvents.pop();
1548 if (traceNestedEvents) {
1549 if (!nestedEvents.empty()) {
1550 NestedEvent &ne = nestedEvents.top();
1551 beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1556 void TraceProjections::endExecute(char *msg)
1558 #if CMK_SMP_TRACE_COMMTHREAD
1559 //This function is called from comm thread in SMP mode
1560 envelope *e = (envelope *)msg;
1561 int ep = e->getEpIdx();
1562 if(_entryTable[ep]->traceEnabled)
1567 void TraceProjections::endExecuteLocal(void)
1569 #if CMK_HAS_COUNTER_PAPI
1570 if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1571 CmiAbort("PAPI failed to read at end execute!\n");
1574 if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1575 double cputime = TraceCpuTimer();
1576 double now = TraceTimer();
1577 if(execEp == (-1)) {
1578 _logPool->add(END_PROCESSING, 0, _threadEP, now,
1579 execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1581 _logPool->add(END_PROCESSING, 0, execEp, now,
1582 execEvent, execPe, 0, NULL, 0.0, cputime);
1584 #if CMK_HAS_COUNTER_PAPI
1585 _logPool->addPapi(CkpvAccess(papiValues));
1590 void TraceProjections::messageRecv(char *env, int pe)
1593 envelope *e = (envelope *)env;
1594 int msgType = e->getMsgtype();
1595 int ep = e->getEpIdx();
1597 if (msgType==NewChareMsg || msgType==NewVChareMsg
1598 || msgType==ForChareMsg || msgType==ForVidMsg
1599 || msgType==BocInitMsg || msgType==NodeBocInitMsg
1600 || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1605 _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1606 curevent++, e->getSrcPe(), e->getTotalsize());
1610 void TraceProjections::beginIdle(double curWallTime)
1612 _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1615 void TraceProjections::endIdle(double curWallTime)
1617 _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1620 void TraceProjections::beginPack(void)
1622 _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1625 void TraceProjections::endPack(void)
1627 _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1630 void TraceProjections::beginUnpack(void)
1632 _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1635 void TraceProjections::endUnpack(void)
1637 _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1640 void TraceProjections::enqueue(envelope *) {}
1642 void TraceProjections::dequeue(envelope *) {}
1644 void TraceProjections::beginComputation(void)
1646 computationStarted = true;
1647 // Executes the callback function provided by the machine
1648 // layer. This is the proper method to register user events in a
1649 // machine layer because projections is a charm++ module.
1650 if (CkpvAccess(traceOnPe) != 0) {
1651 void (*ptr)() = registerMachineUserEvents();
1656 // CkpvAccess(traceInitTime) = TRACE_TIMER();
1657 // CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1658 _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1659 #if CMK_HAS_COUNTER_PAPI
1660 // we start the counters here
1661 if(CkpvAccess(papiStarted) == 0)
1663 if (PAPI_start(CkpvAccess(papiEventSet)) != PAPI_OK) {
1664 CmiAbort("PAPI failed to start designated counters!\n");
1666 CkpvAccess(papiStarted) = 1;
1671 void TraceProjections::endComputation(void)
1673 #if CMK_HAS_COUNTER_PAPI
1674 // we stop the counters here. A silent failure is alright since we
1675 // are already at the end of the program.
1676 if(CkpvAccess(papiStopped) == 0) {
1677 if (PAPI_stop(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1678 CkPrintf("Warning: PAPI failed to stop correctly!\n");
1680 CkpvAccess(papiStopped) = 1;
1682 // NOTE: We should not do a complete close of PAPI until after the
1683 // sts writer is done.
1685 endTime = TraceTimer();
1686 _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1688 CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(),
1693 int TraceProjections::idxRegistered(int idx)
1695 int idxVecLen = idxVec.size();
1696 for(int i=0; i<idxVecLen; i++)
1698 if(idx == idxVec[i])
1704 // specialized PUP:ers for handling trace projections logs
1705 void toProjectionsFile::bytes(void *p,size_t n,size_t itemSize,dataType t)
1707 for (int i=0;i<n;i++)
1709 case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1711 case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1712 case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1713 case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1714 case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1715 case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1716 case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1717 case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1718 case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1719 case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1720 #ifdef CMK_PUP_LONG_LONG
1721 case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1722 case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1724 default: CmiAbort("Unrecognized pup type code!");
1728 void fromProjectionsFile::bytes(void *p,size_t n,size_t itemSize,dataType t)
1730 for (int i=0;i<n;i++)
1735 parseError("Could not match character");
1741 case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1742 case Tshort:((short *)p)[i]=(short)readInt(); break;
1743 case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1744 case Tint: ((int *)p)[i]=readInt(); break;
1745 case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1746 case Tlong: ((long *)p)[i]=readInt(); break;
1747 case Tulong:((unsigned long *)p)[i]=readUint(); break;
1748 case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1749 case Tdouble:((double *)p)[i]=readDouble(); break;
1750 #ifdef CMK_PUP_LONG_LONG
1751 case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1752 case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1754 default: CmiAbort("Unrecognized pup type code!");
1759 void toProjectionsGZFile::bytes(void *p,size_t n,size_t itemSize,dataType t)
1761 for (int i=0;i<n;i++)
1763 case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1765 case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1766 case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1767 case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1768 case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1769 case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1770 case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1771 case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1772 case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1773 case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1774 #ifdef CMK_PUP_LONG_LONG
1775 case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1776 case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1778 default: CmiAbort("Unrecognized pup type code!");
1783 void TraceProjections::endPhase() {
1784 double currentPhaseTime = TraceTimer();
1785 if (lastPhaseEvent != NULL) {
1787 if (_logPool->pool != NULL) {
1788 // assumed to be BEGIN_COMPUTATION
1790 CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1791 _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1795 /* Insert endPhase event here. */
1796 /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1797 /* We currently "borrow" from the standard add() method. */
1798 /* It should really be its own add() method. */
1799 /* NOTE: assignment to lastPhaseEvent is "pre-emptive". */
1800 lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1801 _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1805 #ifdef PROJ_ANALYSIS
1806 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1809 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1811 void registerOutlierReduction() {
1812 outlierReductionType =
1813 CkReduction::addReducer(outlierReduction, false, "outlierReduction");
1814 minMaxReductionType =
1815 CkReduction::addReducer(minMaxReduction, false, "minMaxReduction");
1821 * This is the C++ code that is registered to be activated at module
1822 * shutdown. This is called exactly once on processor 0. Module shutdown
1823 * is initiated as a result of a CkExit() call by the application code
1825 * The exit function must ultimately call CkContinueExit() again to
1826 * so that other module exit functions may proceed after this module is
1830 static void TraceProjectionsExitHandler()
1832 #if CMK_TRACE_ENABLED
1834 CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1836 if (!traceProjectionsGID.isZero()) {
1837 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1838 bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1847 // This is called once on each processor but the idiom of use appears
1848 // to be to only have processor 0 register the function.
1850 // See initnode in trace-projections.ci
1851 void initTraceProjectionsBOC()
1853 // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1855 if (BgNodeRank() == 0) {
1857 if (CkMyRank() == 0) {
1859 registerExitFn(TraceProjectionsExitHandler);
1862 } // this is so indentation does not get messed up
1866 // mainchare for trace-projections BOC-operations.
1867 // Instantiated at processor 0 and ONLY resides on processor 0 for the
1868 // rest of its life.
1871 // 1. Handling commandline arguments
1872 // 2. Creating any objects required for proper BOC operations.
1874 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1875 /** Options for Outlier Analysis */
1876 // defaults. Things will change with support for interactive analysis.
1877 bool findStartTime = (CmiTimerAbsolute()==1);
1878 traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1880 kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1890 // Called on every processor.
1891 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1893 CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1895 endPe = pe; // the pe that starts CkExit()
1896 if (CkMyPe() == 0) {
1897 analysisStartTime = CmiWallTimer();
1899 if (CkpvAccess(_trace)->_logPool != NULL ){
1900 CkpvAccess(_trace)->endComputation();
1901 // no more tracing for projections on this processor after this point.
1902 // Note that clear must be called after remove, or bad things will happen.
1903 CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1904 CkpvAccess(_traces)->clearTrace();
1906 // From this point, we start multiple chains of reductions and broadcasts to
1907 // perform final online analysis activities.
1909 // Start all parallel operations at once.
1910 // These MUST NOT modify base performance data in LogPool. If they must,
1911 // then the parallel operations must be phased (and this code to be
1912 // restructured as necessary)
1915 CProxy_KMeansBOC kMeansProxy(kMeansGID);
1916 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1918 parModulesRemaining++;
1919 kMeansProxy[CkMyPe()].startKMeansAnalysis();
1921 parModulesRemaining++;
1923 bocProxy[CkMyPe()].startTimeAnalysis();
1925 bocProxy[CkMyPe()].startEndTimeAnalysis();
1928 // handle flush log warnings
1929 void TraceProjectionsBOC::flush_warning(int pe)
1931 CmiAssert(CkMyPe() == 0);
1932 std::set<int>::iterator it;
1934 if (it == list.end()) list.insert(pe);
1938 void TraceProjectionsBOC::print_warning()
1940 CmiAssert(CkMyPe() == 0);
1941 if (flush_count == 0) return;
1942 std::set<int>::iterator it;
1943 CkPrintf("*************************************************************\n");
1944 CkPrintf("Warning: Projections log flushed to disk %d times on %d cores:", flush_count, list.size());
1945 for (it=list.begin(); it!=list.end(); it++)
1946 CkPrintf(" %d", *it);
1948 CkPrintf("Warning: The performance data is likely invalid, unless the flushes have been explicitly synchronized by your program.\n");
1949 CkPrintf("Warning: This may be fixed by specifying a larger +logsize (current value %d).\n", CkpvAccess(CtrLogBufSize));
1950 CkPrintf("*************************************************************\n");
1953 // Called on each processor
1954 void KMeansBOC::startKMeansAnalysis() {
1955 // Initialize all necessary structures
1956 LogPool *pool = CkpvAccess(_trace)->_logPool;
1958 if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1960 CkCallback cb(CkReductionTarget(KMeansBOC, flushCheck), 0, thisProxy);
1961 contribute(sizeof(bool), &(pool->hasFlushed), CkReduction::logical_or_bool, cb);
1964 // Called on processor 0
1965 void KMeansBOC::flushCheck(bool someFlush) {
1967 // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1970 // Data intact proceed with KMeans analysis
1971 CProxy_KMeansBOC kMeansProxy(kMeansGID);
1972 kMeansProxy.flushCheckDone();
1974 // Some processor had flushed it data at some point, abandon KMeans
1975 CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1977 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1978 bocProxy[0].kMeansDoneFlushed();
1982 // Called on each processor
1983 void KMeansBOC::flushCheckDone() {
1984 // **FIXME** - more flexible metric collection scheme may be necessary
1985 // in the future for production use.
1986 LogPool *pool = CkpvAccess(_trace)->_logPool;
1988 // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1990 numEntryMethods = _entryTable.size();
1991 numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1993 // maintained across phases
1994 markedBegin = false;
1996 beginBlockTime = 0.0;
1997 beginIdleBlockTime = 0.0;
1998 lastBeginEPIdx = -1; // none
2001 currentExecTimes = NULL;
2005 pool->initializePhases();
2007 // incoming K Seeds and the per-phase filter
2008 incKSeeds = new double[numK*numMetrics];
2009 keepMetric = new bool[numMetrics];
2011 // Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
2012 // CProxy_KMeansBOC kMeansProxy(kMeansGID);
2013 // kMeansProxy[CkMyPe()].getNextPhaseMetrics();
2014 thisProxy[CkMyPe()].getNextPhaseMetrics();
2017 // Called on each processor.
2018 void KMeansBOC::getNextPhaseMetrics() {
2019 // Assumes the presence of the complete logs on this processor.
2020 // Assumes first event is always BEGIN_COMPUTATION
2021 // Assumes each processor sees the same number of phases.
2023 // In this code, we collect performance data for this processor.
2024 // All times are in seconds.
2026 // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );
2029 DEBUGF("[%d] Using Phases\n", CkMyPe());
2031 DEBUGF("[%d] NOT using Phases\n", CkMyPe());
2034 if (currentExecTimes != NULL) {
2035 delete [] currentExecTimes;
2037 currentExecTimes = new double[numMetrics];
2038 for (int i=0; i<numMetrics; i++) {
2039 currentExecTimes[i] = 0.0;
2042 int numEventMethods = _entryTable.size();
2043 LogPool *pool = CkpvAccess(_trace)->_logPool;
2045 CkAssert(pool->numEntries > lastPhaseIdx);
2046 double totalPhaseTime = 0.0;
2047 double totalActiveTime = 0.0; // entry method + idle
2049 for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
2050 if (pool->pool[i].type == BEGIN_PROCESSING) {
2055 beginBlockTime = pool->pool[i].time;
2056 lastBeginEPIdx = pool->pool[i].eIdx;
2057 } else if (pool->pool[i].type == END_PROCESSING) {
2059 // if End without a begin, just ignore
2060 // this event. If a phase-boundary is crossed, the Begin
2061 // event would be maintained in beginBlockTime, so it is
2064 markedBegin = false;
2065 if (pool->pool[i].event < 0)
2067 // ignore dummy events. **FIXME** as they have no eIdx?
2070 currentExecTimes[pool->pool[i].eIdx] +=
2071 pool->pool[i].time - beginBlockTime;
2072 totalActiveTime += pool->pool[i].time - beginBlockTime;
2073 lastBeginEPIdx = -1;
2075 } else if (pool->pool[i].type == BEGIN_IDLE) {
2080 beginIdleBlockTime = pool->pool[i].time;
2081 } else if (pool->pool[i].type == END_IDLE) {
2085 currentExecTimes[numEventMethods] +=
2086 pool->pool[i].time - beginIdleBlockTime;
2087 totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2089 } else if (pool->pool[i].type == END_PHASE) {
2090 // ignored when not using phases
2092 // when we've not visited this node before
2093 if (i != lastPhaseIdx) {
2095 pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2096 // it is important that proper accounting of time take place here.
2097 // Note that END_PHASE events inevitably occur in the context of
2098 // some entry method by the way the tracing API is designed.
2100 CkAssert(lastBeginEPIdx >= 0);
2101 currentExecTimes[lastBeginEPIdx] +=
2102 pool->pool[i].time - beginBlockTime;
2103 totalActiveTime += pool->pool[i].time - beginBlockTime;
2104 // this is so the remainder contributes to the next phase
2105 beginBlockTime = pool->pool[i].time;
2107 // The following is unlikely, but stranger things have happened.
2109 currentExecTimes[numEventMethods] +=
2110 pool->pool[i].time - beginIdleBlockTime;
2111 totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2112 // this is so the remainder contributes to the next phase
2113 beginIdleBlockTime = pool->pool[i].time;
2115 if (totalActiveTime <= totalPhaseTime) {
2116 currentExecTimes[numEventMethods+1] =
2117 totalPhaseTime - totalActiveTime;
2119 currentExecTimes[numEventMethods+1] = 0.0;
2120 CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2121 CkMyPe(), currentPhase);
2123 collectKMeansData();
2124 // end the loop (and method) and defer the work till the next call
2129 } else if (pool->pool[i].type == END_COMPUTATION) {
2131 CkAssert(lastBeginEPIdx >= 0);
2132 currentExecTimes[lastBeginEPIdx] +=
2133 pool->pool[i].time - beginBlockTime;
2134 totalActiveTime += pool->pool[i].time - beginBlockTime;
2137 currentExecTimes[numEventMethods] +=
2138 pool->pool[i].time - beginIdleBlockTime;
2139 totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2142 pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2143 if (totalActiveTime <= totalPhaseTime) {
2144 currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2146 currentExecTimes[numEventMethods+1] = 0.0;
2147 CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2150 collectKMeansData();
2156 * Through a reduction, collectKMeansData aggregates each processors' data
2157 * in order for global properties to be determined:
2159 * 1. min & max to determine normalization factors.
2160 * 2. sum to determine global EP averages for possible metric reduction
2161 * through thresholding.
2162 * 3. sum of squares to compute stddev which may be useful in the future.
2164 * collectKMeansData will also keep the processor's data for the current
2165 * phase so that it may be normalized and worked on subsequently.
2168 void KMeansBOC::collectKMeansData() {
2169 int minOffset = numMetrics;
2170 int maxOffset = 2*numMetrics;
2171 int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2173 // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2175 double *reductionMsg = new double[numMetrics*4];
2177 for (int i=0; i<numMetrics; i++) {
2178 reductionMsg[i] = currentExecTimes[i];
2179 // duplicate the event times for max and min sections of the reduction
2180 reductionMsg[minOffset + i] = currentExecTimes[i];
2181 reductionMsg[maxOffset + i] = currentExecTimes[i];
2183 reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2186 CkCallback cb(CkReductionTarget(KMeansBOC, globalMetricRefinement),
2188 contribute((numMetrics*4)*sizeof(double), reductionMsg,
2189 outlierReductionType, cb);
2190 delete [] reductionMsg;
2193 // The purpose is mainly to initialize the k seeds and generate
2194 // normalization parameters for each of the metrics. The k seeds
2195 // and normalization parameters are broadcast to all processors.
2197 // Called on processor 0
2198 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2199 CkAssert(CkMyPe() == 0);
2201 // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2204 int minOffset = numMetrics;
2205 int maxOffset = 2*numMetrics;
2206 int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2208 // calculate statistics & boundaries for the k seeds for clustering
2209 KMeansStatsMessage *outmsg =
2210 new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2211 outmsg->numMetrics = numMetrics;
2212 outmsg->numKPos = numK*numMetrics;
2213 outmsg->numStats = numMetrics*4;
2215 // Sum | Min | Max | Sum of Squares
2216 double *totalExecTimes = (double*)msg->getData();
2217 double totalTime = 0.0;
2219 for (int i=0; i<numMetrics; i++) {
2220 DEBUGN("%lf\n", totalExecTimes[i]);
2221 totalTime += totalExecTimes[i];
2223 // calculate event mean over all processors
2224 outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2226 // get the ranges and offsets of each metric. With this, we get
2227 // normalization factors that can be sent back to each processor to
2228 // be used as necessary. We reuse max for range. Min remains the offset.
2229 outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2230 outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2231 totalExecTimes[minOffset + i];
2233 // calculate stddev (using biased variance)
2234 outmsg->stats[sosOffset + i] =
2235 sqrt((totalExecTimes[sosOffset + i] -
2236 2*(outmsg->stats[i])*totalExecTimes[i] +
2237 (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2241 for (int i=0; i<numMetrics; i++) {
2242 // 1) if the proportion of the max value of the entry method relative to
2243 // the average time taken over all entry methods across all processors
2244 // is greater than the stipulated percentage threshold ...; AND
2245 // 2) if the range of values are non-zero.
2247 // The current assumption is totalTime > 0 (what program has zero total
2248 // time from all work?)
2249 keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2251 (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2252 if (keepMetric[i]) {
2253 DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2254 totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2256 DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2258 outmsg->filter[i] = keepMetric[i];
2263 // initialize k seeds for this phase
2264 kSeeds = new double[numK*numMetrics];
2267 kNumMembers = new int[numK];
2269 // Randomly select k processors' metric vectors for the k seeds
2270 // srand((unsigned)(CmiWallTimer()*1.0e06));
2271 srand(11337); // for debugging purposes
2272 for (int k=0; k<numK; k++) {
2273 DEBUGF("Seed %d | ", k);
2274 for (int m=0; m<numMetrics; m++) {
2275 double factor = totalExecTimes[maxOffset + m] -
2276 totalExecTimes[minOffset + m];
2277 // "uniform" distribution, scaled according to the normalization
2279 // kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2280 // Random distribution.
2281 kSeeds[numMetrics*k + m] =
2282 ((rand()*1.0)/RAND_MAX)*factor;
2283 if (keepMetric[m]) {
2284 DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2286 outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2292 // broadcast statistical values to all processors for cluster discovery
2293 thisProxy.findInitialClusters(outmsg);
2298 // Called on each processor.
2299 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2301 if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2305 // Get info from stats message
2306 CkAssert(numMetrics == msg->numMetrics);
2307 for (int i=0; i<numMetrics; i++) {
2308 keepMetric[i] = msg->filter[i];
2311 // Normalize data on local processor.
2312 // **CWL** See my thesis for detailed discussion of normalization of
2313 // performance data.
2314 // **NOTE** This might change if we want to send data based on the filter
2315 // instead of all the data.
2316 CkAssert(numMetrics*4 == msg->numStats);
2317 for (int i=0; i<numMetrics; i++) {
2318 currentExecTimes[i] -= msg->stats[numMetrics + i]; // take offset
2319 // **CWL** We do not normalize the range. Entry methods that exhibit
2320 // large absolute timing variations should be allowed to contribute
2321 // more to the Euclidean distance measure!
2322 // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2325 // **NOTE** This might change if we want to send data based on the filter
2326 // instead of all the data.
2327 CkAssert(numK*numMetrics == msg->numKPos);
2328 for (int i=0; i<msg->numKPos; i++) {
2329 incKSeeds[i] = msg->kSeedsPos[i];
2332 // Decide which KSeed this processor belongs to.
2333 minDistance = calculateDistance(0);
2334 DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(),
2335 currentPhase, phaseIter, minDistance);
2337 for (int i=1; i<numK; i++) {
2338 double distance = calculateDistance(i);
2339 DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(),
2340 currentPhase, phaseIter, i, distance);
2341 if (distance < minDistance) {
2342 minDistance = distance;
2347 // Set up a reduction with the modification vector to the root (0).
2349 // The modification vector sends a negative value for each metric
2350 // for the K this processor no longer belongs to and a positive
2351 // value to the K the processor now belongs. In addition, a -1.0
2352 // is sent to the K it is leaving and a +1.0 to the K it is
2355 // The processor must still contribute a "zero returns" even if
2356 // nothing changes. This will be the basis for determine
2357 // convergence at the root.
2359 // The addtional +1 is meant for the count-change that must be
2360 // maintained for the special cases at the root when some K
2361 // may be deprived of all processor points or go from 0 to a
2362 // positive number of processors (see later comments).
2363 double *modVector = new double[numK*(numMetrics+1)];
2364 for (int i=0; i<numK; i++) {
2365 for (int j=0; j<numMetrics+1; j++) {
2366 modVector[i*(numMetrics+1) + j] = 0.0;
2369 for (int i=0; i<numMetrics; i++) {
2370 // for this initialization, only positive values need be sent.
2371 modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2373 modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2375 CkCallback cb(CkReductionTarget(KMeansBOC, updateKSeeds),
2377 contribute(numK*(numMetrics+1)*sizeof(double), modVector,
2378 CkReduction::sum_double, cb);
2379 delete [] modVector;
2382 double KMeansBOC::calculateDistance(int k) {
2384 for (int i=0; i<numMetrics; i++) {
2385 if (keepMetric[i]) {
2386 DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n",
2387 CkMyPe(), currentPhase, phaseIter, i,
2388 currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2389 ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2395 void KMeansBOC::updateKSeeds(double *modVector, int n) {
2396 CkAssert(CkMyPe() == 0);
2398 // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2401 CkAssert(numK*(numMetrics+1) == n);
2403 // A quick convergence test.
2404 bool hasChanges = false;
2405 for (int i=0; i<numK; i++) {
2406 hasChanges = hasChanges ||
2407 (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2410 findRepresentatives();
2412 int overallChange = 0;
2413 for (int i=0; i<numK; i++) {
2414 int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2416 overallChange += change;
2417 // modify the k seeds based on the modification vectors coming in
2419 // If a seed initially has no members, its contents do not matter and
2420 // is simply set to the average of the incoming vector.
2421 // If the change causes a seed to lose all its members, do nothing.
2422 // Its last-known location is kept to allow it to re-capture
2423 // membership at the next iteration rather than apply the last
2424 // changes (which snaps the point unnaturally to 0,0).
2425 // Otherwise, apply the appropriate vector changes.
2426 CkAssert((kNumMembers[i] + change >= 0) &&
2427 (kNumMembers[i] + change <= CkNumPes()));
2428 if (kNumMembers[i] == 0) {
2429 CkAssert(change > 0);
2430 for (int j=0; j<numMetrics; j++) {
2431 kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2433 } else if (kNumMembers[i] + change == 0) {
2436 for (int j=0; j<numMetrics; j++) {
2437 kSeeds[i*numMetrics + j] *= kNumMembers[i];
2438 kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2439 kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2442 kNumMembers[i] += change;
2444 DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2445 CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2448 // broadcast the new seed locations.
2449 KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2450 outmsg->numKPos = numK*numMetrics;
2451 for (int i=0; i<numK*numMetrics; i++) {
2452 outmsg->kSeedsPos[i] = kSeeds[i];
2455 thisProxy.updateSeedMembership(outmsg);
2459 // Called on all processors
2460 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2462 // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2466 // **NOTE** This might change if we want to send data based on the filter
2467 // instead of all the data.
2468 CkAssert(numK*numMetrics == msg->numKPos);
2469 for (int i=0; i<msg->numKPos; i++) {
2470 incKSeeds[i] = msg->kSeedsPos[i];
2473 // Decide which KSeed this processor belongs to.
2475 minDistance = calculateDistance(0);
2476 DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(),
2477 currentPhase, phaseIter, minDistance);
2480 for (int i=1; i<numK; i++) {
2481 double distance = calculateDistance(i);
2482 DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(),
2483 currentPhase, phaseIter, i, distance);
2484 if (distance < minDistance) {
2485 minDistance = distance;
2490 double *modVector = new double[numK*(numMetrics+1)];
2491 for (int i=0; i<numK; i++) {
2492 for (int j=0; j<numMetrics+1; j++) {
2493 modVector[i*(numMetrics+1) + j] = 0.0;
2497 if (minK != lastMinK) {
2498 for (int i=0; i<numMetrics; i++) {
2499 modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2500 modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2502 modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2503 modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2506 CkCallback cb(CkReductionTarget(KMeansBOC, updateKSeeds),
2508 contribute(numK*(numMetrics+1)*sizeof(double), modVector,
2509 CkReduction::sum_double, cb);
2510 delete [] modVector;
2513 void KMeansBOC::findRepresentatives() {
2515 // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2517 int numNonEmptyClusters = 0;
2518 for (int i=0; i<numK; i++) {
2519 if (kNumMembers[i] > 0) {
2520 numNonEmptyClusters++;
2524 int numRepresentatives = peNumKeep;
2526 // This is fairly arbitrary. Next time, choose the centers of the top
2527 // largest clusters.
2528 if (numRepresentatives < numNonEmptyClusters) {
2529 numRepresentatives = numNonEmptyClusters;
2532 int slotsRemaining = numRepresentatives;
2534 DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining,
2535 numNonEmptyClusters);
2537 // determine how many exemplars to select per cluster. Currently
2538 // hardcoded to 1. Future challenge is to decide on other numbers
2539 // or proportionality.
2541 int exemplarsPerCluster = 1;
2542 slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2544 int numCandidateOutliers = CkNumPes() -
2545 exemplarsPerCluster*numNonEmptyClusters;
2547 double *remainders = new double[numK];
2548 int *assigned = new int[numK];
2549 exemplarChoicesLeft = new int[numK];
2550 outlierChoicesLeft = new int[numK];
2552 for (int i=0; i<numK; i++) {
2555 (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2556 slotsRemaining / numCandidateOutliers;
2557 if (remainders[i] >= 0.0) {
2558 assigned[i] = (int)floor(remainders[i]);
2559 remainders[i] -= assigned[i];
2561 remainders[i] = 0.0;
2565 for (int i=0; i<numK; i++) {
2566 slotsRemaining -= assigned[i];
2568 CkAssert(slotsRemaining >= 0);
2570 // find clusters to assign the loose slots to, in order of
2571 // remainder proportion
2572 while (slotsRemaining > 0) {
2575 for (int i=0; i<numK; i++) {
2576 if (remainders[i] > max) {
2577 max = remainders[i];
2582 remainders[winner] = 0.0;
2586 // set up how many reduction cycles of min/max we need to conduct to
2587 // select the representatives.
2588 numSelectionIter = exemplarsPerCluster;
2589 for (int i=0; i<numK; i++) {
2590 if (assigned[i] > numSelectionIter) {
2591 numSelectionIter = assigned[i];
2594 DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2596 for (int i=0; i<numK; i++) {
2597 if (kNumMembers[i] > 0) {
2598 exemplarChoicesLeft[i] = exemplarsPerCluster;
2599 outlierChoicesLeft[i] = assigned[i];
2601 exemplarChoicesLeft[i] = 0;
2602 outlierChoicesLeft[i] = 0;
2604 DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2605 outlierChoicesLeft[i]);
2609 delete [] remainders;
2611 // send out first broadcast
2612 KSelectionMessage *outmsg = NULL;
2613 if (numSelectionIter > 0) {
2614 outmsg = new (numK, numK, numK) KSelectionMessage;
2615 outmsg->numKMinIDs = numK;
2616 outmsg->numKMaxIDs = numK;
2617 for (int i=0; i<numK; i++) {
2618 outmsg->minIDs[i] = -1;
2619 outmsg->maxIDs[i] = -1;
2621 thisProxy.collectDistances(outmsg);
2623 CkPrintf("Warning: No selection iteration from the start!\n");
2624 // invoke phase completion on all processors
2625 thisProxy.phaseDone();
2630 * lastMin = array of minimum champions of the last tournament
2631 * lastMax = array of maximum champions of the last tournament
2632 * lastMaxVal = array of last encountered maximum values, allows previous
2633 * minimum winners to eliminate themselves from the next
2636 * Called on all processors.
2638 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2640 // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2642 DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2643 lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2644 if ((CkMyPe() == msg->minIDs[lastMinK]) ||
2645 (CkMyPe() == msg->maxIDs[lastMinK])) {
2646 CkAssert(!selected);
2650 // build outgoing reduction structure
2651 // format = minVal | ID | maxVal | ID
2652 double *minMaxAndIDs = NULL;
2654 minMaxAndIDs = new double[numK*4];
2655 // initialize to the appropriate out-of-band values (for error checks)
2656 for (int i=0; i<numK; i++) {
2657 minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2658 minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2659 minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2660 minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2662 // If I have not won before, I put myself back into the competition
2664 DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2665 minMaxAndIDs[lastMinK*4] = minDistance;
2666 minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2667 minMaxAndIDs[lastMinK*4+2] = minDistance;
2668 minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2672 CkCallback cb(CkReductionTarget(KMeansBOC, findNextMinMax),
2674 contribute(numK*4*sizeof(double), minMaxAndIDs,
2675 minMaxReductionType, cb);
2678 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2680 // minVal | minID | maxVal | maxID
2682 // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2684 if (numSelectionIter > 0) {
2685 double *incInfo = (double*)msg->getData();
2687 KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2688 outmsg->numKMinIDs = numK;
2689 outmsg->numKMaxIDs = numK;
2691 for (int i=0; i<numK; i++) {
2692 DEBUGF("%d | %lf %d %lf %d \n", i,
2693 incInfo[i*4], (int)incInfo[i*4+1],
2694 incInfo[i*4+2], (int)incInfo[i*4+3]);
2697 for (int i=0; i<numK; i++) {
2698 if (exemplarChoicesLeft[i] > 0) {
2699 outmsg->minIDs[i] = (int)incInfo[i*4+1];
2700 exemplarChoicesLeft[i]--;
2702 outmsg->minIDs[i] = -1;
2704 if (outlierChoicesLeft[i] > 0) {
2705 outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2706 outlierChoicesLeft[i]--;
2708 outmsg->maxIDs[i] = -1;
2711 thisProxy.collectDistances(outmsg);
2714 // invoke phase completion on all processors
2715 thisProxy.phaseDone();
2720 * Completion of the K-Means clustering and data selection of one phase
2721 * of the computation.
2723 * Called on every processor.
2725 void KMeansBOC::phaseDone() {
2727 // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2729 LogPool *pool = CkpvAccess(_trace)->_logPool;
2730 CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2732 // now decide on what to do with the decision.
2735 pool->keepPhase[currentPhase] = false;
2737 // if not using phases, we're working on the whole log
2738 pool->setAllPhases(false);
2742 // **FIXME** (?) - All processors have to agree on this, or the reduction
2743 // will not be correct! The question is "is this enforcible?"
2744 if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2746 contribute(CkCallback(CkReductionTarget(TraceProjectionsBOC, kMeansDone),
2749 // reset all phase-based k-means data and decisions
2753 // invoke the next K-Means computation phase.
2755 thisProxy[CkMyPe()].getNextPhaseMetrics();
2759 void TraceProjectionsBOC::startTimeAnalysis()
2761 double startTime = 0.0;
2762 if (CkpvAccess(_trace)->_logPool->numEntries>0)
2763 startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2764 CkCallback cb(CkReductionTarget(TraceProjectionsBOC, startTimeDone), thisProxy);
2765 contribute(sizeof(double), &startTime, CkReduction::min_double, cb);
2768 void TraceProjectionsBOC::startTimeDone(double result)
2770 // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2772 if (CkpvAccess(_trace) != NULL) {
2773 CkpvAccess(_trace)->_logPool->globalStartTime = result;
2774 CkpvAccess(_trace)->_logPool->setNewStartTime();
2775 //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2777 thisProxy[CkMyPe()].startEndTimeAnalysis();
2780 void TraceProjectionsBOC::startEndTimeAnalysis()
2782 //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2784 endTime = CkpvAccess(_trace)->endTime;
2785 // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2787 CkCallback cb(CkReductionTarget(TraceProjectionsBOC, endTimeDone),
2789 contribute(sizeof(double), &endTime, CkReduction::max_double, cb);
2792 void TraceProjectionsBOC::endTimeDone(double result)
2794 //if(CkMyPe()==0) CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2796 CkAssert(CkMyPe() == 0);
2797 parModulesRemaining--;
2798 if (CkpvAccess(_trace) != NULL && CkpvAccess(_trace)->_logPool != NULL) {
2799 CkpvAccess(_trace)->_logPool->globalEndTime = result - CkpvAccess(_trace)->_logPool->globalStartTime;
2800 // CkPrintf("End time determined to be %lf us\n",
2801 // (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2803 if (parModulesRemaining == 0) {
2804 thisProxy[CkMyPe()].finalize();
2808 void TraceProjectionsBOC::kMeansDone() {
2810 if(CkMyPe()==0) CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2812 CkAssert(CkMyPe() == 0);
2813 parModulesRemaining--;
2814 CkPrintf("K-Means Analysis Time = %lf seconds\n",
2815 CmiWallTimer()-analysisStartTime);
2816 if (parModulesRemaining == 0) {
2817 thisProxy[CkMyPe()].finalize();
2823 * This version is called (on processor 0) only if flushCheck fails.
2826 void TraceProjectionsBOC::kMeansDoneFlushed() {
2827 CkAssert(CkMyPe() == 0);
2828 parModulesRemaining--;
2829 CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2830 CmiWallTimer()-analysisStartTime);
2831 if (parModulesRemaining == 0) {
2832 thisProxy[CkMyPe()].finalize();
2836 void TraceProjectionsBOC::finalize()
2838 CkAssert(CkMyPe() == 0);
2839 //CkPrintf("Total Analysis Time = %lf seconds\n",
2840 // CmiWallTimer()-analysisStartTime);
2841 thisProxy.closingTraces();
2844 // Called on every processor
2845 void TraceProjectionsBOC::closingTraces() {
2846 CkpvAccess(_trace)->closeTrace();
2848 // subtle: reduction needs to go to the PE which started CkExit()
2850 if (endPe != -1) pe = endPe;
2851 contribute(CkCallback(CkReductionTarget(TraceProjectionsBOC, closeParallelShutdown), pe, thisProxy));
2854 // The sole purpose of this reduction is to decide whether or not
2855 // Projections as a module needs to call CkContinueExit() to get other
2856 // modules to shutdown.
2857 void TraceProjectionsBOC::closeParallelShutdown(void) {
2858 CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2859 if (!CkpvAccess(_trace)->converseExit) {
2864 * Registration and definition of the Outlier Reduction callback.
2865 * Format: Sum | Min | Max | Sum of Squares
2867 CkReductionMsg *outlierReduction(int nMsgs,
2868 CkReductionMsg **msgs) {
2874 // nothing to do, just pass it on
2875 return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2879 numBytes = msgs[0]->getSize();
2881 if (numBytes%sizeof(double) != 0) {
2882 CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2884 if ((numBytes/sizeof(double))%4 != 0) {
2885 CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2887 numMetrics = (numBytes/sizeof(double))/4;
2888 ret = new double[numMetrics*4];
2890 // copy the first message data into the return structure first
2891 for (int i=0; i<numMetrics*4; i++) {
2892 ret[i] = ((double *)msgs[0]->getData())[i];
2895 // Sum | Min | Max | Sum of Squares
2896 for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2897 for (int i=0; i<numMetrics; i++) {
2899 ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2901 ret[numMetrics + i] =
2902 (ret[numMetrics + i] <
2903 ((double *)msgs[msgIdx]->getData())[numMetrics + i])
2904 ? ret[numMetrics + i] :
2905 ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2907 ret[2*numMetrics + i] =
2908 (ret[2*numMetrics + i] >
2909 ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2910 ? ret[2*numMetrics + i] :
2911 ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2912 // Sum of Squares (squaring already done at leaf)
2913 ret[3*numMetrics + i] +=
2914 ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2919 /* apparently, we do not delete the incoming messages */
2920 return CkReductionMsg::buildNew(numBytes,ret);
2924 * The only reason we have a user-defined reduction is to support
2925 * identification of the "winning" processors as well as to handle
2926 * both the min and the max of each "tournament". A simple min/max
2927 * discovery cannot handle ties.
2929 CkReductionMsg *minMaxReduction(int nMsgs,
2930 CkReductionMsg **msgs) {
2931 CkAssert(nMsgs > 0);
2933 int numBytes = msgs[0]->getSize();
2934 CkAssert(numBytes%sizeof(double) == 0);
2935 int numK = (numBytes/sizeof(double))/4;
2937 double *ret = new double[numK*4];
2938 // fill with out-of-band values
2939 for (int i=0; i<numK; i++) {
2946 // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2947 for (int i=0; i<nMsgs; i++) {
2948 double *temp = (double *)msgs[i]->getData();
2949 for (int j=0; j<numK; j++) {
2950 // no previous valid min
2951 if (ret[j*4+1] < 0) {
2952 // fill it in only if the incoming min is valid
2953 if (temp[j*4+1] >= 0) {
2954 ret[j*4] = temp[j*4]; // fill min value
2955 ret[j*4+1] = temp[j*4+1]; // fill ID
2958 // find Min, only if incoming min is valid
2959 if (temp[j*4+1] >= 0) {
2960 if (temp[j*4] < ret[j*4]) {
2961 ret[j*4] = temp[j*4]; // replace min value
2962 ret[j*4+1] = temp[j*4+1]; // replace ID
2966 // no previous valid max
2967 if (ret[j*4+3] < 0) {
2968 // fill only if the incoming max is valid
2969 if (temp[j*4+3] >= 0) {
2970 ret[j*4+2] = temp[j*4+2]; // fill max value
2971 ret[j*4+3] = temp[j*4+3]; // fill ID
2974 // find Max, only if incoming max is valid
2975 if (temp[j*4+3] >= 0) {
2976 if (temp[j*4+2] > ret[j*4+2]) {
2977 ret[j*4+2] = temp[j*4+2]; // replace max value
2978 ret[j*4+3] = temp[j*4+3]; // replace ID
2984 CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2989 #include "TraceProjections.def.h"
2990 #endif //PROJ_ANALYSIS