Fix memory leaks in trace projections and summary
[charm.git] / src / ck-core / ck.C
blob0f681b84002b82738fab40574bc72a9b142c40b1
1 /**
2 \addtogroup Ck
4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups.  There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
8 */
9 #include "ck.h"
10 #include "trace.h"
11 #include "queueing.h"
13 #include "pathHistory.h"
15 #if CMK_LBDB_ON
16 #include "LBDatabase.h"
17 #endif // CMK_LBDB_ON
19 #ifndef CMK_CHARE_USE_PTR
20 #include <map>
21 CkpvDeclare(CkVec<void *>, chare_objs);
22 CkpvDeclare(CkVec<int>, chare_types);
23 CkpvDeclare(CkVec<VidBlock *>, vidblocks);
25 typedef std::map<int, CkChareID>  Vidblockmap;
26 CkpvDeclare(Vidblockmap, vmap);      // remote VidBlock to notify upon deletion
27 CkpvDeclare(int, currentChareIdx);
28 #endif
31 #define CK_MSG_SKIP_OR_IMM    (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
33 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
35 int CMessage_CkMessage::__idx=-1;
36 int CMessage_CkArgMsg::__idx=0;
37 int CkIndex_Chare::__idx;
38 int CkIndex_Group::__idx;
39 int CkIndex_ArrayBase::__idx=-1;
41 extern int _defaultObjectQ;
43 void _initChareTables()
45 #ifndef CMK_CHARE_USE_PTR
46           /* chare and vidblock table */
47   CkpvInitialize(CkVec<void *>, chare_objs);
48   CkpvInitialize(CkVec<int>, chare_types);
49   CkpvInitialize(CkVec<VidBlock *>, vidblocks);
50   CkpvInitialize(Vidblockmap, vmap);
51   CkpvInitialize(int, currentChareIdx);
52   CkpvAccess(currentChareIdx) = -1;
53 #endif
56 //Charm++ virtual functions: declaring these here results in a smaller executable
57 Chare::Chare(void) {
58   thishandle.onPE=CkMyPe();
59   thishandle.objPtr=this;
60 #if CMK_ERROR_CHECKING
61   magic = CHARE_MAGIC;
62 #endif
63 #ifndef CMK_CHARE_USE_PTR
64      // for plain chare, objPtr is actually the index to chare obj table
65   if (CkpvAccess(currentChareIdx) >= 0) {
66     thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
67   }
68   chareIdx = CkpvAccess(currentChareIdx);
69 #endif
70 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
71   mlogData = new ChareMlogData();
72   mlogData->objID.type = TypeChare;
73   mlogData->objID.data.chare.id = thishandle;
74 #endif
75 #if CMK_OBJECT_QUEUE_AVAILABLE
76   if (_defaultObjectQ)  CkEnableObjQ();
77 #endif
80 Chare::Chare(CkMigrateMessage* m) {
81   thishandle.onPE=CkMyPe();
82   thishandle.objPtr=this;
83 #if CMK_ERROR_CHECKING
84   magic = 0;
85 #endif
87 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
88         mlogData = NULL;
89 #endif
91 #if CMK_OBJECT_QUEUE_AVAILABLE
92   if (_defaultObjectQ)  CkEnableObjQ();
93 #endif
96 void Chare::CkEnableObjQ()
98 #if CMK_OBJECT_QUEUE_AVAILABLE
99   objQ.create();
100 #endif
103 Chare::~Chare() {
104 #ifndef CMK_CHARE_USE_PTR
106   if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this) 
108   if (chareIdx != -1)
109   {
110     CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
111     CkpvAccess(chare_objs)[chareIdx] = NULL;
112     Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
113     if (iter != CkpvAccess(vmap).end()) {
114       register CkChareID *pCid = (CkChareID *)
115         _allocMsg(DeleteVidMsg, sizeof(CkChareID));
116       int srcPe = iter->second.onPE;
117       *pCid = iter->second;
118       register envelope *ret = UsrToEnv(pCid);
119       ret->setVidPtr(iter->second.objPtr);
120       ret->setSrcPe(CkMyPe());
121       CmiSetHandler(ret, _charmHandlerIdx);
122       CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
123       CpvAccess(_qd)->create();
124       CkpvAccess(vmap).erase(iter);
125     }
126   }
127 #endif
130 void Chare::pup(PUP::er &p)
132   p(thishandle.onPE);
133   thishandle.objPtr=(void *)this;
134 #ifndef CMK_CHARE_USE_PTR
135   p(chareIdx);
136   if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
137 #endif
138 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
139         if(p.isUnpacking()){
140                 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
141                 mlogData = new ChareMlogData();
142         }
143         mlogData->pup(p);
144 #endif
145 #if CMK_ERROR_CHECKING
146   p(magic);
147 #endif
150 int Chare::ckGetChareType() const {
151   return -3;
153 char *Chare::ckDebugChareName(void) {
154   char buf[100];
155   sprintf(buf,"Chare on pe %d at %p",CkMyPe(),this);
156   return strdup(buf);
158 int Chare::ckDebugChareID(char *str, int limit) {
159   // pure chares for now do not have a valid ID
160   str[0] = 0;
161   return 1;
163 void Chare::ckDebugPup(PUP::er &p) {
164   pup(p);
167 /// This method is called before starting a [threaded] entry method.
168 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
169   CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
170   traceAddThreadListeners(th, UsrToEnv(msg));
173 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
174   p.comment("Bytes");
175   int ts=UsrToEnv(msg)->getTotalsize();
176   int msgLen=ts-sizeof(envelope);
177   if (msgLen>0)
178     p((char*)msg,msgLen);
181 IrrGroup::IrrGroup(void) {
182   thisgroup = CkpvAccess(_currentGroup);
183 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
184         mlogData->objID.type = TypeGroup;
185         mlogData->objID.data.group.id = thisgroup;
186         mlogData->objID.data.group.onPE = CkMyPe();
187 #endif
190 IrrGroup::~IrrGroup() {
191   // remove the object pointer
192   if (CkpvAccess(_destroyingNodeGroup)) {
193     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
194     CksvAccess(_nodeGroupTable)->find(thisgroup).setObj(NULL);
195     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
196     CkpvAccess(_destroyingNodeGroup) = false;
197   } else {
198     CmiImmediateLock(CkpvAccess(_groupTableImmLock));
199     CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
200     CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
201   }
204 void IrrGroup::pup(PUP::er &p)
206   Chare::pup(p);
207   p|thisgroup;
210 int IrrGroup::ckGetChareType() const {
211   return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
214 int IrrGroup::ckDebugChareID(char *str, int limit) {
215   if (limit<5) return -1;
216   str[0] = 1;
217   *((int*)&str[1]) = thisgroup.idx;
218   return 5;
221 char *IrrGroup::ckDebugChareName() {
222   return strdup(_chareTable[ckGetChareType()]->name);
225 void IrrGroup::ckJustMigrated(void)
229 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
230   /* FIXME: **CW** not entirely sure what we should do here yet */
233 void Group::CkAddThreadListeners(CthThread th, void *msg) {
234   Chare::CkAddThreadListeners(th, msg);
235   CthSetThreadID(th, thisgroup.idx, 0, 0);
238 void Group::pup(PUP::er &p)
240   CkReductionMgr::pup(p);
241   reductionInfo.pup(p);
244 /**** Delegation Manager Group */
245 CkDelegateMgr::~CkDelegateMgr() { }
247 //Default delegator implementation: do not delegate-- send directly
248 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
249   { CkSendMsg(ep,m,c); }
250 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
251   { CkSendMsgBranch(ep,m,onPE,g); }
252 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
253   { CkBroadcastMsgBranch(ep,m,g); }
254 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
255   { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
256 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
257   { CkSendMsgNodeBranch(ep,m,onNode,g); }
258 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
259   { CkBroadcastMsgNodeBranch(ep,m,g); }
260 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
261   { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->npes,s->pelist); }
262 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
264         CProxyElement_ArrayBase ap(a,idx);
265         ap.ckInsert((CkArrayMessage *)m,ep,onPE);
267 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
269         CProxyElement_ArrayBase ap(a,idx);
270         ap.ckSend((CkArrayMessage *)m,ep);
272 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
274         CProxy_ArrayBase ap(a);
275         ap.ckBroadcast((CkArrayMessage *)m,ep);
278 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
280         CmiAbort("ArraySectionSend is not implemented!\n");
282         CProxyElement_ArrayBase ap(a,idx);
283         ap.ckSend((CkArrayMessage *)m,ep);
287 /*** Proxy <-> delegator communication */
288 CkDelegateData::~CkDelegateData() {}
290 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
291   return pd; // default implementation ignores pup call
294 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
295    this tricky manual reference counting business... */
297 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
298         if (dPtr) dPtr->ref();
299         ckUndelegate();
300         delegatedMgr = dTo;
301         delegatedPtr = dPtr;
302         delegatedGroupId = delegatedMgr->CkGetGroupID();
303         isNodeGroup = delegatedMgr->isNodeGroup();
305 void CProxy::ckUndelegate(void) {
306         delegatedMgr=NULL;
307         delegatedGroupId.setZero();
308         if (delegatedPtr) delegatedPtr->unref();
309         delegatedPtr=NULL;
312 /// Copy constructor
313 CProxy::CProxy(const CProxy &src)
314   :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId), 
315    isNodeGroup(src.isNodeGroup) {
316     delegatedPtr = NULL;
317     if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
318         delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
319     }
322 /// Assignment operator
323 CProxy& CProxy::operator=(const CProxy &src) {
324         CkDelegateData *oldPtr=delegatedPtr;
325         ckUndelegate();
326         delegatedMgr=src.delegatedMgr;
327         delegatedGroupId = src.delegatedGroupId; 
328         isNodeGroup = src.isNodeGroup;
330         if(delegatedMgr != NULL && src.delegatedPtr != NULL)
331             delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
332         else
333             delegatedPtr = NULL;
335         // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
336         if (oldPtr) oldPtr->unref();
337         return *this;
340 void CProxy::pup(PUP::er &p) {
341   if (!p.isUnpacking()) {
342     if (ckDelegatedTo() != NULL) {
343       delegatedGroupId = delegatedMgr->CkGetGroupID();
344       isNodeGroup = delegatedMgr->isNodeGroup();
345     }
346   }
347   p|delegatedGroupId;
348   if (!delegatedGroupId.isZero()) {
349     p|isNodeGroup;
350     if (p.isUnpacking()) {
351       delegatedMgr = ckDelegatedTo(); 
352     }
354     int migCtor = 0, cIdx; 
355     if (!p.isUnpacking()) {
356       if (isNodeGroup) {
357         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
358         cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx(); 
359         migCtor = _chareTable[cIdx]->migCtor; 
360         CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
361       }
362       else  {
363         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
364         cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
365         migCtor = _chareTable[cIdx]->migCtor; 
366         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
367       }         
368     }
370     p|migCtor;
372     // if delegated manager has not been created, construct a dummy
373     // object on which to call DelegatePointerPup
374     if (delegatedMgr == NULL) {
376       // create a dummy object for calling DelegatePointerPup
377       int objId = _entryTable[migCtor]->chareIdx; 
378       int objSize = _chareTable[objId]->size; 
379       void *obj = malloc(objSize); 
380       _entryTable[migCtor]->call(NULL, obj); 
381       delegatedPtr = static_cast<CkDelegateMgr *> (obj)
382         ->DelegatePointerPup(p, delegatedPtr);           
383       free(obj);
385     }
386     else {
388       // delegated manager has been created, so we can use it
389       delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
391     }
393     if (p.isUnpacking() && delegatedPtr) {
394       delegatedPtr->ref();
395     }
396   }
399 /**** Array sections */
400 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
401 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems): _nElems(nElems) { \
402   _cookie.get_aid() = aid;      \
403   _cookie.get_pe() = CkMyPe();  \
404   _elems = new CkArrayIndex[nElems];    \
405   for (int i=0; i<nElems; i++) _elems[i] = elems[i];    \
406   pelist = NULL;        \
407   npes  = 0;    \
410 CKSECTIONID_CONSTRUCTOR_DEF(1D)
411 CKSECTIONID_CONSTRUCTOR_DEF(2D)
412 CKSECTIONID_CONSTRUCTOR_DEF(3D)
413 CKSECTIONID_CONSTRUCTOR_DEF(4D)
414 CKSECTIONID_CONSTRUCTOR_DEF(5D)
415 CKSECTIONID_CONSTRUCTOR_DEF(6D)
416 CKSECTIONID_CONSTRUCTOR_DEF(Max)
418 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes): _nElems(0), _elems(NULL), npes(_npes) {
419   pelist = new int[npes];
420   for (int i=0; i<npes; i++) pelist[i] = _pelist[i];
421   _cookie.get_aid() = gid;
424 CkSectionID::CkSectionID(const CkSectionID &sid) {
425   int i;
426   _cookie = sid._cookie;
427   _nElems = sid._nElems;
428   if (_nElems > 0) {
429     _elems = new CkArrayIndex[_nElems];
430     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
431   } else _elems = NULL;
432   npes = sid.npes;
433   if (npes > 0) {
434     pelist = new int[npes];
435     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
436   } else pelist = NULL;
439 void CkSectionID::operator=(const CkSectionID &sid) {
440   int i;
441   _cookie = sid._cookie;
442   _nElems = sid._nElems;
443   if (_nElems > 0) {
444     _elems = new CkArrayIndex[_nElems];
445     for (i=0; i<_nElems; i++) _elems[i] = sid._elems[i];
446   } else _elems = NULL;
447   npes = sid.npes;
448   if (npes > 0) {
449     pelist = new int[npes];
450     for (i=0; i<npes; ++i) pelist[i] = sid.pelist[i];
451   } else pelist = NULL;
454 void CkSectionID::pup(PUP::er &p) {
455     p | _cookie;
456     p(_nElems);
457     if (_nElems > 0) {
458       if (p.isUnpacking()) _elems = new CkArrayIndex[_nElems];
459       for (int i=0; i< _nElems; i++) p | _elems[i];
460       npes = 0;
461       pelist = NULL;
462     } else {
463       // If _nElems is zero, than this section describes processors instead of array elements
464       _elems = NULL;
465       p(npes);
466       if (p.isUnpacking()) pelist = new int[npes];
467       p(pelist, npes);
468     }
471 /**** Tiny random API routines */
473 #if CMK_CUDA
474 void CUDACallbackManager(void *fn) {
475   if (fn != NULL) {
476     CkCallback *cb = (CkCallback*) fn;
477     cb->send();
478   }
481 #endif
483 void QdCreate(int n) {
484   CpvAccess(_qd)->create(n);
487 void QdProcess(int n) {
488   CpvAccess(_qd)->process(n);
491 extern "C"
492 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
494   UsrToEnv(msg)->setRef(ref);
497 extern "C"
498 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
500   return UsrToEnv(msg)->getRef();
503 extern "C"
504 int CkGetSrcPe(void *msg)
506   return UsrToEnv(msg)->getSrcPe();
509 extern "C"
510 int CkGetSrcNode(void *msg)
512   return CmiNodeOf(CkGetSrcPe(msg));
515 extern "C"
516 void *CkLocalBranch(CkGroupID gID) {
517   return _localBranch(gID);
520 static
521 void *_ckLocalNodeBranch(CkGroupID groupID) {
522   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
523   void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
524   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
525   return retval;
528 extern "C"
529 void *CkLocalNodeBranch(CkGroupID groupID)
531   void *retval;
532   // we are called in a constructor
533   if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
534     return CkpvAccess(_currentNodeGroupObj);
535   while (NULL== (retval=_ckLocalNodeBranch(groupID)))
536   { // Nodegroup hasn't finished being created yet-- schedule...
537     CsdScheduler(0);
538   }
539   return retval;
542 extern "C"
543 void *CkLocalChare(const CkChareID *pCid)
545         int pe=pCid->onPE;
546         if (pe<0) { //A virtual chare ID
547                 if (pe!=(-(CkMyPe()+1)))
548                         return NULL;//VID block not on this PE
549 #ifdef CMK_CHARE_USE_PTR
550                 VidBlock *v=(VidBlock *)pCid->objPtr;
551 #else
552                 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
553 #endif
554                 return v->getLocalChareObj();
555         }
556         else
557         { //An ordinary chare ID
558                 if (pe!=CkMyPe())
559                         return NULL;//Chare not on this PE
560 #ifdef CMK_CHARE_USE_PTR
561                 return pCid->objPtr;
562 #else
563                 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
564 #endif
565         }
568 CkpvDeclare(char**,Ck_argv);
570 extern "C" char **CkGetArgv(void) {
571         return CkpvAccess(Ck_argv);
573 extern "C" int CkGetArgc(void) {
574         return CmiGetArgc(CkpvAccess(Ck_argv));
577 /******************** Basic support *****************/
578 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
580 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
581         CpvAccess(_currentObj) = (Chare *)obj;
582 //      printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
583 #endif
584   //BIGSIM_OOC DEBUGGING
585   //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
586   //fflush(stdout);
587 #if CMK_CHARMDEBUG
588   CpdBeforeEp(epIdx, obj, msg);
589 #endif    
590   _entryTable[epIdx]->call(msg, obj);
591 #if CMK_CHARMDEBUG
592   CpdAfterEp(epIdx);
593 #endif
594   if (_entryTable[epIdx]->noKeep)
595   { /* Method doesn't keep/delete the message, so we have to: */
596     _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
597   }
599 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
601   //BIGSIM_OOC DEBUGGING
602   //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
603   //fflush(stdout);
605   void *deliverMsg;
606 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
607         CpvAccess(_currentObj) = (Chare *)obj;
608 #endif
609   if (_entryTable[epIdx]->noKeep)
610   { /* Deliver a read-only copy of the message */
611     deliverMsg=(void *)msg;
612   } else
613   { /* Method needs a copy of the message to keep/delete */
614     void *oldMsg=(void *)msg;
615     deliverMsg=CkCopyMsg(&oldMsg);
616 #if CMK_ERROR_CHECKING
617     if (oldMsg!=msg)
618       CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
619 #endif
620   }
621 #if CMK_CHARMDEBUG
622   CpdBeforeEp(epIdx, obj, (void*)msg);
623 #endif
624   _entryTable[epIdx]->call(deliverMsg, obj);
625 #if CMK_CHARMDEBUG
626   CpdAfterEp(epIdx);
627 #endif
630 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
632   register void *msg = EnvToUsr(env);
633   _SET_USED(env, 0);
634   CkDeliverMessageFree(epIdx,msg,obj);
637 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
640 #if CMK_TRACE_ENABLED 
641   if (_entryTable[epIdx]->traceEnabled) {
642     _TRACE_BEGIN_EXECUTE(env, obj);
643     if(_entryTable[epIdx]->appWork)
644         _TRACE_BEGIN_APPWORK();
645     _invokeEntryNoTrace(epIdx,env,obj);
646     if(_entryTable[epIdx]->appWork)
647         _TRACE_END_APPWORK();
648     _TRACE_END_EXECUTE();
649   }
650   else
651 #endif
652     _invokeEntryNoTrace(epIdx,env,obj);
656 /********************* Creation ********************/
658 extern "C"
659 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
661   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
662   envelope *env = UsrToEnv(msg);
663   _CHECK_USED(env);
664   if(pCid == 0) {
665     env->setMsgtype(NewChareMsg);
666   } else {
667     pCid->onPE = (-(CkMyPe()+1));
668     //  pCid->magic = _GETIDX(cIdx);
669     pCid->objPtr = (void *) new VidBlock();
670     _MEMCHECK(pCid->objPtr);
671     env->setMsgtype(NewVChareMsg);
672     env->setVidPtr(pCid->objPtr);
673 #ifndef CMK_CHARE_USE_PTR
674     CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
675     int idx = CkpvAccess(vidblocks).size()-1;
676     pCid->objPtr = (void *)(CmiIntPtr)idx;
677     env->setVidPtr((void *)(CmiIntPtr)idx);
678 #endif
679   }
680   env->setEpIdx(eIdx);
681   env->setByPe(CkMyPe());
682   env->setSrcPe(CkMyPe());
683   CmiSetHandler(env, _charmHandlerIdx);
684   _TRACE_CREATION_1(env);
685   CpvAccess(_qd)->create();
686   _STATS_RECORD_CREATE_CHARE_1();
687   _SET_USED(env, 1);
688   if(destPE == CK_PE_ANY)
689     env->setForAnyPE(1);
690   else
691     env->setForAnyPE(0);
692   _CldEnqueue(destPE, env, _infoIdx);
693   _TRACE_CREATION_DONE(1);
696 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
698   register int gIdx = _entryTable[epIdx]->chareIdx;
699   register void *obj = malloc(_chareTable[gIdx]->size);
700   _MEMCHECK(obj);
701   setMemoryTypeChare(obj);
702   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
703   CkpvAccess(_groupTable)->find(groupID).setObj(obj);
704   CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
705   CkpvAccess(_groupIDTable)->push_back(groupID);
706   PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
707   if(ptrq) {
708     void *pending;
709     while((pending=ptrq->deq())!=0) {
710 #if CMK_BIGSIM_CHARM
711       //In BigSim, CpvAccess(CsdSchedQueue) is not used. _CldEnqueue resets the
712       //handler to converse-level BigSim handler.
713       _CldEnqueue(CkMyPe(), pending, _infoIdx);
714 #else
715       CsdEnqueueGeneral(pending, CQS_QUEUEING_FIFO, 0, 0);
716 #endif
717     }
718     CkpvAccess(_groupTable)->find(groupID).clearPending();
719   }
720   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
722   CkpvAccess(_currentGroup) = groupID;
723   CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
725 #ifndef CMK_CHARE_USE_PTR
726   int callingChareIdx = CkpvAccess(currentChareIdx);
727   CkpvAccess(currentChareIdx) = -1;
728 #endif
730   _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
732 #ifndef CMK_CHARE_USE_PTR
733   CkpvAccess(currentChareIdx) = callingChareIdx;
734 #endif
736   _STATS_RECORD_PROCESS_GROUP_1();
739 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
741   register int gIdx = _entryTable[epIdx]->chareIdx;
742   int objSize=_chareTable[gIdx]->size;
743   register void *obj = malloc(objSize);
744   _MEMCHECK(obj);
745   setMemoryTypeChare(obj);
746   CkpvAccess(_currentGroup) = groupID;
748 // Now that the NodeGroup is created, add it to the table.
749 //  NodeGroups can be accessed by multiple processors, so
750 //  this is in the opposite order from groups - invoking the constructor
751 //  before registering it.
752 // User may call CkLocalNodeBranch() inside the nodegroup constructor
753 //  store nodegroup into _currentNodeGroupObj
754   CkpvAccess(_currentNodeGroupObj) = obj;
756 #ifndef CMK_CHARE_USE_PTR
757   int callingChareIdx = CkpvAccess(currentChareIdx);
758   CkpvAccess(currentChareIdx) = -1;
759 #endif
761   _invokeEntryNoTrace(epIdx,env,obj);
763 #ifndef CMK_CHARE_USE_PTR
764   CkpvAccess(currentChareIdx) = callingChareIdx;
765 #endif
767   CkpvAccess(_currentNodeGroupObj) = NULL;
768   _STATS_RECORD_PROCESS_NODE_GROUP_1();
770   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
771   CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
772   CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
773   CksvAccess(_nodeGroupIDTable).push_back(groupID);
775   PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
776   if(ptrq) {
777     void *pending;
778     while((pending=ptrq->deq())!=0) {
779       _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
780     }
781     CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
782   }
783   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
786 void _createGroup(CkGroupID groupID, envelope *env)
788   _CHECK_USED(env);
789   _SET_USED(env, 1);
790   register int epIdx = env->getEpIdx();
791   int gIdx = _entryTable[epIdx]->chareIdx;
792   CkNodeGroupID rednMgr;
793 #if !GROUP_LEVEL_REDUCTION
794   if(_chareTable[gIdx]->isIrr == 0){
795                 CProxy_CkArrayReductionMgr rednMgrProxy = CProxy_CkArrayReductionMgr::ckNew(0, groupID);
796                 rednMgr = rednMgrProxy;
797 //              rednMgrProxy.setAttachedGroup(groupID);
798   }
799   else
800 #endif
801   {
802         rednMgr.setZero();
803   }
804   env->setGroupNum(groupID);
805   env->setSrcPe(CkMyPe());
806   env->setRednMgr(rednMgr);
807   env->setGroupEpoch(CkpvAccess(_charmEpoch));
809   if(CkNumPes()>1) {
810     CkPackMessage(&env);
811     CmiSetHandler(env, _bocHandlerIdx);
812     _numInitMsgs++;
813     CmiSyncBroadcast(env->getTotalsize(), (char *)env);
814     CpvAccess(_qd)->create(CkNumPes()-1);
815     CkUnpackMessage(&env);
816   }
817   _STATS_RECORD_CREATE_GROUP_1();
818   CkCreateLocalGroup(groupID, epIdx, env);
821 void _createNodeGroup(CkGroupID groupID, envelope *env)
823   _CHECK_USED(env);
824   _SET_USED(env, 1);
825   register int epIdx = env->getEpIdx();
826   env->setGroupNum(groupID);
827   env->setSrcPe(CkMyPe());
828   env->setGroupEpoch(CkpvAccess(_charmEpoch));
829   if(CkNumNodes()>1) {
830     CkPackMessage(&env);
831     CmiSetHandler(env, _bocHandlerIdx);
832     _numInitMsgs++;
833     if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
834     CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
835     CpvAccess(_qd)->create(CkNumNodes()-1);
836     CkUnpackMessage(&env);
837   }
838   _STATS_RECORD_CREATE_NODE_GROUP_1();
839   CkCreateLocalNodeGroup(groupID, epIdx, env);
842 // new _groupCreate
844 static CkGroupID _groupCreate(envelope *env)
846   register CkGroupID groupNum;
848   // check CkMyPe(). if it is 0 then idx is _numGroups++
849   // if not, then something else...
850   if(CkMyPe() == 0)
851      groupNum.idx = CkpvAccess(_numGroups)++;
852   else
853      groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
854   _createGroup(groupNum, env);
855   return groupNum;
858 // new _nodeGroupCreate
859 static CkGroupID _nodeGroupCreate(envelope *env)
861   register CkGroupID groupNum;
862   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));                // change for proc 0 and other processors
863   if(CkMyNode() == 0)                           // should this be CkMyPe() or CkMyNode()?
864           groupNum.idx = CksvAccess(_numNodeGroups)++;
865    else
866           groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
867   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
868   _createNodeGroup(groupNum, env);
869   return groupNum;
872 /**** generate the group idx when group is creator pe is not pe0
873  **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
874  **** remaining bits contain the group creator processor number and
875  **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
877 int _getGroupIdx(int numNodes,int myNode,int numGroups)
879         int idx;
880         int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
881         int n = 32 - (x+1);                                     // number of bits remaining for the index
882         idx = (myNode<<n) + numGroups;                          // add number of processors, shift by the no. of bits needed,
883                                                                 // then add the next available index
884         // of course this won't work when int is 8 bytes long on T3E
885         //idx |= 0x80000000;                                      // set the most significant bit to 1
886         idx = - idx;
887                                                                 // if int is not 32 bits, wouldn't this be wrong?
888         return idx;
891 extern "C"
892 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
894   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
895   register envelope *env = UsrToEnv(msg);
896   env->setMsgtype(BocInitMsg);
897   env->setEpIdx(eIdx);
898   env->setSrcPe(CkMyPe());
899   _TRACE_CREATION_N(env, CkNumPes());
900   CkGroupID gid = _groupCreate(env);
901   _TRACE_CREATION_DONE(1);
902   return gid;
905 extern "C"
906 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
908   CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
909   register envelope *env = UsrToEnv(msg);
910   env->setMsgtype(NodeBocInitMsg);
911   env->setEpIdx(eIdx);
912   env->setSrcPe(CkMyPe());
913   _TRACE_CREATION_N(env, CkNumNodes());
914   CkGroupID gid = _nodeGroupCreate(env);
915   _TRACE_CREATION_DONE(1);
916   return gid;
919 static inline void *_allocNewChare(envelope *env, int &idx)
921   int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
922   void *tmp=malloc(_chareTable[chareIdx]->size);
923   _MEMCHECK(tmp);
924 #ifndef CMK_CHARE_USE_PTR
925   CkpvAccess(chare_objs).push_back(tmp);
926   CkpvAccess(chare_types).push_back(chareIdx);
927   idx = CkpvAccess(chare_objs).size()-1;
928 #endif
929   setMemoryTypeChare(tmp);
930   return tmp;
933 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
935   int idx;
936   register void *obj = _allocNewChare(env, idx);
937 #ifndef CMK_CHARE_USE_PTR
938   //((Chare *)obj)->chareIdx = idx;
939   CkpvAccess(currentChareIdx) = idx;
940 #endif
941   _invokeEntry(env->getEpIdx(),env,obj);
944 void CkCreateLocalChare(int epIdx, envelope *env)
946   env->setEpIdx(epIdx);
947   _processNewChareMsg(NULL, env);
950 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
952   int idx;
953   register void *obj = _allocNewChare(env, idx);
954   register CkChareID *pCid = (CkChareID *)
955       _allocMsg(FillVidMsg, sizeof(CkChareID));
956   pCid->onPE = CkMyPe();
957 #ifndef CMK_CHARE_USE_PTR
958   pCid->objPtr = (void*)(CmiIntPtr)idx;
959 #else
960   pCid->objPtr = obj;
961 #endif
962   // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
963   register envelope *ret = UsrToEnv(pCid);
964   ret->setVidPtr(env->getVidPtr());
965   register int srcPe = env->getByPe();
966   ret->setSrcPe(CkMyPe());
967   CmiSetHandler(ret, _charmHandlerIdx);
968   CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
969 #ifndef CMK_CHARE_USE_PTR
970   // register the remote vidblock for deletion when chare is deleted
971   CkChareID vid;
972   vid.onPE = srcPe;
973   vid.objPtr = env->getVidPtr();
974   CkpvAccess(vmap)[idx] = vid;    
975 #endif
976   CpvAccess(_qd)->create();
977 #ifndef CMK_CHARE_USE_PTR
978   //((Chare *)obj)->chareIdx = idx;
979   CkpvAccess(currentChareIdx) = idx;
980 #endif
981   _invokeEntry(env->getEpIdx(),env,obj);
984 /************** Receive: Chares *************/
986 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
988   register int epIdx = env->getEpIdx();
989   register int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
990   register void *obj;
991   if (mainIdx != -1)  {           // mainchare
992     CmiAssert(CkMyPe()==0);
993     obj = _mainTable[mainIdx]->getObj();
994   }
995   else {
996 #ifndef CMK_CHARE_USE_PTR
997     if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
998       obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
999     else
1000       obj = env->getObjPtr();
1001 #else
1002     obj = env->getObjPtr();
1003 #endif
1004   }
1005   _invokeEntry(epIdx,env,obj);
1008 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
1010   register int epIdx = env->getEpIdx();
1011   register void *obj = env->getObjPtr();
1012   _invokeEntry(epIdx,env,obj);
1015 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
1017 #ifndef CMK_CHARE_USE_PTR
1018   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1019 #else
1020   register VidBlock *vptr = (VidBlock *) env->getVidPtr();
1021   _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
1022 #endif
1023   register CkChareID *pcid = (CkChareID *) EnvToUsr(env);
1024   _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
1025   if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
1026   CmiFree(env);
1029 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
1031 #ifndef CMK_CHARE_USE_PTR
1032   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1033 #else
1034   VidBlock *vptr = (VidBlock *) env->getVidPtr();
1035   _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
1036 #endif
1037   _SET_USED(env, 1);
1038   vptr->send(env);
1041 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1043 #ifndef CMK_CHARE_USE_PTR
1044   register VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1045   delete vptr;
1046   CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1047 #endif
1048   CmiFree(env);
1051 /************** Receive: Groups ****************/
1054  Return a pointer to the local BOC of "groupID".
1055  The message "env" passed in has some known dependency on this groupID
1056  (either it is to be delivered to this BOC, or it depends on this BOC being there).
1057  Therefore, if the return value is NULL, this function buffers the message so that
1058  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1059  The message passed in must have its handlers correctly set so that it can be
1060  scheduled again.
1062 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1065         CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1066         IrrGroup *obj = ck->localBranch(groupID);
1067         if (obj==NULL) { /* groupmember not yet created: stash message */
1068                 ck->getGroupTable()->find(groupID).enqMsg(env);
1069         }
1070         else { /* will be able to process message */
1071                 ck->process();
1072         }
1073         CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1074         return obj;
1077 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1079   return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
1082 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1084 #if CMK_LBDB_ON
1085   // if there is a running obj being measured, stop it temporarily
1086   LDObjHandle objHandle;
1087   int objstopped = 0;
1088   LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1089   if (the_lbdb->RunningObject(&objHandle)) {
1090     objstopped = 1;
1091     the_lbdb->ObjectStop(objHandle);
1092   }
1093 #endif
1094   _invokeEntry(epIdx,env,obj);
1095 #if CMK_LBDB_ON
1096   if (objstopped) the_lbdb->ObjectStart(objHandle);
1097 #endif
1098   _STATS_RECORD_PROCESS_BRANCH_1();
1101 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1103   register CkGroupID groupID =  env->getGroupNum();
1104   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1105   if(obj) {
1106     _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1107   }
1110 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1112   env->setMsgtype(ForChareMsg);
1113   env->setObjPtr(obj);
1114   _processForChareMsg(ck,env);
1115   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1118 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1120   env->setEpIdx(epIdx);
1121   _deliverForNodeBocMsg(ck,env, obj);
1124 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1126   register CkGroupID groupID = env->getGroupNum();
1127   register void *obj;
1129   CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1130   obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1131   if(!obj) { // groupmember not yet created
1132 #if CMK_IMMEDIATE_MSG
1133     if (CmiIsImmediate(env)) {
1134       //CmiDelayImmediate();        /* buffer immediate message */
1135       CmiResetImmediate(env);        // note: this may not be SIG IO safe !
1136     }
1137 #endif
1138     CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1139     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1140     return;
1141   }
1142   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1143   ck->process();
1144   env->setMsgtype(ForChareMsg);
1145   env->setObjPtr(obj);
1146   _processForChareMsg(ck,env);
1147   _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1150 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1152   register CkGroupID groupID = env->getGroupNum();
1153   register int epIdx = env->getEpIdx();
1154   if (!env->getGroupDep().isZero()) {      // dependence
1155     CkGroupID dep = env->getGroupDep();
1156     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
1157     if (obj == NULL) return;
1158   }
1159   else
1160     ck->process();
1161   CkCreateLocalGroup(groupID, epIdx, env);
1164 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1166   register CkGroupID groupID = env->getGroupNum();
1167   register int epIdx = env->getEpIdx();
1168   CkCreateLocalNodeGroup(groupID, epIdx, env);
1171 /************** Receive: Arrays *************/
1172 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1173   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
1174   if (mgr) {
1175     _SET_USED(env, 0);
1176     mgr->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_inline);
1177   }
1180 //BIGSIM_OOC DEBUGGING
1181 #define TELLMSGTYPE(x) //x
1184  * This is the main converse-level handler used by all of Charm++.
1186  * \addtogroup CriticalPathFramework
1187  */
1188 void _processHandler(void *converseMsg,CkCoreState *ck)
1190   register envelope *env = (envelope *) converseMsg;
1192   MESSAGE_PHASE_CHECK(env);
1194 //#if CMK_RECORD_REPLAY
1195   if (ck->watcher!=NULL) {
1196     if (!ck->watcher->processMessage(&env,ck)) return;
1197   }
1198 //#endif
1199 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1200         Chare *obj=NULL;
1201         CkObjID sender;
1202         MCount SN;
1203         MlogEntry *entry=NULL;
1204         if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg
1205            || env->getMsgtype() == ForArrayEltMsg
1206            || env->getMsgtype() == ForChareMsg) {
1207                 sender = env->sender;
1208                 SN = env->SN;
1209                 int result = preProcessReceivedMessage(env,&obj,&entry);
1210                 if(result == 0){
1211                         return;
1212                 }
1213         }
1214 #endif
1215 #if USE_CRITICAL_PATH_HEADER_ARRAY
1216   CK_CRITICALPATH_START(env)
1217 #endif
1219   switch(env->getMsgtype()) {
1220 // Group support
1221     case BocInitMsg :
1222       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1223       // QD processing moved inside _processBocInitMsg because it is conditional
1224       //ck->process(); 
1225       if(env->isPacked()) CkUnpackMessage(&env);
1226       _processBocInitMsg(ck,env);
1227       break;
1228     case NodeBocInitMsg :
1229       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1230       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1231       _processNodeBocInitMsg(ck,env);
1232       break;
1233     case ForBocMsg :
1234       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1235       // QD processing moved inside _processForBocMsg because it is conditional
1236       if(env->isPacked()) CkUnpackMessage(&env);
1237       _processForBocMsg(ck,env);
1238       // stats record moved inside _processForBocMsg because it is conditional
1239       break;
1240     case ForNodeBocMsg :
1241       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1242       // QD processing moved to _processForNodeBocMsg because it is conditional
1243       if(env->isPacked()) CkUnpackMessage(&env);
1244       _processForNodeBocMsg(ck,env);
1245       // stats record moved to _processForNodeBocMsg because it is conditional
1246       break;
1248 // Array support
1249     case ForArrayEltMsg:
1250       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1251       if(env->isPacked()) CkUnpackMessage(&env);
1252       _processArrayEltMsg(ck,env);
1253       break;
1255 // Chare support
1256     case NewChareMsg :
1257       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1258       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1259       _processNewChareMsg(ck,env);
1260       _STATS_RECORD_PROCESS_CHARE_1();
1261       break;
1262     case NewVChareMsg :
1263       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1264       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1265       _processNewVChareMsg(ck,env);
1266       _STATS_RECORD_PROCESS_CHARE_1();
1267       break;
1268     case ForChareMsg :
1269       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1270       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
1271       _processForPlainChareMsg(ck,env);
1272       _STATS_RECORD_PROCESS_MSG_1();
1273       break;
1274     case ForVidMsg   :
1275       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1276       ck->process();
1277       _processForVidMsg(ck,env);
1278       break;
1279     case FillVidMsg  :
1280       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1281       ck->process();
1282       _processFillVidMsg(ck,env);
1283       break;
1284     case DeleteVidMsg  :
1285       TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1286       ck->process();
1287       _processDeleteVidMsg(ck,env);
1288       break;
1290     default:
1291       CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1292   }
1293 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1294         if(obj != NULL){
1295                 postProcessReceivedMessage(obj,sender,SN,entry);
1296         }
1297 #endif
1300 #if USE_CRITICAL_PATH_HEADER_ARRAY
1301   CK_CRITICALPATH_END()
1302 #endif
1307 /******************** Message Send **********************/
1309 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1310              int *queueing, int *priobits, unsigned int **prioptr)
1312   register envelope *env = (envelope *)converseMsg;
1313   *pfn = (CldPackFn)CkPackMessage;
1314   *len = env->getTotalsize();
1315   *queueing = env->getQueueing();
1316   *priobits = env->getPriobits();
1317   *prioptr = (unsigned int *) env->getPrioPtr();
1320 void CkPackMessage(envelope **pEnv)
1322   register envelope *env = *pEnv;
1323   if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1324     register void *msg = EnvToUsr(env);
1325     _TRACE_BEGIN_PACK();
1326     msg = _msgTable[env->getMsgIdx()]->pack(msg);
1327     _TRACE_END_PACK();
1328     env=UsrToEnv(msg);
1329     env->setPacked(1);
1330     *pEnv = env;
1331   }
1334 void CkUnpackMessage(envelope **pEnv)
1336   register envelope *env = *pEnv;
1337   register int msgIdx = env->getMsgIdx();
1338   if(env->isPacked()) {
1339     register void *msg = EnvToUsr(env);
1340     _TRACE_BEGIN_UNPACK();
1341     msg = _msgTable[msgIdx]->unpack(msg);
1342     _TRACE_END_UNPACK();
1343     env=UsrToEnv(msg);
1344     env->setPacked(0);
1345     *pEnv = env;
1346   }
1349 //There's no reason for most messages to go through the Cld--
1350 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1351 // Thus these accellerated versions of the Cld calls.
1352 #if CMK_OBJECT_QUEUE_AVAILABLE
1353 static int index_objectQHandler;
1354 #endif
1355 int index_tokenHandler;
1356 int index_skipCldHandler;
1358 void _skipCldHandler(void *converseMsg)
1360   register envelope *env = (envelope *)(converseMsg);
1361   CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1362 #if CMK_GRID_QUEUE_AVAILABLE
1363   if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1364     CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1365                        env, env->getQueueing (), env->getPriobits (),
1366                        (unsigned int *) env->getPrioPtr ());
1367   } else {
1368     CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1369                        env, env->getQueueing (), env->getPriobits (),
1370                        (unsigned int *) env->getPrioPtr ());
1371   }
1372 #else
1373   CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1374         env, env->getQueueing(),env->getPriobits(),
1375         (unsigned int *)env->getPrioPtr());
1376 #endif
1380 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1381 // Made non-static to be used by ckmessagelogging
1382 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1384 #if CMK_CHARMDEBUG
1385   if (!ConverseDeliver(pe)) {
1386     CmiFree(env);
1387     return;
1388   }
1389 #endif
1390   if(pe == CkMyPe() ){
1391     if(!CmiNodeAlive(CkMyPe())){
1392         printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1393 //      return;
1394     }
1395   }
1396   if (pe == CkMyPe() && !CmiImmIsRunning()) {
1397 #if CMK_OBJECT_QUEUE_AVAILABLE
1398     Chare *obj = CkFindObjectPtr(env);
1399     if (obj && obj->CkGetObjQueue().queue()) {
1400       _enqObjQueue(obj, env);
1401     }
1402     else
1403 #endif
1404     CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1405         env, env->getQueueing(),env->getPriobits(),
1406         (unsigned int *)env->getPrioPtr());
1407 #if CMK_PERSISTENT_COMM
1408     CmiPersistentOneSend();
1409 #endif
1410   } else {
1411     if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1412       CkPackMessage(&env);
1413     int len=env->getTotalsize();
1414     CmiSetXHandler(env,CmiGetHandler(env));
1415 #if CMK_OBJECT_QUEUE_AVAILABLE
1416     CmiSetHandler(env,index_objectQHandler);
1417 #else
1418     CmiSetHandler(env,index_skipCldHandler);
1419 #endif
1420     CmiSetInfo(env,infoFn);
1421     if (pe==CLD_BROADCAST) {
1422 #if CMK_MESSAGE_LOGGING
1423         if(env->flags & CK_FREE_MSG_MLOG)
1424                 CmiSyncBroadcastAndFree(len, (char *)env); 
1425         else
1426                 CmiSyncBroadcast(len, (char *)env);
1427 #else
1428                         CmiSyncBroadcastAndFree(len, (char *)env); 
1429 #endif
1432     else if (pe==CLD_BROADCAST_ALL) { 
1433 #if CMK_MESSAGE_LOGGING
1434         if(env->flags & CK_FREE_MSG_MLOG)
1435                 CmiSyncBroadcastAllAndFree(len, (char *)env);
1436         else
1437                 CmiSyncBroadcastAll(len, (char *)env);
1438 #else
1439                         CmiSyncBroadcastAllAndFree(len, (char *)env);
1440 #endif
1443     else{
1444 #if CMK_MESSAGE_LOGGING
1445         if(env->flags & CK_FREE_MSG_MLOG)
1446                 CmiSyncSendAndFree(pe, len, (char *)env);
1447         else
1448                 CmiSyncSend(pe, len, (char *)env);
1449 #else
1450                         CmiSyncSendAndFree(pe, len, (char *)env);
1451 #endif
1453                 }
1454   }
1457 #if CMK_BIGSIM_CHARM
1458 #   define  _skipCldEnqueue   _CldEnqueue
1459 #endif
1461 // by pass Charm++ priority queue, send as Converse message
1462 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1464 #if CMK_CHARMDEBUG
1465   if (!ConverseDeliver(-1)) {
1466     CmiFree(env);
1467     return;
1468   }
1469 #endif
1470   CkPackMessage(&env);
1471   int len=env->getTotalsize();
1472   CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1475 static void _noCldEnqueue(int pe, envelope *env)
1478   if (pe == CkMyPe()) {
1479     CmiHandleMessage(env);
1480   } else
1482 #if CMK_CHARMDEBUG
1483   if (!ConverseDeliver(pe)) {
1484     CmiFree(env);
1485     return;
1486   }
1487 #endif
1488   CkPackMessage(&env);
1489   int len=env->getTotalsize();
1490   if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1491   else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1492   else CmiSyncSendAndFree(pe, len, (char *)env);
1495 //static void _noCldNodeEnqueue(int node, envelope *env)
1496 //Made non-static to be used by ckmessagelogging
1497 void _noCldNodeEnqueue(int node, envelope *env)
1500   if (node == CkMyNode()) {
1501     CmiHandleMessage(env);
1502   } else {
1504 #if CMK_CHARMDEBUG
1505   if (!ConverseDeliver(node)) {
1506     CmiFree(env);
1507     return;
1508   }
1509 #endif
1510   CkPackMessage(&env);
1511   int len=env->getTotalsize();
1512   if (node==CLD_BROADCAST) { 
1513 #if CMK_MESSAGE_LOGGING
1514         if(env->flags & CK_FREE_MSG_MLOG)
1515                 CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1516         else
1517                 CmiSyncNodeBroadcast(len, (char *)env);
1518 #else
1519         CmiSyncNodeBroadcastAndFree(len, (char *)env); 
1520 #endif
1522   else if (node==CLD_BROADCAST_ALL) { 
1523 #if CMK_MESSAGE_LOGGING
1524         if(env->flags & CK_FREE_MSG_MLOG)
1525                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1526         else
1527                 CmiSyncNodeBroadcastAll(len, (char *)env);
1528 #else
1529                 CmiSyncNodeBroadcastAllAndFree(len, (char *)env); 
1530 #endif
1533   else {
1534 #if CMK_MESSAGE_LOGGING
1535         if(env->flags & CK_FREE_MSG_MLOG)
1536                 CmiSyncNodeSendAndFree(node, len, (char *)env);
1537         else
1538                 CmiSyncNodeSend(node, len, (char *)env);
1539 #else
1540         CmiSyncNodeSendAndFree(node, len, (char *)env);
1541 #endif
1542   }
1545 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1547   register envelope *env = UsrToEnv(msg);
1548   _CHECK_USED(env);
1549   _SET_USED(env, 1);
1550 #if CMK_REPLAYSYSTEM
1551   setEventID(env);
1552 #endif
1553   env->setMsgtype(ForChareMsg);
1554   env->setEpIdx(eIdx);
1555   env->setSrcPe(CkMyPe());
1557 #if USE_CRITICAL_PATH_HEADER_ARRAY
1558   CK_CRITICALPATH_SEND(env)
1559   //CK_AUTOMATE_PRIORITY(env)
1560 #endif
1561 #if CMK_CHARMDEBUG
1562   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1563 #endif
1564 #if CMK_OBJECT_QUEUE_AVAILABLE
1565   CmiSetHandler(env, index_objectQHandler);
1566 #else
1567   CmiSetHandler(env, _charmHandlerIdx);
1568 #endif
1569   if (pCid->onPE < 0) { //Virtual chare ID (VID)
1570     register int pe = -(pCid->onPE+1);
1571     if(pe==CkMyPe()) {
1572 #ifndef CMK_CHARE_USE_PTR
1573       VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1574 #else
1575       VidBlock *vblk = (VidBlock *) pCid->objPtr;
1576 #endif
1577       void *objPtr;
1578       if (NULL!=(objPtr=vblk->getLocalChare()))
1579       { //A ready local chare
1580         env->setObjPtr(objPtr);
1581         return pe;
1582       }
1583       else { //The vidblock is not ready-- forget it
1584         vblk->send(env);
1585         return -1;
1586       }
1587     } else { //Valid vidblock for another PE:
1588       env->setMsgtype(ForVidMsg);
1589       env->setVidPtr(pCid->objPtr);
1590       return pe;
1591     }
1592   }
1593   else {
1594     env->setObjPtr(pCid->objPtr);
1595     return pCid->onPE;
1596   }
1599 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1601   int destPE = _prepareMsg(eIdx, msg, pCid);
1602   if (destPE != -1) {
1603     register envelope *env = UsrToEnv(msg);
1604     //criticalPath_send(env);
1605 #if USE_CRITICAL_PATH_HEADER_ARRAY
1606     CK_CRITICALPATH_SEND(env)
1607     //CK_AUTOMATE_PRIORITY(env)
1608 #endif
1609     CmiBecomeImmediate(env);
1610   }
1611   return destPE;
1614 extern "C"
1615 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1617   if (opts & CK_MSG_INLINE) {
1618     CkSendMsgInline(entryIdx, msg, pCid, opts);
1619     return;
1620   }
1621 #if CMK_ERROR_CHECKING
1622   if (opts & CK_MSG_IMMEDIATE) {
1623     CmiAbort("Immediate message is not allowed in Chare!");
1624   }
1625 #endif
1626   register envelope *env = UsrToEnv(msg);
1627   int destPE=_prepareMsg(entryIdx,msg,pCid);
1628   // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1629   // VidBlock was not yet filled). The problem is that the creation was never
1630   // traced later when the VidBlock was filled. One solution is to trace the
1631   // creation here, the other to trace it in VidBlock->msgDeliver().
1632 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1633   if (destPE!=-1) {
1634     CpvAccess(_qd)->create();
1635   }
1636         sendChareMsg(env,destPE,_infoIdx,pCid);
1637 #else
1638   _TRACE_CREATION_1(env);
1639   if (destPE!=-1) {
1640     CpvAccess(_qd)->create();
1641     if (opts & CK_MSG_SKIP_OR_IMM)
1642       _noCldEnqueue(destPE, env);
1643     else
1644       _CldEnqueue(destPE, env, _infoIdx);
1645   }
1646   _TRACE_CREATION_DONE(1);
1647 #endif
1650 extern "C"
1651 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1653   if (pCid->onPE==CkMyPe())
1654   {
1655     if(!CmiNodeAlive(CkMyPe())){
1656         return;
1657     }
1658 #if CMK_CHARMDEBUG
1659     //Just in case we need to breakpoint or use the envelope in some way
1660     _prepareMsg(entryIndex,msg,pCid);
1661 #endif
1662                 //Just directly call the chare (skip QD handling & scheduler)
1663     register envelope *env = UsrToEnv(msg);
1664     if (env->isPacked()) CkUnpackMessage(&env);
1665     _STATS_RECORD_PROCESS_MSG_1();
1666     _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1667   }
1668   else {
1669     //No way to inline a cross-processor message:
1670     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1671   }
1674 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1676   register envelope *env = UsrToEnv(msg);
1677   /*#if CMK_ERROR_CHECKING
1678   CkNodeGroupID nodeRedMgr;
1679 #endif
1680   */
1681   _CHECK_USED(env);
1682   _SET_USED(env, 1);
1683 #if CMK_REPLAYSYSTEM
1684   setEventID(env);
1685 #endif
1686   env->setMsgtype(type);
1687   env->setEpIdx(eIdx);
1688   env->setGroupNum(gID);
1689   env->setSrcPe(CkMyPe());
1690   /*
1691 #if CMK_ERROR_CHECKING
1692   nodeRedMgr.setZero();
1693   env->setRednMgr(nodeRedMgr);
1694 #endif
1696   //criticalPath_send(env);
1697 #if USE_CRITICAL_PATH_HEADER_ARRAY
1698   CK_CRITICALPATH_SEND(env)
1699   //CK_AUTOMATE_PRIORITY(env)
1700 #endif
1701 #if CMK_CHARMDEBUG
1702   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1703 #endif
1704   CmiSetHandler(env, _charmHandlerIdx);
1705   return env;
1708 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1710   envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1711 #if USE_CRITICAL_PATH_HEADER_ARRAY
1712   CK_CRITICALPATH_SEND(env)
1713   //CK_AUTOMATE_PRIORITY(env)
1714 #endif
1715   CmiBecomeImmediate(env);
1716   return env;
1719 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1720                   int pe=CLD_BROADCAST_ALL, int opts = 0)
1722   int numPes;
1723   register envelope *env;
1724     if (opts & CK_MSG_IMMEDIATE) {
1725         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1726     }else
1727     {
1728         env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1729     }
1731 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1732         sendGroupMsg(env,pe,_infoIdx);
1733 #else
1734   _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1735   _TRACE_CREATION_N(env, numPes);
1736   if (opts & CK_MSG_SKIP_OR_IMM)
1737     _noCldEnqueue(pe, env);
1738   else
1739     _skipCldEnqueue(pe, env, _infoIdx);
1740   _TRACE_CREATION_DONE(1);
1741 #endif
1744 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1745                            int npes, int *pes)
1747   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1748   _TRACE_CREATION_MULTICAST(env, npes, pes);
1749   _CldEnqueueMulti(npes, pes, env, _infoIdx);
1750   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1753 extern "C"
1754 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1756 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1757   if (destPE==CkMyPe())
1758   {
1759     CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1760     return;
1761   }
1762   //Can't inline-- send the usual way
1763   register envelope *env = UsrToEnv(msg);
1764   int numPes;
1765   _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1766   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1767   _TRACE_CREATION_N(env, numPes);
1768   _noCldEnqueue(destPE, env);
1769   _STATS_RECORD_SEND_BRANCH_1();
1770   CkpvAccess(_coreState)->create();
1771   _TRACE_CREATION_DONE(1);
1772 #else
1773   // no support for immediate message, send inline
1774   CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1775 #endif
1778 extern "C"
1779 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1781   if (destPE==CkMyPe())
1782   {
1783     if(!CmiNodeAlive(CkMyPe())){
1784         return;
1785     }
1786     IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1787     if (obj!=NULL)
1788     { //Just directly call the group:
1789 #if CMK_ERROR_CHECKING
1790       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1791 #else
1792       envelope *env=UsrToEnv(msg);
1793 #endif
1794       _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1795       return;
1796     }
1797   }
1798   //Can't inline-- send the usual way, clear CK_MSG_INLINE
1799   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1802 extern "C"
1803 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1805   if (opts & CK_MSG_INLINE) {
1806     CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1807     return;
1808   }
1809   if (opts & CK_MSG_IMMEDIATE) {
1810     CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1811     return;
1812   }
1813   _sendMsgBranch(eIdx, msg, gID, pe, opts);
1814   _STATS_RECORD_SEND_BRANCH_1();
1815   CkpvAccess(_coreState)->create();
1818 extern "C"
1819 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1821 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1822   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1823   _TRACE_CREATION_MULTICAST(env, npes, pes);
1824   _noCldEnqueueMulti(npes, pes, env);
1825   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1826 #else
1827   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1828   CpvAccess(_qd)->create(-npes);
1829 #endif
1830   _STATS_RECORD_SEND_BRANCH_N(npes);
1831   CpvAccess(_qd)->create(npes);
1834 extern "C"
1835 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1837   if (opts & CK_MSG_IMMEDIATE) {
1838     CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1839     return;
1840   }
1841     // normal mesg
1842   _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1843   _STATS_RECORD_SEND_BRANCH_N(npes);
1844   CpvAccess(_qd)->create(npes);
1847 extern "C"
1848 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1850   int npes;
1851   int *pes;
1852   if (opts & CK_MSG_IMMEDIATE) {
1853     CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1854     return;
1855   }
1856     // normal mesg
1857   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1858   CmiLookupGroup(grp, &npes, &pes);
1859   _TRACE_CREATION_MULTICAST(env, npes, pes);
1860   _CldEnqueueGroup(grp, env, _infoIdx);
1861   _TRACE_CREATION_DONE(1);      // since it only creates one creation event.
1862   _STATS_RECORD_SEND_BRANCH_N(npes);
1863   CpvAccess(_qd)->create(npes);
1866 extern "C"
1867 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1869   _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1870   _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1871   CpvAccess(_qd)->create(CkNumPes());
1874 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1875                 int node=CLD_BROADCAST_ALL, int opts=0)
1877     int numPes;
1878     register envelope *env;
1879     if (opts & CK_MSG_IMMEDIATE) {
1880         env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1881     }else
1882     {
1883         env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1884     }
1885 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1886     sendNodeGroupMsg(env,node,_infoIdx);
1887 #else
1888   numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1889   _TRACE_CREATION_N(env, numPes);
1890   if (opts & CK_MSG_SKIP_OR_IMM) {
1891     _noCldNodeEnqueue(node, env);
1892   }
1893   else
1894     _CldNodeEnqueue(node, env, _infoIdx);
1895   _TRACE_CREATION_DONE(1);
1896 #endif
1899 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1900                            int npes, int *nodes)
1902   register envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1903   _TRACE_CREATION_N(env, npes);
1904   for (int i=0; i<npes; i++) {
1905     _CldNodeEnqueue(nodes[i], env, _infoIdx);
1906   }
1907   _TRACE_CREATION_DONE(1);  // since it only creates one creation event.
1910 extern "C"
1911 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1913 #if CMK_IMMEDIATE_MSG
1914   if (node==CkMyNode())
1915   {
1916     CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1917     return;
1918   }
1919   //Can't inline-- send the usual way
1920   register envelope *env = UsrToEnv(msg);
1921   int numPes;
1922   _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1923   env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1924   _TRACE_CREATION_N(env, numPes);
1925   _noCldNodeEnqueue(node, env);
1926   _STATS_RECORD_SEND_BRANCH_1();
1927   CkpvAccess(_coreState)->create();
1928   _TRACE_CREATION_DONE(1);
1929 #else
1930   // no support for immediate message, send inline
1931   CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1932 #endif
1935 extern "C"
1936 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1938   if (node==CkMyNode())
1939   {
1940     CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1941     void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1942     CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1943     if (obj!=NULL)
1944     { //Just directly call the group:
1945 #if CMK_ERROR_CHECKING
1946       envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1947 #else
1948       envelope *env=UsrToEnv(msg);
1949 #endif
1950       _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1951       return;
1952     }
1953   }
1954   //Can't inline-- send the usual way
1955   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
1958 extern "C"
1959 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1961   if (opts & CK_MSG_INLINE) {
1962     CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
1963     return;
1964   }
1965   if (opts & CK_MSG_IMMEDIATE) {
1966     CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
1967     return;
1968   }
1969   _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
1970   _STATS_RECORD_SEND_NODE_BRANCH_1();
1971   CkpvAccess(_coreState)->create();
1974 extern "C"
1975 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
1977 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1978   register envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1979   _noCldEnqueueMulti(npes, nodes, env);
1980 #else
1981   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1982   CpvAccess(_qd)->create(-npes);
1983 #endif
1984   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1985   CpvAccess(_qd)->create(npes);
1988 extern "C"
1989 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
1991   if (opts & CK_MSG_IMMEDIATE) {
1992     CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
1993     return;
1994   }
1995     // normal mesg
1996   _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
1997   _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
1998   CpvAccess(_qd)->create(npes);
2001 extern "C"
2002 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
2004   _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
2005   _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
2006   CpvAccess(_qd)->create(CkNumNodes());
2009 //Needed by delegation manager:
2010 extern "C"
2011 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
2012 { return _prepareMsg(eIdx,msg,pCid); }
2013 extern "C"
2014 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2015 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
2016 extern "C"
2017 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2018 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
2020 void _ckModuleInit(void) {
2021         index_skipCldHandler = CkRegisterHandler(_skipCldHandler);
2022 #if CMK_OBJECT_QUEUE_AVAILABLE
2023         index_objectQHandler = CkRegisterHandler(_ObjectQHandler);
2024 #endif
2025         index_tokenHandler = CkRegisterHandler(_TokenHandler);
2026         CkpvInitialize(TokenPool*, _tokenPool);
2027         CkpvAccess(_tokenPool) = new TokenPool;
2031 /************** Send: Arrays *************/
2033 static void _prepareOutgoingArrayMsg(envelope *env,int type)
2035   _CHECK_USED(env);
2036   _SET_USED(env, 1);
2037   env->setMsgtype(type);
2038 #if CMK_CHARMDEBUG
2039   setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
2040 #endif
2041   CmiSetHandler(env, _charmHandlerIdx);
2042   CpvAccess(_qd)->create();
2045 extern "C"
2046 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
2047   register envelope *env = UsrToEnv(msg);
2048   _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
2049 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2050         sendArrayMsg(env,pe,_infoIdx);
2051 #else
2052   if (opts & CK_MSG_IMMEDIATE)
2053     CmiBecomeImmediate(env);
2054   if (opts & CK_MSG_SKIP_OR_IMM)
2055     _noCldEnqueue(pe, env);
2056   else
2057     _skipCldEnqueue(pe, env, _infoIdx);
2058 #endif
2061 class ElementDestroyer : public CkLocIterator {
2062 private:
2063         CkLocMgr *locMgr;
2064 public:
2065         ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2066         void addLocation(CkLocation &loc) {
2067           loc.destroyAll();
2068         }
2071 void CkDeleteChares() {
2072   int i;
2073   int numGroups = CkpvAccess(_groupIDTable)->size();
2075   // delete all plain chares
2076 #ifndef CMK_CHARE_USE_PTR
2077   for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2078         Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2079         delete obj;
2080         CkpvAccess(chare_objs)[i] = NULL;
2081   }
2082   for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2083         VidBlock *obj = CkpvAccess(vidblocks)[i];
2084         delete obj;
2085         CkpvAccess(vidblocks)[i] = NULL;
2086   }
2087 #endif
2089   // delete all array elements
2090   for(i=0;i<numGroups;i++) {
2091     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2092     if(obj && obj->isLocMgr())  {
2093       CkLocMgr *mgr = (CkLocMgr*)obj;
2094       ElementDestroyer destroyer(mgr);
2095       mgr->iterate(destroyer);
2096     }
2097   }
2099   // delete all groups
2100   CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2101   for(i=0;i<numGroups;i++) {
2102     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2103     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2104     if (obj) delete obj;
2105   }
2106   CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2108   // delete all node groups
2109   if (CkMyRank() == 0) {
2110     int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2111     for(i=0;i<numNodeGroups;i++) {
2112       CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2113       IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2114       if (obj) delete obj;
2115     }
2116   }
2119 #if CMK_BIGSIM_CHARM
2120 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2121                                    int pb,unsigned int *prio);
2122 #endif
2124 //------------------- Message Watcher (record/replay) ----------------
2126 #include "crc32.h"
2128 CkpvDeclare(int, envelopeEventID);
2129 int _recplay_crc = 0;
2130 int _recplay_checksum = 0;
2131 int _recplay_logsize = 1024*1024;
2133 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2134 #define REPLAYDEBUG(args) /* empty */
2136 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2138 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2139 #include "BaseLB.h" /* For LBMigrateMsg message */
2141 #if CMK_REPLAYSYSTEM
2142 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2144     char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
2145     strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
2146     sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
2147     FILE *f=fopen(fName,permissions);
2148     REPLAYDEBUG("openReplayfile "<<fName);
2149     if (f==NULL) {
2150         CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2151             CkMyPe(),fName,permissions);
2152         CkAbort("openReplayFile> Could not open replay file");
2153     }
2154     return f;
2157 class CkMessageRecorder : public CkMessageWatcher {
2158   char *buffer;
2159   unsigned int curpos;
2160   bool firstOpen;
2161 public:
2162   CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true) { f=f_; buffer=new char[_recplay_logsize]; }
2163   ~CkMessageRecorder() {
2164     flushLog(0);
2165     fprintf(f,"-1 -1 -1 ");
2166     fclose(f);
2167     delete[] buffer;
2168 #if 0
2169     FILE *stsfp = fopen("sts", "w");
2170     void traceWriteSTS(FILE *stsfp,int nUserEvents);
2171     traceWriteSTS(stsfp, 0);
2172     fclose(stsfp);
2173 #endif
2174     CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2175   }
2177 private:
2178   void flushLog(int verbose=1) {
2179     if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2180     fprintf(f, "%s", buffer);
2181     curpos=0;
2182   }
2183   virtual bool process(envelope **envptr,CkCoreState *ck) {
2184     if ((*envptr)->getEvent()) {
2185       bool wasPacked = (*envptr)->isPacked();
2186       if (!wasPacked) CkPackMessage(envptr);
2187       envelope *env = *envptr;
2188       unsigned int crc1=0, crc2=0;
2189       if (_recplay_crc) {
2190         //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2191         crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2192         crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2193       } else if (_recplay_checksum) {
2194         crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2195         crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2196       }
2197       curpos+=sprintf(&buffer[curpos],"%d %d %d %hhd %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2198       if (curpos > _recplay_logsize-128) flushLog();
2199       if (!wasPacked) CkUnpackMessage(envptr);
2200     }
2201     return true;
2202   }
2203   virtual bool process(CthThreadToken *token,CkCoreState *ck) {
2204     curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2205     if (curpos > _recplay_logsize-128) flushLog();
2206     return true;
2207   }
2208   
2209   virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2210     FILE *f;
2211     if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2212     else f = openReplayFile("ckreplay_",".lb","a");
2213     firstOpen = false;
2214     if (f != NULL) {
2215       PUP::toDisk p(f);
2216       p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2217       (*msg)->pup(p);
2218       fclose(f);
2219     }
2220     return true;
2221   }
2224 class CkMessageDetailRecorder : public CkMessageWatcher {
2225 public:
2226   CkMessageDetailRecorder(FILE *f_) {
2227     f=f_;
2228     /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2229      * The value of 'x' is the pointer size.
2230      */
2231     CmiUInt2 little = sizeof(void*);
2232     fwrite(&little, 2, 1, f);
2233   }
2234   ~CkMessageDetailRecorder() {fclose(f);}
2235 private:
2236   virtual bool process(envelope **envptr, CkCoreState *ck) {
2237     bool wasPacked = (*envptr)->isPacked();
2238     if (!wasPacked) CkPackMessage(envptr);
2239     envelope *env = *envptr;
2240     CmiUInt4 size = env->getTotalsize();
2241     fwrite(&size, 4, 1, f);
2242     fwrite(env, env->getTotalsize(), 1, f);
2243     if (!wasPacked) CkUnpackMessage(envptr);
2244     return true;
2245   }
2248 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2249 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2251 class CkMessageReplay : public CkMessageWatcher {
2252   int counter;
2253         int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2254         int nextEP;
2255         unsigned int crc1, crc2;
2256         FILE *lbFile;
2257         /// Read the next message we need from the file:
2258         void getNext(void) {
2259           if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2260           if (nextSize > 0) {
2261             // We are reading a regular message
2262             if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2263               CkAbort("CkMessageReplay> Syntax error reading replay file");
2264             }
2265             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2266           } else if (nextSize == -2) {
2267             // We are reading a special message (right now only thread awaken)
2268             // Nothing to do since we have already read all info
2269             REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2270           } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2271             CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2272             CkAbort("CkMessageReplay> Unrecognized input");
2273           }
2274             /*
2275                 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2276                         CkAbort("CkMessageReplay> Syntax error reading replay file");
2277                         nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2278                 }
2279                 */
2280                 counter++;
2281         }
2282         /// If this is the next message we need, advance and return true.
2283         bool isNext(envelope *env) {
2284                 if (nextPE!=env->getSrcPe()) return false;
2285                 if (nextEvent!=env->getEvent()) return false;
2286                 if (nextSize<0) return false; // not waiting for a regular message
2287 #if 1
2288                 if (nextEP != env->getEpIdx()) {
2289                         CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2290                         return false;
2291                 }
2292 #endif
2293 #if ! CMK_BIGSIM_CHARM
2294                 if (nextSize!=env->getTotalsize())
2295                 {
2296                         CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2297                         return false;
2298                 }
2299                 if (_recplay_crc || _recplay_checksum) {
2300                   bool wasPacked = env->isPacked();
2301                   if (!wasPacked) CkPackMessage(&env);
2302                   if (_recplay_crc) {
2303                     //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2304                     unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2305                     unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2306                     if (crcnew1 != crc1) {
2307                       CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2308                     }
2309                     if (crcnew2 != crc2) {
2310                       CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2311                     }
2312                   } else if (_recplay_checksum) {
2313             unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2314             unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2315             if (crcnew1 != crc1) {
2316               CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2317             }
2318             if (crcnew2 != crc2) {
2319               CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2320             }               
2321                   }
2322                   if (!wasPacked) CkUnpackMessage(&env);
2323                 }
2324 #endif
2325                 return true;
2326         }
2327         bool isNext(CthThreadToken *token) {
2328           if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return true;
2329           return false;
2330         }
2332         /// This is a (short) list of messages we aren't yet ready for:
2333         CkQ<envelope *> delayedMessages;
2334         /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2335         CkQ<CthThreadToken *> delayedTokens;
2337         /// Try to flush out any delayed messages
2338         void flush(void) {
2339           if (nextSize>0) {
2340                 int len=delayedMessages.length();
2341                 for (int i=0;i<len;i++) {
2342                         envelope *env=delayedMessages.deq();
2343                         if (isNext(env)) { /* this is the next message: process it */
2344                                 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2345                                 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2346                                 return;
2347                         }
2348                         else /* Not ready yet-- put it back in the
2349                                 queue */
2350                           {
2351                                 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2352                                 delayedMessages.enq(env);
2353                           }
2354                 }
2355           } else if (nextSize==-2) {
2356             int len=delayedTokens.length();
2357             for (int i=0;i<len;++i) {
2358               CthThreadToken *token=delayedTokens.deq();
2359               if (isNext(token)) {
2360             REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2361 #if ! CMK_BIGSIM_CHARM
2362                 CsdEnqueueLifo((void*)token);
2363 #else
2364                 CthEnqueueBigSimThread(token,0,0,NULL);
2365 #endif
2366                 return;
2367               } else {
2368             REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2369                 delayedTokens.enq(token);
2370               }
2371             }
2372           }
2373         }
2375 public:
2376         CkMessageReplay(FILE *f_) : lbFile(NULL) {
2377           counter=0;
2378           f=f_;
2379           getNext();
2380           REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2381           if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2382         }
2383         ~CkMessageReplay() {fclose(f);}
2385 private:
2386         virtual bool process(envelope **envptr,CkCoreState *ck) {
2387           bool wasPacked = (*envptr)->isPacked();
2388           if (!wasPacked) CkPackMessage(envptr);
2389           envelope *env = *envptr;
2390           //CkAssert(*(int*)env == 0x34567890);
2391           REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2392                 if (env->getEvent() == 0) return true;
2393                 if (isNext(env)) { /* This is the message we were expecting */
2394                         REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2395                         getNext(); /* Advance over this message */
2396                         flush(); /* try to process queued-up stuff */
2397                         if (!wasPacked) CkUnpackMessage(envptr);
2398                         return true;
2399                 }
2400 #if CMK_SMP
2401                 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2402                          // try next rank, we can't just buffer the msg and left
2403                          // we need to keep unprocessed msg on the fly
2404                         int nextpe = CkMyPe()+1;
2405                         if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2406                         nextpe = CkNodeFirst(CkMyNode());
2407                         CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2408                         return false;
2409                 }
2410 #endif
2411                 else /*!isNext(env) */ {
2412                         REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2413                                 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2414                         delayedMessages.enq(env);
2415                         flush();
2416                         return false;
2417                 }
2418         }
2419         virtual bool process(CthThreadToken *token, CkCoreState *ck) {
2420       REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2421           if (isNext(token)) {
2422         REPLAYDEBUG("Executing token: "<<token->serialNo)
2423             getNext();
2424             flush();
2425             return true;
2426           } else {
2427         REPLAYDEBUG("Queueing token: "<<token->serialNo
2428             <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2429             delayedTokens.enq(token);
2430             return false;
2431           }
2432         }
2434         virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2435           if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2436           if (lbFile != NULL) {
2437             int num_moves = 0;
2438         PUP::fromDisk p(lbFile);
2439             p | num_moves;
2440             if (num_moves != (*msg)->n_moves) {
2441               delete *msg;
2442               *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2443             }
2444             (*msg)->pup(p);
2445           }
2446           return true;
2447         }
2450 class CkMessageDetailReplay : public CkMessageWatcher {
2451   void *getNext() {
2452     CmiUInt4 size; size_t nread;
2453     if ((nread=fread(&size, 4, 1, f)) < 1) {
2454       if (feof(f)) return NULL;
2455       CkPrintf("Broken record file (metadata) got %d\n",nread);
2456       CkAbort("");
2457     }
2458     void *env = CmiAlloc(size);
2459     long tell = ftell(f);
2460     if ((nread=fread(env, size, 1, f)) < 1) {
2461       CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2462       CkAbort("");
2463     }
2464     //*(int*)env = 0x34567890; // set first integer as magic
2465     return env;
2466   }
2467 public:
2468   double starttime;
2469   CkMessageDetailReplay(FILE *f_) {
2470     f=f_;
2471     starttime=CkWallTimer();
2472     /* This must match what CkMessageDetailRecorder did */
2473     CmiUInt2 little;
2474     fread(&little, 2, 1, f);
2475     if (little != sizeof(void*)) {
2476       CkAbort("Replaying on a different architecture from which recording was done!");
2477     }
2479     CsdEnqueue(getNext());
2481     CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2482   }
2483   virtual bool process(envelope **env,CkCoreState *ck) {
2484     void *msg = getNext();
2485     if (msg != NULL) CsdEnqueue(msg);
2486     return true;
2487   }
2490 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2491 #if ! CMK_BIGSIM_CHARM
2492   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2493 #endif
2494   CkMessageReplay *replay = (CkMessageReplay*)rep;
2495   //CmiStartQD(CkMessageReplayQuiescence, replay);
2498 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2499   CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2500   CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2501   ConverseExit();
2503 #endif
2505 static bool CpdExecuteThreadResume(CthThreadToken *token) {
2506   CkCoreState *ck = CkpvAccess(_coreState);
2507   if (ck->watcher!=NULL) {
2508     return ck->watcher->processThread(token,ck);
2509   }
2510   return true;
2513 CpvCExtern(int, CthResumeNormalThreadIdx);
2514 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2516   CthThread t = token->thread;
2518   if(t == NULL){
2519     free(token);
2520     return;
2521   }
2522 #if CMK_TRACE_ENABLED
2523 #if ! CMK_TRACE_IN_CHARM
2524   if(CpvAccess(traceOn))
2525     CthTraceResume(t);
2526 /*    if(CpvAccess(_traceCoreOn)) 
2527             resumeTraceCore();*/
2528 #endif
2529 #endif
2530   
2531   /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2532   if (CpdExecuteThreadResume(token)) {
2533     CthResume(t);
2534   }
2537 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2538   CkCoreState *ck = CkpvAccess(_coreState);
2539   if (ck->watcher!=NULL) {
2540     ck->watcher->processLBMessage(msg, ck);
2541   }
2544 #if CMK_BIGSIM_CHARM
2545 CpvExtern(int      , CthResumeBigSimThreadIdx);
2546 #endif
2548 #include "ckliststring.h"
2549 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2550     CmiArgGroup("Charm++","Record/Replay");
2551     bool forceReplay = false;
2552     char *procs = NULL;
2553     _replaySystem = 0;
2554     if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2555       if(CmiMyRank() == 0) _recplay_crc = 1;
2556     }
2557     if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
2558       if(CmiMyRank() == 0) _recplay_checksum = 1;
2559     }
2560     int tmplogsize;
2561     if(CmiGetArgIntDesc(argv,"+recplay-logsize",&tmplogsize,"Specify the size of the buffer used by the message recorder"))
2562       {
2563         if(CmiMyRank() == 0) _recplay_logsize = tmplogsize;
2564       }
2565     REPLAYDEBUG("CkMessageWatcherInit ");
2566     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
2567 #if CMK_REPLAYSYSTEM
2568         CkListString list(procs);
2569         if (list.includes(CkMyPe())) {
2570           CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
2571           CpdSetInitializeMemory(1);
2572           ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
2573         }
2574 #else
2575         CkAbort("Option `+record-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
2576 #endif
2577     }
2578     if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
2579 #if CMK_REPLAYSYSTEM
2580       if (CkMyPe() == 0) {
2581         CmiPrintf("Charm++> record mode.\n");
2582         if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2583           CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
2584           _recplay_crc = _recplay_checksum = 0;
2585         }
2586       }
2587       CpdSetInitializeMemory(1);
2588       CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2589       ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
2590 #else
2591       CkAbort("Option `+record' requires that record-replay support be enabled at configure time (--enable-replay)");
2592 #endif
2593     }
2594         if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
2595 #if CMK_REPLAYSYSTEM
2596             forceReplay = true;
2597             CpdSetInitializeMemory(1);
2598             // Set the parameters of the processor
2599 #if CMK_SHARED_VARS_UNAVAILABLE
2600             _Cmi_mype = atoi(procs);
2601             while (procs[0]!='/') procs++;
2602             procs++;
2603             _Cmi_numpes = atoi(procs);
2604 #else
2605             CkAbort("+replay-detail available only for non-SMP build");
2606 #endif
2607             _replaySystem = 1;
2608             ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
2609 #else
2610           CkAbort("Option `+replay-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
2611 #endif
2612         }
2613         if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
2614 #if CMK_REPLAYSYSTEM
2615           if (CkMyPe() == 0)  {
2616             CmiPrintf("Charm++> replay mode.\n");
2617             if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
2618               CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
2619               _recplay_crc = _recplay_checksum = 0;
2620             }
2621           }
2622           CpdSetInitializeMemory(1);
2623 #if ! CMK_BIGSIM_CHARM
2624           CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2625 #else
2626           CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
2627 #endif
2628           ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
2629 #else
2630           CkAbort("Option `+replay' requires that record-replay support be enabled at configure time (--enable-replay)");
2631 #endif
2632         }
2633         if (_recplay_crc && _recplay_checksum) {
2634           CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
2635         }
2638 extern "C"
2639 int CkMessageToEpIdx(void *msg) {
2640         envelope *env=UsrToEnv(msg);
2641         int ep=env->getEpIdx();
2642         if (ep==CkIndex_CkArray::recvBroadcast(0))
2643                 return env->getsetArrayBcastEp();
2644         else
2645                 return ep;
2648 extern "C"
2649 int getCharmEnvelopeSize() {
2650   return sizeof(envelope);
2653 /// Best-effort guess at whether @arg msg points at a charm envelope
2654 extern "C"
2655 int isCharmEnvelope(void *msg) {
2656     envelope *e = (envelope *)msg;
2657     if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
2658     if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
2659     if (e->getTotalsize() < sizeof(envelope)) return 0;
2660     if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
2661 #if CMK_SMP
2662     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
2663 #else
2664     if (e->getSrcPe()<0 || e->getSrcPe()>=CkNumPes()) return 0;
2665 #endif
2666     if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
2667     return 1;
2670 #include "CkMarshall.def.h"