Link conv-cpm as C++
[charm.git] / src / ck-perf / trace-projections.C
blob92ce55bbc0ff913e714d20e474b7f7cc227cf048
1 /**
2  * \addtogroup CkPerf
3 */
4 /*@{*/
6 #include <string.h>
8 #include "charm++.h"
9 #include "trace-projections.h"
10 #include "trace-projectionsBOC.h"
11 #include "TopoManager.h"
13 #if DEBUG_PROJ
14 #define DEBUGF(...) CkPrintf(__VA_ARGS__)
15 #else
16 #define DEBUGF(...)
17 #endif
18 #define DEBUGN(...)  // easy way to selectively disable DEBUGs
20 #define DefaultLogBufSize      1000000
21 #define  DEBUG_KMEANS 0
22 // **CW** Simple delta encoding implementation
23 // delta encoding is on by default. It may be turned off later in
24 // the runtime.
26 bool checknested=false;         // check illegal nested begin/end execute
28 #ifdef PROJ_ANALYSIS
29 // BOC operations readonlys
30 CkGroupID traceProjectionsGID;
31 CkGroupID kMeansGID;
33 // New reduction type for Outlier Analysis purposes. This is allowed to be
34 // a global variable according to the Charm++ manual.
35 CkReductionMsg *outlierReduction(int nMsgs,
36                                  CkReductionMsg **msgs);
37 CkReductionMsg *minMaxReduction(int nMsgs,
38                                 CkReductionMsg **msgs);
39 CkReduction::reducerType outlierReductionType;
40 CkReduction::reducerType minMaxReductionType;
41 #endif // PROJ_ANALYSIS
43 CkpvStaticDeclare(TraceProjections*, _trace);
44 CtvExtern(int,curThreadEvent);
46 CkpvDeclare(CmiInt8, CtrLogBufSize);
48 typedef CkVec<char *>  usrEventVec;
49 CkpvStaticDeclare(usrEventVec, usrEventlist);
50 class UsrEvent {
51 public:
52   int e;
53   char *str;
54   UsrEvent(int _e, char* _s): e(_e),str(_s) {}
56 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrEvents);
57 /*User Stat Vector Mirroring usrEvents. Holds all the stat names.
58   Reuses UsrEvent Class since all that is needed is an int and a string to store name*/
59 CkpvStaticDeclare(CkVec<UsrEvent *>*, usrStats);
60 // When tracing is disabled, these are defined as empty static inlines
61 // in the header, to minimize overhead
62 #if CMK_TRACE_ENABLED
63 /// Disable the outputting of the trace logs
64 void disableTraceLogOutput()
65
66   CkpvAccess(_trace)->setWriteData(false);
69 /// Enable the outputting of the trace logs
70 void enableTraceLogOutput()
72   CkpvAccess(_trace)->setWriteData(true);
74 #endif
76 #if ! CMK_TRACE_ENABLED
77 static int warned=0;
78 #define OPTIMIZED_VERSION       \
79         if (!warned) { warned=1;        \
80         CmiPrintf("\n\n!!!! Warning: traceUserEvent not available in optimized version!!!!\n\n\n"); }
81 #else
82 #define OPTIMIZED_VERSION /*empty*/
83 #endif // CMK_TRACE_ENABLED
86 On T3E, we need to have file number control by open/close files only when needed.
88 #if CMK_TRACE_LOGFILE_NUM_CONTROL
89   #define OPEN_LOG openLog("a");
90   #define CLOSE_LOG closeLog();
91 #else
92   #define OPEN_LOG
93   #define CLOSE_LOG
94 #endif //CMK_TRACE_LOGFILE_NUM_CONTROL
96 /**
97   For each TraceFoo module, _createTraceFoo() must be defined.
98   This function is called in _createTraces() generated in moduleInit.C
100 void _createTraceprojections(char **argv)
102   DEBUGF("%d createTraceProjections\n", CkMyPe());
103   CkpvInitialize(CkVec<char *>, usrEventlist);
104   CkpvInitialize(CkVec<UsrEvent *>*, usrEvents);
105   CkpvInitialize(CkVec<UsrEvent *>*, usrStats);
106   CkpvAccess(usrEvents) = new CkVec<UsrEvent *>();
107   CkpvAccess(usrStats) = new CkVec<UsrEvent *>();
108 #if CMK_BIGSIM_CHARM
109   // CthRegister does not call the constructor
110 //  CkpvAccess(usrEvents) = CkVec<UsrEvent *>();
111 #endif //CMK_BIGSIM_CHARM
112   CkpvInitialize(TraceProjections*, _trace);
113   CkpvAccess(_trace) = new  TraceProjections(argv);
114   CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
115   if (CkMyPe()==0) CkPrintf("Charm++: Tracemode Projections enabled.\n");
118 /* ****** CW TEMPORARY LOCATION ***** Support for thread listeners */
120 struct TraceThreadListener {
121   struct CthThreadListener base;
122   int event;
123   int msgType;
124   int ep;
125   int srcPe;
126   int ml;
127   CmiObjId idx;
130 static void traceThreadListener_suspend(struct CthThreadListener *l)
132   TraceThreadListener *a=(TraceThreadListener *)l;
133   /* here, we activate the appropriate trace codes for the appropriate
134      registered modules */
135   traceSuspend();
138 static void traceThreadListener_resume(struct CthThreadListener *l)
140   TraceThreadListener *a=(TraceThreadListener *)l;
141   /* here, we activate the appropriate trace codes for the appropriate
142      registered modules */
143   _TRACE_BEGIN_EXECUTE_DETAILED(a->event,a->msgType,a->ep,a->srcPe,a->ml,
144                                 CthGetThreadID(a->base.thread), NULL);
145   a->event=-1;
146   a->srcPe=CkMyPe(); /* potential lie to migrated threads */
147   a->ml=0;
150 static void traceThreadListener_free(struct CthThreadListener *l)
152   TraceThreadListener *a=(TraceThreadListener *)l;
153   delete a;
156 void TraceProjections::traceAddThreadListeners(CthThread tid, envelope *e)
158 #if CMK_TRACE_ENABLED
159   /* strip essential information from the envelope */
160   TraceThreadListener *a= new TraceThreadListener;
161   
162   a->base.suspend=traceThreadListener_suspend;
163   a->base.resume=traceThreadListener_resume;
164   a->base.free=traceThreadListener_free;
165   a->event=e->getEvent();
166   a->msgType=e->getMsgtype();
167   a->ep=e->getEpIdx();
168   a->srcPe=e->getSrcPe();
169   a->ml=e->getTotalsize();
171   CthAddListener(tid, (CthThreadListener *)a);
172 #endif
175 void LogPool::openLog(const char *mode)
177 #if CMK_USE_ZLIB
178   if(compressed) {
179     do {
180       zfp = gzopen(fname, mode);
181     } while (!zfp && (errno == EINTR || errno == EMFILE));
182     if(!zfp) CmiAbort("Cannot open Projections Compressed Non Delta Trace File for writing...\n");
183   } else {
184     do {
185       fp = fopen(fname, mode);
186     } while (!fp && (errno == EINTR || errno == EMFILE));
187     if (!fp) {
188       CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
189       CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
190     }
191   }
192 #else
193   do {
194     fp = fopen(fname, mode);
195   } while (!fp && (errno == EINTR || errno == EMFILE));
196   if (!fp) {
197     CkPrintf("[%d] Attempting to open file [%s]\n",CkMyPe(),fname);
198     CmiAbort("Cannot open Projections Non Delta Trace File for writing...\n");
199   }
200 #endif
203 void LogPool::closeLog(void)
205 #if CMK_USE_ZLIB
206   if(compressed) {
207     gzclose(zfp);
208     return;
209   }
210 #endif
211 #if !defined(_WIN32)
212   fsync(fileno(fp)); 
213 #endif
214   fclose(fp);
217 LogPool::LogPool(char *pgm) {
218   pool = new LogEntry[CkpvAccess(CtrLogBufSize)];
219   // defaults to writing data (no outlier changes)
220   writeSummaryFiles = false;
221   writeData = true;
222   numEntries = 0;
223   lastCreationEvent = -1;
224   // **CW** for simple delta encoding
225   prevTime = 0.0;
226   timeErr = 0.0;
227   globalStartTime = 0.0;
228   globalEndTime = 0.0;
229   headerWritten = false;
230   numPhases = 0;
231   hasFlushed = false;
233   keepPhase = NULL;
235   fileCreated = false;
236   poolSize = CkpvAccess(CtrLogBufSize);
237   pgmname = new char[strlen(pgm)+1];
238   strcpy(pgmname, pgm);
240   //statistic init
241   statisLastProcessTimer = 0;
242   statisLastIdleTimer = 0;
243   statisLastPackTimer = 0;
244   statisLastUnpackTimer = 0;
245   statisTotalExecutionTime  = 0;
246   statisTotalIdleTime = 0;
247   statisTotalPackTime = 0;
248   statisTotalUnpackTime = 0;
249   statisTotalCreationMsgs = 0;
250   statisTotalCreationBytes = 0;
251   statisTotalMCastMsgs = 0;
252   statisTotalMCastBytes = 0;
253   statisTotalEnqueueMsgs = 0;
254   statisTotalDequeueMsgs = 0;
255   statisTotalRecvMsgs = 0;
256   statisTotalRecvBytes = 0;
257   statisTotalMemAlloc = 0;
258   statisTotalMemFree = 0;
262 void LogPool::createFile(const char *fix)
264   if (fileCreated) {
265     return;
266   }
267   
268   if(CmiNumPartitions() > 1) {
269     CmiMkdir(CkpvAccess(partitionRoot));
270   }
272   char* filenameLastPart = strrchr(pgmname, PATHSEP) + 1; // Last occurrence of path separator
273   char *pathPlusFilePrefix = new char[1024];
275   if(nSubdirs > 0){
276     int sd = CkMyPe() % nSubdirs;
277     char *subdir = new char[1024];
278     sprintf(subdir, "%s.projdir.%d", pgmname, sd);
279     CmiMkdir(subdir);
280     sprintf(pathPlusFilePrefix, "%s%c%s%s", subdir, PATHSEP, filenameLastPart, fix);
281     delete[] subdir;
282   } else {
283     sprintf(pathPlusFilePrefix, "%s%s", pgmname, fix);
284   }
286   char pestr[10];
287   sprintf(pestr, "%d", CkMyPe());
288 #if CMK_USE_ZLIB
289   int len;
290   if(compressed)
291     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+strlen(".gz")+3;
292   else
293     len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
294 #else
295   int len = strlen(pathPlusFilePrefix)+strlen(".logold")+strlen(pestr)+3;
296 #endif
298   fname = new char[len];
299 #if CMK_USE_ZLIB
300   if(compressed) {
301     sprintf(fname, "%s.%s.log.gz", pathPlusFilePrefix,pestr);
302   }
303   else {
304     sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
305   }
306 #else
307   sprintf(fname, "%s.%s.log", pathPlusFilePrefix, pestr);
308 #endif
309   fileCreated = true;
310   delete[] pathPlusFilePrefix;
311   openLog("w");
312   CLOSE_LOG 
315 void LogPool::createSts(const char *fix)
317   CkAssert(CkMyPe() == 0);
318   if(CmiNumPartitions() > 1) {
319     CmiMkdir(CkpvAccess(partitionRoot));
320   }
322   // create the sts file
323   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(fix)+strlen(".sts")+2];
324   sprintf(fname, "%s%s.sts", CkpvAccess(traceRoot), fix);
325   do
326     {
327       stsfp = fopen(fname, "w");
328     } while (!stsfp && (errno == EINTR || errno == EMFILE));
329   if(stsfp==0){
330     CmiPrintf("Cannot open projections sts file for writing due to %s\n", strerror(errno));
331     CmiAbort("Error!!\n");
332   }
333   delete[] fname;
336 void LogPool::createRC()
338   // create the projections rc file.
339   fname = 
340     new char[strlen(CkpvAccess(traceRoot))+strlen(".projrc")+1];
341   sprintf(fname, "%s.projrc", CkpvAccess(traceRoot));
342   do {
343     rcfp = fopen(fname, "w");
344   } while (!rcfp && (errno == EINTR || errno == EMFILE));
345   if (rcfp==0) {
346     CmiAbort("Cannot open projections configuration file for writing.\n");
347   }
348   delete[] fname;
351 LogPool::~LogPool() 
353   if (writeData) {
354       if(writeSummaryFiles)
355           writeStatis();
356     writeLog();
357 #if !CMK_TRACE_LOGFILE_NUM_CONTROL
358     closeLog();
359 #endif
360   }
362 #if CMK_BIGSIM_CHARM
363   extern int correctTimeLog;
364   if (correctTimeLog) {
365     createFile("-bg");
366     if (CkMyPe() == 0) {
367       createSts("-bg");
368     }
369     writeHeader();
370     if (CkMyPe() == 0) writeSts(NULL);
371     postProcessLog();
372   }
373 #endif
375   delete[] pool;
376   delete [] fname;
379 void LogPool::writeHeader()
381   if (headerWritten) return;
382   headerWritten = true;
383   if(!binary) {
384 #if CMK_USE_ZLIB
385     if(compressed) {
386       gzprintf(zfp, "PROJECTIONS-RECORD %d\n", numEntries);
387     } 
388     else /* else clause is below... */
389 #endif
390     /*... may hang over from else above */ {
391       fprintf(fp, "PROJECTIONS-RECORD %d\n", numEntries);
392     }
393   }
394   else { // binary
395     fwrite(&numEntries,sizeof(numEntries),1,fp);
396   }
399 void LogPool::writeLog(void)
401   createFile();
402   OPEN_LOG
403   writeHeader();
404   write(0);
405   CLOSE_LOG
408 void LogPool::write(int writedelta) 
410   // **CW** Simple delta encoding implementation
411   // prevTime has to be maintained as an object variable because
412   // LogPool::write may be called several times depending on the
413   // +logsize value.
414   PUP::er *p = NULL;
415   if (binary) {
416     p = new PUP::toDisk(writedelta?deltafp:fp);
417   }
418 #if CMK_USE_ZLIB
419   else if (compressed) {
420     p = new toProjectionsGZFile(writedelta?deltazfp:zfp);
421   }
422 #endif
423   else {
424     p = new toProjectionsFile(writedelta?deltafp:fp);
425   }
426   CmiAssert(p);
427   int curPhase = 0;
428   // **FIXME** - Should probably consider a more sophisticated bounds-based
429   //   approach for selective writing instead of making multiple if-checks
430   //   for every single event.
431   for(UInt i=0; i<numEntries; i++) {
432     if (!writedelta) {
433       if (keepPhase == NULL) {
434         // default case, when no phase selection is required.
435         pool[i].pup(*p);
436       } else {
437         // **FIXME** Might be a good idea to create a "filler" event block for
438         //   all the events taken out by phase filtering.
439         if (pool[i].type == END_PHASE) {
440           // always write phase markers
441           pool[i].pup(*p);
442           curPhase++;
443         } else if (pool[i].type == BEGIN_COMPUTATION ||
444                    pool[i].type == END_COMPUTATION) {
445           // always write BEGIN and END COMPUTATION markers
446           pool[i].pup(*p);
447         } else if (keepPhase[curPhase]) {
448           pool[i].pup(*p);
449         }
450       }
451     }
452     else {      // delta
453       // **FIXME** Implement phase-selective writing for delta logs
454       //   eventually
455       double time = pool[i].time;
456       if (pool[i].type != BEGIN_COMPUTATION && pool[i].type != END_COMPUTATION)
457       {
458         double timeDiff = (time-prevTime)*1.0e6;
459         UInt intTimeDiff = (UInt)timeDiff;
460         timeErr += timeDiff - intTimeDiff; /* timeErr is never >= 2.0 */
461         if (timeErr > 1.0) {
462           timeErr -= 1.0;
463           intTimeDiff++;
464         }
465         pool[i].time = intTimeDiff/1.0e6;
466       }
467       pool[i].pup(*p);
468       pool[i].time = time;      // restore time value
469       prevTime = time;
470     }
471   }
472   delete p;
473   delete [] keepPhase;
476 void LogPool::writeSts(void)
478   // for whining compilers
479   int i;
480   // generate an automatic unique ID for each log
481   fprintf(stsfp, "PROJECTIONS_ID %s\n", "");
482   fprintf(stsfp, "VERSION %s\n", PROJECTION_VERSION);
483   fprintf(stsfp, "TOTAL_PHASES %d\n", numPhases);
484 #if CMK_HAS_COUNTER_PAPI
485   fprintf(stsfp, "TOTAL_PAPI_EVENTS %d\n", NUMPAPIEVENTS);
486   // for now, use i, next time use papiEvents[i].
487   // **CW** papi event names is a hack.
488   char eventName[PAPI_MAX_STR_LEN];
489   for (i=0;i<NUMPAPIEVENTS;i++) {
490     PAPI_event_code_to_name(papiEvents[i], eventName);
491     fprintf(stsfp, "PAPI_EVENT %d %s\n", i, eventName);
492   }
493 #endif
494   // Adds common elements to sts file such as num stats, num chares etc.
495   traceWriteSTS(stsfp,CkpvAccess(usrEvents)->length());
496   fprintf(stsfp, "TOTAL_STATS %d\n", (int) CkpvAccess(usrStats)->length());
497   for(i=0;i<CkpvAccess(usrEvents)->length();i++){
498     fprintf(stsfp, "EVENT %d %s\n", (*CkpvAccess(usrEvents))[i]->e, (*CkpvAccess(usrEvents))[i]->str);
499   }     
500   //Mirrors Event printing. Prints every stat name and number
501   for(i=0;i<CkpvAccess(usrStats)->length();i++){
502     fprintf(stsfp, "STAT %d %s\n", (*CkpvAccess(usrStats))[i]->e, (*CkpvAccess(usrStats))[i]->str);
503   }
506 void LogPool::writeSts(TraceProjections *traceProj){
507   writeSts();
508   fprintf(stsfp, "END\n");
509   fclose(stsfp);
512 void LogPool::writeRC(void)
514     //CkPrintf("write RC is being executed\n");
515 #ifdef PROJ_ANALYSIS  
516     CkAssert(CkMyPe() == 0);
517     fprintf(rcfp,"RC_GLOBAL_START_TIME %lld\n",
518             (CMK_PUP_LONG_LONG)(1.0e6*globalStartTime));
519     fprintf(rcfp,"RC_GLOBAL_END_TIME   %lld\n",
520             (CMK_PUP_LONG_LONG)(1.0e6*globalEndTime));
521     /* //Yanhua comment it because isOutlierAutomatic is not a variable in trace
522     if (CkpvAccess(_trace)->isOutlierAutomatic()) {
523       fprintf(rcfp,"RC_OUTLIER_FILTERED true\n");
524     } else {
525       fprintf(rcfp,"RC_OUTLIER_FILTERED false\n");
526     }
527     */
528 #endif //PROJ_ANALYSIS
529   fclose(rcfp);
532 void LogPool::writeStatis(void)
534     
535    // create the statis file
536   char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".statis")+10];
537   sprintf(fname, "%s.%d.statis", CkpvAccess(traceRoot), CkMyPe());
538   do
539   {
540       statisfp = fopen(fname, "w");
541   } while (!statisfp && (errno == EINTR || errno == EMFILE));
542   if(statisfp==0){
543       CmiPrintf("Cannot open projections statistic file for writing due to %s\n", strerror(errno));
544       CmiAbort("Error!!\n");
545   }
546   delete[] fname;
547   double totaltime = endComputationTime - beginComputationTime;
548   fprintf(statisfp, "time(sec) percentage\n");
549   fprintf(statisfp, "Time:    \t%f\n", totaltime);
550   fprintf(statisfp, "Idle :\t%f\t %.1f\n", statisTotalIdleTime, statisTotalIdleTime/totaltime * 100); 
551   fprintf(statisfp, "Overhead:    \t%f\t %.1f\n", totaltime - statisTotalIdleTime - statisTotalExecutionTime , (totaltime - statisTotalIdleTime - statisTotalExecutionTime)/totaltime * 100); 
552   fprintf(statisfp, "Exeuction:\t%f\t %.1f\n", statisTotalExecutionTime, statisTotalExecutionTime/totaltime*100); 
553   fprintf(statisfp, "Pack:     \t%f\t %.2f\n", statisTotalPackTime, statisTotalPackTime/totaltime*100); 
554   fprintf(statisfp, "Unpack:   \t%f\t %.2f\n", statisTotalUnpackTime, statisTotalUnpackTime/totaltime*100); 
555   fprintf(statisfp, "Creation Msgs Numbers, Bytes, Avg:   \t%lld\t %lld\t %lld \n", statisTotalCreationMsgs, statisTotalCreationBytes, (statisTotalCreationMsgs>0)?statisTotalCreationBytes/statisTotalCreationMsgs:statisTotalCreationMsgs); 
556   fprintf(statisfp, "Multicast Msgs Numbers, Bytes, Avg:   \t%lld\t %lld\t %lld \n", statisTotalMCastMsgs, statisTotalMCastBytes, (statisTotalMCastMsgs>0)?statisTotalMCastBytes/statisTotalMCastMsgs:statisTotalMCastMsgs); 
557   fprintf(statisfp, "Received Msgs Numbers, Bytes, Avg:   \t%lld\t %lld\t %lld \n", statisTotalRecvMsgs, statisTotalRecvBytes, (statisTotalRecvMsgs>0)?statisTotalRecvBytes/statisTotalRecvMsgs:statisTotalRecvMsgs); 
558   fclose(statisfp);
561 #if CMK_BIGSIM_CHARM
562 static void updateProjLog(void *data, double t, double recvT, void *ptr)
564   LogEntry *log = (LogEntry *)data;
565   FILE *fp = *(FILE **)ptr;
566   log->time = t;
567   log->recvTime = recvT<0.0?0:recvT;
568 //  log->write(fp);
569   toProjectionsFile p(fp);
570   log->pup(p);
572 #endif
574 // flush log entries to disk
575 void LogPool::flushLogBuffer()
577   if (numEntries) {
578     double writeTime = TraceTimer();
579     writeLog();
580     hasFlushed = true;
581     numEntries = 0;
582     lastCreationEvent = -1;
583     new (&pool[numEntries++]) LogEntry(writeTime, BEGIN_INTERRUPT);
584     new (&pool[numEntries++]) LogEntry(TraceTimer(), END_INTERRUPT);
585     //CkPrintf("Warning: Projections log flushed to disk on PE %d.\n", CkMyPe());
586     if (!traceProjectionsGID.isZero()) {    // report flushing events to PE 0
587       CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
588       bocProxy[0].flush_warning(CkMyPe());
589     }
590   }
593 void LogPool::add(UChar type, UShort mIdx, UShort eIdx,
594                   double time, int event, int pe, int ml, CmiObjId *id, 
595                   double recvT, double cpuT, int numPe, double statVal)
597     switch(type)
598     {
599         case BEGIN_COMPUTATION:
600             beginComputationTime = time;
601             break;
602         case END_COMPUTATION:
603             endComputationTime = time;
604             break;
605         case CREATION:
606             statisTotalCreationMsgs++;
607             statisTotalCreationBytes += ml;
608             break;
609         case CREATION_MULTICAST:
610             statisTotalMCastMsgs++;
611             statisTotalMCastBytes += ml;
612             break;
613         case ENQUEUE:
614             statisTotalEnqueueMsgs++;
615             break;
616         case DEQUEUE:
617             statisTotalDequeueMsgs++;
618             break;
619         case MESSAGE_RECV:
620             statisTotalRecvMsgs++;
621             statisTotalRecvBytes += ml;
622             break;
623         case MEMORY_MALLOC:
624             statisTotalMemAlloc++;
625             break;
626         case MEMORY_FREE:
627             statisTotalMemFree++;
628             break;
629         case BEGIN_PROCESSING:
630             statisLastProcessTimer = time;
631             break;
632         case BEGIN_UNPACK:
633             statisLastUnpackTimer = time;
634             break;
635         case BEGIN_PACK:
636             statisLastPackTimer = time;
637             break;
638         case BEGIN_IDLE:
639             statisLastIdleTimer = time;
640             break;
641         case END_PROCESSING:
642             statisTotalExecutionTime += (time - statisLastProcessTimer);
643             break;
644         case END_UNPACK:
645             statisTotalUnpackTime += (time - statisLastUnpackTimer);
646             break;
647         case END_PACK:
648             statisTotalPackTime += (time - statisLastPackTimer);
649             break;
650         case END_IDLE:
651             statisTotalIdleTime += (time - statisLastIdleTimer);
652             break;
653         default:
654             break;
655     }
657   if (type == CREATION ||
658       type == CREATION_MULTICAST ||
659       type == CREATION_BCAST) {
660     lastCreationEvent = numEntries;
661   }
662   new (&pool[numEntries++])
663     LogEntry(time, type, mIdx, eIdx, event, pe, ml, id, recvT, cpuT, numPe, statVal);
664   if ((type == END_PHASE) || (type == END_COMPUTATION)) {
665     numPhases++;
666   }
667   if(poolSize==numEntries) {
668     flushLogBuffer();
669 #if CMK_BIGSIM_CHARM
670     extern int correctTimeLog;
671     if (correctTimeLog) CmiAbort("I/O interrupt!\n");
672 #endif
673   }
674 #if CMK_BIGSIM_CHARM
675   switch (type) {
676     case BEGIN_PROCESSING:
677       pool[numEntries-1].recvTime = BgGetRecvTime();
678     case END_PROCESSING:
679     case BEGIN_COMPUTATION:
680     case END_COMPUTATION:
681     case CREATION:
682     case BEGIN_PACK:
683     case END_PACK:
684     case BEGIN_UNPACK:
685     case END_UNPACK:
686     case USER_EVENT_PAIR:
687       bgAddProjEvent(&pool[numEntries-1], numEntries-1, time, updateProjLog, &fp, BG_EVENT_PROJ);
688   }
689 #endif
692 void LogPool::add(UChar type,double time,UShort funcID,int lineNum,char *fileName){
693 #ifndef CMK_BIGSIM_CHARM
694   if (type == CREATION ||
695       type == CREATION_MULTICAST ||
696       type == CREATION_BCAST) {
697     lastCreationEvent = numEntries;
698   }
699   new (&pool[numEntries++])
700         LogEntry(time,type,funcID,lineNum,fileName);
701   if(poolSize == numEntries){
702     flushLogBuffer();
703   }
704 #endif  
707 void LogPool::addUserBracketEventNestedID(unsigned char type, double time,
708                                           UShort mIdx, int event, int nestedID) {
709   new (&pool[numEntries++])
710   LogEntry(time, type, mIdx, 0, event, CkMyPe(), 0, 0, 0, 0, 0, 0, nestedID);
711   if(poolSize == numEntries){
712     flushLogBuffer();
713   }
716   
717 void LogPool::addMemoryUsage(unsigned char type,double time,double memUsage){
718 #ifndef CMK_BIGSIM_CHARM
719   if (type == CREATION ||
720       type == CREATION_MULTICAST ||
721       type == CREATION_BCAST) {
722     lastCreationEvent = numEntries;
723   }
724   new (&pool[numEntries++])
725         LogEntry(type,time,memUsage);
726   if(poolSize == numEntries){
727     flushLogBuffer();
728   }
729 #endif  
730         
731 }  
733 void LogPool::addUserSupplied(int data){
734         // add an event
735         add(USER_SUPPLIED, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
737         // set the user supplied value for the previously created event 
738         pool[numEntries-1].setUserSuppliedData(data);
742 void LogPool::addUserSuppliedNote(char *note){
743         // add an event
744         add(USER_SUPPLIED_NOTE, 0, 0, TraceTimer(), -1, -1, 0, 0, 0, 0, 0 );
746         // set the user supplied note for the previously created event 
747         pool[numEntries-1].setUserSuppliedNote(note);
750 void LogPool::addUserSuppliedBracketedNote(char *note, int eventID, double bt, double et){
751   //CkPrintf("LogPool::addUserSuppliedBracketedNote eventID=%d\n", eventID);
752 #ifndef CMK_BIGSIM_CHARM
753 #if MPI_TRACE_MACHINE_HACK
754   //This part of code is used  to combine the contiguous
755   //MPI_Test and MPI_Iprobe events to reduce the number of
756   //entries
757 #define MPI_TEST_EVENT_ID 60
758 #define MPI_IPROBE_EVENT_ID 70 
759   int lastEvent = pool[numEntries-1].event;
760   if((eventID==MPI_TEST_EVENT_ID || eventID==MPI_IPROBE_EVENT_ID) && (eventID==lastEvent)){
761     //just replace the endtime of last event
762     //CkPrintf("addUserSuppliedBracketNote: for event %d\n", lastEvent);
763     pool[numEntries].endTime = et;
764   }else{
765     new (&pool[numEntries++])
766       LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
767   }
768 #else
769   new (&pool[numEntries++])
770     LogEntry(bt, et, USER_SUPPLIED_BRACKETED_NOTE, note, eventID);
771 #endif
772   if(poolSize == numEntries){
773     flushLogBuffer();
774   }
775 #endif  
779 /* **CW** Not sure if this is the right thing to do. Feels more like
780    a hack than a solution to Sameer's request to add the destination
781    processor information to multicasts and broadcasts.
783    In the unlikely event this method is used for Broadcasts as well,
784    pelist == NULL will be used to indicate a global broadcast with 
785    num PEs.
787 void LogPool::addCreationMulticast(UShort mIdx, UShort eIdx, double time,
788                                    int event, int pe, int ml, CmiObjId *id,
789                                    double recvT, int numPe, int *pelist)
791   lastCreationEvent = numEntries;
792   new (&pool[numEntries++])
793     LogEntry(time, mIdx, eIdx, event, pe, ml, id, recvT, numPe, pelist);
794   if(poolSize==numEntries) {
795     flushLogBuffer();
796   }
799 void LogPool::postProcessLog()
801 #if CMK_BIGSIM_CHARM
802   bgUpdateProj(1);   // event type
803 #endif
806 void LogPool::modLastEntryTimestamp(double ts)
808   pool[numEntries-1].time = ts;
809   //pool[numEntries-1].cputime = ts;
812 // /** Constructor for a multicast log entry */
813 // 
814 //  THIS WAS MOVED TO trace-projections.h with the other constructors
815 // 
816 // LogEntry::LogEntry(double tm, unsigned short m, unsigned short e, int ev, int p,
817 //           int ml, CmiObjId *d, double rt, int numPe, int *pelist) 
818 // {
819 //     type = CREATION_MULTICAST; mIdx = m; eIdx = e; event = ev; pe = p; time = tm; msglen = ml;
820 //     if (d) id = *d; else {id.id[0]=id.id[1]=id.id[2]=id.id[3]=-1; };
821 //     recvTime = rt; 
822 //     numpes = numPe;
823 //     userSuppliedNote = NULL;
824 //     if (pelist != NULL) {
825 //      pes = new int[numPe];
826 //      for (int i=0; i<numPe; i++) {
827 //        pes[i] = pelist[i];
828 //      }
829 //     } else {
830 //      pes= NULL;
831 //     }
832 // }
834 //void LogEntry::addPapi(LONG_LONG_PAPI *papiVals)
836 //#if CMK_HAS_COUNTER_PAPI
837 //   memcpy(papiValues, papiVals, sizeof(LONG_LONG_PAPI)*NUMPAPIEVENTS);
838 //#endif
843 void LogEntry::pup(PUP::er &p)
845   int i;
846   CMK_TYPEDEF_UINT8 itime, iEndTime, irecvtime, icputime;
847   char ret = '\n';
849   p|type;
850   if (p.isPacking()) itime = (CMK_TYPEDEF_UINT8)(1.0e6*time);
851   if (p.isPacking()) iEndTime = (CMK_TYPEDEF_UINT8)(1.0e6*endTime);
853   switch (type) {
854     case USER_EVENT:
855     case USER_EVENT_PAIR:
856     case BEGIN_USER_EVENT_PAIR:
857     case END_USER_EVENT_PAIR:
858       p|mIdx; p|itime; p|event; p|pe; p|nestedID;
859       break;
860     case BEGIN_IDLE:
861     case END_IDLE:
862     case BEGIN_PACK:
863     case END_PACK:
864     case BEGIN_UNPACK:
865     case END_UNPACK:
866       p|itime; p|pe; 
867       break;
868     case BEGIN_PROCESSING:
869       if (p.isPacking()) {
870         irecvtime = (CMK_TYPEDEF_UINT8)(recvTime==-1?-1:1.0e6*recvTime);
871         icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
872       }
873       p|mIdx; p|eIdx; p|itime; p|event; p|pe; 
874       p|msglen; p|irecvtime;
875       { // This brace is so that ndims can be declared inside a switch
876         const int ndims = _chareTable[_entryTable[eIdx]->chareIdx]->ndims;
877         // Should only be true if the chare is part of an array, otherwise ndims should be -1
878         if (ndims >= 1) {
879           if (ndims >= 4) {
880             short int* idShorts = (short int*)&(id.id);
881             for (int i = 0; i < ndims; i++)
882               p | idShorts[i];
883           }
884           else {
885             for (int i = 0; i < ndims; i++)
886               p | id.id[i];
887           }
888         }
889         else {
890           p|id.id[0]; p|id.id[1]; p|id.id[2]; p|id.id[3];
891         }
892       }
893       p|icputime;
894 #if CMK_HAS_COUNTER_PAPI
895       //p|numPapiEvents;
896       for (i=0; i<NUMPAPIEVENTS; i++) {
897         // not yet!!!
898         //      p|papiIDs[i]; 
899         p|papiValues[i];
900         
901       }
902 #else
903       //p|numPapiEvents;     // non papi version has value 0
904 #endif
905       if (p.isUnpacking()) {
906         recvTime = irecvtime/1.0e6;
907         cputime = icputime/1.0e6;
908       }
909       break;
910     case END_PROCESSING:
911       if (p.isPacking()) icputime = (CMK_TYPEDEF_UINT8)(1.0e6*cputime);
912       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen; p|icputime;
913 #if CMK_HAS_COUNTER_PAPI
914       //p|numPapiEvents;
915       for (i=0; i<NUMPAPIEVENTS; i++) {
916         // not yet!!!
917         //      p|papiIDs[i];
918         p|papiValues[i];
919       }
920 #else
921       //p|numPapiEvents;  // non papi version has value 0
922 #endif
923       if (p.isUnpacking()) cputime = icputime/1.0e6;
924       break;
925     case USER_SUPPLIED:
926           p|userSuppliedData;
927           p|itime;
928         break;
929     case USER_SUPPLIED_NOTE:
930           p|itime;
931           int length;
932           length=0;
933           if (p.isPacking()) length = strlen(userSuppliedNote);
934           p | length;
935           char space;
936           space = ' ';
937           p | space;
938           if (p.isUnpacking()) {
939             userSuppliedNote = new char[length+1];
940             userSuppliedNote[length] = '\0';
941           }
942           PUParray(p,userSuppliedNote, length);
943           break;
944     case USER_SUPPLIED_BRACKETED_NOTE:
945       //CkPrintf("Writting out a USER_SUPPLIED_BRACKETED_NOTE\n");
946           p|itime;
947           p|iEndTime;
948           p|event;
949           int length2;
950           length2=0;
951           if (p.isPacking()) length2 = strlen(userSuppliedNote);
952           p | length2;
953           char space2;
954           space2 = ' ';
955           p | space2;
956           if (p.isUnpacking()) {
957             userSuppliedNote = new char[length2+1];
958             userSuppliedNote[length2] = '\0';
959           }
960           PUParray(p,userSuppliedNote, length2);
961           break;
962     case MEMORY_USAGE_CURRENT:
963       p | memUsage;
964       p | itime;
965         break;
966     case USER_STAT:
967       p | itime;
968       p | cputime;  //This is user specified time
969       p | stat;
970       p | pe;
971       p | mIdx;
972       break;
973     case CREATION:
974       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
975       p|mIdx; p|eIdx; p|itime;
976       p|event; p|pe; p|msglen; p|irecvtime;
977       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
978       break;
979     case CREATION_BCAST:
980       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
981       p|mIdx; p|eIdx; p|itime;
982       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
983       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
984       break;
985     case CREATION_MULTICAST:
986       if (p.isPacking()) irecvtime = (CMK_TYPEDEF_UINT8)(1.0e6*recvTime);
987       p|mIdx; p|eIdx; p|itime;
988       p|event; p|pe; p|msglen; p|irecvtime; p|numpes;
989       if (p.isUnpacking()) pes = numpes?new int[numpes]:NULL;
990       for (i=0; i<numpes; i++) p|pes[i];
991       if (p.isUnpacking()) recvTime = irecvtime/1.0e6;
992       break;
993     case MESSAGE_RECV:
994       p|mIdx; p|eIdx; p|itime; p|event; p|pe; p|msglen;
995       break;
997     case ENQUEUE:
998     case DEQUEUE:
999       p|mIdx; p|itime; p|event; p|pe;
1000       break;
1002     case BEGIN_INTERRUPT:
1003     case END_INTERRUPT:
1004       p|itime; p|event; p|pe;
1005       break;
1007       // **CW** absolute timestamps are used here to support a quick
1008       // way of determining the total time of a run in projections
1009       // visualization.
1010     case BEGIN_COMPUTATION:
1011     case END_COMPUTATION:
1012     case BEGIN_TRACE:
1013     case END_TRACE:
1014       p|itime;
1015       break;
1016     case END_PHASE:
1017       p|eIdx; // FIXME: actually the phase ID
1018       p|itime;
1019       break;
1020     default:
1021       CmiError("***Internal Error*** Wierd Event %d.\n", type);
1022       break;
1023   }
1024   if (p.isUnpacking()) time = itime/1.0e6;
1025   p|ret;
1028 TraceProjections::TraceProjections(char **argv): 
1029   _logPool(NULL), curevent(0), inEntry(false), computationStarted(false),
1030         traceNestedEvents(false), converseExit(false),
1031         currentPhaseID(0), lastPhaseEvent(NULL), endTime(0.0)
1033   //  CkPrintf("Trace projections dummy constructor called on %d\n",CkMyPe());
1034   if (CkpvAccess(traceOnPe) == 0) return;
1036   CkpvInitialize(CmiInt8, CtrLogBufSize);
1037   CkpvAccess(CtrLogBufSize) = DefaultLogBufSize;
1038   if (CmiGetArgLongDesc(argv,"+logsize",&CkpvAccess(CtrLogBufSize), 
1039                        "Log entries to buffer per I/O")) {
1040     if (CkMyPe() == 0) {
1041       CmiPrintf("Trace: logsize: %ld\n", CkpvAccess(CtrLogBufSize));
1042     }
1043   }
1044   checknested = 
1045     CmiGetArgFlagDesc(argv,"+checknested",
1046                       "check projections nest begin end execute events");
1047   traceNestedEvents = 
1048     CmiGetArgFlagDesc(argv,"+tracenested",
1049               "trace projections nest begin/end execute events");
1050   int binary = 
1051     CmiGetArgFlagDesc(argv,"+binary-trace",
1052                       "Write log files in binary format");
1054   int nSubdirs = 0;
1055   CmiGetArgIntDesc(argv,"+trace-subdirs", &nSubdirs, "Number of subdirectories into which traces will be written");
1058 #if CMK_USE_ZLIB
1059   int compressed = true;
1060   CmiGetArgFlagDesc(argv,"+gz-trace","Write log files pre-compressed with gzip");
1061   int disableCompressed = CmiGetArgFlagDesc(argv,"+no-gz-trace","Disable writing log files pre-compressed with gzip");
1062   compressed = compressed && !disableCompressed;
1063 #else
1064   // consume the flag so there's no confusing
1065   CmiGetArgFlagDesc(argv,"+gz-trace",
1066                     "Write log files pre-compressed with gzip");
1067   if(CkMyPe() == 0) CkPrintf("Warning> gz-trace is not supported on this machine!\n");
1068 #endif
1070   int writeSummaryFiles = CmiGetArgFlagDesc(argv,"+write-analysis-file","Enable writing summary files "); 
1071   
1072   // **CW** default to non delta log encoding. The user may choose to do
1073   // create both logs (for debugging) or just the old log timestamping
1074   // (for compatibility).
1075   // Generating just the non delta log takes precedence over generating
1076   // both logs (if both arguments appear on the command line).
1078   // switch to OLD log format until everything works // Gengbin
1080   _logPool = new LogPool(CkpvAccess(traceRoot));
1081   _logPool->setNumSubdirs(nSubdirs);
1082   _logPool->setBinary(binary);
1083   _logPool->setWriteSummaryFiles(writeSummaryFiles);
1084 #if CMK_USE_ZLIB
1085   _logPool->setCompressed(compressed);
1086 #endif
1087   if (CkMyPe() == 0) {
1088     _logPool->createSts();
1089     _logPool->createRC();
1090   }
1091   funcCount=1;
1093 #if CMK_HAS_COUNTER_PAPI
1094   initPAPI();
1095 #endif
1098 int TraceProjections::traceRegisterUserEvent(const char* evt, int e)
1100   OPTIMIZED_VERSION
1101   CkAssert(e==-1 || e>=0);
1102   CkAssert(evt != NULL);
1103   int event;
1104   int biggest = -1;
1105   for (int i=0; i<CkpvAccess(usrEvents)->length(); i++) {
1106     int cur = (*CkpvAccess(usrEvents))[i]->e;
1107     if (cur == e) {
1108       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1109       if (strcmp((*CkpvAccess(usrEvents))[i]->str, evt) == 0) 
1110         return e;
1111       else
1112         CmiAbort("UserEvent double registered!");
1113     }
1114     if (cur > biggest) biggest = cur;
1115   }
1116   // if no user events have so far been registered. biggest will be -1
1117   // and hence newly assigned event numbers will begin from 0.
1118   if (e==-1) event = biggest+1;  // automatically assign new event number
1119   else event = e;
1120   CkpvAccess(usrEvents)->push_back(new UsrEvent(event,(char *)evt));
1121   return event;
1123 // Registers User Stat by adding its name and number to the usrStat vector.
1125 int TraceProjections::traceRegisterUserStat(const char* evt, int e)
1127   OPTIMIZED_VERSION
1128   CkAssert(e==-1 || e>=0);
1129   CkAssert(evt != NULL);
1130   int event;
1131   int biggest = -1;
1132   for (int i=0; i<CkpvAccess(usrStats)->length(); i++) {
1133     int cur = (*CkpvAccess(usrStats))[i]->e;
1134     if (cur == e) {
1135       //CmiPrintf("%s %s\n", (*CkpvAccess(usrEvents))[i]->str, evt);
1136       if (strcmp((*CkpvAccess(usrStats))[i]->str, evt) == 0)
1137         return e;
1138       else
1139         CmiAbort("UserStat double registered!");
1140     }
1141     if (cur > biggest) biggest = cur;
1142   }
1143   // if no user events have so far been registered. biggest will be -1
1144   // and hence newly assigned event numbers will begin from 0.
1145   if (e==-1) event = biggest+1;  // automatically assign new event number
1146   else event = e;
1147   CkpvAccess(usrStats)->push_back(new UsrEvent(event,(char *)evt));
1148   return event;
1151 void TraceProjections::traceClearEps(void)
1153   // In trace-summary, this zeros out the EP bins, to eliminate noise
1154   // from startup.  Here, this isn't useful, since we can do that in
1155   // post-processing
1158 void TraceProjections::traceWriteSts(void)
1160   if(CkMyPe()==0)
1161     _logPool->writeSts(this);
1164 /** 
1165  * **IMPT NOTES**:
1167  * This is called when Converse closes during ConverseCommonExit().
1168  * **FIXME**(?) - is this also exposed as a tracing-framework API call?
1170  * Some programs bypass CkExit() (like NAMD, which eventually calls
1171  * ConverseExit()), modules like traces will have to pretend to shutdown
1172  * as if CkExit() was called but at the same time avoid making
1173  * subsequent CkExit() calls (which is usually required for allowing
1174  * other modules to shutdown).
1176  * Note that we can only get here if CkExit() was not called, since the
1177  * trace module will un-register itself from TraceArray if it did.
1179  */
1180 void TraceProjections::traceClose(void)
1182 #ifdef PROJ_ANALYSIS
1183   // CkPrintf("CkExit was not called on shutdown on [%d]\n", CkMyPe());
1185   if (_logPool == NULL) {
1186     return;
1187   }
1188   if(CkMyPe()==0){
1189     _logPool->writeSts(this);
1190     _logPool->writeRC();
1191   }
1192   CkpvAccess(_trace)->endComputation();
1193   delete _logPool;              // will write
1194   _logPool = NULL;
1195   // remove myself from traceArray so that no tracing will be called.
1196   CkpvAccess(_traces)->removeTrace(this);
1197 #else
1198   // we've already deleted the logpool, so multiple calls to traceClose
1199   // are tolerated.
1200   if (_logPool == NULL) {
1201     return;
1202   }
1203   if(CkMyPe()==0){
1204     _logPool->writeSts(this);
1205   }
1206   CkpvAccess(_trace)->endComputation();
1207   delete _logPool;              // will write
1208   _logPool = NULL;
1209   // remove myself from traceArray so that no tracing will be called.
1210   CkpvAccess(_traces)->removeTrace(this);
1212   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1213   bocProxy.ckLocalBranch()->print_warning();
1214 #endif
1218  *  **IMPT NOTES**:
1220  *  This is meant to be called internally by the tracing framework.
1222  */
1223 void TraceProjections::closeTrace() {
1224   //  CkPrintf("Close Trace called on [%d]\n", CkMyPe());
1225   if (CkMyPe() == 0 && _logPool!= NULL) {
1226     // CkPrintf("Pe 0 will now write sts and projrc files\n");
1227     _logPool->writeSts(this);
1228     _logPool->writeRC();
1229     // CkPrintf("Pe 0 has now written sts and projrc files\n");
1231     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1232     bocProxy.ckLocalBranch()->print_warning();
1233   }
1234   delete _logPool;       // will write logs to file
1238 #if CMK_SMP_TRACE_COMMTHREAD
1239 void TraceProjections::traceBeginOnCommThread()
1241   if (!computationStarted) return;
1242   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1245 void TraceProjections::traceEndOnCommThread()
1247   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CmiNumPes()+CmiMyNode());
1249 #endif
1251 void TraceProjections::traceBegin(void)
1253   if (!computationStarted) return;
1254   _logPool->add(BEGIN_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1257 void TraceProjections::traceEnd(void)
1259   _logPool->add(END_TRACE, 0, 0, TraceTimer(), curevent++, CkMyPe());
1262 void TraceProjections::userEvent(int e)
1264   if (!computationStarted) return;
1265   _logPool->add(USER_EVENT, e, 0, TraceTimer(),curevent++,CkMyPe());
1268 void TraceProjections::userBracketEvent(int e, double bt, double et, int nestedID=0)
1270   if (!computationStarted) return;
1271   // two events record Begin/End of event e.
1272   _logPool->addUserBracketEventNestedID(USER_EVENT_PAIR, TraceTimer(bt),
1273                                         e, curevent, nestedID);
1274   _logPool->addUserBracketEventNestedID(USER_EVENT_PAIR, TraceTimer(et),
1275                                         e, curevent++, nestedID);
1278 void TraceProjections::beginUserBracketEvent(int e, int nestedID=0)
1280   if (!computationStarted) return;
1281   _logPool->addUserBracketEventNestedID(BEGIN_USER_EVENT_PAIR, TraceTimer(),
1282                                         e, curevent++, nestedID);
1285 void TraceProjections::endUserBracketEvent(int e, int nestedID=0)
1287   if (!computationStarted) return;
1288   _logPool->addUserBracketEventNestedID(END_USER_EVENT_PAIR, TraceTimer(),
1289                                         e, curevent++, nestedID);
1292 void TraceProjections::userSuppliedData(int d)
1294   if (!computationStarted) return;
1295   _logPool->addUserSupplied(d);
1298 void TraceProjections::userSuppliedNote(char *note)
1300   if (!computationStarted) return;
1301   _logPool->addUserSuppliedNote(note);
1305 void TraceProjections::userSuppliedBracketedNote(char *note, int eventID, double bt, double et)
1307   if (!computationStarted) return;
1308   _logPool->addUserSuppliedBracketedNote(note,  eventID,  bt, et);
1311 void TraceProjections::memoryUsage(double m)
1313   if (!computationStarted) return;
1314   _logPool->addMemoryUsage(MEMORY_USAGE_CURRENT, TraceTimer(), m );
1315   
1317 //Updates User stat value. Makes appropriate Call to LogPool updateStat function
1318 void TraceProjections::updateStatPair(int e, double stat, double time)
1320   if (!computationStarted) return;
1321   _logPool->add(USER_STAT, e, 0, TraceTimer(), curevent, CkMyPe(), 0, 0, 0.0, time, 0, stat);
1324 //When user time is not given, -1 is stored instead.
1325 void TraceProjections::updateStat(int e, double stat)
1327   if (!computationStarted) return;
1328   _logPool->add(USER_STAT, e, 0, TraceTimer(), curevent, CkMyPe(), 0, 0, 0.0, -1, 0, stat);
1331 void TraceProjections::creation(envelope *e, int ep, int num)
1333 #if CMK_TRACE_ENABLED
1334   double curTime = TraceTimer();
1335   if (e == 0) {
1336     CtvAccess(curThreadEvent) = curevent;
1337     _logPool->add(CREATION, ForChareMsg, ep, curTime,
1338                   curevent++, CkMyPe(), 0, NULL, 0, 0.0);
1339   } else {
1340     int type=e->getMsgtype();
1341     e->setEvent(curevent);
1342     CpvAccess(curPeEvent) = curevent;
1343     if (num > 1) {
1344       _logPool->add(CREATION_BCAST, type, ep, curTime,
1345                     curevent++, CkMyPe(), e->getTotalsize(), 
1346                     NULL, 0, 0.0, num);
1347     } else {
1348       _logPool->add(CREATION, type, ep, curTime,
1349                     curevent++, CkMyPe(), e->getTotalsize(), 
1350                     NULL, 0, 0.0);
1351     }
1352   }
1353 #endif
1356 //This function is only called from a comm thread in SMP mode. 
1357 void TraceProjections::creation(char *msg)
1359 #if CMK_SMP_TRACE_COMMTHREAD
1360         // msg must be a charm message
1361     envelope *e = (envelope *)msg;
1362     int ep = e->getEpIdx();
1363     if(_entryTable[ep]->traceEnabled) {
1364         creation(e, ep, 1);
1365         e->setSrcPe(CkMyPe());              // pretend I am the sender
1366     }
1367 #endif
1370 void TraceProjections::traceCommSetMsgID(char *msg)
1372 #if CMK_SMP_TRACE_COMMTHREAD
1373     // msg must be a charm message
1374     envelope *e = (envelope *)msg;
1375     int ep = e->getEpIdx();
1376     if(_entryTable[ep]->traceEnabled) {
1377         e->setSrcPe(CkMyPe());              // pretend I am the sender
1378         e->setEvent(curevent);
1379     }
1380 #endif
1383 void TraceProjections::traceGetMsgID(char *msg, int *pe, int *event)
1385 #if CMK_TRACE_ENABLED
1386     // msg must be a charm message
1387     *pe = *event = -1;
1388     envelope *e = (envelope *)msg;
1389     int ep = e->getEpIdx();
1390     if(_entryTable[ep]->traceEnabled) {
1391         *pe = e->getSrcPe();
1392         *event = e->getEvent();
1393     }
1394 #endif
1397 void TraceProjections::traceSetMsgID(char *msg, int pe, int event)
1399 #if CMK_TRACE_ENABLED
1400        // msg must be a charm message
1401     envelope *e = (envelope *)msg;
1402     int ep = e->getEpIdx();
1403     if(ep<=0 || ep>=_entryTable.size()) return;
1404     if (e->getSrcPe()>=CkNumPes()+CkNumNodes()) return;
1405     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return;
1406     if(_entryTable[ep]->traceEnabled) {
1407         e->setSrcPe(pe);
1408         e->setEvent(event);
1409     }
1410 #endif
1413 /* **CW** Non-disruptive attempt to add destination PE knowledge to
1414    Communication Library-specific Multicasts via new event 
1415    CREATION_MULTICAST.
1418 void TraceProjections::creationMulticast(envelope *e, int ep, int num,
1419                                          int *pelist)
1421 #if CMK_TRACE_ENABLED
1422   double curTime = TraceTimer();
1423   if (e==0) {
1424     CtvAccess(curThreadEvent)=curevent;
1425     _logPool->addCreationMulticast(ForChareMsg, ep, curTime, curevent++,
1426                                    CkMyPe(), 0, 0, 0.0, num, pelist);
1427   } else {
1428     int type=e->getMsgtype();
1429     e->setEvent(curevent);
1430     _logPool->addCreationMulticast(type, ep, curTime, curevent++, CkMyPe(),
1431                                    e->getTotalsize(), 0, 0.0, num, pelist);
1432   }
1433 #endif
1436 void TraceProjections::creationDone(int num)
1438   // modified the creation done time of the last num log entries
1439   // FIXME: totally a hack
1440   double curTime = TraceTimer();
1441   int idx = _logPool->lastCreationEvent;
1442   while (idx >=0 && num >0 ) {
1443     LogEntry &log = _logPool->pool[idx];
1444     if ((log.type == CREATION) ||
1445         (log.type == CREATION_BCAST) ||
1446         (log.type == CREATION_MULTICAST)) {
1447       log.recvTime = curTime - log.time;
1448       num --;
1449     }
1450     idx--;
1451   }
1454 void TraceProjections::beginExecute(CmiObjId *tid)
1456 #if CMK_HAS_COUNTER_PAPI
1457   if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1458     CmiAbort("PAPI failed to read at begin execute!\n");
1459   }
1460 #endif
1461   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1462   execEvent = CtvAccess(curThreadEvent);
1463   execEp = (-1);
1464   _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP, TraceTimer(),
1465                 execEvent,CkMyPe(), 0, tid);
1466 #if CMK_HAS_COUNTER_PAPI
1467   _logPool->addPapi(CkpvAccess(papiValues));
1468 #endif
1469   inEntry = true;
1472 void TraceProjections::beginExecute(envelope *e, void *obj)
1474 #if CMK_TRACE_ENABLED
1475   if(e==0) {
1476 #if CMK_HAS_COUNTER_PAPI
1477     if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1478       CmiAbort("PAPI failed to read at begin execute!\n");
1479     }
1480 #endif
1481     if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1482     execEvent = CtvAccess(curThreadEvent);
1483     execEp = (-1);
1484     _logPool->add(BEGIN_PROCESSING,ForChareMsg,_threadEP, TraceTimer(),
1485                   execEvent,CkMyPe(), 0, NULL, 0.0, TraceCpuTimer());
1486 #if CMK_HAS_COUNTER_PAPI
1487     _logPool->addPapi(CkpvAccess(papiValues));
1488 #endif
1489     inEntry = true;
1490   } else {
1491     beginExecute(e->getEvent(),e->getMsgtype(),e->getEpIdx(),
1492                  e->getSrcPe(),e->getTotalsize());
1493   }
1494 #endif
1497 void TraceProjections::beginExecute(char *msg){
1498 #if CMK_SMP_TRACE_COMMTHREAD
1499         //This function is called from comm thread in SMP mode
1500     envelope *e = (envelope *)msg;
1501     int ep = e->getEpIdx();
1502     if(_entryTable[ep]->traceEnabled)
1503                 beginExecute(e);
1504 #endif
1507 void TraceProjections::beginExecute(int event, int msgType, int ep, int srcPe,
1508                                     int mlen, CmiObjId *idx, void *obj )
1510   if (traceNestedEvents) {
1511     if (!nestedEvents.empty()) {
1512       endExecuteLocal();
1513     }
1514     nestedEvents.emplace(event, msgType, ep, srcPe, mlen, idx);
1515   }
1516   beginExecuteLocal(event, msgType, ep, srcPe, mlen, idx);
1519 void TraceProjections::changeLastEntryTimestamp(double ts)
1521   _logPool->modLastEntryTimestamp(ts);
1524 void TraceProjections::beginExecuteLocal(int event, int msgType, int ep, int srcPe,
1525                                     int mlen, CmiObjId *idx)
1527 #if CMK_HAS_COUNTER_PAPI
1528   if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1529     CmiAbort("PAPI failed to read at begin execute!\n");
1530   }
1531 #endif
1532   if (checknested && inEntry) CmiAbort("Nested Begin Execute!\n");
1533   execEvent=event;
1534   execEp=ep;
1535   execPe=srcPe;
1536   _logPool->add(BEGIN_PROCESSING,msgType,ep, TraceTimer(),event,
1537                 srcPe, mlen, idx, 0.0, TraceCpuTimer());
1538 #if CMK_HAS_COUNTER_PAPI
1539   _logPool->addPapi(CkpvAccess(papiValues));
1540 #endif
1541   inEntry = true;
1544 void TraceProjections::endExecute(void)
1546   if (traceNestedEvents && !nestedEvents.empty()) nestedEvents.pop();
1547   endExecuteLocal();
1548   if (traceNestedEvents) {
1549     if (!nestedEvents.empty()) {
1550       NestedEvent &ne = nestedEvents.top();
1551       beginExecuteLocal(ne.event, ne.msgType, ne.ep, ne.srcPe, ne.ml, ne.idx);
1552     }
1553   }
1556 void TraceProjections::endExecute(char *msg)
1558 #if CMK_SMP_TRACE_COMMTHREAD
1559         //This function is called from comm thread in SMP mode
1560     envelope *e = (envelope *)msg;
1561     int ep = e->getEpIdx();
1562     if(_entryTable[ep]->traceEnabled)
1563                 endExecute();
1564 #endif  
1567 void TraceProjections::endExecuteLocal(void)
1569 #if CMK_HAS_COUNTER_PAPI
1570   if (PAPI_read(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1571     CmiAbort("PAPI failed to read at end execute!\n");
1572   }
1573 #endif
1574   if (checknested && !inEntry) CmiAbort("Nested EndExecute!\n");
1575   double cputime = TraceCpuTimer();
1576   double now = TraceTimer();
1577   if(execEp == (-1)) {
1578     _logPool->add(END_PROCESSING, 0, _threadEP, now,
1579                   execEvent, CkMyPe(), 0, NULL, 0.0, cputime);
1580   } else {
1581     _logPool->add(END_PROCESSING, 0, execEp, now,
1582                   execEvent, execPe, 0, NULL, 0.0, cputime);
1583   }
1584 #if CMK_HAS_COUNTER_PAPI
1585   _logPool->addPapi(CkpvAccess(papiValues));
1586 #endif
1587   inEntry = false;
1590 void TraceProjections::messageRecv(char *env, int pe)
1592 #if 0
1593   envelope *e = (envelope *)env;
1594   int msgType = e->getMsgtype();
1595   int ep = e->getEpIdx();
1596 #if 0
1597   if (msgType==NewChareMsg || msgType==NewVChareMsg
1598           || msgType==ForChareMsg || msgType==ForVidMsg
1599           || msgType==BocInitMsg || msgType==NodeBocInitMsg
1600           || msgType==ForBocMsg || msgType==ForNodeBocMsg)
1601     ep = e->getEpIdx();
1602   else
1603     ep = _threadEP;
1604 #endif
1605   _logPool->add(MESSAGE_RECV, msgType, ep, TraceTimer(),
1606                 curevent++, e->getSrcPe(), e->getTotalsize());
1607 #endif
1610 void TraceProjections::beginIdle(double curWallTime)
1612     _logPool->add(BEGIN_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1615 void TraceProjections::endIdle(double curWallTime)
1617     _logPool->add(END_IDLE, 0, 0, TraceTimer(curWallTime), 0, CkMyPe());
1620 void TraceProjections::beginPack(void)
1622     _logPool->add(BEGIN_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1625 void TraceProjections::endPack(void)
1627     _logPool->add(END_PACK, 0, 0, TraceTimer(), 0, CkMyPe());
1630 void TraceProjections::beginUnpack(void)
1632     _logPool->add(BEGIN_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1635 void TraceProjections::endUnpack(void)
1637     _logPool->add(END_UNPACK, 0, 0, TraceTimer(), 0, CkMyPe());
1640 void TraceProjections::enqueue(envelope *) {}
1642 void TraceProjections::dequeue(envelope *) {}
1644 void TraceProjections::beginComputation(void)
1646   computationStarted = true;
1647   // Executes the callback function provided by the machine
1648   // layer. This is the proper method to register user events in a
1649   // machine layer because projections is a charm++ module.
1650   if (CkpvAccess(traceOnPe) != 0) {
1651     void (*ptr)() = registerMachineUserEvents();
1652     if (ptr != NULL) {
1653       ptr();
1654     }
1655   }
1656 //  CkpvAccess(traceInitTime) = TRACE_TIMER();
1657 //  CkpvAccess(traceInitCpuTime) = TRACE_CPUTIMER();
1658   _logPool->add(BEGIN_COMPUTATION, 0, 0, TraceTimer(), -1, -1);
1659 #if CMK_HAS_COUNTER_PAPI
1660   // we start the counters here
1661   if(CkpvAccess(papiStarted) == 0)
1662   {
1663       if (PAPI_start(CkpvAccess(papiEventSet)) != PAPI_OK) {
1664           CmiAbort("PAPI failed to start designated counters!\n");
1665       }
1666       CkpvAccess(papiStarted) = 1;
1667   }
1668 #endif
1671 void TraceProjections::endComputation(void)
1673 #if CMK_HAS_COUNTER_PAPI
1674   // we stop the counters here. A silent failure is alright since we
1675   // are already at the end of the program.
1676   if(CkpvAccess(papiStopped) == 0) {
1677       if (PAPI_stop(CkpvAccess(papiEventSet), CkpvAccess(papiValues)) != PAPI_OK) {
1678           CkPrintf("Warning: PAPI failed to stop correctly!\n");
1679       }
1680       CkpvAccess(papiStopped) = 1;
1681   }
1682   // NOTE: We should not do a complete close of PAPI until after the
1683   // sts writer is done.
1684 #endif
1685   endTime = TraceTimer();
1686   _logPool->add(END_COMPUTATION, 0, 0, endTime, -1, -1);
1687   /*
1688   CkPrintf("End Computation [%d] records time as %lf\n", CkMyPe(), 
1689            endTime*1e06);
1690   */
1693 int TraceProjections::idxRegistered(int idx)
1695     int idxVecLen = idxVec.size();
1696     for(int i=0; i<idxVecLen; i++)
1697     {
1698         if(idx == idxVec[i])
1699             return 1;
1700     }
1701     return 0;
1704 // specialized PUP:ers for handling trace projections logs
1705 void toProjectionsFile::bytes(void *p,size_t n,size_t itemSize,dataType t)
1707   for (int i=0;i<n;i++) 
1708     switch(t) {
1709     case Tchar: CheckAndFPrintF(f,"%c",((char *)p)[i]); break;
1710     case Tuchar:
1711     case Tbyte: CheckAndFPrintF(f,"%d",((unsigned char *)p)[i]); break;
1712     case Tshort: CheckAndFPrintF(f," %d",((short *)p)[i]); break;
1713     case Tushort: CheckAndFPrintF(f," %u",((unsigned short *)p)[i]); break;
1714     case Tint: CheckAndFPrintF(f," %d",((int *)p)[i]); break;
1715     case Tuint: CheckAndFPrintF(f," %u",((unsigned int *)p)[i]); break;
1716     case Tlong: CheckAndFPrintF(f," %ld",((long *)p)[i]); break;
1717     case Tulong: CheckAndFPrintF(f," %lu",((unsigned long *)p)[i]); break;
1718     case Tfloat: CheckAndFPrintF(f," %.7g",((float *)p)[i]); break;
1719     case Tdouble: CheckAndFPrintF(f," %.15g",((double *)p)[i]); break;
1720 #ifdef CMK_PUP_LONG_LONG
1721     case Tlonglong: CheckAndFPrintF(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1722     case Tulonglong: CheckAndFPrintF(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1723 #endif
1724     default: CmiAbort("Unrecognized pup type code!");
1725     };
1728 void fromProjectionsFile::bytes(void *p,size_t n,size_t itemSize,dataType t)
1730   for (int i=0;i<n;i++) 
1731     switch(t) {
1732     case Tchar: { 
1733       char c = fgetc(f);
1734       if (c==EOF)
1735         parseError("Could not match character");
1736       else
1737         ((char *)p)[i] = c;
1738       break;
1739     }
1740     case Tuchar:
1741     case Tbyte: ((unsigned char *)p)[i]=(unsigned char)readInt("%d"); break;
1742     case Tshort:((short *)p)[i]=(short)readInt(); break;
1743     case Tushort: ((unsigned short *)p)[i]=(unsigned short)readUint(); break;
1744     case Tint:  ((int *)p)[i]=readInt(); break;
1745     case Tuint: ((unsigned int *)p)[i]=readUint(); break;
1746     case Tlong: ((long *)p)[i]=readInt(); break;
1747     case Tulong:((unsigned long *)p)[i]=readUint(); break;
1748     case Tfloat: ((float *)p)[i]=(float)readDouble(); break;
1749     case Tdouble:((double *)p)[i]=readDouble(); break;
1750 #ifdef CMK_PUP_LONG_LONG
1751     case Tlonglong: ((CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1752     case Tulonglong: ((unsigned CMK_PUP_LONG_LONG *)p)[i]=readLongInt(); break;
1753 #endif
1754     default: CmiAbort("Unrecognized pup type code!");
1755     };
1758 #if CMK_USE_ZLIB
1759 void toProjectionsGZFile::bytes(void *p,size_t n,size_t itemSize,dataType t)
1761   for (int i=0;i<n;i++) 
1762     switch(t) {
1763     case Tchar: gzprintf(f,"%c",((char *)p)[i]); break;
1764     case Tuchar:
1765     case Tbyte: gzprintf(f,"%d",((unsigned char *)p)[i]); break;
1766     case Tshort: gzprintf(f," %d",((short *)p)[i]); break;
1767     case Tushort: gzprintf(f," %u",((unsigned short *)p)[i]); break;
1768     case Tint: gzprintf(f," %d",((int *)p)[i]); break;
1769     case Tuint: gzprintf(f," %u",((unsigned int *)p)[i]); break;
1770     case Tlong: gzprintf(f," %ld",((long *)p)[i]); break;
1771     case Tulong: gzprintf(f," %lu",((unsigned long *)p)[i]); break;
1772     case Tfloat: gzprintf(f," %.7g",((float *)p)[i]); break;
1773     case Tdouble: gzprintf(f," %.15g",((double *)p)[i]); break;
1774 #ifdef CMK_PUP_LONG_LONG
1775     case Tlonglong: gzprintf(f," %lld",((CMK_PUP_LONG_LONG *)p)[i]); break;
1776     case Tulonglong: gzprintf(f," %llu",((unsigned CMK_PUP_LONG_LONG *)p)[i]); break;
1777 #endif
1778     default: CmiAbort("Unrecognized pup type code!");
1779     };
1781 #endif
1783 void TraceProjections::endPhase() {
1784   double currentPhaseTime = TraceTimer();
1785   if (lastPhaseEvent != NULL) {
1786   } else {
1787     if (_logPool->pool != NULL) {
1788       // assumed to be BEGIN_COMPUTATION
1789     } else {
1790       CkPrintf("[%d] Warning: End Phase encountered in an empty log. Inserting BEGIN_COMPUTATION event\n", CkMyPe());
1791       _logPool->add(BEGIN_COMPUTATION, 0, 0, currentPhaseTime, -1, -1);
1792     }
1793   }
1795   /* Insert endPhase event here. */
1796   /* FIXME: Format should be TYPE, PHASE#, TimeStamp, [StartTime] */
1797   /*        We currently "borrow" from the standard add() method. */
1798   /*        It should really be its own add() method.             */
1799   /* NOTE: assignment to lastPhaseEvent is "pre-emptive".         */
1800   lastPhaseEvent = &(_logPool->pool[_logPool->numEntries]);
1801   _logPool->add(END_PHASE, 0, currentPhaseID, currentPhaseTime, -1, CkMyPe());
1802   currentPhaseID++;
1805 #ifdef PROJ_ANALYSIS
1806 // ***** FROM HERE, ALL BOC-BASED FUNCTIONALITY IS DEFINED *******
1809 // ***@@@@ REGISTRATION FUNCTIONS/METHODS @@@@***
1811 void registerOutlierReduction() {
1812   outlierReductionType =
1813     CkReduction::addReducer(outlierReduction, false, "outlierReduction");
1814   minMaxReductionType =
1815     CkReduction::addReducer(minMaxReduction, false, "minMaxReduction");
1819  * **IMPT NOTES**:
1821  * This is the C++ code that is registered to be activated at module
1822  * shutdown. This is called exactly once on processor 0. Module shutdown
1823  * is initiated as a result of a CkExit() call by the application code
1824  * 
1825  * The exit function must ultimately call CkContinueExit() again to
1826  * so that other module exit functions may proceed after this module is
1827  * done.
1829  */
1830 static void TraceProjectionsExitHandler()
1832 #if CMK_TRACE_ENABLED
1833 #if DEBUG_KMEANS
1834   CkPrintf("[%d] TraceProjectionsExitHandler called!\n", CkMyPe());
1835 #endif
1836   if (!traceProjectionsGID.isZero()) {
1837     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1838     bocProxy.traceProjectionsParallelShutdown(CkMyPe());
1839   } else {
1840     CkContinueExit();
1841   }
1842 #else
1843   CkContinueExit();
1844 #endif
1847 // This is called once on each processor but the idiom of use appears
1848 // to be to only have processor 0 register the function.
1850 // See initnode in trace-projections.ci
1851 void initTraceProjectionsBOC()
1853   // CkPrintf("[%d] Trace Projections initialization called!\n", CkMyPe());
1854 #ifdef __BIGSIM__
1855   if (BgNodeRank() == 0) {
1856 #else
1857     if (CkMyRank() == 0) {
1858 #endif
1859       registerExitFn(TraceProjectionsExitHandler);
1860     }
1861 #if 0
1862   } // this is so indentation does not get messed up
1863 #endif
1866 // mainchare for trace-projections BOC-operations. 
1867 // Instantiated at processor 0 and ONLY resides on processor 0 for the 
1868 // rest of its life.
1870 // Responsible for:
1871 //   1. Handling commandline arguments
1872 //   2. Creating any objects required for proper BOC operations.
1874 TraceProjectionsInit::TraceProjectionsInit(CkArgMsg *msg) {
1875   /** Options for Outlier Analysis */
1876   // defaults. Things will change with support for interactive analysis.
1877   bool findStartTime = (CmiTimerAbsolute()==1);
1878   traceProjectionsGID = CProxy_TraceProjectionsBOC::ckNew(findOutliers, findStartTime);
1879   if (findOutliers) {
1880     kMeansGID = CProxy_KMeansBOC::ckNew(outlierAutomatic,
1881                                         numKSeeds,
1882                                         peNumKeep,
1883                                         entryThreshold,
1884                                         outlierUsePhases);
1885   }
1887   delete msg;
1890 // Called on every processor.
1891 void TraceProjectionsBOC::traceProjectionsParallelShutdown(int pe) {
1892 #if DEBUG_KMEANS
1893   CmiPrintf("[%d] traceProjectionsParallelShutdown called from . \n", CkMyPe(), pe);
1894 #endif
1895   endPe = pe;                // the pe that starts CkExit()
1896   if (CkMyPe() == 0) {
1897     analysisStartTime = CmiWallTimer();
1898   }
1899   if (CkpvAccess(_trace)->_logPool != NULL ){
1900       CkpvAccess(_trace)->endComputation();
1901       // no more tracing for projections on this processor after this point. 
1902       // Note that clear must be called after remove, or bad things will happen.
1903       CkpvAccess(_traces)->removeTrace(CkpvAccess(_trace));
1904       CkpvAccess(_traces)->clearTrace();
1906       // From this point, we start multiple chains of reductions and broadcasts to
1907       // perform final online analysis activities.
1908       //
1909       // Start all parallel operations at once. 
1910       //   These MUST NOT modify base performance data in LogPool. If they must,
1911       //   then the parallel operations must be phased (and this code to be
1912       //   restructured as necessary)
1913       //
1914   }
1915   CProxy_KMeansBOC kMeansProxy(kMeansGID);
1916   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1917   if (findOutliers) {
1918     parModulesRemaining++;
1919     kMeansProxy[CkMyPe()].startKMeansAnalysis();
1920   }
1921   parModulesRemaining++;
1922   if (findStartTime) 
1923   bocProxy[CkMyPe()].startTimeAnalysis();
1924   else
1925   bocProxy[CkMyPe()].startEndTimeAnalysis();
1928 // handle flush log warnings
1929 void TraceProjectionsBOC::flush_warning(int pe) 
1931     CmiAssert(CkMyPe() == 0);
1932     std::set<int>::iterator it;
1933     it = list.find(pe);
1934     if (it == list.end())    list.insert(pe);
1935     flush_count++;
1938 void TraceProjectionsBOC::print_warning() 
1940     CmiAssert(CkMyPe() == 0);
1941     if (flush_count == 0) return;
1942     std::set<int>::iterator it;
1943     CkPrintf("*************************************************************\n");
1944     CkPrintf("Warning: Projections log flushed to disk %d times on %d cores:", flush_count, list.size());
1945     for (it=list.begin(); it!=list.end(); it++)
1946       CkPrintf(" %d", *it);
1947     CkPrintf(".\n");
1948     CkPrintf("Warning: The performance data is likely invalid, unless the flushes have been explicitly synchronized by your program.\n");
1949     CkPrintf("Warning: This may be fixed by specifying a larger +logsize (current value %d).\n", CkpvAccess(CtrLogBufSize));
1950     CkPrintf("*************************************************************\n");
1953 // Called on each processor
1954 void KMeansBOC::startKMeansAnalysis() {
1955   // Initialize all necessary structures
1956   LogPool *pool = CkpvAccess(_trace)->_logPool;
1958  if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::startKMeansAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
1960   CkCallback cb(CkReductionTarget(KMeansBOC, flushCheck), 0, thisProxy);
1961   contribute(sizeof(bool), &(pool->hasFlushed), CkReduction::logical_or_bool, cb);
1964 // Called on processor 0
1965 void KMeansBOC::flushCheck(bool someFlush) {
1967   // if(CkMyPe()==0) CkPrintf("[%d] KMeansBOC::flushCheck time=\t%g\n", CkMyPe(), CkWallTimer() );
1968   
1969   if (!someFlush) {
1970     // Data intact proceed with KMeans analysis
1971     CProxy_KMeansBOC kMeansProxy(kMeansGID);
1972     kMeansProxy.flushCheckDone();
1973   } else {
1974     // Some processor had flushed it data at some point, abandon KMeans
1975     CkPrintf("Warning: Some processor has flushed its data. No KMeans will be conducted\n");
1976     // terminate KMeans
1977     CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
1978     bocProxy[0].kMeansDoneFlushed();
1979   }
1982 // Called on each processor
1983 void KMeansBOC::flushCheckDone() {
1984   // **FIXME** - more flexible metric collection scheme may be necessary
1985   //   in the future for production use.
1986   LogPool *pool = CkpvAccess(_trace)->_logPool;
1988   // if(CkMyPe()==0)     CkPrintf("[%d] KMeansBOC::flushCheckDone time=\t%g\n", CkMyPe(), CkWallTimer() );
1990   numEntryMethods = _entryTable.size();
1991   numMetrics = numEntryMethods + 2; // EPtime + idle and overhead
1993   // maintained across phases
1994   markedBegin = false;
1995   markedIdle = false;
1996   beginBlockTime = 0.0;
1997   beginIdleBlockTime = 0.0;
1998   lastBeginEPIdx = -1; // none
2000   lastPhaseIdx = 0;
2001   currentExecTimes = NULL;
2002   currentPhase = 0;
2003   selected = false;
2005   pool->initializePhases();
2007   // incoming K Seeds and the per-phase filter
2008   incKSeeds = new double[numK*numMetrics];
2009   keepMetric = new bool[numMetrics];
2011   //  Something wrong when call thisProxy[CkMyPe()].getNextPhaseMetrics() !??!
2012   //  CProxy_KMeansBOC kMeansProxy(kMeansGID);
2013   //  kMeansProxy[CkMyPe()].getNextPhaseMetrics();
2014   thisProxy[CkMyPe()].getNextPhaseMetrics();
2017 // Called on each processor.
2018 void KMeansBOC::getNextPhaseMetrics() {
2019   // Assumes the presence of the complete logs on this processor.
2020   // Assumes first event is always BEGIN_COMPUTATION
2021   // Assumes each processor sees the same number of phases.
2022   //
2023   // In this code, we collect performance data for this processor.
2024   // All times are in seconds.
2026   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::getNextPhaseMetrics time=\t%g\n", CkMyPe(), CkWallTimer() );  
2028   if (usePhases) {
2029     DEBUGF("[%d] Using Phases\n", CkMyPe());
2030   } else {
2031     DEBUGF("[%d] NOT using Phases\n", CkMyPe());
2032   }
2033   
2034   if (currentExecTimes != NULL) {
2035     delete [] currentExecTimes;
2036   }
2037   currentExecTimes = new double[numMetrics];
2038   for (int i=0; i<numMetrics; i++) {
2039     currentExecTimes[i] = 0.0;
2040   }
2042   int numEventMethods = _entryTable.size();
2043   LogPool *pool = CkpvAccess(_trace)->_logPool;
2044   
2045   CkAssert(pool->numEntries > lastPhaseIdx);
2046   double totalPhaseTime = 0.0;
2047   double totalActiveTime = 0.0; // entry method + idle
2049   for (int i=lastPhaseIdx; i<pool->numEntries; i++) {
2050     if (pool->pool[i].type == BEGIN_PROCESSING) {
2051       // check pairing
2052       if (!markedBegin) {
2053         markedBegin = true;
2054       }
2055       beginBlockTime = pool->pool[i].time;
2056       lastBeginEPIdx = pool->pool[i].eIdx;
2057     } else if (pool->pool[i].type == END_PROCESSING) {
2058       // check pairing
2059       // if End without a begin, just ignore
2060       //   this event. If a phase-boundary is crossed, the Begin
2061       //   event would be maintained in beginBlockTime, so it is 
2062       //   not a problem.
2063       if (markedBegin) {
2064         markedBegin = false;
2065         if (pool->pool[i].event < 0)
2066         {
2067           // ignore dummy events. **FIXME** as they have no eIdx?
2068           continue;
2069         }
2070         currentExecTimes[pool->pool[i].eIdx] += 
2071           pool->pool[i].time - beginBlockTime;
2072         totalActiveTime += pool->pool[i].time - beginBlockTime;
2073         lastBeginEPIdx = -1;
2074       }
2075     } else if (pool->pool[i].type == BEGIN_IDLE) {
2076       // check pairing
2077       if (!markedIdle) {
2078         markedIdle = true;
2079       }
2080       beginIdleBlockTime = pool->pool[i].time;
2081     } else if (pool->pool[i].type == END_IDLE) {
2082       // check pairing
2083       if (markedIdle) {
2084         markedIdle = false;
2085         currentExecTimes[numEventMethods] += 
2086           pool->pool[i].time - beginIdleBlockTime;
2087         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2088       }
2089     } else if (pool->pool[i].type == END_PHASE) {
2090       // ignored when not using phases
2091       if (usePhases) {
2092         // when we've not visited this node before
2093         if (i != lastPhaseIdx) { 
2094           totalPhaseTime = 
2095             pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2096           // it is important that proper accounting of time take place here.
2097           // Note that END_PHASE events inevitably occur in the context of
2098           //   some entry method by the way the tracing API is designed.
2099           if (markedBegin) {
2100             CkAssert(lastBeginEPIdx >= 0);
2101             currentExecTimes[lastBeginEPIdx] += 
2102               pool->pool[i].time - beginBlockTime;
2103             totalActiveTime += pool->pool[i].time - beginBlockTime;
2104             // this is so the remainder contributes to the next phase
2105             beginBlockTime = pool->pool[i].time;
2106           }
2107           // The following is unlikely, but stranger things have happened.
2108           if (markedIdle) {
2109             currentExecTimes[numEventMethods] +=
2110               pool->pool[i].time - beginIdleBlockTime;
2111             totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2112             // this is so the remainder contributes to the next phase
2113             beginIdleBlockTime = pool->pool[i].time;
2114           }
2115           if (totalActiveTime <= totalPhaseTime) {
2116             currentExecTimes[numEventMethods+1] = 
2117               totalPhaseTime - totalActiveTime;
2118           } else {
2119             currentExecTimes[numEventMethods+1] = 0.0;
2120             CkPrintf("[%d] Warning: Overhead found to be negative for Phase %d!\n",
2121                      CkMyPe(), currentPhase);
2122           }
2123           collectKMeansData();
2124           // end the loop (and method) and defer the work till the next call
2125           lastPhaseIdx = i;
2126           break; 
2127         }
2128       }
2129     } else if (pool->pool[i].type == END_COMPUTATION) {
2130       if (markedBegin) {
2131         CkAssert(lastBeginEPIdx >= 0);
2132         currentExecTimes[lastBeginEPIdx] += 
2133           pool->pool[i].time - beginBlockTime;
2134         totalActiveTime += pool->pool[i].time - beginBlockTime;
2135       }
2136       if (markedIdle) {
2137         currentExecTimes[numEventMethods] +=
2138           pool->pool[i].time - beginIdleBlockTime;
2139         totalActiveTime += pool->pool[i].time - beginIdleBlockTime;
2140       }
2141       totalPhaseTime = 
2142         pool->pool[i].time - pool->pool[lastPhaseIdx].time;
2143       if (totalActiveTime <= totalPhaseTime) {
2144         currentExecTimes[numEventMethods+1] = totalPhaseTime - totalActiveTime;
2145       } else {
2146         currentExecTimes[numEventMethods+1] = 0.0;
2147         CkPrintf("[%d] Warning: Overhead found to be negative!\n",
2148                  CkMyPe());
2149       }
2150       collectKMeansData();
2151     }
2152   }
2156  *  Through a reduction, collectKMeansData aggregates each processors' data
2157  *  in order for global properties to be determined:
2158  *  
2159  *  1. min & max to determine normalization factors.
2160  *  2. sum to determine global EP averages for possible metric reduction
2161  *       through thresholding.
2162  *  3. sum of squares to compute stddev which may be useful in the future.
2164  *  collectKMeansData will also keep the processor's data for the current
2165  *    phase so that it may be normalized and worked on subsequently.
2167  **/
2168 void KMeansBOC::collectKMeansData() {
2169   int minOffset = numMetrics;
2170   int maxOffset = 2*numMetrics;
2171   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2173   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectKMeansData time=\tg\n", CkMyPe(), CkWallTimer() );
2175   double *reductionMsg = new double[numMetrics*4];
2177   for (int i=0; i<numMetrics; i++) {
2178     reductionMsg[i] = currentExecTimes[i];
2179     // duplicate the event times for max and min sections of the reduction
2180     reductionMsg[minOffset + i] = currentExecTimes[i];
2181     reductionMsg[maxOffset + i] = currentExecTimes[i];
2182     // compute squares
2183     reductionMsg[sosOffset + i] = currentExecTimes[i]*currentExecTimes[i];
2184   }
2186   CkCallback cb(CkReductionTarget(KMeansBOC, globalMetricRefinement),
2187                 0, thisProxy);
2188   contribute((numMetrics*4)*sizeof(double), reductionMsg, 
2189              outlierReductionType, cb);  
2190   delete [] reductionMsg;
2193 // The purpose is mainly to initialize the k seeds and generate
2194 //   normalization parameters for each of the metrics. The k seeds
2195 //   and normalization parameters are broadcast to all processors.
2197 // Called on processor 0
2198 void KMeansBOC::globalMetricRefinement(CkReductionMsg *msg) {
2199   CkAssert(CkMyPe() == 0);
2200   
2201   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::globalMetricRefinement time=\t%g\n", CkMyPe(), CkWallTimer() );
2203   int sumOffset = 0;
2204   int minOffset = numMetrics;
2205   int maxOffset = 2*numMetrics;
2206   int sosOffset = 3*numMetrics; // sos = Sum Of Squares
2208   // calculate statistics & boundaries for the k seeds for clustering
2209   KMeansStatsMessage *outmsg = 
2210     new (numMetrics, numK*numMetrics, numMetrics*4) KMeansStatsMessage;
2211   outmsg->numMetrics = numMetrics;
2212   outmsg->numKPos = numK*numMetrics;
2213   outmsg->numStats = numMetrics*4;
2215   // Sum | Min | Max | Sum of Squares
2216   double *totalExecTimes = (double*)msg->getData();
2217   double totalTime = 0.0;
2219   for (int i=0; i<numMetrics; i++) {
2220     DEBUGN("%lf\n", totalExecTimes[i]);
2221     totalTime += totalExecTimes[i];
2223     // calculate event mean over all processors
2224     outmsg->stats[sumOffset + i] = totalExecTimes[sumOffset + i]/CkNumPes();
2226     // get the ranges and offsets of each metric. With this, we get
2227     //   normalization factors that can be sent back to each processor to
2228     //   be used as necessary. We reuse max for range. Min remains the offset.
2229     outmsg->stats[minOffset + i] = totalExecTimes[minOffset + i];
2230     outmsg->stats[maxOffset + i] = totalExecTimes[maxOffset + i] -
2231       totalExecTimes[minOffset + i];
2232     
2233     // calculate stddev (using biased variance)
2234     outmsg->stats[sosOffset + i] = 
2235       sqrt((totalExecTimes[sosOffset + i] - 
2236             2*(outmsg->stats[i])*totalExecTimes[i] +
2237             (outmsg->stats[i])*(outmsg->stats[i])*CkNumPes())/
2238            CkNumPes());
2239   }
2241   for (int i=0; i<numMetrics; i++) {
2242     // 1) if the proportion of the max value of the entry method relative to
2243     //   the average time taken over all entry methods across all processors
2244     //   is greater than the stipulated percentage threshold ...; AND
2245     // 2) if the range of values are non-zero.
2246     //
2247     // The current assumption is totalTime > 0 (what program has zero total
2248     //   time from all work?)
2249     keepMetric[i] = ((totalExecTimes[maxOffset + i]/(totalTime/CkNumPes()) >=
2250                      entryThreshold) &&
2251       (totalExecTimes[maxOffset + i] > totalExecTimes[minOffset + i]));
2252     if (keepMetric[i]) {
2253       DEBUGF("[%d] Keep EP %d | Max = %lf | Avg Tot = %lf\n", CkMyPe(), i,
2254              totalExecTimes[maxOffset + i], totalTime/CkNumPes());
2255     } else {
2256       DEBUGN("[%d] DO NOT Keep EP %d\n", CkMyPe(), i);
2257     }
2258     outmsg->filter[i] = keepMetric[i];
2259   }
2261   delete msg;
2263   // initialize k seeds for this phase
2264   kSeeds = new double[numK*numMetrics];
2266   numKReported = 0;
2267   kNumMembers = new int[numK];
2269   // Randomly select k processors' metric vectors for the k seeds
2270   //  srand((unsigned)(CmiWallTimer()*1.0e06));
2271   srand(11337); // for debugging purposes
2272   for (int k=0; k<numK; k++) {
2273     DEBUGF("Seed %d | ", k);
2274     for (int m=0; m<numMetrics; m++) {
2275       double factor = totalExecTimes[maxOffset + m] - 
2276         totalExecTimes[minOffset + m];
2277       // "uniform" distribution, scaled according to the normalization
2278       //   factors
2279       //      kSeeds[numMetrics*k + m] = ((1.0*(k+1))/numK)*factor;
2280       // Random distribution.
2281       kSeeds[numMetrics*k + m] =
2282         ((rand()*1.0)/RAND_MAX)*factor;
2283       if (keepMetric[m]) {
2284         DEBUGF("[%d|%lf] ", m, kSeeds[numMetrics*k + m]);
2285       }
2286       outmsg->kSeedsPos[numMetrics*k + m] = kSeeds[numMetrics*k + m];
2287     }
2288     DEBUGF("\n");
2289     kNumMembers[k] = 0;
2290   }
2292   // broadcast statistical values to all processors for cluster discovery
2293   thisProxy.findInitialClusters(outmsg);
2298 // Called on each processor.
2299 void KMeansBOC::findInitialClusters(KMeansStatsMessage *msg) {
2301  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findInitialClusters time=\t%g\n", CkMyPe(), CkWallTimer() );
2303   phaseIter = 0;
2305   // Get info from stats message
2306   CkAssert(numMetrics == msg->numMetrics);
2307   for (int i=0; i<numMetrics; i++) {
2308     keepMetric[i] = msg->filter[i];
2309   }
2311   // Normalize data on local processor.
2312   // **CWL** See my thesis for detailed discussion of normalization of
2313   //    performance data.
2314   // **NOTE** This might change if we want to send data based on the filter
2315   //   instead of all the data.
2316   CkAssert(numMetrics*4 == msg->numStats);
2317   for (int i=0; i<numMetrics; i++) {
2318     currentExecTimes[i] -= msg->stats[numMetrics + i];  // take offset
2319     // **CWL** We do not normalize the range. Entry methods that exhibit
2320     //   large absolute timing variations should be allowed to contribute
2321     //   more to the Euclidean distance measure!
2322     // currentExecTimes[i] /= msg->stats[2*numMetrics + i];
2323   }
2325   // **NOTE** This might change if we want to send data based on the filter
2326   //   instead of all the data.
2327   CkAssert(numK*numMetrics == msg->numKPos);
2328   for (int i=0; i<msg->numKPos; i++) {
2329     incKSeeds[i] = msg->kSeedsPos[i];
2330   }
2332   // Decide which KSeed this processor belongs to.
2333   minDistance = calculateDistance(0);
2334   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2335            currentPhase, phaseIter, minDistance);
2336   minK = 0;
2337   for (int i=1; i<numK; i++) {
2338     double distance = calculateDistance(i);
2339     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2340              currentPhase, phaseIter, i, distance);
2341     if (distance < minDistance) {
2342       minDistance = distance;
2343       minK = i;
2344     }
2345   }
2347   // Set up a reduction with the modification vector to the root (0).
2348   //
2349   // The modification vector sends a negative value for each metric
2350   //   for the K this processor no longer belongs to and a positive
2351   //   value to the K the processor now belongs. In addition, a -1.0
2352   //   is sent to the K it is leaving and a +1.0 to the K it is 
2353   //   joining.
2354   //
2355   // The processor must still contribute a "zero returns" even if
2356   //   nothing changes. This will be the basis for determine
2357   //   convergence at the root.
2358   //
2359   // The addtional +1 is meant for the count-change that must be
2360   //   maintained for the special cases at the root when some K
2361   //   may be deprived of all processor points or go from 0 to a
2362   //   positive number of processors (see later comments).
2363   double *modVector = new double[numK*(numMetrics+1)];
2364   for (int i=0; i<numK; i++) {
2365     for (int j=0; j<numMetrics+1; j++) {
2366       modVector[i*(numMetrics+1) + j] = 0.0;
2367     }
2368   }
2369   for (int i=0; i<numMetrics; i++) {
2370     // for this initialization, only positive values need be sent.
2371     modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2372   }
2373   modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2375   CkCallback cb(CkReductionTarget(KMeansBOC, updateKSeeds),
2376                0, thisProxy);
2377   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2378              CkReduction::sum_double, cb);  
2379   delete [] modVector;
2382 double KMeansBOC::calculateDistance(int k) {
2383   double ret = 0.0;
2384   for (int i=0; i<numMetrics; i++) {
2385     if (keepMetric[i]) {
2386       DEBUGN("[%d] Phase %d Iter %d Metric %d Exec %lf Seed %lf \n", 
2387              CkMyPe(), currentPhase, phaseIter, i,
2388                currentExecTimes[i], incKSeeds[k*numMetrics + i]);
2389       ret += pow(currentExecTimes[i] - incKSeeds[k*numMetrics + i], 2.0);
2390     }
2391   }
2392   return sqrt(ret);
2395 void KMeansBOC::updateKSeeds(double *modVector, int n) {
2396   CkAssert(CkMyPe() == 0);
2398   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateKSeeds time=\t%g\n", CkMyPe(), CkWallTimer() );
2400   // sanity check
2401   CkAssert(numK*(numMetrics+1) == n);
2403   // A quick convergence test.
2404   bool hasChanges = false;
2405   for (int i=0; i<numK; i++) {
2406     hasChanges = hasChanges || 
2407       (modVector[i*(numMetrics+1) + numMetrics] != 0.0);
2408   }
2409   if (!hasChanges) {
2410     findRepresentatives();
2411   } else {
2412     int overallChange = 0;
2413     for (int i=0; i<numK; i++) {
2414       int change = (int)modVector[i*(numMetrics+1) + numMetrics];
2415       if (change != 0) {
2416         overallChange += change;
2417         // modify the k seeds based on the modification vectors coming in
2418         //
2419         // If a seed initially has no members, its contents do not matter and
2420         //   is simply set to the average of the incoming vector.
2421         // If the change causes a seed to lose all its members, do nothing.
2422         //   Its last-known location is kept to allow it to re-capture
2423         //   membership at the next iteration rather than apply the last
2424         //   changes (which snaps the point unnaturally to 0,0).
2425         // Otherwise, apply the appropriate vector changes.
2426         CkAssert((kNumMembers[i] + change >= 0) &&
2427                  (kNumMembers[i] + change <= CkNumPes()));
2428         if (kNumMembers[i] == 0) {
2429           CkAssert(change > 0);
2430           for (int j=0; j<numMetrics; j++) {
2431             kSeeds[i*numMetrics + j] = modVector[i*(numMetrics+1) + j]/change;
2432           }
2433         } else if (kNumMembers[i] + change == 0) {
2434           // do nothing.
2435         } else {
2436           for (int j=0; j<numMetrics; j++) {
2437             kSeeds[i*numMetrics + j] *= kNumMembers[i];
2438             kSeeds[i*numMetrics + j] += modVector[i*(numMetrics+1) + j];
2439             kSeeds[i*numMetrics + j] /= kNumMembers[i] + change;
2440           }
2441         }
2442         kNumMembers[i] += change;
2443       }
2444       DEBUGN("[%d] Phase %d Iter %d K = %d Membership Count = %d\n",
2445              CkMyPe(), currentPhase, phaseIter, i, kNumMembers[i]);
2446     }
2448     // broadcast the new seed locations.
2449     KSeedsMessage *outmsg = new (numK*numMetrics) KSeedsMessage;
2450     outmsg->numKPos = numK*numMetrics;
2451     for (int i=0; i<numK*numMetrics; i++) {
2452       outmsg->kSeedsPos[i] = kSeeds[i];
2453     }
2455     thisProxy.updateSeedMembership(outmsg);
2456   }
2459 // Called on all processors
2460 void KMeansBOC::updateSeedMembership(KSeedsMessage *msg) {
2462   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::updateSeedMembership time=\t%g\n", CkMyPe(), CkWallTimer() );
2464   phaseIter++;
2466   // **NOTE** This might change if we want to send data based on the filter
2467   //   instead of all the data.
2468   CkAssert(numK*numMetrics == msg->numKPos);
2469   for (int i=0; i<msg->numKPos; i++) {
2470     incKSeeds[i] = msg->kSeedsPos[i];
2471   }
2473   // Decide which KSeed this processor belongs to.
2474   lastMinK = minK;
2475   minDistance = calculateDistance(0);
2476   DEBUGN("[%d] Phase %d Iter %d | Distance from 0 = %lf \n", CkMyPe(), 
2477          currentPhase, phaseIter, minDistance);
2479   minK = 0;
2480   for (int i=1; i<numK; i++) {
2481     double distance = calculateDistance(i);
2482     DEBUGN("[%d] Phase %d Iter %d | Distance from %d = %lf \n", CkMyPe(), 
2483            currentPhase, phaseIter, i, distance);
2484     if (distance < minDistance) {
2485       minDistance = distance;
2486       minK = i;
2487     }
2488   }
2490   double *modVector = new double[numK*(numMetrics+1)];
2491   for (int i=0; i<numK; i++) {
2492     for (int j=0; j<numMetrics+1; j++) {
2493       modVector[i*(numMetrics+1) + j] = 0.0;
2494     }
2495   }
2497   if (minK != lastMinK) {
2498     for (int i=0; i<numMetrics; i++) {
2499       modVector[minK*(numMetrics+1) + i] = currentExecTimes[i];
2500       modVector[lastMinK*(numMetrics+1) + i] = -currentExecTimes[i];
2501     }
2502     modVector[minK*(numMetrics+1)+numMetrics] = 1.0;
2503     modVector[lastMinK*(numMetrics+1)+numMetrics] = -1.0;
2504   }
2506   CkCallback cb(CkReductionTarget(KMeansBOC, updateKSeeds),
2507                0, thisProxy);
2508   contribute(numK*(numMetrics+1)*sizeof(double), modVector, 
2509              CkReduction::sum_double, cb);  
2510   delete [] modVector;
2513 void KMeansBOC::findRepresentatives() {
2515   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findRepresentatives time=\t%g\n", CkMyPe(), CkWallTimer() );
2517   int numNonEmptyClusters = 0;
2518   for (int i=0; i<numK; i++) {
2519     if (kNumMembers[i] > 0) {
2520       numNonEmptyClusters++;
2521     }
2522   }
2524   int numRepresentatives = peNumKeep;
2525   // **FIXME**
2526   // This is fairly arbitrary. Next time, choose the centers of the top
2527   //   largest clusters.
2528   if (numRepresentatives < numNonEmptyClusters) {
2529     numRepresentatives = numNonEmptyClusters;
2530   }
2532   int slotsRemaining = numRepresentatives;
2534   DEBUGF("Slots = %d | Non-empty = %d \n", slotsRemaining, 
2535          numNonEmptyClusters);
2537   // determine how many exemplars to select per cluster. Currently
2538   //   hardcoded to 1. Future challenge is to decide on other numbers
2539   //   or proportionality.
2540   //
2541   int exemplarsPerCluster = 1;
2542   slotsRemaining -= exemplarsPerCluster*numNonEmptyClusters;
2544   int numCandidateOutliers = CkNumPes() - 
2545     exemplarsPerCluster*numNonEmptyClusters;
2547   double *remainders = new double[numK];
2548   int *assigned = new int[numK];
2549   exemplarChoicesLeft = new int[numK];
2550   outlierChoicesLeft = new int[numK];
2552   for (int i=0; i<numK; i++) {
2553     assigned[i] = 0;
2554     remainders[i] = 
2555       (kNumMembers[i] - exemplarsPerCluster*numNonEmptyClusters) *
2556       slotsRemaining / numCandidateOutliers;
2557     if (remainders[i] >= 0.0) {
2558       assigned[i] = (int)floor(remainders[i]);
2559       remainders[i] -= assigned[i];
2560     } else {
2561       remainders[i] = 0.0;
2562     }
2563   }
2565   for (int i=0; i<numK; i++) {
2566     slotsRemaining -= assigned[i];
2567   }
2568   CkAssert(slotsRemaining >= 0);
2570   // find clusters to assign the loose slots to, in order of
2571   // remainder proportion
2572   while (slotsRemaining > 0) {
2573     double max = 0.0;
2574     int winner = 0;
2575     for (int i=0; i<numK; i++) {
2576       if (remainders[i] > max) {
2577         max = remainders[i];
2578         winner = i;
2579       }
2580     }
2581     assigned[winner]++;
2582     remainders[winner] = 0.0;
2583     slotsRemaining--;
2584   }
2586   // set up how many reduction cycles of min/max we need to conduct to
2587   // select the representatives.
2588   numSelectionIter = exemplarsPerCluster;
2589   for (int i=0; i<numK; i++) {
2590     if (assigned[i] > numSelectionIter) {
2591       numSelectionIter = assigned[i];
2592     }
2593   }
2594   DEBUGF("Selection Iterations = %d\n", numSelectionIter);
2596   for (int i=0; i<numK; i++) {
2597     if (kNumMembers[i] > 0) {
2598       exemplarChoicesLeft[i] = exemplarsPerCluster;
2599       outlierChoicesLeft[i] = assigned[i];
2600     } else {
2601       exemplarChoicesLeft[i] = 0;
2602       outlierChoicesLeft[i] = 0;
2603     }
2604     DEBUGF("%d | Exemplar = %d | Outlier = %d\n", i, exemplarChoicesLeft[i],
2605            outlierChoicesLeft[i]);
2606   }
2608   delete [] assigned;
2609   delete [] remainders;
2611   // send out first broadcast
2612   KSelectionMessage *outmsg = NULL;
2613   if (numSelectionIter > 0) {
2614     outmsg = new (numK, numK, numK) KSelectionMessage;
2615     outmsg->numKMinIDs = numK;
2616     outmsg->numKMaxIDs = numK;
2617     for (int i=0; i<numK; i++) {
2618       outmsg->minIDs[i] = -1;
2619       outmsg->maxIDs[i] = -1;
2620     }
2621     thisProxy.collectDistances(outmsg);
2622   } else {
2623     CkPrintf("Warning: No selection iteration from the start!\n");
2624     // invoke phase completion on all processors
2625     thisProxy.phaseDone();
2626   }
2630  *  lastMin = array of minimum champions of the last tournament
2631  *  lastMax = array of maximum champions of the last tournament
2632  *  lastMaxVal = array of last encountered maximum values, allows previous
2633  *                 minimum winners to eliminate themselves from the next
2634  *                 minimum race.
2636  *  Called on all processors.
2637  */
2638 void KMeansBOC::collectDistances(KSelectionMessage *msg) {
2640   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::collectDistances time=\t%g\n", CkMyPe(), CkWallTimer() );
2642   DEBUGF("[%d] %d | min = %d max = %d\n", CkMyPe(),
2643          lastMinK, msg->minIDs[lastMinK], msg->maxIDs[lastMinK]);
2644   if ((CkMyPe() == msg->minIDs[lastMinK]) || 
2645       (CkMyPe() == msg->maxIDs[lastMinK])) {
2646     CkAssert(!selected);
2647     selected = true;
2648   }
2650   // build outgoing reduction structure
2651   //   format = minVal | ID | maxVal | ID
2652   double *minMaxAndIDs = NULL;
2654   minMaxAndIDs = new double[numK*4];
2655   // initialize to the appropriate out-of-band values (for error checks)
2656   for (int i=0; i<numK; i++) {
2657     minMaxAndIDs[i*4] = -1.0; // out-of-band min value
2658     minMaxAndIDs[i*4+1] = -1.0; // out of band ID
2659     minMaxAndIDs[i*4+2] = -1.0; // out-of-band max value
2660     minMaxAndIDs[i*4+3] = -1.0; // out of band ID
2661   }
2662   // If I have not won before, I put myself back into the competition
2663   if (!selected) {
2664     DEBUGF("[%d] My Contribution = %lf\n", CkMyPe(), minDistance);
2665     minMaxAndIDs[lastMinK*4] = minDistance;
2666     minMaxAndIDs[lastMinK*4+1] = CkMyPe();
2667     minMaxAndIDs[lastMinK*4+2] = minDistance;
2668     minMaxAndIDs[lastMinK*4+3] = CkMyPe();
2669   }
2670   delete msg;
2672   CkCallback cb(CkReductionTarget(KMeansBOC, findNextMinMax),
2673                0, thisProxy);
2674   contribute(numK*4*sizeof(double), minMaxAndIDs, 
2675              minMaxReductionType, cb);  
2678 void KMeansBOC::findNextMinMax(CkReductionMsg *msg) {
2679   // incoming format:
2680   //   minVal | minID | maxVal | maxID
2682   // if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::findNextMinMax time=\t%g\n", CkMyPe(), CkWallTimer() );
2684   if (numSelectionIter > 0) {
2685     double *incInfo = (double*)msg->getData();
2686     
2687     KSelectionMessage *outmsg = new (numK, numK) KSelectionMessage;
2688     outmsg->numKMinIDs = numK;
2689     outmsg->numKMaxIDs = numK;
2690     
2691     for (int i=0; i<numK; i++) {
2692       DEBUGF("%d | %lf %d %lf %d \n", i, 
2693              incInfo[i*4], (int)incInfo[i*4+1], 
2694              incInfo[i*4+2], (int)incInfo[i*4+3]);
2695     }
2697     for (int i=0; i<numK; i++) {
2698       if (exemplarChoicesLeft[i] > 0) {
2699         outmsg->minIDs[i] = (int)incInfo[i*4+1];
2700         exemplarChoicesLeft[i]--;
2701       } else {
2702         outmsg->minIDs[i] = -1;
2703       }
2704       if (outlierChoicesLeft[i] > 0) {
2705         outmsg->maxIDs[i] = (int)incInfo[i*4+3];
2706         outlierChoicesLeft[i]--;
2707       } else {
2708         outmsg->maxIDs[i] = -1;
2709       }
2710     }
2711     thisProxy.collectDistances(outmsg);
2712     numSelectionIter--;
2713   } else {
2714     // invoke phase completion on all processors
2715     thisProxy.phaseDone();
2716   }
2720  *  Completion of the K-Means clustering and data selection of one phase
2721  *    of the computation.
2723  *  Called on every processor.
2724  */
2725 void KMeansBOC::phaseDone() {
2727   //  if(CkMyPe()==0)    CkPrintf("[%d] KMeansBOC::phaseDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2729   LogPool *pool = CkpvAccess(_trace)->_logPool;
2730   CProxy_TraceProjectionsBOC bocProxy(traceProjectionsGID);
2732   // now decide on what to do with the decision.
2733   if (!selected) {
2734     if (usePhases) {
2735       pool->keepPhase[currentPhase] = false;
2736     } else {
2737       // if not using phases, we're working on the whole log
2738       pool->setAllPhases(false);
2739     }
2740   }
2742   // **FIXME** (?) - All processors have to agree on this, or the reduction
2743   //   will not be correct! The question is "is this enforcible?"
2744   if ((currentPhase == (pool->numPhases-1)) || !usePhases) {
2745     // We're done
2746     contribute(CkCallback(CkReductionTarget(TraceProjectionsBOC, kMeansDone),
2747               0, bocProxy));
2748   } else {
2749     // reset all phase-based k-means data and decisions
2751     // **FIXME**!!!!!    
2752     
2753     // invoke the next K-Means computation phase.
2754     currentPhase++;
2755     thisProxy[CkMyPe()].getNextPhaseMetrics();
2756   }
2759 void TraceProjectionsBOC::startTimeAnalysis()
2761   double startTime = 0.0;
2762   if (CkpvAccess(_trace)->_logPool->numEntries>0)
2763      startTime = CkpvAccess(_trace)->_logPool->pool[0].time;
2764   CkCallback cb(CkReductionTarget(TraceProjectionsBOC, startTimeDone), thisProxy);
2765   contribute(sizeof(double), &startTime, CkReduction::min_double, cb);
2768 void TraceProjectionsBOC::startTimeDone(double result)
2770   // CkPrintf("[%d] TraceProjectionsBOC::startTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2772   if (CkpvAccess(_trace) != NULL) {
2773     CkpvAccess(_trace)->_logPool->globalStartTime = result;
2774     CkpvAccess(_trace)->_logPool->setNewStartTime();
2775     //if (CkMyPe() == 0) CkPrintf("Start time determined to be %lf us\n", (CkpvAccess(_trace)->_logPool->globalStartTime)*1e06);
2776   }
2777   thisProxy[CkMyPe()].startEndTimeAnalysis();
2780 void TraceProjectionsBOC::startEndTimeAnalysis()
2782  //CkPrintf("[%d] TraceProjectionsBOC::startEndTimeAnalysis time=\t%g\n", CkMyPe(), CkWallTimer() );
2784   endTime = CkpvAccess(_trace)->endTime;
2785   // CkPrintf("[%d] End time is %lf us\n", CkMyPe(), endTime*1e06);
2787   CkCallback cb(CkReductionTarget(TraceProjectionsBOC, endTimeDone),
2788           0, thisProxy);
2789   contribute(sizeof(double), &endTime, CkReduction::max_double, cb);  
2792 void TraceProjectionsBOC::endTimeDone(double result)
2794  //if(CkMyPe()==0)    CkPrintf("[%d] TraceProjectionsBOC::endTimeDone time=\t%g parModulesRemaining:%d\n", CkMyPe(), CkWallTimer(), parModulesRemaining);
2796   CkAssert(CkMyPe() == 0);
2797   parModulesRemaining--;
2798   if (CkpvAccess(_trace) != NULL && CkpvAccess(_trace)->_logPool != NULL) {
2799     CkpvAccess(_trace)->_logPool->globalEndTime = result - CkpvAccess(_trace)->_logPool->globalStartTime;
2800     // CkPrintf("End time determined to be %lf us\n",
2801     //       (CkpvAccess(_trace)->_logPool->globalEndTime)*1e06);
2802   }
2803   if (parModulesRemaining == 0) {
2804     thisProxy[CkMyPe()].finalize();
2805   }
2808 void TraceProjectionsBOC::kMeansDone() {
2810  if(CkMyPe()==0)  CkPrintf("[%d] TraceProjectionsBOC::kMeansDone time=\t%g\n", CkMyPe(), CkWallTimer() );
2812   CkAssert(CkMyPe() == 0);
2813   parModulesRemaining--;
2814   CkPrintf("K-Means Analysis Time = %lf seconds\n",
2815            CmiWallTimer()-analysisStartTime);
2816   if (parModulesRemaining == 0) {
2817     thisProxy[CkMyPe()].finalize();
2818   }
2823  *  This version is called (on processor 0) only if flushCheck fails.
2825  */
2826 void TraceProjectionsBOC::kMeansDoneFlushed() {
2827   CkAssert(CkMyPe() == 0);
2828   parModulesRemaining--;
2829   CkPrintf("K-Means Analysis Aborted because of flush. Time taken = %lf seconds\n",
2830            CmiWallTimer()-analysisStartTime);
2831   if (parModulesRemaining == 0) {
2832     thisProxy[CkMyPe()].finalize();
2833   }
2836 void TraceProjectionsBOC::finalize()
2838   CkAssert(CkMyPe() == 0);
2839   //CkPrintf("Total Analysis Time = %lf seconds\n", 
2840   //       CmiWallTimer()-analysisStartTime);
2841   thisProxy.closingTraces();
2844 // Called on every processor
2845 void TraceProjectionsBOC::closingTraces() {
2846   CkpvAccess(_trace)->closeTrace();
2848     // subtle:  reduction needs to go to the PE which started CkExit()
2849   int pe = 0;
2850   if (endPe != -1) pe = endPe;
2851   contribute(CkCallback(CkReductionTarget(TraceProjectionsBOC, closeParallelShutdown), pe, thisProxy));
2854 // The sole purpose of this reduction is to decide whether or not
2855 //   Projections as a module needs to call CkContinueExit() to get other
2856 //   modules to shutdown.
2857 void TraceProjectionsBOC::closeParallelShutdown(void) {
2858   CkAssert(endPe == -1 && CkMyPe() ==0 || CkMyPe() == endPe);
2859   if (!CkpvAccess(_trace)->converseExit) {
2860     CkContinueExit();
2861   }
2864  *  Registration and definition of the Outlier Reduction callback.
2865  *  Format: Sum | Min | Max | Sum of Squares
2866  */
2867 CkReductionMsg *outlierReduction(int nMsgs,
2868                                  CkReductionMsg **msgs) {
2869   int numBytes = 0;
2870   int numMetrics = 0;
2871   double *ret = NULL;
2873   if (nMsgs == 1) {
2874     // nothing to do, just pass it on
2875     return CkReductionMsg::buildNew(msgs[0]->getSize(),msgs[0]->getData());
2876   }
2878   if (nMsgs > 1) {
2879     numBytes = msgs[0]->getSize();
2880     // sanity checks
2881     if (numBytes%sizeof(double) != 0) {
2882       CkAbort("Outlier Reduction Size incompatible with doubles!\n");
2883     }
2884     if ((numBytes/sizeof(double))%4 != 0) {
2885       CkAbort("Outlier Reduction Size Array not divisible by 4!\n");
2886     }
2887     numMetrics = (numBytes/sizeof(double))/4;
2888     ret = new double[numMetrics*4];
2890     // copy the first message data into the return structure first
2891     for (int i=0; i<numMetrics*4; i++) {
2892       ret[i] = ((double *)msgs[0]->getData())[i];
2893     }
2895     // Sum | Min | Max | Sum of Squares
2896     for (int msgIdx=1; msgIdx<nMsgs; msgIdx++) {
2897       for (int i=0; i<numMetrics; i++) {
2898         // Sum
2899         ret[i] += ((double *)msgs[msgIdx]->getData())[i];
2900         // Min
2901         ret[numMetrics + i] =
2902           (ret[numMetrics + i] < 
2903            ((double *)msgs[msgIdx]->getData())[numMetrics + i]) 
2904           ? ret[numMetrics + i] : 
2905           ((double *)msgs[msgIdx]->getData())[numMetrics + i];
2906         // Max
2907         ret[2*numMetrics + i] = 
2908           (ret[2*numMetrics + i] >
2909            ((double *)msgs[msgIdx]->getData())[2*numMetrics + i])
2910           ? ret[2*numMetrics + i] :
2911           ((double *)msgs[msgIdx]->getData())[2*numMetrics + i];
2912         // Sum of Squares (squaring already done at leaf)
2913         ret[3*numMetrics + i] +=
2914           ((double *)msgs[msgIdx]->getData())[3*numMetrics + i];
2915       }
2916     }
2917   }
2918   
2919   /* apparently, we do not delete the incoming messages */
2920   return CkReductionMsg::buildNew(numBytes,ret);
2924  * The only reason we have a user-defined reduction is to support
2925  *   identification of the "winning" processors as well as to handle
2926  *   both the min and the max of each "tournament". A simple min/max
2927  *   discovery cannot handle ties.
2928  */
2929 CkReductionMsg *minMaxReduction(int nMsgs,
2930                                 CkReductionMsg **msgs) {
2931   CkAssert(nMsgs > 0);
2933   int numBytes = msgs[0]->getSize();
2934   CkAssert(numBytes%sizeof(double) == 0);
2935   int numK = (numBytes/sizeof(double))/4;
2937   double *ret = new double[numK*4];
2938   // fill with out-of-band values
2939   for (int i=0; i<numK; i++) {
2940     ret[i*4] = -1.0;
2941     ret[i*4+1] = -1.0;
2942     ret[i*4+2] = -1.0;
2943     ret[i*4+3] = -1.0;
2944   }
2946   // incoming format K * (minVal | minIdx | maxVal | maxIdx)
2947   for (int i=0; i<nMsgs; i++) {
2948     double *temp = (double *)msgs[i]->getData();
2949     for (int j=0; j<numK; j++) {
2950       // no previous valid min
2951       if (ret[j*4+1] < 0) {
2952         // fill it in only if the incoming min is valid
2953         if (temp[j*4+1] >= 0) {
2954           ret[j*4] = temp[j*4];      // fill min value
2955           ret[j*4+1] = temp[j*4+1];  // fill ID
2956         }
2957       } else {
2958         // find Min, only if incoming min is valid
2959         if (temp[j*4+1] >= 0) {
2960           if (temp[j*4] < ret[j*4]) {
2961             ret[j*4] = temp[j*4];      // replace min value
2962             ret[j*4+1] = temp[j*4+1];  // replace ID
2963           }
2964         }
2965       }
2966       // no previous valid max
2967       if (ret[j*4+3] < 0) {
2968         // fill only if the incoming max is valid
2969         if (temp[j*4+3] >= 0) {
2970           ret[j*4+2] = temp[j*4+2];  // fill max value
2971           ret[j*4+3] = temp[j*4+3];  // fill ID
2972         }
2973       } else {
2974         // find Max, only if incoming max is valid
2975         if (temp[j*4+3] >= 0) {
2976           if (temp[j*4+2] > ret[j*4+2]) {
2977             ret[j*4+2] = temp[j*4+2];  // replace max value
2978             ret[j*4+3] = temp[j*4+3];  // replace ID
2979           }
2980         }
2981       }
2982     }
2983   }
2984   CkReductionMsg *redmsg = CkReductionMsg::buildNew(numBytes, ret);
2985   delete [] ret;
2986   return redmsg;
2989 #include "TraceProjections.def.h"
2990 #endif //PROJ_ANALYSIS
2992 /*@}*/