Fix memory leaks in trace projections and summary
[charm.git] / src / ck-core / ckmessagelogging.h
blob16212ea8a0af5794b16c814e70ffb35cdbe9525f
1 #ifndef _CKMESSAGELOGGING_H_
2 #define _CKMESSAGELOGGING_H_
4 #include "ckobjid.h"
6 CpvExtern(Chare *,_currentObj);
7 CpvExtern(int, _numImmigrantRecObjs);
9 //states of a ticket sent as a reply to a request
10 #define NEW_TICKET 1
11 #define OLD_TICKET 2
12 #define FORWARDED_TICKET 0x8000
14 //TML: global variable for the size of the team
15 #define MLOG_RESTARTED 0
16 #define MLOG_CRASHED 1
17 #define MEGABYTE 1048576
19 //array on which we print the formatted string representing an object id
20 extern char objString[100];
22 // defines the initial size of _bufferedDets
23 #define INITIAL_BUFFERED_DETERMINANTS 1024
25 // constant to define the type of checkpoint used (synchronized or not)
26 #define SYNCHRONIZED_CHECKPOINT 1
28 #define DEBUGGING(x) // x
29 #define DEBUGGING_NOW(x) x
31 class MlogEntry;
33 class RestoredLocalMap;
35 #define RSSN_INITIAL_SIZE 16
37 /**
38 * @brief Class that stores all received-sender-sequence-numbers (rssn) from another object.
40 class RSSN{
41 private:
42 MCount *data;
43 int currentSize, start, end;
44 public:
46 // Constructor
47 RSSN(){
48 currentSize = RSSN_INITIAL_SIZE;
49 start = 0;
50 end = 0;
51 data = new MCount[RSSN_INITIAL_SIZE];
52 memset(data,0,sizeof(MCount)*currentSize);
55 ~RSSN()
57 if(data != NULL)
59 delete []data;
60 data = NULL;
64 // Checks if a particular SSN is already in the data; if not, stores it
65 // return value: 0 (sucess, value stored), 1 (value already there)
66 int checkAndStore(MCount ssn){
67 int index, oldCS, num, i;
68 MCount *old;
70 // checking if ssn can be inserted, most common case
71 if((start == end && ssn == (data[start] + 1)) || data[start] == 0){
72 data[start] = ssn;
73 return 0;
76 // checking if ssn was already received
77 if(ssn <= data[start]){
78 DEBUGGING(CkPrintf("[%d] Repeated ssn=%d start=%d\n",CkMyPe(),ssn,data[start]));
79 return 1;
82 // checking if data needs to be extended
83 if(ssn-data[start] >= currentSize){
84 DEBUGGING(CkPrintf("[%d] Extending Data %d %d %d\n",CkMyPe(),ssn,data[start],currentSize));
86 // HACK for migration
87 data[0] = ssn;
88 start = end = 0;
89 return 0; //HACK
91 old = data;
92 oldCS = currentSize;
93 currentSize *= 2;
94 data = new MCount[currentSize];
95 memset(data,0,sizeof(MCount)*currentSize);
96 for(i=start, num=0; i!=end; i=(i+1)%oldCS,num++){
97 data[num] = old[i];
99 start = 0;
100 end = num-1;
101 delete[] old;
104 DEBUGGING(CkPrintf("[%d] Ahead ssn=%d start=%d\n",CkMyPe(),ssn,data[start]));
106 // adding ssn into data
107 num = end - start;
108 if(num < 0) num += currentSize;
109 num++;
110 index = (start+ssn-data[start])%currentSize;
111 data[index] = ssn;
112 if((ssn-data[start]) >= num) end = index;
114 // compressing ssn
115 index = start + 1;
116 while(data[index]){
117 data[start] = 0;
118 start = index;
119 index = (index + 1)%currentSize;
120 if(index == end) break;
122 return 0;
125 // PUP method
126 inline void pup(PUP::er &p){
127 p | start;
128 p | end;
129 p | currentSize;
130 if(p.isUnpacking()){
131 if(currentSize > RSSN_INITIAL_SIZE){
132 delete[] data;
133 data = new MCount[currentSize];
136 for(int i=0;i<currentSize;i++){
137 p | data[i];
145 * This file includes the definition of the class for storing the meta data
146 * associdated with the message logging protocol.
151 * @brief This class stores all the message logging related data for a chare.
153 class ChareMlogData{
154 public:
155 // Object unique ID.
156 CkObjID objID;
157 // variable that keeps a count of the processors that have replied to a requests to resend messages.
158 int resendReplyRecvd;
159 // 0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time
160 int restartFlag;
161 // 0 -> normal state .. 1 -> recovery of a team member
162 int teamRecoveryFlag;
163 int toResumeOrNot;
164 int resumeCount;
165 int immigrantRecFlag;
166 int immigrantSourcePE;
168 private:
169 // ssnTable, stores the number of messages sent (sequence numbers) to other objects.
170 CkHashtableT<CkHashtableAdaptorT<CkObjID>, MCount> ssnTable;
171 // receivedSsnTable, stores the list of ssn received from other objects.
172 CkHashtableT<CkHashtableAdaptorT<CkObjID>, RSSN *> receivedSsnTable;
173 // Log of messages sent.
174 CkQ<MlogEntry *> mlog;
176 public:
178 * Default constructor.
180 ChareMlogData():ssnTable(100,0.4),receivedSsnTable(100,0.4){
181 restartFlag=0;
182 teamRecoveryFlag=0;
183 resendReplyRecvd=0;
184 toResumeOrNot=0;
185 resumeCount=0;
186 immigrantRecFlag = 0;
188 inline MCount nextSN(const CkObjID &recver);
189 int checkAndStoreSsn(const CkObjID &sender, MCount ssn);
190 void addLogEntry(MlogEntry *entry);
191 virtual void pup(PUP::er &p);
192 CkQ<MlogEntry *> *getMlog(){ return &mlog;};
196 * @brief Entry in a message log. It also includes the index of the buffered
197 * determinants array and the number of appended determinants.
198 * @note: this message appended numBufDets counting downwards from indexBufDets.
199 * In other words, if indexBufDets == 5 and numBufDets = 3, it means that
200 * determinants bufDets[2], bufDets[3] and bufDets[4] were piggybacked.
202 class MlogEntry{
203 public:
204 envelope *env;
205 int destPE;
206 int _infoIdx;
208 MlogEntry(envelope *_env,int _destPE,int __infoIdx){
209 env = _env;
210 destPE = _destPE;
211 _infoIdx = __infoIdx;
213 MlogEntry(){
214 env = 0;
215 destPE = -1;
216 _infoIdx = 0;
218 ~MlogEntry(){
219 if(env){
220 CmiFree(env);
223 virtual void pup(PUP::er &p);
227 * @brief
229 class StoredCheckpoint{
230 public:
231 char *buf;
232 int bufSize;
233 int PE;
234 StoredCheckpoint(){
235 buf = NULL;
236 bufSize = 0;
237 PE = -1;
241 typedef struct{
242 char header[CmiMsgHeaderSizeBytes];
243 int PE;
244 int dataSize;
245 } CheckPointDataMsg;
247 typedef struct{
248 char header[CmiMsgHeaderSizeBytes];
249 int PE;
250 } DistributeObjectMsg;
253 /*typedef struct{
254 char header[CmiMsgHeaderSizeBytes];
255 int PE;
256 int dataSize;
257 } CheckPointAck;*/
259 typedef CheckPointDataMsg CheckPointAck;
263 * Struct to request a particular action during restart.
265 typedef struct{
266 char header[CmiMsgHeaderSizeBytes];
267 int PE;
268 } RestartRequest;
270 typedef RestartRequest CkPingMsg;
271 typedef RestartRequest CheckpointRequest;
273 typedef struct{
274 char header[CmiMsgHeaderSizeBytes];
275 int PE;
276 double restartWallTime;
277 int checkPointSize;
278 int numMigratedAwayElements;
279 int numMigratedInElements;
280 int migratedElementSize;
281 int numLocalMessages;
282 CkGroupID lbGroupID;
283 } RestartProcessorData;
285 typedef struct{
286 char header[CmiMsgHeaderSizeBytes];
287 int PE;
288 int numberObjects;
289 } ResendRequest;
291 typedef ResendRequest RemoveLogRequest;
293 typedef struct {
294 char header[CmiMsgHeaderSizeBytes];
295 CkObjID recver;
296 int numTNs;
297 } ReceivedTNData;
299 // Structure to forward determinants in parallel restart
300 typedef struct {
301 char header[CmiMsgHeaderSizeBytes];
302 CkObjID recver;
303 int numDets;
304 } ReceivedDetData;
306 typedef struct{
307 int PE;
308 int numberObjects;
309 CkObjID *listObjects;
310 } ResendData;
312 typedef struct {
313 CkGroupID gID;
314 CkArrayIndexMax idx;
315 int fromPE,toPE;
316 char ackFrom,ackTo;
317 } MigrationRecord;
319 typedef struct {
320 char header[CmiMsgHeaderSizeBytes];
321 MigrationRecord migRecord;
322 void *record;
323 } MigrationNotice;
325 typedef struct {
326 char header[CmiMsgHeaderSizeBytes];
327 void *record;
328 } MigrationNoticeAck;
330 typedef struct {
331 MigrationRecord migRecord;
332 void *msg;
333 int size;
334 char acked;
335 } RetainedMigratedObject;
337 typedef struct {
338 char header[CmiMsgHeaderSizeBytes];
339 MigrationRecord migRecord;
340 int index;
341 int fromPE;
342 } VerifyAckMsg;
344 typedef struct {
345 char header[CmiMsgHeaderSizeBytes];
346 int checkpointCount;
347 int fromPE;
348 } CheckpointBarrierMsg;
351 //message used to inform a locmgr of an object's current location
352 typedef struct {
353 char header[CmiMsgHeaderSizeBytes];
354 CkGroupID mgrID;
355 CkArrayIndexMax idx;
356 int locationPE;
357 int fromPE;
358 } CurrentLocationMsg;
360 typedef struct {
361 char header[CmiMsgHeaderSizeBytes];
362 CkGroupID lbID;
363 int fromPE;
364 int step;
365 } LBStepMsg;
368 #define MLOG_OBJECT 1
369 #define MLOG_COUNT 2
371 typedef struct {
372 char header[CmiMsgHeaderSizeBytes];
373 int flag;// specific object(1) or count(2)
374 CkGroupID lbID;
375 int count;// if just count
376 /**if object **/
377 CkGroupID mgrID;
378 CkArrayIndexMax idx;
379 int locationPE;
380 } DummyMigrationMsg;
383 //function pointer passed to the forAllCharesDo method.
384 //It takes a void *data and a ChareMlogData pointer
385 //It gets called for each chare
386 typedef void (*MlogFn)(void *,ChareMlogData *);
388 void _messageLoggingInit();
390 //Methods for sending ticket requests
391 void sendGroupMsg(envelope *env,int destPE,int _infoIdx);
392 void sendArrayMsg(envelope *env,int destPE,int _infoIdx);
393 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid);
394 void sendNodeGroupMsg(envelope *env,int destNode,int _infoIdx);
395 void sendCommonMsg(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
396 void sendRemoteMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend);
397 void sendLocalMsg(envelope *env, int _infoIdx);
399 //handler functions
400 void _pingHandler(CkPingMsg *msg);
402 //methods for sending messages
403 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
404 extern void _noCldNodeEnqueue(int node, envelope *env);
405 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
407 //methods to process received messages with respect to mlog
408 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
409 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
412 //Checkpoint
413 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
415 //methods for checkpointing
416 void CkStartMlogCheckpoint(CkCallback &cb);
417 void checkpointAlarm(void *_dummy,double curWallTime);
418 void startMlogCheckpoint(void *_dummy,double curWallTime);
419 void pupArrayElementsSkip(PUP::er &p, bool create, MigrationRecord *listToSkip,int listSize=0);
421 //handler functions for checkpoint
422 void _checkpointRequestHandler(CheckpointRequest *request);
423 void _storeCheckpointHandler(char *msg);
424 void _checkpointAckHandler(CheckPointAck *ackMsg);
425 void _removeProcessedLogHandler(char *requestMsg);
426 void garbageCollectMlog();
427 void _startCheckpointHandler(CheckpointBarrierMsg *msg);
428 void _endCheckpointHandler(char *msg);
430 //handler idxs for checkpoint
431 extern int _checkpointRequestHandlerIdx;
432 extern int _storeCheckpointHandlerIdx;
433 extern int _checkpointAckHandlerIdx;
434 extern int _removeProcessedLogHandlerIdx;
436 //Restart
439 //methods for restart
440 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
441 void CkMlogRestartDouble(void *,double);
442 void initializeRestart(void *data,ChareMlogData *mlogData);
443 void distributeRestartedObjects();
444 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE);
446 //TML: function for locally calling the restart
447 void CkMlogRestartLocal();
449 //handler functions for restart
450 void _getCheckpointHandler(RestartRequest *restartMsg);
451 void _recvCheckpointHandler(char *_restartData);
452 void _resendMessagesHandler(char *msg);
453 void _sendDetsHandler(char *msg);
454 void _sendDetsReplyHandler(char *msg);
455 void _receivedTNDataHandler(ReceivedTNData *msg);
456 void _receivedDetDataHandler(ReceivedDetData *msg);
457 void _distributedLocationHandler(char *receivedMsg);
458 void _sendBackLocationHandler(char *receivedMsg);
459 void _updateHomeRequestHandler(RestartRequest *updateRequest);
460 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
461 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
462 void _verifyAckHandler(VerifyAckMsg *verifyReply);
463 void _dummyMigrationHandler(DummyMigrationMsg *msg);
465 //TML: new functions for group-based message logging
466 void _restartHandler(RestartRequest *restartMsg);
467 void _getRestartCheckpointHandler(RestartRequest *restartMsg);
468 void _recvRestartCheckpointHandler(char *_restartData);
470 //handler idxs for restart
471 extern int _getCheckpointHandlerIdx;
472 extern int _recvCheckpointHandlerIdx;
473 extern int _resendMessagesHandlerIdx;
474 extern int _sendDetsHandlerIdx;
475 extern int _sendDetsReplyHandlerIdx;
476 extern int _receivedTNDataHandlerIdx;
477 extern int _receivedDetDataHandlerIdx;
478 extern int _distributedLocationHandlerIdx;
479 extern int _updateHomeRequestHandlerIdx;
480 extern int _updateHomeAckHandlerIdx;
481 extern int _verifyAckRequestHandlerIdx;
482 extern int _verifyAckHandlerIdx;
483 extern int _dummyMigrationHandlerIdx;
485 /// Load Balancing
487 //methods for load balancing
488 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
489 void finishedCheckpointLoadBalancing();
490 void sendMlogLocation(int targetPE,envelope *env);
491 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
492 void restoreParallelRecovery(void (*fnPtr)(void *),void *_centralLb);
494 //handlers for Load Balancing
495 void _receiveMlogLocationHandler(void *buf);
496 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
497 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
498 void _getGlobalStepHandler(LBStepMsg *msg);
499 void _recvGlobalStepHandler(LBStepMsg *msg);
500 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
501 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
503 //globals used for loadBalancing
504 extern int onGoingLoadBalancing;
505 extern void *centralLb;
506 extern void (*resumeLbFnPtr)(void *) ;
507 extern int _receiveMlogLocationHandlerIdx;
508 extern int _receiveMigrationNoticeHandlerIdx;
509 extern int _receiveMigrationNoticeAckHandlerIdx;
510 extern int _getGlobalStepHandlerIdx;
511 extern int _recvGlobalStepHandlerIdx;
512 extern int _checkpointBarrierHandlerIdx;
513 extern int _checkpointBarrierAckHandlerIdx;
515 //extern CkHashtableT<CkHashtableAdaptorT<CkObjID>,void *> migratedObjectList;
516 extern CkVec<MigrationRecord> migratedNoticeList;
517 extern CkVec<RetainedMigratedObject *> retainedObjectList;
519 int getCheckPointPE();
520 void forAllCharesDo(MlogFn fnPointer,void *data);
521 envelope *copyEnvelope(envelope *env);
522 extern void _initDone(void);
524 //TML: needed for group restart
525 extern void _resetNodeBocInitVec(void);
527 //methods for updating location
528 void informLocationHome(CkGroupID mgrID,CkArrayIndexMax idx,int homePE,int currentPE);
530 //handlers for updating locations
531 void _receiveLocationHandler(CurrentLocationMsg *data);
533 //globals for updating locations
534 extern int _receiveLocationHandlerIdx;
537 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
539 #endif