1 #ifndef _CKMESSAGELOGGING_H_
2 #define _CKMESSAGELOGGING_H_
6 CpvExtern(Chare
*,_currentObj
);
7 CpvExtern(int, _numImmigrantRecObjs
);
9 //states of a ticket sent as a reply to a request
12 #define FORWARDED_TICKET 0x8000
14 //TML: global variable for the size of the team
15 #define MLOG_RESTARTED 0
16 #define MLOG_CRASHED 1
17 #define MEGABYTE 1048576
19 //array on which we print the formatted string representing an object id
20 extern char objString
[100];
22 // defines the initial size of _bufferedDets
23 #define INITIAL_BUFFERED_DETERMINANTS 1024
25 // constant to define the type of checkpoint used (synchronized or not)
26 #define SYNCHRONIZED_CHECKPOINT 1
28 #define DEBUGGING(x) // x
29 #define DEBUGGING_NOW(x) x
33 class RestoredLocalMap
;
35 #define RSSN_INITIAL_SIZE 16
38 * @brief Class that stores all received-sender-sequence-numbers (rssn) from another object.
43 int currentSize
, start
, end
;
48 currentSize
= RSSN_INITIAL_SIZE
;
51 data
= new MCount
[RSSN_INITIAL_SIZE
];
52 memset(data
,0,sizeof(MCount
)*currentSize
);
64 // Checks if a particular SSN is already in the data; if not, stores it
65 // return value: 0 (sucess, value stored), 1 (value already there)
66 int checkAndStore(MCount ssn
){
67 int index
, oldCS
, num
, i
;
70 // checking if ssn can be inserted, most common case
71 if((start
== end
&& ssn
== (data
[start
] + 1)) || data
[start
] == 0){
76 // checking if ssn was already received
77 if(ssn
<= data
[start
]){
78 DEBUGGING(CkPrintf("[%d] Repeated ssn=%d start=%d\n",CkMyPe(),ssn
,data
[start
]));
82 // checking if data needs to be extended
83 if(ssn
-data
[start
] >= currentSize
){
84 DEBUGGING(CkPrintf("[%d] Extending Data %d %d %d\n",CkMyPe(),ssn
,data
[start
],currentSize
));
94 data
= new MCount
[currentSize
];
95 memset(data
,0,sizeof(MCount
)*currentSize
);
96 for(i
=start
, num
=0; i
!=end
; i
=(i
+1)%oldCS
,num
++){
104 DEBUGGING(CkPrintf("[%d] Ahead ssn=%d start=%d\n",CkMyPe(),ssn
,data
[start
]));
106 // adding ssn into data
108 if(num
< 0) num
+= currentSize
;
110 index
= (start
+ssn
-data
[start
])%currentSize
;
112 if((ssn
-data
[start
]) >= num
) end
= index
;
119 index
= (index
+ 1)%currentSize
;
120 if(index
== end
) break;
126 inline void pup(PUP::er
&p
){
131 if(currentSize
> RSSN_INITIAL_SIZE
){
133 data
= new MCount
[currentSize
];
136 for(int i
=0;i
<currentSize
;i
++){
145 * This file includes the definition of the class for storing the meta data
146 * associdated with the message logging protocol.
151 * @brief This class stores all the message logging related data for a chare.
157 // variable that keeps a count of the processors that have replied to a requests to resend messages.
158 int resendReplyRecvd
;
159 // 0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time
161 // 0 -> normal state .. 1 -> recovery of a team member
162 int teamRecoveryFlag
;
165 int immigrantRecFlag
;
166 int immigrantSourcePE
;
169 // ssnTable, stores the number of messages sent (sequence numbers) to other objects.
170 CkHashtableT
<CkHashtableAdaptorT
<CkObjID
>, MCount
> ssnTable
;
171 // receivedSsnTable, stores the list of ssn received from other objects.
172 CkHashtableT
<CkHashtableAdaptorT
<CkObjID
>, RSSN
*> receivedSsnTable
;
173 // Log of messages sent.
174 CkQ
<MlogEntry
*> mlog
;
178 * Default constructor.
180 ChareMlogData():ssnTable(100,0.4),receivedSsnTable(100,0.4){
186 immigrantRecFlag
= 0;
188 inline MCount
nextSN(const CkObjID
&recver
);
189 int checkAndStoreSsn(const CkObjID
&sender
, MCount ssn
);
190 void addLogEntry(MlogEntry
*entry
);
191 virtual void pup(PUP::er
&p
);
192 CkQ
<MlogEntry
*> *getMlog(){ return &mlog
;};
196 * @brief Entry in a message log. It also includes the index of the buffered
197 * determinants array and the number of appended determinants.
198 * @note: this message appended numBufDets counting downwards from indexBufDets.
199 * In other words, if indexBufDets == 5 and numBufDets = 3, it means that
200 * determinants bufDets[2], bufDets[3] and bufDets[4] were piggybacked.
208 MlogEntry(envelope
*_env
,int _destPE
,int __infoIdx
){
211 _infoIdx
= __infoIdx
;
223 virtual void pup(PUP::er
&p
);
229 class StoredCheckpoint
{
242 char header
[CmiMsgHeaderSizeBytes
];
248 char header
[CmiMsgHeaderSizeBytes
];
250 } DistributeObjectMsg
;
254 char header[CmiMsgHeaderSizeBytes];
259 typedef CheckPointDataMsg CheckPointAck
;
263 * Struct to request a particular action during restart.
266 char header
[CmiMsgHeaderSizeBytes
];
270 typedef RestartRequest CkPingMsg
;
271 typedef RestartRequest CheckpointRequest
;
274 char header
[CmiMsgHeaderSizeBytes
];
276 double restartWallTime
;
278 int numMigratedAwayElements
;
279 int numMigratedInElements
;
280 int migratedElementSize
;
281 int numLocalMessages
;
283 } RestartProcessorData
;
286 char header
[CmiMsgHeaderSizeBytes
];
291 typedef ResendRequest RemoveLogRequest
;
294 char header
[CmiMsgHeaderSizeBytes
];
299 // Structure to forward determinants in parallel restart
301 char header
[CmiMsgHeaderSizeBytes
];
309 CkObjID
*listObjects
;
320 char header
[CmiMsgHeaderSizeBytes
];
321 MigrationRecord migRecord
;
326 char header
[CmiMsgHeaderSizeBytes
];
328 } MigrationNoticeAck
;
331 MigrationRecord migRecord
;
335 } RetainedMigratedObject
;
338 char header
[CmiMsgHeaderSizeBytes
];
339 MigrationRecord migRecord
;
345 char header
[CmiMsgHeaderSizeBytes
];
348 } CheckpointBarrierMsg
;
351 //message used to inform a locmgr of an object's current location
353 char header
[CmiMsgHeaderSizeBytes
];
358 } CurrentLocationMsg
;
361 char header
[CmiMsgHeaderSizeBytes
];
368 #define MLOG_OBJECT 1
372 char header
[CmiMsgHeaderSizeBytes
];
373 int flag
;// specific object(1) or count(2)
375 int count
;// if just count
383 //function pointer passed to the forAllCharesDo method.
384 //It takes a void *data and a ChareMlogData pointer
385 //It gets called for each chare
386 typedef void (*MlogFn
)(void *,ChareMlogData
*);
388 void _messageLoggingInit();
390 //Methods for sending ticket requests
391 void sendGroupMsg(envelope
*env
,int destPE
,int _infoIdx
);
392 void sendArrayMsg(envelope
*env
,int destPE
,int _infoIdx
);
393 void sendChareMsg(envelope
*env
,int destPE
,int _infoIdx
, const CkChareID
*pCid
);
394 void sendNodeGroupMsg(envelope
*env
,int destNode
,int _infoIdx
);
395 void sendCommonMsg(CkObjID
&recver
,envelope
*env
,int destPE
,int _infoIdx
);
396 void sendRemoteMsg(CkObjID
&sender
,CkObjID
&recver
,int destPE
,MlogEntry
*entry
,MCount SN
,int resend
);
397 void sendLocalMsg(envelope
*env
, int _infoIdx
);
400 void _pingHandler(CkPingMsg
*msg
);
402 //methods for sending messages
403 extern void _skipCldEnqueue(int pe
,envelope
*env
, int infoFn
);
404 extern void _noCldNodeEnqueue(int node
, envelope
*env
);
405 void generalCldEnqueue(int destPE
,envelope
*env
,int _infoIdx
);
407 //methods to process received messages with respect to mlog
408 int preProcessReceivedMessage(envelope
*env
,Chare
**objPointer
,MlogEntry
**localLogEntry
);
409 void postProcessReceivedMessage(Chare
*obj
,CkObjID
&sender
,MCount SN
,MlogEntry
*entry
);
413 CpvExtern(StoredCheckpoint
*,_storedCheckpointData
);
415 //methods for checkpointing
416 void CkStartMlogCheckpoint(CkCallback
&cb
);
417 void checkpointAlarm(void *_dummy
,double curWallTime
);
418 void startMlogCheckpoint(void *_dummy
,double curWallTime
);
419 void pupArrayElementsSkip(PUP::er
&p
, bool create
, MigrationRecord
*listToSkip
,int listSize
=0);
421 //handler functions for checkpoint
422 void _checkpointRequestHandler(CheckpointRequest
*request
);
423 void _storeCheckpointHandler(char *msg
);
424 void _checkpointAckHandler(CheckPointAck
*ackMsg
);
425 void _removeProcessedLogHandler(char *requestMsg
);
426 void garbageCollectMlog();
427 void _startCheckpointHandler(CheckpointBarrierMsg
*msg
);
428 void _endCheckpointHandler(char *msg
);
430 //handler idxs for checkpoint
431 extern int _checkpointRequestHandlerIdx
;
432 extern int _storeCheckpointHandlerIdx
;
433 extern int _checkpointAckHandlerIdx
;
434 extern int _removeProcessedLogHandlerIdx
;
439 //methods for restart
440 void CkMlogRestart(const char * dummy
, CkArgMsg
* dummyMsg
);
441 void CkMlogRestartDouble(void *,double);
442 void initializeRestart(void *data
,ChareMlogData
*mlogData
);
443 void distributeRestartedObjects();
444 void sendDummyMigration(int restartPE
,CkGroupID lbID
,CkGroupID locMgrID
,CkArrayIndexMax
&idx
,int locationPE
);
446 //TML: function for locally calling the restart
447 void CkMlogRestartLocal();
449 //handler functions for restart
450 void _getCheckpointHandler(RestartRequest
*restartMsg
);
451 void _recvCheckpointHandler(char *_restartData
);
452 void _resendMessagesHandler(char *msg
);
453 void _sendDetsHandler(char *msg
);
454 void _sendDetsReplyHandler(char *msg
);
455 void _receivedTNDataHandler(ReceivedTNData
*msg
);
456 void _receivedDetDataHandler(ReceivedDetData
*msg
);
457 void _distributedLocationHandler(char *receivedMsg
);
458 void _sendBackLocationHandler(char *receivedMsg
);
459 void _updateHomeRequestHandler(RestartRequest
*updateRequest
);
460 void _updateHomeAckHandler(RestartRequest
*updateHomeAck
);
461 void _verifyAckRequestHandler(VerifyAckMsg
*verifyRequest
);
462 void _verifyAckHandler(VerifyAckMsg
*verifyReply
);
463 void _dummyMigrationHandler(DummyMigrationMsg
*msg
);
465 //TML: new functions for group-based message logging
466 void _restartHandler(RestartRequest
*restartMsg
);
467 void _getRestartCheckpointHandler(RestartRequest
*restartMsg
);
468 void _recvRestartCheckpointHandler(char *_restartData
);
470 //handler idxs for restart
471 extern int _getCheckpointHandlerIdx
;
472 extern int _recvCheckpointHandlerIdx
;
473 extern int _resendMessagesHandlerIdx
;
474 extern int _sendDetsHandlerIdx
;
475 extern int _sendDetsReplyHandlerIdx
;
476 extern int _receivedTNDataHandlerIdx
;
477 extern int _receivedDetDataHandlerIdx
;
478 extern int _distributedLocationHandlerIdx
;
479 extern int _updateHomeRequestHandlerIdx
;
480 extern int _updateHomeAckHandlerIdx
;
481 extern int _verifyAckRequestHandlerIdx
;
482 extern int _verifyAckHandlerIdx
;
483 extern int _dummyMigrationHandlerIdx
;
487 //methods for load balancing
488 void startLoadBalancingMlog(void (*fnPtr
)(void *),void *_centralLb
);
489 void finishedCheckpointLoadBalancing();
490 void sendMlogLocation(int targetPE
,envelope
*env
);
491 void resumeFromSyncRestart(void *data
,ChareMlogData
*mlogData
);
492 void restoreParallelRecovery(void (*fnPtr
)(void *),void *_centralLb
);
494 //handlers for Load Balancing
495 void _receiveMlogLocationHandler(void *buf
);
496 void _receiveMigrationNoticeHandler(MigrationNotice
*msg
);
497 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck
*msg
);
498 void _getGlobalStepHandler(LBStepMsg
*msg
);
499 void _recvGlobalStepHandler(LBStepMsg
*msg
);
500 void _checkpointBarrierHandler(CheckpointBarrierMsg
*msg
);
501 void _checkpointBarrierAckHandler(CheckpointBarrierMsg
*msg
);
503 //globals used for loadBalancing
504 extern int onGoingLoadBalancing
;
505 extern void *centralLb
;
506 extern void (*resumeLbFnPtr
)(void *) ;
507 extern int _receiveMlogLocationHandlerIdx
;
508 extern int _receiveMigrationNoticeHandlerIdx
;
509 extern int _receiveMigrationNoticeAckHandlerIdx
;
510 extern int _getGlobalStepHandlerIdx
;
511 extern int _recvGlobalStepHandlerIdx
;
512 extern int _checkpointBarrierHandlerIdx
;
513 extern int _checkpointBarrierAckHandlerIdx
;
515 //extern CkHashtableT<CkHashtableAdaptorT<CkObjID>,void *> migratedObjectList;
516 extern CkVec
<MigrationRecord
> migratedNoticeList
;
517 extern CkVec
<RetainedMigratedObject
*> retainedObjectList
;
519 int getCheckPointPE();
520 void forAllCharesDo(MlogFn fnPointer
,void *data
);
521 envelope
*copyEnvelope(envelope
*env
);
522 extern void _initDone(void);
524 //TML: needed for group restart
525 extern void _resetNodeBocInitVec(void);
527 //methods for updating location
528 void informLocationHome(CkGroupID mgrID
,CkArrayIndexMax idx
,int homePE
,int currentPE
);
530 //handlers for updating locations
531 void _receiveLocationHandler(CurrentLocationMsg
*data
);
533 //globals for updating locations
534 extern int _receiveLocationHandlerIdx
;
537 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler
,int higherHandler
);