1 #ifndef _CKMESSAGELOGGING_H_
2 #define _CKMESSAGELOGGING_H_
6 CpvExtern(Chare
*,_currentObj
);
7 CpvExtern(int, _numImmigrantRecObjs
);
9 //states of a ticket sent as a reply to a request
12 #define FORWARDED_TICKET 0x8000
14 //TML: global variable for the size of the team
15 #define MLOG_RESTARTED 0
16 #define MLOG_CRASHED 1
17 #define MEGABYTE 1048576
19 //array on which we print the formatted string representing an object id
20 extern char objString
[100];
22 // defines the initial size of _bufferedDets
23 #define INITIAL_BUFFERED_DETERMINANTS 1024
25 // constant to define the type of checkpoint used (synchronized or not)
26 #define SYNCHRONIZED_CHECKPOINT 1
29 * @brief Struct to store the determinant of a particular message.
30 * The determinant remembers all the necessary information for a
31 * message to be replayed in the same order as in the execution prior
39 // SSN: sender sequence number
41 // TN: ticket number (RSN: receiver sequence number)
46 * @brief Typedef for the hashtable type that maps object IDs to determinants.
48 typedef CkHashtableT
<CkHashtableAdaptorT
<CkObjID
>, CkVec
<Determinant
> *> CkDeterminantHashtableT
;
51 * @brief Struct for the header of the removeDeterminants handler
54 char header
[CmiMsgHeaderSizeBytes
];
57 } RemoveDeterminantsHeader
;
60 * @brief Struct for the header of the storeDeterminants handler
63 char header
[CmiMsgHeaderSizeBytes
];
68 } StoreDeterminantsHeader
;
71 * @brief Structure for a ticket assigned to a particular message.
89 class RestoredLocalMap
;
91 #define INITSIZE_SNTOTICKET 100
94 * @brief Class that maps SN (sequence numbers) to TN (ticket numbers)
95 * for a particular object.
99 Ticket initial
[INITSIZE_SNTOTICKET
];
106 currentSize
= INITSIZE_SNTOTICKET
;
107 ticketVec
= &initial
[0];
108 memset(ticketVec
,0,sizeof(Ticket
)*currentSize
);
113 * Gets the finishSN value.
115 inline MCount
getFinishSN(){
119 * Gets the startSN value.
121 inline MCount
getStartSN(){
124 //assume indices start from 1.. true for MCounts
125 inline Ticket
&put(MCount SN
){
126 if(SN
> finishSN
) finishSN
= SN
;
130 int index
= SN
-startSN
;
131 if(index
>= currentSize
){
132 int oldSize
= currentSize
;
133 Ticket
*old
= ticketVec
;
135 currentSize
= index
*2;
136 ticketVec
= new Ticket
[currentSize
];
137 memcpy(ticketVec
,old
,sizeof(Ticket
)*oldSize
);
138 if(old
!= &initial
[0]){
142 return ticketVec
[index
];
145 inline Ticket
get(MCount SN
){
146 int index
= SN
-startSN
;
147 CmiAssert(index
>= 0);
148 if(index
>= currentSize
){
152 return ticketVec
[index
];
156 inline void pup(PUP::er
&p
){
160 if(currentSize
> INITSIZE_SNTOTICKET
){
161 ticketVec
= new Ticket
[currentSize
];
164 for(int i
=0;i
<currentSize
;i
++){
172 * This file includes the definition of the class for storing the meta data
173 * associdated with the message logging protocol.
178 * @brief This class stores all the message logging related data for a chare.
184 // Counts how many tickets have been handed out.
186 // Stores the highest ticket that has been processed.
189 //TODO: pup receivedTNs
190 CkVec
<MCount
> *receivedTNs
; //used to store TNs received by senders during a restart
194 // variable that keeps a count of the processors that have replied to a requests to resend messages.
195 int resendReplyRecvd
;
196 // 0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time
198 // 0 -> normal state .. 1 -> recovery of a team member
199 int teamRecoveryFlag
;
200 //TML: teamTable, stores the SN to TN mapping for messages intra team
201 CkHashtableT
<CkHashtableAdaptorT
<CkObjID
>, SNToTicket
*> teamTable
;
205 int immigrantRecFlag
;
206 int immigrantSourcePE
;
210 // SNTable, stores the number of messages sent (sequence numbers) to other objects.
211 CkHashtableT
<CkHashtableAdaptorT
<CkObjID
>,MCount
> snTable
;
212 // TNTable, stores the ticket associated with a particular combination <ObjectID,SN>.
213 CkHashtableT
<CkHashtableAdaptorT
<CkObjID
>,SNToTicket
*> ticketTable
;
214 // Log of messages sent.
215 CkQ
<MlogEntry
*> mlog
;
218 inline MCount
newTN();
222 * Default constructor.
224 ChareMlogData():ticketTable(1000,0.3),snTable(100,0.4),teamTable(100,0.4){
236 immigrantRecFlag
= 0;
238 inline MCount
nextSN(const CkObjID
&recver
);
239 inline Ticket
next_ticket(CkObjID
&sender
,MCount SN
);
240 inline void verifyTicket(CkObjID
&sender
,MCount SN
, MCount TN
);
241 inline Ticket
getTicket(CkObjID
&sender
, MCount SN
);
242 void addLogEntry(MlogEntry
*entry
);
243 virtual void pup(PUP::er
&p
);
244 CkQ
<MlogEntry
*> *getMlog(){ return &mlog
;};
245 MCount
searchRestoredLocalQ(CkObjID
&sender
,CkObjID
&recver
,MCount SN
);
249 * @brief Entry in a message log. It also includes the index of the buffered
250 * determinants array and the number of appended determinants.
251 * @note: this message appended numBufDets counting downwards from indexBufDets.
252 * In other words, if indexBufDets == 5 and numBufDets = 3, it means that
253 * determinants bufDets[2], bufDets[3] and bufDets[4] were piggybacked.
263 MlogEntry(envelope
*_env
,int _destPE
,int __infoIdx
){
266 _infoIdx
= __infoIdx
;
278 virtual void pup(PUP::er
&p
);
284 class StoredCheckpoint
{
297 * @brief Class for storing metadata of local messages.
298 * It maps sequence numbers to ticket numbers.
299 * It is used after a restart to maintain the same ticket numbers.
301 class RestoredLocalMap
{
303 MCount minSN
,maxSN
,count
;
309 RestoredLocalMap(int i
){
314 virtual void pup(PUP::er
&p
);
319 char header
[CmiMsgHeaderSizeBytes
];
327 CpvExtern(CkQ
<TicketRequest
*> *,_delayedTicketRequests
);
328 CpvExtern(CkQ
<MlogEntry
*> *,_delayedLocalTicketRequests
);
331 TicketRequest request
;
336 CpvExtern(char**,_bufferedTicketRequests
);
337 extern int _maxBufferedTicketRequests
; //Number of ticket requests to be buffered
342 char header
[CmiMsgHeaderSizeBytes
];
344 } BufferedLocalLogHeader
;
346 typedef BufferedLocalLogHeader BufferedTicketRequestHeader
;
349 char header
[CmiMsgHeaderSizeBytes
];
355 char header
[CmiMsgHeaderSizeBytes
];
357 } DistributeObjectMsg
;
361 char header[CmiMsgHeaderSizeBytes];
366 typedef CheckPointDataMsg CheckPointAck
;
375 * Struct to request a particular action during restart.
378 char header
[CmiMsgHeaderSizeBytes
];
382 typedef RestartRequest CkPingMsg
;
383 typedef RestartRequest CheckpointRequest
;
386 char header
[CmiMsgHeaderSizeBytes
];
388 double restartWallTime
;
390 int numMigratedAwayElements
;
391 int numMigratedInElements
;
392 int migratedElementSize
;
393 int numLocalMessages
;
395 } RestartProcessorData
;
398 char header
[CmiMsgHeaderSizeBytes
];
403 typedef ResendRequest RemoveLogRequest
;
406 char header
[CmiMsgHeaderSizeBytes
];
411 // Structure to forward determinants in parallel restart
413 char header
[CmiMsgHeaderSizeBytes
];
421 TProcessedLog
*listObjects
;
422 CkVec
<MCount
> *ticketVecs
;
433 char header
[CmiMsgHeaderSizeBytes
];
434 MigrationRecord migRecord
;
439 char header
[CmiMsgHeaderSizeBytes
];
441 } MigrationNoticeAck
;
444 MigrationRecord migRecord
;
448 } RetainedMigratedObject
;
451 char header
[CmiMsgHeaderSizeBytes
];
452 MigrationRecord migRecord
;
458 char header
[CmiMsgHeaderSizeBytes
];
461 } CheckpointBarrierMsg
;
464 //message used to inform a locmgr of an object's current location
466 char header
[CmiMsgHeaderSizeBytes
];
471 } CurrentLocationMsg
;
474 char header
[CmiMsgHeaderSizeBytes
];
481 #define MLOG_OBJECT 1
485 char header
[CmiMsgHeaderSizeBytes
];
486 int flag
;// specific object(1) or count(2)
488 int count
;// if just count
496 //function pointer passed to the forAllCharesDo method.
497 //It takes a void *data and a ChareMlogData pointer
498 //It gets called for each chare
499 typedef void (*MlogFn
)(void *,ChareMlogData
*);
501 void _messageLoggingInit();
503 //Methods for sending ticket requests
504 void sendGroupMsg(envelope
*env
,int destPE
,int _infoIdx
);
505 void sendArrayMsg(envelope
*env
,int destPE
,int _infoIdx
);
506 void sendChareMsg(envelope
*env
,int destPE
,int _infoIdx
, const CkChareID
*pCid
);
507 void sendNodeGroupMsg(envelope
*env
,int destNode
,int _infoIdx
);
508 void sendCommonMsg(CkObjID
&recver
,envelope
*env
,int destPE
,int _infoIdx
);
509 void sendMsg(CkObjID
&sender
,CkObjID
&recver
,int destPE
,MlogEntry
*entry
,MCount SN
,MCount TN
,int resend
);
510 void sendLocalMsg(envelope
*env
, int _infoIdx
);
513 void _ticketRequestHandler(TicketRequest
*);
514 void _ticketHandler(TicketReply
*);
515 void _pingHandler(CkPingMsg
*msg
);
516 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader
*recvdHeader
,int freeHeader
=1);
517 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader
*recvdHeader
);
518 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader
*recvdHeader
);
519 void _bufferedTicketHandler(BufferedTicketRequestHeader
*recvdHeader
);
520 void _storeDeterminantsHandler(char *buffer
);
521 void _removeDeterminantsHandler(char *buffer
);
524 //methods for sending messages
525 extern void _skipCldEnqueue(int pe
,envelope
*env
, int infoFn
);
526 extern void _noCldNodeEnqueue(int node
, envelope
*env
);
527 void generalCldEnqueue(int destPE
,envelope
*env
,int _infoIdx
);
528 void retryTicketRequest(void *_ticketRequest
,double curWallTime
);
530 //methods to process received messages with respect to mlog
531 int preProcessReceivedMessage(envelope
*env
,Chare
**objPointer
,MlogEntry
**localLogEntry
);
532 void postProcessReceivedMessage(Chare
*obj
,CkObjID
&sender
,MCount SN
,MlogEntry
*entry
);
536 CpvExtern(StoredCheckpoint
*,_storedCheckpointData
);
538 //methods for checkpointing
539 void checkpointAlarm(void *_dummy
,double curWallTime
);
540 void startMlogCheckpoint(void *_dummy
,double curWallTime
);
541 void pupArrayElementsSkip(PUP::er
&p
, bool create
, MigrationRecord
*listToSkip
,int listSize
=0);
543 //handler functions for checkpoint
544 void _checkpointRequestHandler(CheckpointRequest
*request
);
545 void _storeCheckpointHandler(char *msg
);
546 void _checkpointAckHandler(CheckPointAck
*ackMsg
);
547 void _removeProcessedLogHandler(char *requestMsg
);
548 void garbageCollectMlog();
550 //handler idxs for checkpoint
551 extern int _checkpointRequestHandlerIdx
;
552 extern int _storeCheckpointHandlerIdx
;
553 extern int _checkpointAckHandlerIdx
;
554 extern int _removeProcessedLogHandlerIdx
;
559 //methods for restart
560 void CkMlogRestart(const char * dummy
, CkArgMsg
* dummyMsg
);
561 void CkMlogRestartDouble(void *,double);
562 void processReceivedTN(Chare
*obj
,int vecsize
,MCount
*listTNs
);
563 void processReceivedDet(Chare
*obj
,int vecsize
, Determinant
*listDets
);
564 void initializeRestart(void *data
,ChareMlogData
*mlogData
);
565 void distributeRestartedObjects();
566 void sendDummyMigration(int restartPE
,CkGroupID lbID
,CkGroupID locMgrID
,CkArrayIndexMax
&idx
,int locationPE
);
568 //TML: function for locally calling the restart
569 void CkMlogRestartLocal();
571 //handler functions for restart
572 void _getCheckpointHandler(RestartRequest
*restartMsg
);
573 void _recvCheckpointHandler(char *_restartData
);
574 void _resendMessagesHandler(char *msg
);
575 void _sendDetsHandler(char *msg
);
576 void _sendDetsReplyHandler(char *msg
);
577 void _receivedTNDataHandler(ReceivedTNData
*msg
);
578 void _receivedDetDataHandler(ReceivedDetData
*msg
);
579 void _distributedLocationHandler(char *receivedMsg
);
580 void _sendBackLocationHandler(char *receivedMsg
);
581 void _updateHomeRequestHandler(RestartRequest
*updateRequest
);
582 void _updateHomeAckHandler(RestartRequest
*updateHomeAck
);
583 void _verifyAckRequestHandler(VerifyAckMsg
*verifyRequest
);
584 void _verifyAckHandler(VerifyAckMsg
*verifyReply
);
585 void _dummyMigrationHandler(DummyMigrationMsg
*msg
);
587 //TML: new functions for group-based message logging
588 void _restartHandler(RestartRequest
*restartMsg
);
589 void _getRestartCheckpointHandler(RestartRequest
*restartMsg
);
590 void _recvRestartCheckpointHandler(char *_restartData
);
592 //handler idxs for restart
593 extern int _getCheckpointHandlerIdx
;
594 extern int _recvCheckpointHandlerIdx
;
595 extern int _resendMessagesHandlerIdx
;
596 extern int _sendDetsHandlerIdx
;
597 extern int _sendDetsReplyHandlerIdx
;
598 extern int _receivedTNDataHandlerIdx
;
599 extern int _receivedDetDataHandlerIdx
;
600 extern int _distributedLocationHandlerIdx
;
601 extern int _updateHomeRequestHandlerIdx
;
602 extern int _updateHomeAckHandlerIdx
;
603 extern int _verifyAckRequestHandlerIdx
;
604 extern int _verifyAckHandlerIdx
;
605 extern int _dummyMigrationHandlerIdx
;
609 //methods for load balancing
610 void startLoadBalancingMlog(void (*fnPtr
)(void *),void *_centralLb
);
611 void finishedCheckpointLoadBalancing();
612 void sendMlogLocation(int targetPE
,envelope
*env
);
613 void resumeFromSyncRestart(void *data
,ChareMlogData
*mlogData
);
614 void restoreParallelRecovery(void (*fnPtr
)(void *),void *_centralLb
);
616 //handlers for Load Balancing
617 void _receiveMlogLocationHandler(void *buf
);
618 void _receiveMigrationNoticeHandler(MigrationNotice
*msg
);
619 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck
*msg
);
620 void _getGlobalStepHandler(LBStepMsg
*msg
);
621 void _recvGlobalStepHandler(LBStepMsg
*msg
);
622 void _checkpointBarrierHandler(CheckpointBarrierMsg
*msg
);
623 void _checkpointBarrierAckHandler(CheckpointBarrierMsg
*msg
);
625 //globals used for loadBalancing
626 extern int onGoingLoadBalancing
;
627 extern void *centralLb
;
628 extern void (*resumeLbFnPtr
)(void *) ;
629 extern int _receiveMlogLocationHandlerIdx
;
630 extern int _receiveMigrationNoticeHandlerIdx
;
631 extern int _receiveMigrationNoticeAckHandlerIdx
;
632 extern int _getGlobalStepHandlerIdx
;
633 extern int _recvGlobalStepHandlerIdx
;
634 extern int _checkpointBarrierHandlerIdx
;
635 extern int _checkpointBarrierAckHandlerIdx
;
637 //extern CkHashtableT<CkHashtableAdaptorT<CkObjID>,void *> migratedObjectList;
638 extern CkVec
<MigrationRecord
> migratedNoticeList
;
639 extern CkVec
<RetainedMigratedObject
*> retainedObjectList
;
641 int getCheckPointPE();
642 int getReverseCheckPointPE();
643 inline int isSameDet(Determinant
*first
, Determinant
*second
);
644 void forAllCharesDo(MlogFn fnPointer
,void *data
);
645 envelope
*copyEnvelope(envelope
*env
);
646 extern void _initDone(void);
648 //TML: needed for group restart
649 extern void _resetNodeBocInitVec(void);
651 //methods for updating location
652 void informLocationHome(CkGroupID mgrID
,CkArrayIndexMax idx
,int homePE
,int currentPE
);
654 //handlers for updating locations
655 void _receiveLocationHandler(CurrentLocationMsg
*data
);
657 //globals for updating locations
658 extern int _receiveLocationHandlerIdx
;
661 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler
,int higherHandler
);