LRTS Comm Thread Tracing in message recieve
[charm.git] / src / ck-core / ckcausalmlog.h
blobfcfb174f7419c9e7cfc77cbaccdb3ba6d041c454
1 #ifndef _CKMESSAGELOGGING_H_
2 #define _CKMESSAGELOGGING_H_
4 #include "ckobjid.h"
6 CpvExtern(Chare *,_currentObj);
7 CpvExtern(int, _numImmigrantRecObjs);
9 //states of a ticket sent as a reply to a request
10 #define NEW_TICKET 1
11 #define OLD_TICKET 2
12 #define FORWARDED_TICKET 0x8000
14 //TML: global variable for the size of the team
15 #define MLOG_RESTARTED 0
16 #define MLOG_CRASHED 1
17 #define MEGABYTE 1048576
19 //array on which we print the formatted string representing an object id
20 extern char objString[100];
22 // defines the initial size of _bufferedDets
23 #define INITIAL_BUFFERED_DETERMINANTS 1024
25 // constant to define the type of checkpoint used (synchronized or not)
26 #define SYNCHRONIZED_CHECKPOINT 1
28 /**
29 * @brief Struct to store the determinant of a particular message.
30 * The determinant remembers all the necessary information for a
31 * message to be replayed in the same order as in the execution prior
32 * the failure.
34 typedef struct {
35 // sender ID
36 CkObjID sender;
37 // receiver ID
38 CkObjID receiver;
39 // SSN: sender sequence number
40 MCount SN;
41 // TN: ticket number (RSN: receiver sequence number)
42 MCount TN;
43 } Determinant;
45 /**
46 * @brief Typedef for the hashtable type that maps object IDs to determinants.
48 typedef CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> CkDeterminantHashtableT;
50 /**
51 * @brief Struct for the header of the removeDeterminants handler
53 typedef struct {
54 char header[CmiMsgHeaderSizeBytes];
55 int phase;
56 int index;
57 } RemoveDeterminantsHeader;
59 /**
60 * @brief Struct for the header of the storeDeterminants handler
62 typedef struct {
63 char header[CmiMsgHeaderSizeBytes];
64 int number;
65 int index;
66 int phase;
67 int PE;
68 } StoreDeterminantsHeader;
70 /**
71 * @brief Structure for a ticket assigned to a particular message.
73 class Ticket {
74 public:
75 MCount TN;
76 int state;
77 Ticket(){
78 TN = 0;
79 state = 0;
81 Ticket(int x){
82 TN = x;
83 state = 0;
86 PUPbytes(Ticket)
87 class MlogEntry;
89 class RestoredLocalMap;
91 #define INITSIZE_SNTOTICKET 100
93 /**
94 * @brief Class that maps SN (sequence numbers) to TN (ticket numbers)
95 * for a particular object.
97 class SNToTicket{
98 private:
99 Ticket initial[INITSIZE_SNTOTICKET];
100 Ticket *ticketVec;
101 MCount startSN;
102 int currentSize;
103 MCount finishSN;
104 public:
105 SNToTicket(){
106 currentSize = INITSIZE_SNTOTICKET;
107 ticketVec = &initial[0];
108 memset(ticketVec,0,sizeof(Ticket)*currentSize);
109 startSN = 0;
110 finishSN = 0;
113 * Gets the finishSN value.
115 inline MCount getFinishSN(){
116 return finishSN;
119 * Gets the startSN value.
121 inline MCount getStartSN(){
122 return startSN;
124 //assume indices start from 1.. true for MCounts
125 inline Ticket &put(MCount SN){
126 if(SN > finishSN) finishSN = SN;
127 if(startSN == 0){
128 startSN = SN;
130 int index = SN-startSN;
131 if(index >= currentSize){
132 int oldSize = currentSize;
133 Ticket *old = ticketVec;
135 currentSize = index*2;
136 ticketVec = new Ticket[currentSize];
137 memcpy(ticketVec,old,sizeof(Ticket)*oldSize);
138 if(old != &initial[0]){
139 delete [] old;
142 return ticketVec[index];
145 inline Ticket get(MCount SN){
146 int index = SN-startSN;
147 CmiAssert(index >= 0);
148 if(index >= currentSize){
149 Ticket tn;
150 return tn;
151 }else{
152 return ticketVec[index];
156 inline void pup(PUP::er &p){
157 p | startSN;
158 p | currentSize;
159 if(p.isUnpacking()){
160 if(currentSize > INITSIZE_SNTOTICKET){
161 ticketVec = new Ticket[currentSize];
164 for(int i=0;i<currentSize;i++){
165 p | ticketVec[i];
172 * This file includes the definition of the class for storing the meta data
173 * associdated with the message logging protocol.
178 * @brief This class stores all the message logging related data for a chare.
180 class ChareMlogData{
181 public:
182 // Object unique ID.
183 CkObjID objID;
184 // Counts how many tickets have been handed out.
185 MCount tCount;
186 // Stores the highest ticket that has been processed.
187 MCount tProcessed;
189 //TODO: pup receivedTNs
190 CkVec<MCount> *receivedTNs; //used to store TNs received by senders during a restart
191 MCount *ticketHoles;
192 int numberHoles;
193 int currentHoles;
194 // variable that keeps a count of the processors that have replied to a requests to resend messages.
195 int resendReplyRecvd;
196 // 0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time
197 int restartFlag;
198 // 0 -> normal state .. 1 -> recovery of a team member
199 int teamRecoveryFlag;
200 //TML: teamTable, stores the SN to TN mapping for messages intra team
201 CkHashtableT<CkHashtableAdaptorT<CkObjID>, SNToTicket *> teamTable;
203 int toResumeOrNot;
204 int resumeCount;
205 int immigrantRecFlag;
206 int immigrantSourcePE;
208 private:
210 // SNTable, stores the number of messages sent (sequence numbers) to other objects.
211 CkHashtableT<CkHashtableAdaptorT<CkObjID>,MCount> snTable;
212 // TNTable, stores the ticket associated with a particular combination <ObjectID,SN>.
213 CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> ticketTable;
214 // Log of messages sent.
215 CkQ<MlogEntry *> mlog;
218 inline MCount newTN();
220 public:
222 * Default constructor.
224 ChareMlogData():ticketTable(1000,0.3),snTable(100,0.4),teamTable(100,0.4){
225 tCount = 0;
226 tProcessed = 0;
227 numberHoles = 0;
228 ticketHoles = NULL;
229 currentHoles = 0;
230 restartFlag=0;
231 teamRecoveryFlag=0;
232 receivedTNs = NULL;
233 resendReplyRecvd=0;
234 toResumeOrNot=0;
235 resumeCount=0;
236 immigrantRecFlag = 0;
238 inline MCount nextSN(const CkObjID &recver);
239 inline Ticket next_ticket(CkObjID &sender,MCount SN);
240 inline void verifyTicket(CkObjID &sender,MCount SN, MCount TN);
241 inline Ticket getTicket(CkObjID &sender, MCount SN);
242 void addLogEntry(MlogEntry *entry);
243 virtual void pup(PUP::er &p);
244 CkQ<MlogEntry *> *getMlog(){ return &mlog;};
245 MCount searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN);
249 * @brief Entry in a message log. It also includes the index of the buffered
250 * determinants array and the number of appended determinants.
251 * @note: this message appended numBufDets counting downwards from indexBufDets.
252 * In other words, if indexBufDets == 5 and numBufDets = 3, it means that
253 * determinants bufDets[2], bufDets[3] and bufDets[4] were piggybacked.
255 class MlogEntry{
256 public:
257 envelope *env;
258 int destPE;
259 int _infoIdx;
260 int indexBufDets;
261 int numBufDets;
263 MlogEntry(envelope *_env,int _destPE,int __infoIdx){
264 env = _env;
265 destPE = _destPE;
266 _infoIdx = __infoIdx;
268 MlogEntry(){
269 env = 0;
270 destPE = -1;
271 _infoIdx = 0;
273 ~MlogEntry(){
274 if(env){
275 CmiFree(env);
278 virtual void pup(PUP::er &p);
282 * @brief
284 class StoredCheckpoint{
285 public:
286 char *buf;
287 int bufSize;
288 int PE;
289 StoredCheckpoint(){
290 buf = NULL;
291 bufSize = 0;
292 PE = -1;
297 * @brief Class for storing metadata of local messages.
298 * It maps sequence numbers to ticket numbers.
299 * It is used after a restart to maintain the same ticket numbers.
301 class RestoredLocalMap {
302 public:
303 MCount minSN,maxSN,count;
304 MCount *TNArray;
305 RestoredLocalMap(){
306 minSN=maxSN=count=0;
307 TNArray=NULL;
309 RestoredLocalMap(int i){
310 minSN=maxSN=count=0;
311 TNArray=NULL;
314 virtual void pup(PUP::er &p);
318 typedef struct {
319 char header[CmiMsgHeaderSizeBytes];
320 CkObjID sender;
321 CkObjID recver;
322 MlogEntry *logEntry;
323 MCount SN;
324 MCount TN;
325 int senderPE;
326 } TicketRequest;
327 CpvExtern(CkQ<TicketRequest *> *,_delayedTicketRequests);
328 CpvExtern(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
330 typedef struct{
331 TicketRequest request;
332 Ticket ticket;
333 int recverPE;
334 } TicketReply;
336 CpvExtern(char**,_bufferedTicketRequests);
337 extern int _maxBufferedTicketRequests; //Number of ticket requests to be buffered
341 typedef struct {
342 char header[CmiMsgHeaderSizeBytes];
343 int numberLogs;
344 } BufferedLocalLogHeader;
346 typedef BufferedLocalLogHeader BufferedTicketRequestHeader;
348 typedef struct{
349 char header[CmiMsgHeaderSizeBytes];
350 int PE;
351 int dataSize;
352 } CheckPointDataMsg;
354 typedef struct{
355 char header[CmiMsgHeaderSizeBytes];
356 int PE;
357 } DistributeObjectMsg;
360 /*typedef struct{
361 char header[CmiMsgHeaderSizeBytes];
362 int PE;
363 int dataSize;
364 } CheckPointAck;*/
366 typedef CheckPointDataMsg CheckPointAck;
368 typedef struct{
369 CkObjID recver;
370 MCount tProcessed;
371 } TProcessedLog;
375 * Struct to request a particular action during restart.
377 typedef struct{
378 char header[CmiMsgHeaderSizeBytes];
379 int PE;
380 } RestartRequest;
382 typedef RestartRequest CkPingMsg;
383 typedef RestartRequest CheckpointRequest;
385 typedef struct{
386 char header[CmiMsgHeaderSizeBytes];
387 int PE;
388 double restartWallTime;
389 int checkPointSize;
390 int numMigratedAwayElements;
391 int numMigratedInElements;
392 int migratedElementSize;
393 int numLocalMessages;
394 CkGroupID lbGroupID;
395 } RestartProcessorData;
397 typedef struct{
398 char header[CmiMsgHeaderSizeBytes];
399 int PE;
400 int numberObjects;
401 } ResendRequest;
403 typedef ResendRequest RemoveLogRequest;
405 typedef struct {
406 char header[CmiMsgHeaderSizeBytes];
407 CkObjID recver;
408 int numTNs;
409 } ReceivedTNData;
411 // Structure to forward determinants in parallel restart
412 typedef struct {
413 char header[CmiMsgHeaderSizeBytes];
414 CkObjID recver;
415 int numDets;
416 } ReceivedDetData;
418 typedef struct{
419 int PE;
420 int numberObjects;
421 TProcessedLog *listObjects;
422 CkVec<MCount> *ticketVecs;
423 } ResendData;
425 typedef struct {
426 CkGroupID gID;
427 CkArrayIndexMax idx;
428 int fromPE,toPE;
429 char ackFrom,ackTo;
430 } MigrationRecord;
432 typedef struct {
433 char header[CmiMsgHeaderSizeBytes];
434 MigrationRecord migRecord;
435 void *record;
436 } MigrationNotice;
438 typedef struct {
439 char header[CmiMsgHeaderSizeBytes];
440 void *record;
441 } MigrationNoticeAck;
443 typedef struct {
444 MigrationRecord migRecord;
445 void *msg;
446 int size;
447 char acked;
448 } RetainedMigratedObject;
450 typedef struct {
451 char header[CmiMsgHeaderSizeBytes];
452 MigrationRecord migRecord;
453 int index;
454 int fromPE;
455 } VerifyAckMsg;
457 typedef struct {
458 char header[CmiMsgHeaderSizeBytes];
459 int checkpointCount;
460 int fromPE;
461 } CheckpointBarrierMsg;
464 //message used to inform a locmgr of an object's current location
465 typedef struct {
466 char header[CmiMsgHeaderSizeBytes];
467 CkGroupID mgrID;
468 CkArrayIndexMax idx;
469 int locationPE;
470 int fromPE;
471 } CurrentLocationMsg;
473 typedef struct {
474 char header[CmiMsgHeaderSizeBytes];
475 CkGroupID lbID;
476 int fromPE;
477 int step;
478 } LBStepMsg;
481 #define MLOG_OBJECT 1
482 #define MLOG_COUNT 2
484 typedef struct {
485 char header[CmiMsgHeaderSizeBytes];
486 int flag;// specific object(1) or count(2)
487 CkGroupID lbID;
488 int count;// if just count
489 /**if object **/
490 CkGroupID mgrID;
491 CkArrayIndexMax idx;
492 int locationPE;
493 } DummyMigrationMsg;
496 //function pointer passed to the forAllCharesDo method.
497 //It takes a void *data and a ChareMlogData pointer
498 //It gets called for each chare
499 typedef void (*MlogFn)(void *,ChareMlogData *);
501 void _messageLoggingInit();
503 //Methods for sending ticket requests
504 void sendGroupMsg(envelope *env,int destPE,int _infoIdx);
505 void sendArrayMsg(envelope *env,int destPE,int _infoIdx);
506 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid);
507 void sendNodeGroupMsg(envelope *env,int destNode,int _infoIdx);
508 void sendCommonMsg(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
509 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend);
510 void sendLocalMsg(envelope *env, int _infoIdx);
512 //handler functions
513 void _ticketRequestHandler(TicketRequest *);
514 void _ticketHandler(TicketReply *);
515 void _pingHandler(CkPingMsg *msg);
516 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
517 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
518 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
519 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader);
520 void _storeDeterminantsHandler(char *buffer);
521 void _removeDeterminantsHandler(char *buffer);
524 //methods for sending messages
525 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
526 extern void _noCldNodeEnqueue(int node, envelope *env);
527 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
528 void retryTicketRequest(void *_ticketRequest,double curWallTime);
530 //methods to process received messages with respect to mlog
531 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
532 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
535 //Checkpoint
536 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
538 //methods for checkpointing
539 void checkpointAlarm(void *_dummy,double curWallTime);
540 void startMlogCheckpoint(void *_dummy,double curWallTime);
541 void pupArrayElementsSkip(PUP::er &p, bool create, MigrationRecord *listToSkip,int listSize=0);
543 //handler functions for checkpoint
544 void _checkpointRequestHandler(CheckpointRequest *request);
545 void _storeCheckpointHandler(char *msg);
546 void _checkpointAckHandler(CheckPointAck *ackMsg);
547 void _removeProcessedLogHandler(char *requestMsg);
548 void garbageCollectMlog();
550 //handler idxs for checkpoint
551 extern int _checkpointRequestHandlerIdx;
552 extern int _storeCheckpointHandlerIdx;
553 extern int _checkpointAckHandlerIdx;
554 extern int _removeProcessedLogHandlerIdx;
556 //Restart
559 //methods for restart
560 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
561 void CkMlogRestartDouble(void *,double);
562 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
563 void processReceivedDet(Chare *obj,int vecsize, Determinant *listDets);
564 void initializeRestart(void *data,ChareMlogData *mlogData);
565 void distributeRestartedObjects();
566 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE);
568 //TML: function for locally calling the restart
569 void CkMlogRestartLocal();
571 //handler functions for restart
572 void _getCheckpointHandler(RestartRequest *restartMsg);
573 void _recvCheckpointHandler(char *_restartData);
574 void _resendMessagesHandler(char *msg);
575 void _sendDetsHandler(char *msg);
576 void _sendDetsReplyHandler(char *msg);
577 void _receivedTNDataHandler(ReceivedTNData *msg);
578 void _receivedDetDataHandler(ReceivedDetData *msg);
579 void _distributedLocationHandler(char *receivedMsg);
580 void _sendBackLocationHandler(char *receivedMsg);
581 void _updateHomeRequestHandler(RestartRequest *updateRequest);
582 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
583 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
584 void _verifyAckHandler(VerifyAckMsg *verifyReply);
585 void _dummyMigrationHandler(DummyMigrationMsg *msg);
587 //TML: new functions for group-based message logging
588 void _restartHandler(RestartRequest *restartMsg);
589 void _getRestartCheckpointHandler(RestartRequest *restartMsg);
590 void _recvRestartCheckpointHandler(char *_restartData);
592 //handler idxs for restart
593 extern int _getCheckpointHandlerIdx;
594 extern int _recvCheckpointHandlerIdx;
595 extern int _resendMessagesHandlerIdx;
596 extern int _sendDetsHandlerIdx;
597 extern int _sendDetsReplyHandlerIdx;
598 extern int _receivedTNDataHandlerIdx;
599 extern int _receivedDetDataHandlerIdx;
600 extern int _distributedLocationHandlerIdx;
601 extern int _updateHomeRequestHandlerIdx;
602 extern int _updateHomeAckHandlerIdx;
603 extern int _verifyAckRequestHandlerIdx;
604 extern int _verifyAckHandlerIdx;
605 extern int _dummyMigrationHandlerIdx;
607 /// Load Balancing
609 //methods for load balancing
610 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
611 void finishedCheckpointLoadBalancing();
612 void sendMlogLocation(int targetPE,envelope *env);
613 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
614 void restoreParallelRecovery(void (*fnPtr)(void *),void *_centralLb);
616 //handlers for Load Balancing
617 void _receiveMlogLocationHandler(void *buf);
618 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
619 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
620 void _getGlobalStepHandler(LBStepMsg *msg);
621 void _recvGlobalStepHandler(LBStepMsg *msg);
622 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
623 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
625 //globals used for loadBalancing
626 extern int onGoingLoadBalancing;
627 extern void *centralLb;
628 extern void (*resumeLbFnPtr)(void *) ;
629 extern int _receiveMlogLocationHandlerIdx;
630 extern int _receiveMigrationNoticeHandlerIdx;
631 extern int _receiveMigrationNoticeAckHandlerIdx;
632 extern int _getGlobalStepHandlerIdx;
633 extern int _recvGlobalStepHandlerIdx;
634 extern int _checkpointBarrierHandlerIdx;
635 extern int _checkpointBarrierAckHandlerIdx;
637 //extern CkHashtableT<CkHashtableAdaptorT<CkObjID>,void *> migratedObjectList;
638 extern CkVec<MigrationRecord> migratedNoticeList;
639 extern CkVec<RetainedMigratedObject *> retainedObjectList;
641 int getCheckPointPE();
642 int getReverseCheckPointPE();
643 inline int isSameDet(Determinant *first, Determinant *second);
644 void forAllCharesDo(MlogFn fnPointer,void *data);
645 envelope *copyEnvelope(envelope *env);
646 extern void _initDone(void);
648 //TML: needed for group restart
649 extern void _resetNodeBocInitVec(void);
651 //methods for updating location
652 void informLocationHome(CkGroupID mgrID,CkArrayIndexMax idx,int homePE,int currentPE);
654 //handlers for updating locations
655 void _receiveLocationHandler(CurrentLocationMsg *data);
657 //globals for updating locations
658 extern int _receiveLocationHandlerIdx;
661 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
663 #endif