6 #include "trace-utilization.h"
9 /* readonly */ CProxy_TraceUtilizationBOC traceUtilizationGroupProxy;
12 /// A reduction type for merging compressed sum detail data
13 CkReduction::reducerType sumDetailCompressedReducer;
16 // This function has unused arguments to match the type of
17 // CcdVoidFn, which CcdCallOnConditionKeep takes
18 void collectUtilizationData(void *, double) {
19 traceUtilizationGroupProxy.collectSumDetailData();
23 CkpvStaticDeclare(TraceUtilization*, _trace);
26 For each TraceFoo module, _createTraceFoo() must be defined.
27 This function is called in _createTraces() generated in moduleInit.C
29 void _createTraceutilization(char **argv)
31 // CkPrintf("[%d] _createTraceutilization\n", CkMyPe());
33 // Register the reducer
34 CkAssert(sizeof(short) == 2);
35 sumDetailCompressedReducer=CkReduction::addReducer(sumDetailCompressedReduction, false, "sumDetailCompressedReduction");
37 CkpvInitialize(TraceUtilization*, _trace);
38 CkpvAccess(_trace) = new TraceUtilization();
39 CkpvAccess(_traces)->addTrace(CkpvAccess(_trace));
45 void TraceUtilization::beginExecute(CmiObjId *tid)
47 beginExecute(-1,-1,_threadEP,-1);
50 void TraceUtilization::beginExecute(envelope *e, void *obj)
52 // no message means thread execution
54 beginExecute(-1,-1,_threadEP,-1);
57 beginExecute(-1,-1,e->getEpIdx(),-1);
61 void TraceUtilization::beginExecute(int event,int msgType,int ep,int srcPe, int mlen, CmiObjId *idx, void *obj)
63 if (execEp != INVALIDEP) {
64 TRACE_WARN("Warning: TraceUtilization two consecutive BEGIN_PROCESSING!\n");
73 void TraceUtilization::endExecute(void)
76 if (execEp == TRACEON_EP) {
77 // if trace just got turned on, then one expects to see this
78 // END_PROCESSING event without seeing a preceeding BEGIN_PROCESSING
83 double endTime = TraceTimer();
85 updateCpuTime(execEp, start, endTime);
93 void TraceUtilization::addEventType(int eventType)
95 CkPrintf("FIXME handle TraceUtilization::addEventType(%d)\n", eventType);
103 Send back to the client compressed sum-detail style measurements about the
104 utilization for each active PE combined across all PEs.
106 The data format sent by this handler is a bunch of records(one for each bin) of the following format:
107 #samples (EP,utilization)*
109 One example record for two EPS that executed during the sample period.
110 EP 3 used 150/200 of the time while EP 122 executed for 20/200 of the time.
111 All of these would be packed as bytes into the message:
115 void TraceUtilizationBOC::ccsRequestSumDetailCompressed(CkCcsRequestMsg *m) {
116 CkPrintf("CCS request for compressed sum detail. (found %d stored in deque)\n", storedSumDetailResults.size() );
117 // CkAssert(sumDetail);
122 compressedBuffer fakeMessage = fakeCompressedMessage();
123 CcsSendDelayedReply(m->reply, fakeMessage.datalength(), fakeMessage.buffer() );
124 fakeMessage.freeBuf();
128 if (storedSumDetailResults.size() == 0) {
129 compressedBuffer b = emptyCompressedBuffer();
130 CcsSendDelayedReply(m->reply, b.datalength(), b.buffer());
133 CkReductionMsg * msg = storedSumDetailResults.front();
134 storedSumDetailResults.pop_front();
137 void *sendBuffer = (void *)msg->getData();
138 datalength = msg->getSize();
139 CcsSendDelayedReply(m->reply, datalength, sendBuffer);
147 // CkPrintf("CCS response of %d bytes sent.\n", datalength);
153 void TraceUtilizationBOC::collectSumDetailData() {
154 TraceUtilization* t = CkpvAccess(_trace);
156 // If we don't have enough data, just return and wait for the next invocation
157 if (t->cpuTimeEntriesAvailable() - t->cpuTimeEntriesSentSoFar() < BIN_PER_SEC)
160 compressedBuffer b = t->compressNRecentSumDetail(BIN_PER_SEC);
162 // CkPrintf("[%d] contributing buffer created by compressNRecentSumDetail avg util=%lg\n", CkMyPe(), averageUtilizationInBuffer(b));
163 // printCompressedBuf(b);
168 b = fakeCompressedMessage();
171 // CkPrintf("[%d] contributing %d bytes worth of SumDetail data\n", CkMyPe(), b.datalength());
173 // CProxy_TraceUtilizationBOC sumProxy(traceSummaryGID);
174 CkCallback cb(CkIndex_TraceUtilizationBOC::sumDetailDataCollected(NULL), thisProxy[0]);
175 contribute(b.datalength(), b.buffer(), sumDetailCompressedReducer, cb);
181 void TraceUtilizationBOC::sumDetailDataCollected(CkReductionMsg *msg) {
182 CkAssert(CkMyPe() == 0);
184 compressedBuffer b(msg->getData());
185 CkPrintf("putting CCS reply in queue (average utilization= %lg)\n", averageUtilizationInBuffer(b));
186 //if(isCompressedBufferSane(b)){
187 storedSumDetailResults.push_back(msg);
190 // CkPrintf("[%d] Reduction of SumDetail completed. Result stored in storedSumDetailResults deque(size now=%d)\n", CkMyPe(), storedSumDetailResults.size() );
197 void TraceUtilization::writeSts(void) {
199 char *fname = new char[strlen(CkpvAccess(traceRoot))+strlen(".util.sts")+1];
200 sprintf(fname, "%s.util.sts", CkpvAccess(traceRoot));
201 FILE* stsfp = fopen(fname, "w+");
203 CmiAbort("Cannot open summary sts file for writing.\n");
207 traceWriteSTS(stsfp,0);
208 fprintf(stsfp, "END\n");
215 ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
219 /// Compress a buffer by merging all entries in a bin that are less than the threshold into a single "other" category
220 compressedBuffer moveTinyEntriesToOther(compressedBuffer src, double threshold){
221 // CkPrintf("[%d] moveTinyEntriesToOther\n", CkMyPe());
223 // reset the src buffer to the beginning
226 compressedBuffer dest(100000);
228 int numBins = src.pop<numBins_T>();
229 int numProcs = src.pop<numProcs_T>();
231 dest.push<numBins_T>(numBins);
232 dest.push<numProcs_T>(numProcs);
235 for(int i=0;i<numBins;i++){
236 double utilizationInOther = 0.0;
238 entriesInBin_T numEntriesInSrcBin = src.pop<entriesInBin_T>();
239 int numEntriesInDestBinOffset = dest.push<entriesInBin_T>(0);
241 CkAssert(numEntriesInSrcBin < 200);
243 for(int j=0;j<numEntriesInSrcBin;j++){
244 ep_T ep = src.pop<ep_T>();
245 double v = src.pop<utilization_T>();
247 if(v < threshold * 250.0){
248 // do not copy bin into destination
249 utilizationInOther += v / 250.0;
251 // copy bin into destination
252 dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
254 dest.push<utilization_T>(v);
259 // if other category has stuff in it, add it to the destination buffer
260 if(utilizationInOther > 0.0){
261 dest.increment<entriesInBin_T>(numEntriesInDestBinOffset);
262 dest.push<ep_T>(other_EP);
263 if(utilizationInOther > 1.0)
264 utilizationInOther = 1.0;
265 dest.push<utilization_T>(utilizationInOther*250.0);
276 /// A reducer for merging compressed sum detail data
277 CkReductionMsg *sumDetailCompressedReduction(int nMsg,CkReductionMsg **msgs){
278 // CkPrintf("[%d] sumDetailCompressedReduction(nMsgs=%d)\n", CkMyPe(), nMsg);
280 compressedBuffer *incomingMsgs = new compressedBuffer[nMsg];
281 int *numProcsRepresentedInMessage = new int[nMsg];
285 int totalProcsAcrossAllMessages = 0;
287 for (int i=0;i<nMsg;i++) {
288 incomingMsgs[i].init(msgs[i]->getData());
290 // CkPrintf("[%d] Incoming reduction message %d has average utilization %lg\n", CkMyPe(), i, averageUtilizationInBuffer(incomingMsgs[i]));
291 // CkPrintf("Is buffer %d sane? %s\n", i, isCompressedBufferSane(incomingMsgs[i]) ? "yes": "no" );
294 totalsize += msgs[i]->getSize();
295 // CkPrintf("BEGIN MERGE MESSAGE=========================================================\n");
296 // printCompressedBuf(incomingMsgs[i]);
298 // Read first value from message.
299 // Make sure all messages have the same number of bins
301 numBins = incomingMsgs[i].pop<numBins_T>();
303 CkAssert( numBins == incomingMsgs[i].pop<numBins_T>() );
305 // Read second value from message.
306 numProcsRepresentedInMessage[i] = incomingMsgs[i].pop<numProcs_T>();
307 totalProcsAcrossAllMessages += numProcsRepresentedInMessage[i];
308 // CkPrintf("Number of procs in message[%d] is %d\n", i, (int)numProcsRepresentedInMessage[i]);
311 compressedBuffer dest(totalsize + 100);
313 // build a compressed representation of each merged bin
314 dest.push<numBins_T>(numBins);
315 dest.push<numProcs_T>(totalProcsAcrossAllMessages);
317 for(int i=0; i<numBins; i++){
318 mergeCompressedBin(incomingMsgs, nMsg, numProcsRepresentedInMessage, totalProcsAcrossAllMessages, dest);
321 // CkPrintf("END MERGE RESULT=========================================================\n");
322 // printCompressedBuf(dest);
325 //CkPrintf("[%d] Merged buffer has average utilization %lg \n", CkMyPe(), averageUtilizationInBuffer(dest));
327 //CkPrintf("Is resulting merged buffer sane? %s\n", isCompressedBufferSane(dest) ? "yes": "no" );
329 compressedBuffer dest2 = moveTinyEntriesToOther(dest, 0.10);
331 // CkPrintf("Is resulting merged Filtered buffer sane? %s\n", isCompressedBufferSane(dest2) ? "yes": "no" );
333 // CkPrintf("[%d] Outgoing reduction (filtered) message has average utilization %lf \n", CkMyPe(), averageUtilizationInBuffer(dest2));
336 CkReductionMsg *m = CkReductionMsg::buildNew(dest2.datalength(),dest2.buffer());
338 delete[] incomingMsgs;
339 delete[] numProcsRepresentedInMessage;
349 /// Create fake sum detail data in the compressed format (for debugging)
350 compressedBuffer fakeCompressedMessage(){
351 CkPrintf("[%d] fakeCompressedMessage\n", CkMyPe());
353 compressedBuffer fakeBuf(10000);
358 // build a compressed representation of each merged bin
359 fakeBuf.push<numBins_T>(numBins);
360 fakeBuf.push<numProcs_T>(numProcs);
361 for(int i=0; i<numBins; i++){
363 fakeBuf.push<entriesInBin_T>(numRecords);
364 for(int j=0;j<numRecords;j++){
365 fakeBuf.push<ep_T>(j*10+2);
366 fakeBuf.push<utilization_T>(120.00);
370 //CkPrintf("Fake Compressed Message:=========================================================\n");
371 // printCompressedBuf(fakeBuf);
373 CkAssert(isCompressedBufferSane(fakeBuf));
379 /// Create an empty message
380 compressedBuffer emptyCompressedBuffer(){
381 compressedBuffer result(sizeof(numBins_T));
382 result.push<numBins_T>(0);
389 /** print out the compressed buffer starting from its begining*/
390 void printCompressedBuf(compressedBuffer b){
391 // b should be passed in by value, and hence we can modify it
393 int numEntries = b.pop<numBins_T>();
394 CkPrintf("Buffer contains %d records\n", numEntries);
395 int numProcs = b.pop<numProcs_T>();
396 CkPrintf("Buffer represents an average over %d PEs\n", numProcs);
398 for(int i=0;i<numEntries;i++){
399 entriesInBin_T recordLength = b.pop<entriesInBin_T>();
400 if(recordLength > 0){
401 CkPrintf(" Record %d is of length %d : ", i, recordLength);
403 for(int j=0;j<recordLength;j++){
404 ep_T ep = b.pop<ep_T>();
405 utilization_T v = b.pop<utilization_T>();
406 CkPrintf("(%d,%f) ", ep, v);
416 bool isCompressedBufferSane(compressedBuffer b){
417 // b should be passed in by value, and hence we can modify it
419 numBins_T numBins = b.pop<numBins_T>();
420 numProcs_T numProcs = b.pop<numProcs_T>();
423 ckout << "WARNING: numBins=" << numBins << endl;
427 for(int i=0;i<numBins;i++){
428 entriesInBin_T recordLength = b.pop<entriesInBin_T>();
429 if(recordLength > 200){
430 ckout << "WARNING: recordLength=" << recordLength << endl;
434 if(recordLength > 0){
436 for(int j=0;j<recordLength;j++){
437 ep_T ep = b.pop<ep_T>();
438 utilization_T v = b.pop<utilization_T>();
439 // CkPrintf("(%d,%f) ", ep, v);
440 if(((ep>800 || ep <0 ) && ep != other_EP) || v < 0.0 || v > 251.0){
441 ckout << "WARNING: ep=" << ep << " v=" << v << endl;
454 double averageUtilizationInBuffer(compressedBuffer b){
455 // b should be passed in by value, and hence we can modify it
457 numBins_T numBins = b.pop<numBins_T>();
458 numProcs_T numProcs = b.pop<numProcs_T>();
460 // CkPrintf("[%d] averageUtilizationInBuffer numProcs=%d (grep reduction message)\n", CkMyPe(), numProcs);
462 double totalUtilization = 0.0;
464 for(int i=0;i<numBins;i++) {
465 entriesInBin_T entriesInBin = b.pop<entriesInBin_T>();
466 for(int j=0;j<entriesInBin;j++){
467 ep_T ep = b.pop<ep_T>();
468 totalUtilization += b.pop<utilization_T>();
472 return totalUtilization / numBins / 2.5;
477 void sanityCheckCompressedBuf(compressedBuffer b){
478 CkAssert(isCompressedBufferSane(b));
483 double TraceUtilization::sumUtilization(int startBin, int endBin){
484 int epInfoSize = getEpInfoSize();
488 for(int i=startBin; i<=endBin; i++){
489 for(int j=0; j<epInfoSize; j++){
490 a += cpuTime[(i%NUM_BINS)*epInfoSize+j];
497 /// Create a compressed buffer of the n most recent sum detail samples
498 compressedBuffer TraceUtilization::compressNRecentSumDetail(int desiredBinsToSend){
499 // CkPrintf("compressNRecentSumDetail(desiredBinsToSend=%d)\n", desiredBinsToSend);
501 int startBin = cpuTimeEntriesSentSoFar();
502 int numEntries = getEpInfoSize();
504 int endBin = startBin + desiredBinsToSend - 1;
505 int binsToSend = endBin - startBin + 1;
506 CkAssert(binsToSend >= desiredBinsToSend );
507 incrementNumCpuTimeEntriesSent(binsToSend);
511 bool nonePrinted = true;
512 for(int i=0;i<(NUM_BINS-1000);i+=1000){
513 double expectedU = sumUtilization(i, i+999);
515 CkPrintf("[%d of %d] compressNRecentSumDetail All bins: start=%05d end=%05d values in array sum to %lg\n", CkMyPe(), CkNumPes(), i, i+999, expectedU);
521 CkPrintf("[%d of %d] compressNRecentSumDetail All bins are 0\n", CkMyPe(), CkNumPes() );
526 int bufferSize = 8*(2+numEntries) * (2+binsToSend)+100;
527 compressedBuffer b(bufferSize);
529 b.push<numBins_T>(binsToSend);
530 b.push<numProcs_T>(1); // number of processors along reduction subtree. I am just one processor.
533 for(int i=0; i<binsToSend; i++) {
534 // Create a record for bin i
535 // CkPrintf("Adding record for bin %d\n", i);
536 int numEntriesInRecordOffset = b.push<entriesInBin_T>(0); // The number of entries in this record
538 for(int e=0; e<numEntries; e++) {
539 double scaledUtilization = getUtilization(i+startBin,e) * 2.5; // use range of 0 to 250 for the utilization, so it can fit in an unsigned char
540 if(scaledUtilization > 0.0) {
541 //CkPrintf("scaledUtilization=%lg !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n", scaledUtilization);
542 if(scaledUtilization > 250.0)
543 scaledUtilization = 250.0;
546 b.push<utilization_T>(scaledUtilization);
547 // myu += scaledUtilization;
548 b.increment<entriesInBin_T>(numEntriesInRecordOffset);
553 // CkPrintf("[%d] compressNRecentSumDetail resulting buffer: averageUtilizationInBuffer()=%lg myu=%lg\n", CkMyPe(), averageUtilizationInBuffer(b), myu);
563 /** Merge the compressed entries from the first bin in each of the srcBuf buffers.
566 void mergeCompressedBin(compressedBuffer *srcBufferArray, int numSrcBuf, int *numProcsRepresentedInMessage, int totalProcsAcrossAllMessages, compressedBuffer &destBuf){
567 // put a counter at the beginning of destBuf
568 int numEntriesInDestRecordOffset = destBuf.push<entriesInBin_T>(0);
570 // CkPrintf("BEGIN MERGE------------------------------------------------------------------\n");
572 // Read off the number of bins in each buffer
573 int *remainingEntriesToRead = new int[numSrcBuf];
574 for(int i=0;i<numSrcBuf;i++){
575 remainingEntriesToRead[i] = srcBufferArray[i].pop<entriesInBin_T>();
579 // Count remaining entries to process
580 for(int i=0;i<numSrcBuf;i++){
581 count += remainingEntriesToRead[i];
585 // find first EP from all buffers (these are sorted by EP already)
587 for(int i=0;i<numSrcBuf;i++){
588 if(remainingEntriesToRead[i]>0){
589 int ep = srcBufferArray[i].peek<ep_T>();
596 // CkPrintf("[%d] mergeCompressedBin minEp found was %d totalProcsAcrossAllMessages=%d\n", CkMyPe(), minEp, (int)totalProcsAcrossAllMessages);
598 destBuf.increment<entriesInBin_T>(numEntriesInDestRecordOffset);
600 // Merge contributions from all buffers that list the EP
602 for(int i=0;i<numSrcBuf;i++){
603 if(remainingEntriesToRead[i]>0){
604 int ep = srcBufferArray[i].peek<ep_T>();
606 srcBufferArray[i].pop<ep_T>(); // pop ep
607 double util = srcBufferArray[i].pop<utilization_T>();
608 v += util * numProcsRepresentedInMessage[i];
609 remainingEntriesToRead[i]--;
615 // create a new entry in the output for this EP.
616 destBuf.push<ep_T>(minEp);
617 destBuf.push<utilization_T>(v / (double)totalProcsAcrossAllMessages);
622 delete [] remainingEntriesToRead;
623 // CkPrintf("[%d] End of mergeCompressedBin:\n", CkMyPe() );
624 // CkPrintf("END MERGE ------------------------------------------------------------------\n");
629 #include "TraceUtilization.def.h"