4 These routines implement a basic remote-method-invocation system
5 consisting of chares and groups. There is no migration. All
6 the bindings are written to the C language, although most
7 clients, including the rest of Charm++, are actually C++.
13 #include "pathHistory.h"
16 #include "LBDatabase.h"
19 #ifndef CMK_CHARE_USE_PTR
21 CkpvDeclare(std::vector<void *>, chare_objs);
22 CkpvDeclare(std::vector<int>, chare_types);
23 CkpvDeclare(std::vector<VidBlock *>, vidblocks);
25 typedef std::map<int, CkChareID> Vidblockmap;
26 CkpvDeclare(Vidblockmap, vmap); // remote VidBlock to notify upon deletion
27 CkpvDeclare(int, currentChareIdx);
30 // Map of array IDs to array elements for fast message delivery
31 CkpvDeclare(ArrayObjMap, array_objs);
33 #define CK_MSG_SKIP_OR_IMM (CK_MSG_EXPEDITED | CK_MSG_IMMEDIATE)
35 VidBlock::VidBlock() { state = UNFILLED; msgQ = new PtrQ(); _MEMCHECK(msgQ); }
37 int CMessage_CkMessage::__idx=-1;
38 int CMessage_CkArgMsg::__idx=0;
39 int CkIndex_Chare::__idx;
40 int CkIndex_Group::__idx;
41 int CkIndex_ArrayBase::__idx=-1;
43 extern int _defaultObjectQ;
45 void _initChareTables()
47 #ifndef CMK_CHARE_USE_PTR
48 /* chare and vidblock table */
49 CkpvInitialize(std::vector<void *>, chare_objs);
50 CkpvInitialize(std::vector<int>, chare_types);
51 CkpvInitialize(std::vector<VidBlock *>, vidblocks);
52 CkpvInitialize(Vidblockmap, vmap);
53 CkpvInitialize(int, currentChareIdx);
54 CkpvAccess(currentChareIdx) = -1;
57 CkpvInitialize(ArrayObjMap, array_objs);
60 //Charm++ virtual functions: declaring these here results in a smaller executable
62 thishandle.onPE=CkMyPe();
63 thishandle.objPtr=this;
64 #if CMK_ERROR_CHECKING
67 #ifndef CMK_CHARE_USE_PTR
68 // for plain chare, objPtr is actually the index to chare obj table
69 if (CkpvAccess(currentChareIdx) >= 0) {
70 thishandle.objPtr=(void*)(CmiIntPtr)CkpvAccess(currentChareIdx);
72 chareIdx = CkpvAccess(currentChareIdx);
74 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
75 mlogData = new ChareMlogData();
76 mlogData->objID.type = TypeChare;
77 mlogData->objID.data.chare.id = thishandle;
79 #if CMK_OBJECT_QUEUE_AVAILABLE
80 if (_defaultObjectQ) CkEnableObjQ();
84 Chare::Chare(CkMigrateMessage* m) {
85 thishandle.onPE=CkMyPe();
86 thishandle.objPtr=this;
87 #if CMK_ERROR_CHECKING
91 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
95 #if CMK_OBJECT_QUEUE_AVAILABLE
96 if (_defaultObjectQ) CkEnableObjQ();
100 void Chare::CkEnableObjQ()
102 #if CMK_OBJECT_QUEUE_AVAILABLE
108 #ifndef CMK_CHARE_USE_PTR
110 if (chareIdx >= 0 && chareIdx < CpvAccess(chare_objs).size() && CpvAccess(chare_objs)[chareIdx] == this)
114 CmiAssert(CkpvAccess(chare_objs)[chareIdx] == this);
115 CkpvAccess(chare_objs)[chareIdx] = NULL;
116 Vidblockmap::iterator iter = CkpvAccess(vmap).find(chareIdx);
117 if (iter != CkpvAccess(vmap).end()) {
118 CkChareID *pCid = (CkChareID *)
119 _allocMsg(DeleteVidMsg, sizeof(CkChareID));
120 int srcPe = iter->second.onPE;
121 *pCid = iter->second;
122 envelope *ret = UsrToEnv(pCid);
123 ret->setVidPtr(iter->second.objPtr);
124 ret->setSrcPe(CkMyPe());
125 CmiSetHandler(ret, _charmHandlerIdx);
126 CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
127 CpvAccess(_qd)->create();
128 CkpvAccess(vmap).erase(iter);
134 void Chare::pup(PUP::er &p)
137 thishandle.objPtr=(void *)this;
138 #ifndef CMK_CHARE_USE_PTR
140 if (chareIdx != -1) thishandle.objPtr=(void*)(CmiIntPtr)chareIdx;
142 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
144 if(mlogData == NULL || !mlogData->teamRecoveryFlag)
145 mlogData = new ChareMlogData();
149 #if CMK_ERROR_CHECKING
154 int Chare::ckGetChareType() const {
157 char *Chare::ckDebugChareName(void) {
159 sprintf(buf,"Chare on pe %d at %p",CkMyPe(),(void*)this);
162 int Chare::ckDebugChareID(char *str, int limit) {
163 // pure chares for now do not have a valid ID
167 void Chare::ckDebugPup(PUP::er &p) {
171 /// This method is called before starting a [threaded] entry method.
172 void Chare::CkAddThreadListeners(CthThread th, void *msg) {
173 CthSetThreadID(th, thishandle.onPE, (int)(((char *)thishandle.objPtr)-(char *)0), 0);
174 traceAddThreadListeners(th, UsrToEnv(msg));
177 void CkMessage::ckDebugPup(PUP::er &p,void *msg) {
179 int ts=UsrToEnv(msg)->getTotalsize();
180 int msgLen=ts-sizeof(envelope);
182 p((char*)msg,msgLen);
185 IrrGroup::IrrGroup(void) {
186 thisgroup = CkpvAccess(_currentGroup);
187 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
188 mlogData->objID.type = TypeGroup;
189 mlogData->objID.data.group.id = thisgroup;
190 mlogData->objID.data.group.onPE = CkMyPe();
194 IrrGroup::~IrrGroup() {
195 // remove the object pointer
196 if (CkpvAccess(_destroyingNodeGroup)) {
197 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
198 CksvAccess(_nodeGroupTable)->find(thisgroup).setObj(NULL);
199 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
200 CkpvAccess(_destroyingNodeGroup) = false;
202 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
203 CkpvAccess(_groupTable)->find(thisgroup).setObj(NULL);
204 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
208 void IrrGroup::pup(PUP::er &p)
214 int IrrGroup::ckGetChareType() const {
215 return CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
218 int IrrGroup::ckDebugChareID(char *str, int limit) {
219 if (limit<5) return -1;
221 *((int*)&str[1]) = thisgroup.idx;
225 char *IrrGroup::ckDebugChareName() {
226 return strdup(_chareTable[ckGetChareType()]->name);
229 void IrrGroup::ckJustMigrated(void)
233 void IrrGroup::CkAddThreadListeners(CthThread tid, void *msg) {
234 /* FIXME: **CW** not entirely sure what we should do here yet */
237 void Group::CkAddThreadListeners(CthThread th, void *msg) {
238 Chare::CkAddThreadListeners(th, msg);
239 CthSetThreadID(th, thisgroup.idx, 0, 0);
242 void Group::pup(PUP::er &p)
244 CkReductionMgr::pup(p);
248 /**** Delegation Manager Group */
249 CkDelegateMgr::~CkDelegateMgr() { }
251 //Default delegator implementation: do not delegate-- send directly
252 void CkDelegateMgr::ChareSend(CkDelegateData *pd,int ep,void *m,const CkChareID *c,int onPE)
253 { CkSendMsg(ep,m,c); }
254 void CkDelegateMgr::GroupSend(CkDelegateData *pd,int ep,void *m,int onPE,CkGroupID g)
255 { CkSendMsgBranch(ep,m,onPE,g); }
256 void CkDelegateMgr::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g)
257 { CkBroadcastMsgBranch(ep,m,g); }
258 void CkDelegateMgr::GroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
259 { CkSendMsgBranchMulti(ep,m,s->_cookie.get_aid(),s->pelist.size(),s->pelist.data()); }
260 void CkDelegateMgr::NodeGroupSend(CkDelegateData *pd,int ep,void *m,int onNode,CkNodeGroupID g)
261 { CkSendMsgNodeBranch(ep,m,onNode,g); }
262 void CkDelegateMgr::NodeGroupBroadcast(CkDelegateData *pd,int ep,void *m,CkNodeGroupID g)
263 { CkBroadcastMsgNodeBranch(ep,m,g); }
264 void CkDelegateMgr::NodeGroupSectionSend(CkDelegateData *pd,int ep,void *m,int nsid,CkSectionID *s)
265 { CkSendMsgNodeBranchMulti(ep,m,s->_cookie.get_aid(),s->pelist.size(),s->pelist.data()); }
266 void CkDelegateMgr::ArrayCreate(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,int onPE,CkArrayID a)
268 CProxyElement_ArrayBase ap(a,idx);
269 ap.ckInsert((CkArrayMessage *)m,ep,onPE);
271 void CkDelegateMgr::ArraySend(CkDelegateData *pd,int ep,void *m,const CkArrayIndex &idx,CkArrayID a)
273 CProxyElement_ArrayBase ap(a,idx);
274 ap.ckSend((CkArrayMessage *)m,ep);
276 void CkDelegateMgr::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a)
278 CProxy_ArrayBase ap(a);
279 ap.ckBroadcast((CkArrayMessage *)m,ep);
282 void CkDelegateMgr::ArraySectionSend(CkDelegateData *pd,int ep,void *m, int nsid,CkSectionID *s, int opts)
284 CmiAbort("ArraySectionSend is not implemented!\n");
286 CProxyElement_ArrayBase ap(a,idx);
287 ap.ckSend((CkArrayMessage *)m,ep);
291 /*** Proxy <-> delegator communication */
292 CkDelegateData::~CkDelegateData() {}
294 CkDelegateData *CkDelegateMgr::DelegatePointerPup(PUP::er &p,CkDelegateData *pd) {
295 return pd; // default implementation ignores pup call
298 /** FIXME: make a "CkReferenceHandle<CkDelegateData>" class to avoid
299 this tricky manual reference counting business... */
301 void CProxy::ckDelegate(CkDelegateMgr *dTo,CkDelegateData *dPtr) {
302 if (dPtr) dPtr->ref();
306 delegatedGroupId = delegatedMgr->CkGetGroupID();
307 isNodeGroup = delegatedMgr->isNodeGroup();
309 void CProxy::ckUndelegate(void) {
311 delegatedGroupId.setZero();
312 if (delegatedPtr) delegatedPtr->unref();
317 CProxy::CProxy(const CProxy &src)
318 :delegatedMgr(src.delegatedMgr), delegatedGroupId(src.delegatedGroupId),
319 isNodeGroup(src.isNodeGroup) {
321 if(delegatedMgr != NULL && src.delegatedPtr != NULL) {
322 delegatedPtr = src.delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
326 /// Assignment operator
327 CProxy& CProxy::operator=(const CProxy &src) {
328 CkDelegateData *oldPtr=delegatedPtr;
330 delegatedMgr=src.delegatedMgr;
331 delegatedGroupId = src.delegatedGroupId;
332 isNodeGroup = src.isNodeGroup;
334 if(delegatedMgr != NULL && src.delegatedPtr != NULL)
335 delegatedPtr = delegatedMgr->ckCopyDelegateData(src.delegatedPtr);
339 // subtle: do unref *after* ref, because it's possible oldPtr == delegatedPtr
340 if (oldPtr) oldPtr->unref();
344 void CProxy::pup(PUP::er &p) {
345 if (!p.isUnpacking()) {
346 if (ckDelegatedTo() != NULL) {
347 delegatedGroupId = delegatedMgr->CkGetGroupID();
348 isNodeGroup = delegatedMgr->isNodeGroup();
352 if (!delegatedGroupId.isZero()) {
354 if (p.isUnpacking()) {
355 delegatedMgr = ckDelegatedTo();
358 int migCtor = 0, cIdx;
359 if (!p.isUnpacking()) {
361 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
362 cIdx = CksvAccess(_nodeGroupTable)->find(delegatedGroupId).getcIdx();
363 migCtor = _chareTable[cIdx]->migCtor;
364 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
367 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
368 cIdx = CkpvAccess(_groupTable)->find(delegatedGroupId).getcIdx();
369 migCtor = _chareTable[cIdx]->migCtor;
370 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
376 // if delegated manager has not been created, construct a dummy
377 // object on which to call DelegatePointerPup
378 if (delegatedMgr == NULL) {
380 // create a dummy object for calling DelegatePointerPup
381 int objId = _entryTable[migCtor]->chareIdx;
382 size_t objSize = _chareTable[objId]->size;
383 void *obj = malloc(objSize);
384 _entryTable[migCtor]->call(NULL, obj);
385 delegatedPtr = static_cast<CkDelegateMgr *> (obj)
386 ->DelegatePointerPup(p, delegatedPtr);
392 // delegated manager has been created, so we can use it
393 delegatedPtr = delegatedMgr->DelegatePointerPup(p,delegatedPtr);
397 if (p.isUnpacking() && delegatedPtr) {
403 /**** Array sections */
404 #define CKSECTIONID_CONSTRUCTOR_DEF(index) \
405 CkSectionID::CkSectionID(const CkArrayID &aid, const CkArrayIndex##index *elems, const int nElems, int factor): bfactor(factor) { \
406 _elems.assign(elems, elems+nElems); \
407 _cookie.get_aid() = aid; \
408 _cookie.get_pe() = CkMyPe(); \
410 CkSectionID::CkSectionID(const CkArrayID &aid, const std::vector<CkArrayIndex##index> &elems, int factor): bfactor(factor) { \
411 _elems.resize(elems.size()); \
412 for (int i=0; i<_elems.size(); ++i) { \
413 _elems[i] = static_cast<CkArrayIndex>(elems[i]); \
415 _cookie.get_aid() = aid; \
416 _cookie.get_pe() = CkMyPe(); \
419 CKSECTIONID_CONSTRUCTOR_DEF(1D)
420 CKSECTIONID_CONSTRUCTOR_DEF(2D)
421 CKSECTIONID_CONSTRUCTOR_DEF(3D)
422 CKSECTIONID_CONSTRUCTOR_DEF(4D)
423 CKSECTIONID_CONSTRUCTOR_DEF(5D)
424 CKSECTIONID_CONSTRUCTOR_DEF(6D)
425 CKSECTIONID_CONSTRUCTOR_DEF(Max)
427 CkSectionID::CkSectionID(const CkGroupID &gid, const int *_pelist, const int _npes, int factor): bfactor(factor) {
428 _cookie.get_aid() = gid;
429 pelist.assign(_pelist, _pelist+_npes);
432 CkSectionID::CkSectionID(const CkGroupID &gid, const std::vector<int>& _pelist, int factor): pelist(_pelist), bfactor(factor) {
433 _cookie.get_aid() = gid;
436 CkSectionID::CkSectionID(const CkSectionID &sid) {
437 _cookie = sid._cookie;
440 bfactor = sid.bfactor;
443 void CkSectionID::operator=(const CkSectionID &sid) {
444 _cookie = sid._cookie;
447 bfactor = sid.bfactor;
450 void CkSectionID::pup(PUP::er &p) {
457 /**** Tiny random API routines */
460 void CUDACallbackManager(void *fn) {
462 CkCallback *cb = (CkCallback*) fn;
469 void QdCreate(int n) {
470 CpvAccess(_qd)->create(n);
473 void QdProcess(int n) {
474 CpvAccess(_qd)->process(n);
478 void CkSetRefNum(void *msg, CMK_REFNUM_TYPE ref)
480 UsrToEnv(msg)->setRef(ref);
484 CMK_REFNUM_TYPE CkGetRefNum(void *msg)
486 return UsrToEnv(msg)->getRef();
490 int CkGetSrcPe(void *msg)
492 return UsrToEnv(msg)->getSrcPe();
496 int CkGetSrcNode(void *msg)
498 return CmiNodeOf(CkGetSrcPe(msg));
502 void *CkLocalBranch(CkGroupID gID) {
503 return _localBranch(gID);
507 void *_ckLocalNodeBranch(CkGroupID groupID) {
508 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
509 void *retval = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
510 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
515 void *CkLocalNodeBranch(CkGroupID groupID)
518 // we are called in a constructor
519 if (CkpvAccess(_currentNodeGroupObj) && CkpvAccess(_currentGroup) == groupID)
520 return CkpvAccess(_currentNodeGroupObj);
521 while (NULL== (retval=_ckLocalNodeBranch(groupID)))
522 { // Nodegroup hasn't finished being created yet-- schedule...
529 void *CkLocalChare(const CkChareID *pCid)
532 if (pe<0) { //A virtual chare ID
533 if (pe!=(-(CkMyPe()+1)))
534 return NULL;//VID block not on this PE
535 #ifdef CMK_CHARE_USE_PTR
536 VidBlock *v=(VidBlock *)pCid->objPtr;
538 VidBlock *v=CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
540 return v->getLocalChareObj();
543 { //An ordinary chare ID
545 return NULL;//Chare not on this PE
546 #ifdef CMK_CHARE_USE_PTR
549 return CkpvAccess(chare_objs)[(CmiIntPtr)pCid->objPtr];
554 CkpvDeclare(char**,Ck_argv);
556 extern "C" char **CkGetArgv(void) {
557 return CkpvAccess(Ck_argv);
559 extern "C" int CkGetArgc(void) {
560 return CmiGetArgc(CkpvAccess(Ck_argv));
563 /******************** Basic support *****************/
564 extern "C" void CkDeliverMessageFree(int epIdx,void *msg,void *obj)
566 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
567 CpvAccess(_currentObj) = (Chare *)obj;
568 // printf("[%d] CurrentObj set to %p\n",CkMyPe(),obj);
570 //BIGSIM_OOC DEBUGGING
571 //CkPrintf("CkDeliverMessageFree: name of entry fn: %s\n", _entryTable[epIdx]->name);
574 CpdBeforeEp(epIdx, obj, msg);
576 _entryTable[epIdx]->call(msg, obj);
580 if (_entryTable[epIdx]->noKeep)
581 { /* Method doesn't keep/delete the message, so we have to: */
582 _msgTable[_entryTable[epIdx]->msgIdx]->dealloc(msg);
585 extern "C" void CkDeliverMessageReadonly(int epIdx,const void *msg,void *obj)
587 //BIGSIM_OOC DEBUGGING
588 //CkPrintf("CkDeliverMessageReadonly: name of entry fn: %s\n", _entryTable[epIdx]->name);
592 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
593 CpvAccess(_currentObj) = (Chare *)obj;
595 if (_entryTable[epIdx]->noKeep)
596 { /* Deliver a read-only copy of the message */
597 deliverMsg=(void *)msg;
599 { /* Method needs a copy of the message to keep/delete */
600 void *oldMsg=(void *)msg;
601 deliverMsg=CkCopyMsg(&oldMsg);
602 #if CMK_ERROR_CHECKING
604 CkAbort("CkDeliverMessageReadonly: message pack/unpack changed message pointer!");
608 CpdBeforeEp(epIdx, obj, (void*)msg);
610 _entryTable[epIdx]->call(deliverMsg, obj);
616 static inline void _invokeEntryNoTrace(int epIdx,envelope *env,void *obj)
618 void *msg = EnvToUsr(env);
620 CkDeliverMessageFree(epIdx,msg,obj);
623 static inline void _invokeEntry(int epIdx,envelope *env,void *obj)
626 #if CMK_TRACE_ENABLED
627 if (_entryTable[epIdx]->traceEnabled) {
628 _TRACE_BEGIN_EXECUTE(env, obj);
629 if(_entryTable[epIdx]->appWork)
630 _TRACE_BEGIN_APPWORK();
631 _invokeEntryNoTrace(epIdx,env,obj);
632 if(_entryTable[epIdx]->appWork)
633 _TRACE_END_APPWORK();
634 _TRACE_END_EXECUTE();
638 _invokeEntryNoTrace(epIdx,env,obj);
642 /********************* Creation ********************/
645 void CkCreateChare(int cIdx, int eIdx, void *msg, CkChareID *pCid, int destPE)
647 CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
648 envelope *env = UsrToEnv(msg);
651 env->setMsgtype(NewChareMsg);
653 pCid->onPE = (-(CkMyPe()+1));
654 // pCid->magic = _GETIDX(cIdx);
655 pCid->objPtr = (void *) new VidBlock();
656 _MEMCHECK(pCid->objPtr);
657 env->setMsgtype(NewVChareMsg);
658 env->setVidPtr(pCid->objPtr);
659 #ifndef CMK_CHARE_USE_PTR
660 CkpvAccess(vidblocks).push_back((VidBlock*)pCid->objPtr);
661 int idx = CkpvAccess(vidblocks).size()-1;
662 pCid->objPtr = (void *)(CmiIntPtr)idx;
663 env->setVidPtr((void *)(CmiIntPtr)idx);
667 env->setByPe(CkMyPe());
668 env->setSrcPe(CkMyPe());
669 CmiSetHandler(env, _charmHandlerIdx);
670 _TRACE_CREATION_1(env);
671 CpvAccess(_qd)->create();
672 _STATS_RECORD_CREATE_CHARE_1();
674 if(destPE == CK_PE_ANY)
678 _CldEnqueue(destPE, env, _infoIdx);
679 _TRACE_CREATION_DONE(1);
682 void CkCreateLocalGroup(CkGroupID groupID, int epIdx, envelope *env)
684 int gIdx = _entryTable[epIdx]->chareIdx;
685 void *obj = malloc(_chareTable[gIdx]->size);
687 setMemoryTypeChare(obj);
688 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
689 CkpvAccess(_groupTable)->find(groupID).setObj(obj);
690 CkpvAccess(_groupTable)->find(groupID).setcIdx(gIdx);
691 CkpvAccess(_groupIDTable)->push_back(groupID);
692 PtrQ *ptrq = CkpvAccess(_groupTable)->find(groupID).getPending();
695 while((pending=ptrq->deq())!=0) {
697 //In BigSim, CpvAccess(CsdSchedQueue) is not used. _CldEnqueue resets the
698 //handler to converse-level BigSim handler.
699 _CldEnqueue(CkMyPe(), pending, _infoIdx);
701 CsdEnqueueGeneral(pending, CQS_QUEUEING_FIFO, 0, 0);
704 CkpvAccess(_groupTable)->find(groupID).clearPending();
706 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
708 CkpvAccess(_currentGroup) = groupID;
709 CkpvAccess(_currentGroupRednMgr) = env->getRednMgr();
711 #ifndef CMK_CHARE_USE_PTR
712 int callingChareIdx = CkpvAccess(currentChareIdx);
713 CkpvAccess(currentChareIdx) = -1;
716 _invokeEntryNoTrace(epIdx,env,obj); /* can't trace groups: would cause nested begin's */
718 #ifndef CMK_CHARE_USE_PTR
719 CkpvAccess(currentChareIdx) = callingChareIdx;
722 _STATS_RECORD_PROCESS_GROUP_1();
725 void CkCreateLocalNodeGroup(CkGroupID groupID, int epIdx, envelope *env)
727 int gIdx = _entryTable[epIdx]->chareIdx;
728 size_t objSize=_chareTable[gIdx]->size;
729 void *obj = malloc(objSize);
731 setMemoryTypeChare(obj);
732 CkpvAccess(_currentGroup) = groupID;
734 // Now that the NodeGroup is created, add it to the table.
735 // NodeGroups can be accessed by multiple processors, so
736 // this is in the opposite order from groups - invoking the constructor
737 // before registering it.
738 // User may call CkLocalNodeBranch() inside the nodegroup constructor
739 // store nodegroup into _currentNodeGroupObj
740 CkpvAccess(_currentNodeGroupObj) = obj;
742 #ifndef CMK_CHARE_USE_PTR
743 int callingChareIdx = CkpvAccess(currentChareIdx);
744 CkpvAccess(currentChareIdx) = -1;
747 _invokeEntryNoTrace(epIdx,env,obj);
749 #ifndef CMK_CHARE_USE_PTR
750 CkpvAccess(currentChareIdx) = callingChareIdx;
753 CkpvAccess(_currentNodeGroupObj) = NULL;
754 _STATS_RECORD_PROCESS_NODE_GROUP_1();
756 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
757 CksvAccess(_nodeGroupTable)->find(groupID).setObj(obj);
758 CksvAccess(_nodeGroupTable)->find(groupID).setcIdx(gIdx);
759 CksvAccess(_nodeGroupIDTable).push_back(groupID);
761 PtrQ *ptrq = CksvAccess(_nodeGroupTable)->find(groupID).getPending();
764 while((pending=ptrq->deq())!=0) {
765 _CldNodeEnqueue(CkMyNode(), pending, _infoIdx);
767 CksvAccess(_nodeGroupTable)->find(groupID).clearPending();
769 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
772 void _createGroup(CkGroupID groupID, envelope *env)
776 int epIdx = env->getEpIdx();
777 int gIdx = _entryTable[epIdx]->chareIdx;
778 env->setGroupNum(groupID);
779 env->setSrcPe(CkMyPe());
780 env->setGroupEpoch(CkpvAccess(_charmEpoch));
784 CmiSetHandler(env, _bocHandlerIdx);
786 CmiSyncBroadcast(env->getTotalsize(), (char *)env);
787 CpvAccess(_qd)->create(CkNumPes()-1);
788 CkUnpackMessage(&env);
790 _STATS_RECORD_CREATE_GROUP_1();
791 CkCreateLocalGroup(groupID, epIdx, env);
794 void _createNodeGroup(CkGroupID groupID, envelope *env)
798 int epIdx = env->getEpIdx();
799 env->setGroupNum(groupID);
800 env->setSrcPe(CkMyPe());
801 env->setGroupEpoch(CkpvAccess(_charmEpoch));
804 CmiSetHandler(env, _bocHandlerIdx);
806 if (CkpvAccess(_charmEpoch)==0) CksvAccess(_numInitNodeMsgs)++;
807 CmiSyncNodeBroadcast(env->getTotalsize(), (char *)env);
808 CpvAccess(_qd)->create(CkNumNodes()-1);
809 CkUnpackMessage(&env);
811 _STATS_RECORD_CREATE_NODE_GROUP_1();
812 CkCreateLocalNodeGroup(groupID, epIdx, env);
817 static CkGroupID _groupCreate(envelope *env)
821 // check CkMyPe(). if it is 0 then idx is _numGroups++
822 // if not, then something else...
824 groupNum.idx = CkpvAccess(_numGroups)++;
826 groupNum.idx = _getGroupIdx(CkNumPes(),CkMyPe(),CkpvAccess(_numGroups)++);
827 _createGroup(groupNum, env);
831 // new _nodeGroupCreate
832 static CkGroupID _nodeGroupCreate(envelope *env)
835 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock)); // change for proc 0 and other processors
836 if(CkMyNode() == 0) // should this be CkMyPe() or CkMyNode()?
837 groupNum.idx = CksvAccess(_numNodeGroups)++;
839 groupNum.idx = _getGroupIdx(CkNumNodes(),CkMyNode(),CksvAccess(_numNodeGroups)++);
840 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
841 _createNodeGroup(groupNum, env);
845 /**** generate the group idx when group is creator pe is not pe0
846 **** the 32 bit index has msb set to 1 (+ve indices are used by proc 0)
847 **** remaining bits contain the group creator processor number and
848 **** the idx number which starts from 1(_numGroups or _numNodeGroups) on each proc ****/
850 int _getGroupIdx(int numNodes,int myNode,int numGroups)
853 int x = (int)ceil(log((double)numNodes)/log((double)2));// number of bits needed to store node number
854 int n = 32 - (x+1); // number of bits remaining for the index
855 idx = (myNode<<n) + numGroups; // add number of processors, shift by the no. of bits needed,
856 // then add the next available index
857 // of course this won't work when int is 8 bytes long on T3E
858 //idx |= 0x80000000; // set the most significant bit to 1
860 // if int is not 32 bits, wouldn't this be wrong?
865 CkGroupID CkCreateGroup(int cIdx, int eIdx, void *msg)
867 CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
868 envelope *env = UsrToEnv(msg);
869 env->setMsgtype(BocInitMsg);
871 env->setSrcPe(CkMyPe());
872 _TRACE_CREATION_N(env, CkNumPes());
873 CkGroupID gid = _groupCreate(env);
874 _TRACE_CREATION_DONE(1);
879 CkGroupID CkCreateNodeGroup(int cIdx, int eIdx, void *msg)
881 CkAssert(cIdx == _entryTable[eIdx]->chareIdx);
882 envelope *env = UsrToEnv(msg);
883 env->setMsgtype(NodeBocInitMsg);
885 env->setSrcPe(CkMyPe());
886 _TRACE_CREATION_N(env, CkNumNodes());
887 CkGroupID gid = _nodeGroupCreate(env);
888 _TRACE_CREATION_DONE(1);
892 static inline void *_allocNewChare(envelope *env, int &idx)
894 int chareIdx = _entryTable[env->getEpIdx()]->chareIdx;
895 void *tmp=malloc(_chareTable[chareIdx]->size);
897 #ifndef CMK_CHARE_USE_PTR
898 CkpvAccess(chare_objs).push_back(tmp);
899 CkpvAccess(chare_types).push_back(chareIdx);
900 idx = CkpvAccess(chare_objs).size()-1;
902 setMemoryTypeChare(tmp);
906 // Method returns true if one or more group dependencies are unsatisfied
907 inline bool isGroupDepUnsatisfied(const CkCoreState *ck, const envelope *env) {
908 int groupDepNum = env->getGroupDepNum();
909 if(groupDepNum != 0) {
910 CkGroupID *groupDepPtr = (CkGroupID *)(env->getGroupDepPtr());
911 for(int i=0;i<groupDepNum;i++) {
912 CkGroupID depID = groupDepPtr[i];
913 if (!depID.isZero() && !_lookupGroupAndBufferIfNotThere(ck, env, depID)) {
921 static void _processNewChareMsg(CkCoreState *ck,envelope *env)
923 if(isGroupDepUnsatisfied(ck, env))
926 ck->process(); // ck->process() updates mProcessed count used in QD
928 void *obj = _allocNewChare(env, idx);
929 #ifndef CMK_CHARE_USE_PTR
930 CkpvAccess(currentChareIdx) = idx;
932 _invokeEntry(env->getEpIdx(),env,obj);
934 _STATS_RECORD_PROCESS_CHARE_1();
937 void CkCreateLocalChare(int epIdx, envelope *env)
939 env->setEpIdx(epIdx);
940 _processNewChareMsg(NULL, env);
943 static void _processNewVChareMsg(CkCoreState *ck,envelope *env)
945 if(isGroupDepUnsatisfied(ck, env))
947 ck->process(); // ck->process() updates mProcessed count used in QD
949 void *obj = _allocNewChare(env, idx);
950 CkChareID *pCid = (CkChareID *)
951 _allocMsg(FillVidMsg, sizeof(CkChareID));
952 pCid->onPE = CkMyPe();
953 #ifndef CMK_CHARE_USE_PTR
954 pCid->objPtr = (void*)(CmiIntPtr)idx;
958 // pCid->magic = _GETIDX(_entryTable[env->getEpIdx()]->chareIdx);
959 envelope *ret = UsrToEnv(pCid);
960 ret->setVidPtr(env->getVidPtr());
961 int srcPe = env->getByPe();
962 ret->setSrcPe(CkMyPe());
963 CmiSetHandler(ret, _charmHandlerIdx);
964 CmiSyncSendAndFree(srcPe, ret->getTotalsize(), (char *)ret);
965 #ifndef CMK_CHARE_USE_PTR
966 // register the remote vidblock for deletion when chare is deleted
969 vid.objPtr = env->getVidPtr();
970 CkpvAccess(vmap)[idx] = vid;
972 CpvAccess(_qd)->create();
973 #ifndef CMK_CHARE_USE_PTR
974 CkpvAccess(currentChareIdx) = idx;
976 _invokeEntry(env->getEpIdx(),env,obj);
977 _STATS_RECORD_PROCESS_CHARE_1();
980 /************** Receive: Chares *************/
982 static inline void _processForPlainChareMsg(CkCoreState *ck,envelope *env)
984 if(isGroupDepUnsatisfied(ck, env))
986 ck->process(); // ck->process() updates mProcessed count used in QD
987 int epIdx = env->getEpIdx();
988 int mainIdx = _chareTable[_entryTable[epIdx]->chareIdx]->mainChareType();
990 if (mainIdx != -1) { // mainchare
991 CmiAssert(CkMyPe()==0);
992 obj = _mainTable[mainIdx]->getObj();
995 #ifndef CMK_CHARE_USE_PTR
996 if (_chareTable[_entryTable[epIdx]->chareIdx]->chareType == TypeChare)
997 obj = CkpvAccess(chare_objs)[(CmiIntPtr)env->getObjPtr()];
999 obj = env->getObjPtr();
1001 obj = env->getObjPtr();
1004 _invokeEntry(epIdx,env,obj);
1005 _STATS_RECORD_PROCESS_MSG_1();
1008 static inline void _processForChareMsg(CkCoreState *ck,envelope *env)
1010 int epIdx = env->getEpIdx();
1011 void *obj = env->getObjPtr();
1012 _invokeEntry(epIdx,env,obj);
1015 static inline void _processFillVidMsg(CkCoreState *ck,envelope *env)
1017 ck->process(); // ck->process() updates mProcessed count used in QD
1018 #ifndef CMK_CHARE_USE_PTR
1019 VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1021 VidBlock *vptr = (VidBlock *) env->getVidPtr();
1022 _CHECK_VALID(vptr, "FillVidMsg: Not a valid VIdPtr\n");
1024 CkChareID *pcid = (CkChareID *) EnvToUsr(env);
1025 _CHECK_VALID(pcid, "FillVidMsg: Not a valid pCid\n");
1026 if (vptr) vptr->fill(pcid->onPE, pcid->objPtr);
1030 static inline void _processForVidMsg(CkCoreState *ck,envelope *env)
1032 ck->process(); // ck->process() updates mProcessed count used in QD
1033 #ifndef CMK_CHARE_USE_PTR
1034 VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1036 VidBlock *vptr = (VidBlock *) env->getVidPtr();
1037 _CHECK_VALID(vptr, "ForVidMsg: Not a valid VIdPtr\n");
1043 static inline void _processDeleteVidMsg(CkCoreState *ck,envelope *env)
1045 ck->process(); // ck->process() updates mProcessed count used in QD
1046 #ifndef CMK_CHARE_USE_PTR
1047 VidBlock *vptr = CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()];
1049 CkpvAccess(vidblocks)[(CmiIntPtr)env->getVidPtr()] = NULL;
1054 /************** Receive: Groups ****************/
1057 Return a pointer to the local BOC of "groupID".
1058 The message "env" passed in has some known dependency on this groupID
1059 (either it is to be delivered to this BOC, or it depends on this BOC being there).
1060 Therefore, if the return value is NULL, this function buffers the message so that
1061 it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
1062 The message passed in must have its handlers correctly set so that it can be
1065 static inline IrrGroup *_lookupGroupAndBufferIfNotThere(const CkCoreState *ck, const envelope *env, const CkGroupID &groupID)
1068 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
1069 IrrGroup *obj = ck->localBranch(groupID);
1070 if (obj==NULL) { /* groupmember not yet created: stash message */
1071 ck->getGroupTable()->find(groupID).enqMsg((envelope *)env);
1073 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
1077 IrrGroup *lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
1079 return _lookupGroupAndBufferIfNotThere(ck, env, groupID);
1082 static inline void _deliverForBocMsg(CkCoreState *ck,int epIdx,envelope *env,IrrGroup *obj)
1085 // if there is a running obj being measured, stop it temporarily
1086 LDObjHandle objHandle;
1088 LBDatabase *the_lbdb = (LBDatabase *)CkLocalBranch(_lbdb);
1089 if (the_lbdb->RunningObject(&objHandle)) {
1091 the_lbdb->ObjectStop(objHandle);
1094 _invokeEntry(epIdx,env,obj);
1096 if (objstopped) the_lbdb->ObjectStart(objHandle);
1098 _STATS_RECORD_PROCESS_BRANCH_1();
1101 static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
1103 if(isGroupDepUnsatisfied(ck, env))
1105 CkGroupID groupID = env->getGroupNum();
1106 IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
1108 ck->process(); // ck->process() updates mProcessed count used in QD
1109 _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
1113 static inline void _deliverForNodeBocMsg(CkCoreState *ck,envelope *env,void *obj)
1115 env->setMsgtype(ForChareMsg);
1116 env->setObjPtr(obj);
1117 _processForChareMsg(ck,env);
1118 _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1121 static inline void _deliverForNodeBocMsg(CkCoreState *ck,int epIdx, envelope *env,void *obj)
1123 env->setEpIdx(epIdx);
1124 _deliverForNodeBocMsg(ck,env, obj);
1127 static inline void _processForNodeBocMsg(CkCoreState *ck,envelope *env)
1129 if(isGroupDepUnsatisfied(ck, env))
1131 CkGroupID groupID = env->getGroupNum();
1134 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1135 obj = CksvAccess(_nodeGroupTable)->find(groupID).getObj();
1136 if(!obj) { // groupmember not yet created
1137 #if CMK_IMMEDIATE_MSG
1138 if (CmiIsImmediate(env)) {
1139 //CmiDelayImmediate(); /* buffer immediate message */
1140 CmiResetImmediate(env); // note: this may not be SIG IO safe !
1143 CksvAccess(_nodeGroupTable)->find(groupID).enqMsg(env);
1144 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1147 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1148 ck->process(); // ck->process() updates mProcessed count used in QD
1149 env->setMsgtype(ForChareMsg);
1150 env->setObjPtr(obj);
1151 _processForChareMsg(ck,env);
1152 _STATS_RECORD_PROCESS_NODE_BRANCH_1();
1155 void _processBocInitMsg(CkCoreState *ck,envelope *env)
1157 if(isGroupDepUnsatisfied(ck, env))
1159 CkGroupID groupID = env->getGroupNum();
1160 int epIdx = env->getEpIdx();
1161 ck->process(); // ck->process() updates mProcessed count used in QD
1162 CkCreateLocalGroup(groupID, epIdx, env);
1165 void _processNodeBocInitMsg(CkCoreState *ck,envelope *env)
1167 if(isGroupDepUnsatisfied(ck, env))
1169 ck->process(); // ck->process() updates mProcessed count used in QD
1170 CkGroupID groupID = env->getGroupNum();
1171 int epIdx = env->getEpIdx();
1172 CkCreateLocalNodeGroup(groupID, epIdx, env);
1175 /************** Receive: Arrays *************/
1176 static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
1177 ArrayObjMap& object_map = CkpvAccess(array_objs);
1178 auto iter = object_map.find(env->getRecipientID());
1179 if (iter != object_map.end()) {
1180 // First see if we already have a direct pointer to the object
1182 ck->process(); // ck->process() updates mProcessed count used in QD
1184 CkArrayMessage* msg = (CkArrayMessage*)EnvToUsr(env);
1185 if (msg->array_hops()>1) {
1186 CProxy_ArrayBase(env->getArrayMgr()).ckLocMgr()->multiHop(msg);
1188 iter->second->ckInvokeEntry(env->getEpIdx(), msg, !(opts & CK_MSG_KEEP));
1190 // Otherwise fallback to delivery through the array manager
1191 CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getArrayMgr());
1194 ck->process(); // ck->process() updates mProcessed count used in QD
1195 mgr->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_inline);
1200 //BIGSIM_OOC DEBUGGING
1201 #define TELLMSGTYPE(x) //x
1204 * This is the main converse-level handler used by all of Charm++.
1206 * \addtogroup CriticalPathFramework
1208 void _processHandler(void *converseMsg,CkCoreState *ck)
1210 envelope *env = (envelope *) converseMsg;
1212 MESSAGE_PHASE_CHECK(env);
1214 #if CMK_ONESIDED_IMPL
1216 envelope *prevEnv = env;
1217 env = CkRdmaIssueRgets(prevEnv);
1219 // Within pe or logical node, env points to new message with data
1222 CkFreeMsg(EnvToUsr(prevEnv));
1224 // async rdma call in place, asynchronous return and ack handling
1230 //#if CMK_RECORD_REPLAY
1231 if (ck->watcher!=NULL) {
1232 if (!ck->watcher->processMessage(&env,ck)) return;
1235 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1239 MlogEntry *entry=NULL;
1240 if(env->getMsgtype() == ForBocMsg || env->getMsgtype() == ForNodeBocMsg
1241 || env->getMsgtype() == ForArrayEltMsg
1242 || env->getMsgtype() == ForChareMsg) {
1243 sender = env->sender;
1245 int result = preProcessReceivedMessage(env,&obj,&entry);
1251 #if USE_CRITICAL_PATH_HEADER_ARRAY
1252 CK_CRITICALPATH_START(env)
1255 switch(env->getMsgtype()) {
1258 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
1259 // QD processing moved inside _processBocInitMsg because it is conditional
1261 if(env->isPacked()) CkUnpackMessage(&env);
1262 _processBocInitMsg(ck,env);
1264 case NodeBocInitMsg :
1265 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NodeBocInitMsg\n", CkMyPe());)
1266 if(env->isPacked()) CkUnpackMessage(&env);
1267 _processNodeBocInitMsg(ck,env);
1270 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForBocMsg\n", CkMyPe());)
1271 // QD processing moved inside _processForBocMsg because it is conditional
1272 if(env->isPacked()) CkUnpackMessage(&env);
1273 _processForBocMsg(ck,env);
1274 // stats record moved inside _processForBocMsg because it is conditional
1276 case ForNodeBocMsg :
1277 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForNodeBocMsg\n", CkMyPe());)
1278 // QD processing moved to _processForNodeBocMsg because it is conditional
1279 if(env->isPacked()) CkUnpackMessage(&env);
1280 _processForNodeBocMsg(ck,env);
1281 // stats record moved to _processForNodeBocMsg because it is conditional
1285 case ForArrayEltMsg:
1286 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForArrayEltMsg\n", CkMyPe());)
1287 if(env->isPacked()) CkUnpackMessage(&env);
1288 _processArrayEltMsg(ck,env);
1293 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewChareMsg\n", CkMyPe());)
1294 if(env->isPacked()) CkUnpackMessage(&env);
1295 _processNewChareMsg(ck,env);
1298 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: NewVChareMsg\n", CkMyPe());)
1299 if(env->isPacked()) CkUnpackMessage(&env);
1300 _processNewVChareMsg(ck,env);
1303 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForChareMsg\n", CkMyPe());)
1304 if(env->isPacked()) CkUnpackMessage(&env);
1305 _processForPlainChareMsg(ck,env);
1308 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: ForVidMsg\n", CkMyPe());)
1309 _processForVidMsg(ck,env);
1312 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: FillVidMsg\n", CkMyPe());)
1313 _processFillVidMsg(ck,env);
1316 TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: DeleteVidMsg\n", CkMyPe());)
1317 _processDeleteVidMsg(ck,env);
1321 CmiAbort("Fatal Charm++ Error> Unknown msg-type in _processHandler.\n");
1323 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1325 postProcessReceivedMessage(obj,sender,SN,entry);
1330 #if USE_CRITICAL_PATH_HEADER_ARRAY
1331 CK_CRITICALPATH_END()
1337 /******************** Message Send **********************/
1339 void _infoFn(void *converseMsg, CldPackFn *pfn, int *len,
1340 int *queueing, int *priobits, unsigned int **prioptr)
1342 envelope *env = (envelope *)converseMsg;
1343 *pfn = (CldPackFn)CkPackMessage;
1344 *len = env->getTotalsize();
1345 *queueing = env->getQueueing();
1346 *priobits = env->getPriobits();
1347 *prioptr = (unsigned int *) env->getPrioPtr();
1350 void CkPackMessage(envelope **pEnv)
1352 envelope *env = *pEnv;
1353 if(!env->isPacked() && _msgTable[env->getMsgIdx()]->pack) {
1354 void *msg = EnvToUsr(env);
1355 _TRACE_BEGIN_PACK();
1356 msg = _msgTable[env->getMsgIdx()]->pack(msg);
1364 void CkUnpackMessage(envelope **pEnv)
1366 envelope *env = *pEnv;
1367 int msgIdx = env->getMsgIdx();
1368 if(env->isPacked()) {
1369 void *msg = EnvToUsr(env);
1370 _TRACE_BEGIN_UNPACK();
1371 msg = _msgTable[msgIdx]->unpack(msg);
1372 _TRACE_END_UNPACK();
1379 //There's no reason for most messages to go through the Cld--
1380 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
1381 // Thus these accellerated versions of the Cld calls.
1382 #if CMK_OBJECT_QUEUE_AVAILABLE
1383 static int index_objectQHandler;
1385 int index_tokenHandler;
1386 int index_skipCldHandler;
1388 void _skipCldHandler(void *converseMsg)
1390 envelope *env = (envelope *)(converseMsg);
1391 CmiSetHandler(converseMsg, CmiGetXHandler(converseMsg));
1392 #if CMK_GRID_QUEUE_AVAILABLE
1393 if (CmiGridQueueLookupMsg ((char *) converseMsg)) {
1394 CqsEnqueueGeneral ((Queue) CpvAccess (CsdGridQueue),
1395 env, env->getQueueing (), env->getPriobits (),
1396 (unsigned int *) env->getPrioPtr ());
1398 CqsEnqueueGeneral ((Queue) CpvAccess (CsdSchedQueue),
1399 env, env->getQueueing (), env->getPriobits (),
1400 (unsigned int *) env->getPrioPtr ());
1403 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1404 env, env->getQueueing(),env->getPriobits(),
1405 (unsigned int *)env->getPrioPtr());
1410 //static void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1411 // Made non-static to be used by ckmessagelogging
1412 void _skipCldEnqueue(int pe,envelope *env, int infoFn)
1415 if (!ConverseDeliver(pe)) {
1422 if(pe == CkMyPe() ){
1423 if(!CmiNodeAlive(CkMyPe())){
1424 printf("[%d] Invalid processor sending itself a message \n",CkMyPe());
1429 if (pe == CkMyPe() && !CmiImmIsRunning()) {
1430 #if CMK_OBJECT_QUEUE_AVAILABLE
1431 Chare *obj = CkFindObjectPtr(env);
1432 if (obj && obj->CkGetObjQueue().queue()) {
1433 _enqObjQueue(obj, env);
1437 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),
1438 env, env->getQueueing(),env->getPriobits(),
1439 (unsigned int *)env->getPrioPtr());
1440 #if CMK_PERSISTENT_COMM
1441 CmiPersistentOneSend();
1444 if (pe < 0 || CmiNodeOf(pe) != CmiMyNode())
1445 CkPackMessage(&env);
1446 int len=env->getTotalsize();
1447 CmiSetXHandler(env,CmiGetHandler(env));
1448 #if CMK_OBJECT_QUEUE_AVAILABLE
1449 CmiSetHandler(env,index_objectQHandler);
1451 CmiSetHandler(env,index_skipCldHandler);
1453 CmiSetInfo(env,infoFn);
1454 if (pe==CLD_BROADCAST) {
1455 #if CMK_MESSAGE_LOGGING
1456 if(env->flags & CK_FREE_MSG_MLOG)
1457 CmiSyncBroadcastAndFree(len, (char *)env);
1459 CmiSyncBroadcast(len, (char *)env);
1461 CmiSyncBroadcastAndFree(len, (char *)env);
1465 else if (pe==CLD_BROADCAST_ALL) {
1466 #if CMK_MESSAGE_LOGGING
1467 if(env->flags & CK_FREE_MSG_MLOG)
1468 CmiSyncBroadcastAllAndFree(len, (char *)env);
1470 CmiSyncBroadcastAll(len, (char *)env);
1472 CmiSyncBroadcastAllAndFree(len, (char *)env);
1477 #if CMK_MESSAGE_LOGGING
1478 if(env->flags & CK_FREE_MSG_MLOG)
1479 CmiSyncSendAndFree(pe, len, (char *)env);
1481 CmiSyncSend(pe, len, (char *)env);
1483 CmiSyncSendAndFree(pe, len, (char *)env);
1490 #if CMK_BIGSIM_CHARM
1491 # define _skipCldEnqueue _CldEnqueue
1494 // by pass Charm++ priority queue, send as Converse message
1495 static void _noCldEnqueueMulti(int npes, int *pes, envelope *env)
1498 if (!ConverseDeliver(-1)) {
1503 CkPackMessage(&env);
1504 int len=env->getTotalsize();
1505 CmiSyncListSendAndFree(npes, pes, len, (char *)env);
1508 static void _noCldEnqueue(int pe, envelope *env)
1511 if (pe == CkMyPe()) {
1512 CmiHandleMessage(env);
1516 if (!ConverseDeliver(pe)) {
1522 CkPackMessage(&env);
1523 int len=env->getTotalsize();
1524 if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, (char *)env); }
1525 else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, (char *)env); }
1526 else CmiSyncSendAndFree(pe, len, (char *)env);
1529 //static void _noCldNodeEnqueue(int node, envelope *env)
1530 //Made non-static to be used by ckmessagelogging
1531 void _noCldNodeEnqueue(int node, envelope *env)
1534 if (node == CkMyNode()) {
1535 CmiHandleMessage(env);
1539 if (!ConverseDeliver(node)) {
1545 CkPackMessage(&env);
1546 int len=env->getTotalsize();
1547 if (node==CLD_BROADCAST) {
1548 #if CMK_MESSAGE_LOGGING
1549 if(env->flags & CK_FREE_MSG_MLOG)
1550 CmiSyncNodeBroadcastAndFree(len, (char *)env);
1552 CmiSyncNodeBroadcast(len, (char *)env);
1554 CmiSyncNodeBroadcastAndFree(len, (char *)env);
1557 else if (node==CLD_BROADCAST_ALL) {
1558 #if CMK_MESSAGE_LOGGING
1559 if(env->flags & CK_FREE_MSG_MLOG)
1560 CmiSyncNodeBroadcastAllAndFree(len, (char *)env);
1562 CmiSyncNodeBroadcastAll(len, (char *)env);
1564 CmiSyncNodeBroadcastAllAndFree(len, (char *)env);
1569 #if CMK_MESSAGE_LOGGING
1570 if(env->flags & CK_FREE_MSG_MLOG)
1571 CmiSyncNodeSendAndFree(node, len, (char *)env);
1573 CmiSyncNodeSend(node, len, (char *)env);
1575 CmiSyncNodeSendAndFree(node, len, (char *)env);
1580 static inline int _prepareMsg(int eIdx,void *msg,const CkChareID *pCid)
1582 envelope *env = UsrToEnv(msg);
1585 #if CMK_REPLAYSYSTEM
1588 env->setMsgtype(ForChareMsg);
1589 env->setEpIdx(eIdx);
1590 env->setSrcPe(CkMyPe());
1592 #if USE_CRITICAL_PATH_HEADER_ARRAY
1593 CK_CRITICALPATH_SEND(env)
1594 //CK_AUTOMATE_PRIORITY(env)
1597 setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1599 #if CMK_OBJECT_QUEUE_AVAILABLE
1600 CmiSetHandler(env, index_objectQHandler);
1602 CmiSetHandler(env, _charmHandlerIdx);
1604 if (pCid->onPE < 0) { //Virtual chare ID (VID)
1605 int pe = -(pCid->onPE+1);
1607 #ifndef CMK_CHARE_USE_PTR
1608 VidBlock *vblk = CkpvAccess(vidblocks)[(CmiIntPtr)pCid->objPtr];
1610 VidBlock *vblk = (VidBlock *) pCid->objPtr;
1613 if (NULL!=(objPtr=vblk->getLocalChare()))
1614 { //A ready local chare
1615 env->setObjPtr(objPtr);
1618 else { //The vidblock is not ready-- forget it
1622 } else { //Valid vidblock for another PE:
1623 env->setMsgtype(ForVidMsg);
1624 env->setVidPtr(pCid->objPtr);
1629 env->setObjPtr(pCid->objPtr);
1634 static inline int _prepareImmediateMsg(int eIdx,void *msg,const CkChareID *pCid)
1636 int destPE = _prepareMsg(eIdx, msg, pCid);
1638 envelope *env = UsrToEnv(msg);
1639 //criticalPath_send(env);
1640 #if USE_CRITICAL_PATH_HEADER_ARRAY
1641 CK_CRITICALPATH_SEND(env)
1642 //CK_AUTOMATE_PRIORITY(env)
1644 CmiBecomeImmediate(env);
1650 void CkSendMsg(int entryIdx, void *msg,const CkChareID *pCid, int opts)
1652 if (opts & CK_MSG_INLINE) {
1653 CkSendMsgInline(entryIdx, msg, pCid, opts);
1656 envelope *env = UsrToEnv(msg);
1657 #if CMK_ERROR_CHECKING
1658 //Allow rdma metadata messages marked as immediate to go through
1659 if (opts & CK_MSG_IMMEDIATE && !env->isRdma()) {
1660 CmiAbort("Immediate message is not allowed in Chare!");
1663 int destPE=_prepareMsg(entryIdx,msg,pCid);
1664 // Before it traced the creation only if destPE!=-1 (i.e it did not when the
1665 // VidBlock was not yet filled). The problem is that the creation was never
1666 // traced later when the VidBlock was filled. One solution is to trace the
1667 // creation here, the other to trace it in VidBlock->msgDeliver().
1668 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1670 CpvAccess(_qd)->create();
1672 sendChareMsg(env,destPE,_infoIdx,pCid);
1674 _TRACE_CREATION_1(env);
1676 CpvAccess(_qd)->create();
1677 if (opts & CK_MSG_SKIP_OR_IMM)
1678 _noCldEnqueue(destPE, env);
1680 _CldEnqueue(destPE, env, _infoIdx);
1682 _TRACE_CREATION_DONE(1);
1687 void CkSendMsgInline(int entryIndex, void *msg, const CkChareID *pCid, int opts)
1689 if (pCid->onPE==CkMyPe())
1692 if(!CmiNodeAlive(CkMyPe())){
1697 //Just in case we need to breakpoint or use the envelope in some way
1698 _prepareMsg(entryIndex,msg,pCid);
1700 //Just directly call the chare (skip QD handling & scheduler)
1701 envelope *env = UsrToEnv(msg);
1702 if (env->isPacked()) CkUnpackMessage(&env);
1703 _STATS_RECORD_PROCESS_MSG_1();
1704 _invokeEntryNoTrace(entryIndex,env,pCid->objPtr);
1707 //No way to inline a cross-processor message:
1708 CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
1712 static inline envelope *_prepareMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1714 envelope *env = UsrToEnv(msg);
1715 /*#if CMK_ERROR_CHECKING
1716 CkNodeGroupID nodeRedMgr;
1721 #if CMK_REPLAYSYSTEM
1724 env->setMsgtype(type);
1725 env->setEpIdx(eIdx);
1726 env->setGroupNum(gID);
1727 env->setSrcPe(CkMyPe());
1729 #if CMK_ERROR_CHECKING
1730 nodeRedMgr.setZero();
1731 env->setRednMgr(nodeRedMgr);
1734 //criticalPath_send(env);
1735 #if USE_CRITICAL_PATH_HEADER_ARRAY
1736 CK_CRITICALPATH_SEND(env)
1737 //CK_AUTOMATE_PRIORITY(env)
1740 setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
1742 CmiSetHandler(env, _charmHandlerIdx);
1746 static inline envelope *_prepareImmediateMsgBranch(int eIdx,void *msg,CkGroupID gID,int type)
1748 envelope *env = _prepareMsgBranch(eIdx, msg, gID, type);
1749 #if USE_CRITICAL_PATH_HEADER_ARRAY
1750 CK_CRITICALPATH_SEND(env)
1751 //CK_AUTOMATE_PRIORITY(env)
1753 CmiBecomeImmediate(env);
1757 static inline void _sendMsgBranch(int eIdx, void *msg, CkGroupID gID,
1758 int pe=CLD_BROADCAST_ALL, int opts = 0)
1762 if (opts & CK_MSG_IMMEDIATE) {
1763 env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1766 env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1769 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1770 sendGroupMsg(env,pe,_infoIdx);
1772 _TRACE_ONLY(numPes = (pe==CLD_BROADCAST_ALL?CkNumPes():1));
1773 _TRACE_CREATION_N(env, numPes);
1774 if (opts & CK_MSG_SKIP_OR_IMM)
1775 _noCldEnqueue(pe, env);
1777 _skipCldEnqueue(pe, env, _infoIdx);
1778 _TRACE_CREATION_DONE(1);
1782 static inline void _sendMsgBranchMulti(int eIdx, void *msg, CkGroupID gID,
1785 envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1786 _TRACE_CREATION_MULTICAST(env, npes, pes);
1787 _CldEnqueueMulti(npes, pes, env, _infoIdx);
1788 _TRACE_CREATION_DONE(1); // since it only creates one creation event.
1792 void CkSendMsgBranchImmediate(int eIdx, void *msg, int destPE, CkGroupID gID)
1794 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1795 if (destPE==CkMyPe())
1797 CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1800 //Can't inline-- send the usual way
1801 envelope *env = UsrToEnv(msg);
1803 _TRACE_ONLY(numPes = (destPE==CLD_BROADCAST_ALL?CkNumPes():1));
1804 env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1805 _TRACE_CREATION_N(env, numPes);
1806 _noCldEnqueue(destPE, env);
1807 _STATS_RECORD_SEND_BRANCH_1();
1808 CkpvAccess(_coreState)->create();
1809 _TRACE_CREATION_DONE(1);
1811 // no support for immediate message, send inline
1812 CkSendMsgBranchInline(eIdx, msg, destPE, gID);
1817 void CkSendMsgBranchInline(int eIdx, void *msg, int destPE, CkGroupID gID, int opts)
1819 if (destPE==CkMyPe())
1822 if(!CmiNodeAlive(CkMyPe())){
1826 IrrGroup *obj=(IrrGroup *)_localBranch(gID);
1828 { //Just directly call the group:
1829 #if CMK_ERROR_CHECKING
1830 envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1832 envelope *env=UsrToEnv(msg);
1834 _deliverForBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1838 //Can't inline-- send the usual way, clear CK_MSG_INLINE
1839 CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
1843 void CkSendMsgBranch(int eIdx, void *msg, int pe, CkGroupID gID, int opts)
1845 if (opts & CK_MSG_INLINE) {
1846 CkSendMsgBranchInline(eIdx, msg, pe, gID, opts);
1849 envelope *env=UsrToEnv(msg);
1850 //Allow rdma metadata messages marked as immediate to go through
1851 if (opts & CK_MSG_IMMEDIATE && !env->isRdma()) {
1852 CkSendMsgBranchImmediate(eIdx,msg,pe,gID);
1855 _sendMsgBranch(eIdx, msg, gID, pe, opts);
1856 _STATS_RECORD_SEND_BRANCH_1();
1857 CkpvAccess(_coreState)->create();
1861 void CkSendMsgBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *pes)
1863 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
1864 envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForBocMsg);
1865 _TRACE_CREATION_MULTICAST(env, npes, pes);
1866 _noCldEnqueueMulti(npes, pes, env);
1867 _TRACE_CREATION_DONE(1); // since it only creates one creation event.
1869 _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1870 CpvAccess(_qd)->create(-npes);
1872 _STATS_RECORD_SEND_BRANCH_N(npes);
1873 CpvAccess(_qd)->create(npes);
1877 void CkSendMsgBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *pes, int opts)
1879 if (opts & CK_MSG_IMMEDIATE) {
1880 CkSendMsgBranchMultiImmediate(eIdx,msg,gID,npes,pes);
1884 _sendMsgBranchMulti(eIdx, msg, gID, npes, pes);
1885 _STATS_RECORD_SEND_BRANCH_N(npes);
1886 CpvAccess(_qd)->create(npes);
1890 void CkSendMsgBranchGroup(int eIdx,void *msg,CkGroupID gID,CmiGroup grp, int opts)
1894 if (opts & CK_MSG_IMMEDIATE) {
1895 CmiAbort("CkSendMsgBranchGroup: immediate messages not supported!");
1899 envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForBocMsg);
1900 CmiLookupGroup(grp, &npes, &pes);
1901 _TRACE_CREATION_MULTICAST(env, npes, pes);
1902 _CldEnqueueGroup(grp, env, _infoIdx);
1903 _TRACE_CREATION_DONE(1); // since it only creates one creation event.
1904 _STATS_RECORD_SEND_BRANCH_N(npes);
1905 CpvAccess(_qd)->create(npes);
1909 void CkBroadcastMsgBranch(int eIdx, void *msg, CkGroupID gID, int opts)
1911 _sendMsgBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
1912 _STATS_RECORD_SEND_BRANCH_N(CkNumPes());
1913 CpvAccess(_qd)->create(CkNumPes());
1916 static inline void _sendMsgNodeBranch(int eIdx, void *msg, CkGroupID gID,
1917 int node=CLD_BROADCAST_ALL, int opts=0)
1921 if (opts & CK_MSG_IMMEDIATE) {
1922 env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1925 env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1927 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1928 sendNodeGroupMsg(env,node,_infoIdx);
1930 numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1);
1931 _TRACE_CREATION_N(env, numPes);
1932 if (opts & CK_MSG_SKIP_OR_IMM) {
1933 _noCldNodeEnqueue(node, env);
1936 _CldNodeEnqueue(node, env, _infoIdx);
1937 _TRACE_CREATION_DONE(1);
1941 static inline void _sendMsgNodeBranchMulti(int eIdx, void *msg, CkGroupID gID,
1942 int npes, int *nodes)
1944 envelope *env = _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1945 _TRACE_CREATION_N(env, npes);
1946 for (int i=0; i<npes; i++) {
1947 _CldNodeEnqueue(nodes[i], env, _infoIdx);
1949 _TRACE_CREATION_DONE(1); // since it only creates one creation event.
1953 void CkSendMsgNodeBranchImmediate(int eIdx, void *msg, int node, CkGroupID gID)
1955 #if CMK_IMMEDIATE_MSG
1956 if (node==CkMyNode())
1958 CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1961 //Can't inline-- send the usual way
1962 envelope *env = UsrToEnv(msg);
1964 _TRACE_ONLY(numPes = (node==CLD_BROADCAST_ALL?CkNumNodes():1));
1965 env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1966 _TRACE_CREATION_N(env, numPes);
1967 _noCldNodeEnqueue(node, env);
1968 _STATS_RECORD_SEND_BRANCH_1();
1969 CkpvAccess(_coreState)->create();
1970 _TRACE_CREATION_DONE(1);
1972 // no support for immediate message, send inline
1973 CkSendMsgNodeBranchInline(eIdx, msg, node, gID);
1978 void CkSendMsgNodeBranchInline(int eIdx, void *msg, int node, CkGroupID gID, int opts)
1980 if (node==CkMyNode() && ((envelope *)(UsrToEnv(msg)))->isRdma() == false)
1982 CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
1983 void *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
1984 CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));
1986 { //Just directly call the group:
1987 #if CMK_ERROR_CHECKING
1988 envelope *env=_prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
1990 envelope *env=UsrToEnv(msg);
1992 _deliverForNodeBocMsg(CkpvAccess(_coreState),eIdx,env,obj);
1996 //Can't inline-- send the usual way
1997 CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
2001 void CkSendMsgNodeBranch(int eIdx, void *msg, int node, CkGroupID gID, int opts)
2003 if (opts & CK_MSG_INLINE) {
2004 CkSendMsgNodeBranchInline(eIdx, msg, node, gID, opts);
2007 if (opts & CK_MSG_IMMEDIATE) {
2008 CkSendMsgNodeBranchImmediate(eIdx, msg, node, gID);
2011 _sendMsgNodeBranch(eIdx, msg, gID, node, opts);
2012 _STATS_RECORD_SEND_NODE_BRANCH_1();
2013 CkpvAccess(_coreState)->create();
2017 void CkSendMsgNodeBranchMultiImmediate(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes)
2019 #if CMK_IMMEDIATE_MSG && ! CMK_SMP
2020 envelope *env = _prepareImmediateMsgBranch(eIdx,msg,gID,ForNodeBocMsg);
2021 _noCldEnqueueMulti(npes, nodes, env);
2023 _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2024 CpvAccess(_qd)->create(-npes);
2026 _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2027 CpvAccess(_qd)->create(npes);
2031 void CkSendMsgNodeBranchMulti(int eIdx,void *msg,CkGroupID gID,int npes,int *nodes, int opts)
2033 if (opts & CK_MSG_IMMEDIATE) {
2034 CkSendMsgNodeBranchMultiImmediate(eIdx,msg,gID,npes,nodes);
2038 _sendMsgNodeBranchMulti(eIdx, msg, gID, npes, nodes);
2039 _STATS_RECORD_SEND_NODE_BRANCH_N(npes);
2040 CpvAccess(_qd)->create(npes);
2044 void CkBroadcastMsgNodeBranch(int eIdx, void *msg, CkGroupID gID, int opts)
2046 _sendMsgNodeBranch(eIdx, msg, gID, CLD_BROADCAST_ALL, opts);
2047 _STATS_RECORD_SEND_NODE_BRANCH_N(CkNumNodes());
2048 CpvAccess(_qd)->create(CkNumNodes());
2051 //Needed by delegation manager:
2053 int CkChareMsgPrep(int eIdx, void *msg,const CkChareID *pCid)
2054 { return _prepareMsg(eIdx,msg,pCid); }
2056 void CkGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2057 { _prepareMsgBranch(eIdx,msg,gID,ForBocMsg); }
2059 void CkNodeGroupMsgPrep(int eIdx, void *msg, CkGroupID gID)
2060 { _prepareMsgBranch(eIdx,msg,gID,ForNodeBocMsg); }
2062 void _ckModuleInit(void) {
2063 CmiAssignOnce(&index_skipCldHandler, CkRegisterHandler(_skipCldHandler));
2064 #if CMK_OBJECT_QUEUE_AVAILABLE
2065 CmiAssignOnce(&index_objectQHandler, CkRegisterHandler(_ObjectQHandler));
2067 CmiAssignOnce(&index_tokenHandler, CkRegisterHandler(_TokenHandler));
2068 CkpvInitialize(TokenPool*, _tokenPool);
2069 CkpvAccess(_tokenPool) = new TokenPool;
2073 /************** Send: Arrays *************/
2075 static void _prepareOutgoingArrayMsg(envelope *env,int type)
2079 env->setMsgtype(type);
2081 setMemoryOwnedBy(((char*)env)-sizeof(CmiChunkHeader), 0);
2083 CmiSetHandler(env, _charmHandlerIdx);
2084 CpvAccess(_qd)->create();
2088 void CkArrayManagerDeliver(int pe,void *msg, int opts) {
2089 envelope *env = UsrToEnv(msg);
2090 _prepareOutgoingArrayMsg(env,ForArrayEltMsg);
2091 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2092 sendArrayMsg(env,pe,_infoIdx);
2094 if (opts & CK_MSG_IMMEDIATE)
2095 CmiBecomeImmediate(env);
2096 if (opts & CK_MSG_SKIP_OR_IMM)
2097 _noCldEnqueue(pe, env);
2099 _skipCldEnqueue(pe, env, _infoIdx);
2103 class ElementDestroyer : public CkLocIterator {
2107 ElementDestroyer(CkLocMgr* mgr_):locMgr(mgr_){};
2108 void addLocation(CkLocation &loc) {
2113 void CkDeleteChares() {
2115 int numGroups = CkpvAccess(_groupIDTable)->size();
2117 // delete all plain chares
2118 #ifndef CMK_CHARE_USE_PTR
2119 for (i=0; i<CkpvAccess(chare_objs).size(); i++) {
2120 Chare *obj = (Chare*)CkpvAccess(chare_objs)[i];
2122 CkpvAccess(chare_objs)[i] = NULL;
2124 for (i=0; i<CkpvAccess(vidblocks).size(); i++) {
2125 VidBlock *obj = CkpvAccess(vidblocks)[i];
2127 CkpvAccess(vidblocks)[i] = NULL;
2131 // delete all array elements
2132 for(i=0;i<numGroups;i++) {
2133 IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2134 if(obj && obj->isLocMgr()) {
2135 CkLocMgr *mgr = (CkLocMgr*)obj;
2136 ElementDestroyer destroyer(mgr);
2137 mgr->iterate(destroyer);
2141 // delete all groups
2142 CmiImmediateLock(CkpvAccess(_groupTableImmLock));
2143 for(i=0;i<numGroups;i++) {
2144 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
2145 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
2146 if (obj) delete obj;
2148 CmiImmediateUnlock(CkpvAccess(_groupTableImmLock));
2150 // delete all node groups
2151 if (CkMyRank() == 0) {
2152 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2153 for(i=0;i<numNodeGroups;i++) {
2154 CkGroupID gID = CksvAccess(_nodeGroupIDTable)[i];
2155 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find(gID).getObj();
2156 if (obj) delete obj;
2161 #if CMK_BIGSIM_CHARM
2162 void CthEnqueueBigSimThread(CthThreadToken* token, int s,
2163 int pb,unsigned int *prio);
2166 //------------------- External client support (e.g. Charm4py) ----------------
2168 static std::vector< std::vector<char> > ext_args;
2169 static std::vector<char*> ext_argv;
2171 // This is just a wrapper for ConverseInit that copies command-line args into a private
2173 // To be called from external clients like charm4py. This wrapper avoids issues with
2175 extern "C" void StartCharmExt(int argc, char **argv) {
2176 #if !defined(_WIN32) && !NODE_0_IS_CONVHOST
2177 // only do this in net layers if using charmrun, so that output of process 0
2178 // doesn't get duplicated
2179 char *ns = getenv("NETSTART");
2182 if (-1 != (fd = open("/dev/null", O_RDWR))) {
2189 ext_args.resize(argc);
2190 ext_argv.resize(argc + 1, NULL);
2191 for (int i=0; i < argc; i++) {
2192 ext_args[i].resize(strlen(argv[i]) + 1);
2193 strcpy(ext_args[i].data(), argv[i]);
2194 ext_argv[i] = ext_args[i].data();
2196 ConverseInit(argc, ext_argv.data(), _initCharm, 0, 0);
2199 void (*CkRegisterMainModuleCallback)() = NULL;
2200 extern "C" void registerCkRegisterMainModuleCallback(void (*cb)()) {
2201 CkRegisterMainModuleCallback = cb;
2204 void (*MainchareCtorExtCallback)(int, void*, int, int, char **) = NULL;
2205 extern "C" void registerMainchareCtorExtCallback(void (*cb)(int, void*, int, int, char **)) {
2206 MainchareCtorExtCallback = cb;
2209 void (*ReadOnlyRecvExtCallback)(int, char*) = NULL;
2210 extern "C" void registerReadOnlyRecvExtCallback(void (*cb)(int, char*)) {
2211 ReadOnlyRecvExtCallback = cb;
2214 void* ReadOnlyExt::ro_data = NULL;
2215 size_t ReadOnlyExt::data_size = 0;
2217 void (*ChareMsgRecvExtCallback)(int, void*, int, int, char *, int) = NULL;
2218 extern "C" void registerChareMsgRecvExtCallback(void (*cb)(int, void*, int, int, char *, int)) {
2219 ChareMsgRecvExtCallback = cb;
2222 void (*GroupMsgRecvExtCallback)(int, int, int, char *, int) = NULL;
2223 extern "C" void registerGroupMsgRecvExtCallback(void (*cb)(int, int, int, char *, int)) {
2224 GroupMsgRecvExtCallback = cb;
2227 void (*ArrayMsgRecvExtCallback)(int, int, int *, int, int, char *, int) = NULL;
2228 extern "C" void registerArrayMsgRecvExtCallback(void (*cb)(int, int, int *, int, int, char *, int)) {
2229 ArrayMsgRecvExtCallback = cb;
2232 int (*ArrayElemLeaveExt)(int, int, int *, char**, int) = NULL;
2233 extern "C" void registerArrayElemLeaveExtCallback(int (*cb)(int, int, int *, char**, int)) {
2234 ArrayElemLeaveExt = cb;
2237 void (*ArrayElemJoinExt)(int, int, int *, int, char*, int) = NULL;
2238 extern "C" void registerArrayElemJoinExtCallback(void (*cb)(int, int, int *, int, char*, int)) {
2239 ArrayElemJoinExt = cb;
2242 void (*ArrayResumeFromSyncExtCallback)(int, int, int *) = NULL;
2243 extern "C" void registerArrayResumeFromSyncExtCallback(void (*cb)(int, int, int *)) {
2244 ArrayResumeFromSyncExtCallback = cb;
2247 void (*CreateReductionTargetMsgExt)(void*, int, int, int, char**, int*) = NULL;
2248 extern "C" void registerCreateReductionTargetMsgExtCallback(void (*cb)(void*, int, int, int, char**, int*)) {
2249 CreateReductionTargetMsgExt = cb;
2252 int (*PyReductionExt)(char**, int*, int, char**) = NULL;
2253 extern "C" void registerPyReductionExtCallback(int (*cb)(char**, int*, int, char**)) {
2254 PyReductionExt = cb;
2257 int (*ArrayMapProcNumExtCallback)(int, int, const int *) = NULL;
2258 extern "C" void registerArrayMapProcNumExtCallback(int (*cb)(int, int, const int *)) {
2259 ArrayMapProcNumExtCallback = cb;
2262 extern "C" int CkMyPeHook() { return CkMyPe(); }
2263 extern "C" int CkNumPesHook() { return CkNumPes(); }
2265 void ReadOnlyExt::setData(void *msg, size_t msgSize) {
2266 ro_data = malloc(msgSize);
2267 memcpy(ro_data, msg, msgSize);
2268 data_size = msgSize;
2271 void ReadOnlyExt::_roPup(void *pup_er) {
2272 PUP::er &p=*(PUP::er *)pup_er;
2273 if (!p.isUnpacking()) {
2274 //printf("[%d] Sizing/packing data, ro_data=%p, data_size=%d\n", CkMyPe(), ro_data, int(data_size));
2276 p((char*)ro_data, data_size);
2278 CkAssert(CkMyPe() != 0);
2279 CkAssert(ro_data == NULL);
2280 PUP::fromMem &p_mem = *(PUP::fromMem *)pup_er;
2282 //printf("[%d] Unpacking ro, size of data to unpack is %d\n", CkMyPe(), int(data_size));
2283 ReadOnlyRecvExtCallback(int(data_size), p_mem.get_current_pointer());
2284 p_mem.advance(data_size);
2288 CkpvExtern(int, _currentChareType);
2290 MainchareExt::MainchareExt(CkArgMsg *m) {
2291 int cIdx = CkpvAccess(_currentChareType);
2292 //printf("Constructor of MainchareExt, chareId=(%d,%p), chareIdx=%d\n", thishandle.onPE, thishandle.objPtr, cIdx);
2293 int ctorEpIdx = _mainTable[_chareTable[cIdx]->mainChareType()]->entryIdx;
2294 MainchareCtorExtCallback(thishandle.onPE, thishandle.objPtr, ctorEpIdx, m->argc, m->argv);
2298 GroupExt::GroupExt(void *impl_msg) {
2299 //printf("Constructor of GroupExt, gid=%d\n", thisgroup.idx);
2300 //int chareIdx = CkpvAccess(_groupTable)->find(thisgroup).getcIdx();
2301 int chareIdx = ckGetChareType();
2302 int ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
2303 CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
2304 char *impl_buf = impl_msg_typed->msgBuf;
2305 PUP::fromMem implP(impl_buf);
2306 int msgSize; implP|msgSize;
2307 int dcopy_start; implP|dcopy_start;
2308 GroupMsgRecvExtCallback(thisgroup.idx, ctorEpIdx, msgSize, impl_buf+(2*sizeof(int)),
2312 ArrayMapExt::ArrayMapExt(void *impl_msg) {
2313 //printf("Constructor of ArrayMapExt, gid=%d\n", thisgroup.idx);
2314 int chareIdx = ckGetChareType();
2315 int ctorEpIdx = _chareTable[chareIdx]->getDefaultCtor();
2316 CkMarshallMsg *impl_msg_typed = (CkMarshallMsg *)impl_msg;
2317 char *impl_buf = impl_msg_typed->msgBuf;
2318 PUP::fromMem implP(impl_buf);
2319 int msgSize; implP|msgSize;
2320 int dcopy_start; implP|dcopy_start;
2321 GroupMsgRecvExtCallback(thisgroup.idx, ctorEpIdx, msgSize, impl_buf+(2*sizeof(int)),
2327 int CkCreateGroupExt(int cIdx, int eIdx, int num_bufs, char **bufs, int *buf_sizes) {
2328 //static_cast<void>(impl_e_opts);
2329 CkAssert(num_bufs >= 1);
2331 for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2332 int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2);
2333 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2334 PUP::toMem implP((void *)impl_msg->msgBuf);
2337 for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2338 UsrToEnv(impl_msg)->setMsgtype(BocInitMsg);
2340 // UsrToEnv(impl_msg)->setGroupDep(impl_e_opts->getGroupDepID());
2341 CkGroupID gId = CkCreateGroup(cIdx, eIdx, impl_msg);
2347 int CkCreateArrayExt(int cIdx, int ndims, int *dims, int eIdx, int num_bufs,
2348 char **bufs, int *buf_sizes, int map_gid, char useAtSync) {
2349 //static_cast<void>(impl_e_opts);
2350 CkAssert(num_bufs >= 1);
2352 for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2353 int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2 + sizeof(char));
2354 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2355 PUP::toMem implP((void *)impl_msg->msgBuf);
2359 for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2360 CkArrayOptions opts;
2362 opts = CkArrayOptions(ndims, dims);
2365 map_gId.idx = map_gid;
2366 opts.setMap(CProxy_Group(map_gId));
2368 UsrToEnv(impl_msg)->setMsgtype(ArrayEltInitMsg);
2369 //CkArrayID gId = ckCreateArray((CkArrayMessage *)impl_msg, eIdx, opts);
2370 CkGroupID gId = CProxyElement_ArrayElement::ckCreateArray((CkArrayMessage *)impl_msg, eIdx, opts);
2376 void CkInsertArrayExt(int aid, int ndims, int *index, int epIdx, int onPE, int num_bufs,
2377 char **bufs, int *buf_sizes, char useAtSync) {
2378 CkAssert(num_bufs >= 1);
2380 for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2381 int marshall_msg_size = (sizeof(char)*totalSize + sizeof(int)*2 + sizeof(char));
2382 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2383 PUP::toMem implP((void *)impl_msg->msgBuf);
2387 for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2389 UsrToEnv(impl_msg)->setMsgtype(ArrayEltInitMsg);
2390 CkArrayIndex newIdx(ndims, index);
2393 CProxy_ArrayBase(gId).ckInsertIdx((CkArrayMessage *)impl_msg, epIdx, onPE, newIdx);
2397 void CkMigrateExt(int aid, int ndims, int *index, int toPe) {
2398 //printf("[charm] CkMigrateMeExt called with aid: %d, ndims: %d, index: %d, toPe: %d\n",
2399 //aid, ndims, *index, toPe);
2402 CkArrayIndex arrayIndex(ndims, index);
2403 CProxyElement_ArrayBase arrayProxy = CProxyElement_ArrayBase(gId, arrayIndex);
2404 ArrayElement* arrayElement = arrayProxy.ckLocal();
2405 CkAssert(arrayElement != NULL);
2406 arrayElement->migrateMe(toPe);
2410 void CkArrayDoneInsertingExt(int aid) {
2413 CProxy_ArrayBase(gId).doneInserting();
2417 int CkGroupGetReductionNumber(int g_id) {
2420 return ((Group*)CkLocalBranch(gId))->getRedNo();
2424 int CkArrayGetReductionNumber(int aid, int ndims, int *index) {
2427 CkArrayIndex arrayIndex(ndims, index);
2428 CProxyElement_ArrayBase arrayProxy = CProxyElement_ArrayBase(gId, arrayIndex);
2429 ArrayElement* arrayElement = arrayProxy.ckLocal();
2430 CkAssert(arrayElement != NULL);
2431 return arrayElement->getRedNo();
2435 void CkChareExtSend(int onPE, void *objPtr, int epIdx, char *msg, int msgSize) {
2436 //ckCheck(); // checks that gid is not zero
2437 int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2438 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2439 PUP::toMem implP((void *)impl_msg->msgBuf);
2443 implP(msg, msgSize);
2445 chareID.onPE = onPE;
2446 chareID.objPtr = objPtr;
2448 CkSendMsg(epIdx, impl_msg, &chareID);
2452 void CkChareExtSend_multi(int onPE, void *objPtr, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2453 CkAssert(num_bufs >= 1);
2454 //ckCheck(); // checks that gid is not zero
2456 for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2457 int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2458 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2459 PUP::toMem implP((void *)impl_msg->msgBuf);
2462 implP | buf_sizes[0];
2463 for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2465 chareID.onPE = onPE;
2466 chareID.objPtr = objPtr;
2468 CkSendMsg(epIdx, impl_msg, &chareID);
2472 void CkGroupExtSend(int gid, int pe, int epIdx, char *msg, int msgSize) {
2473 //ckCheck(); // checks that gid is not zero
2474 int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2475 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2476 PUP::toMem implP((void *)impl_msg->msgBuf);
2480 implP(msg, msgSize);
2485 CkBroadcastMsgBranch(epIdx, impl_msg, gId, 0);
2487 CkSendMsgBranch(epIdx, impl_msg, pe, gId, 0);
2491 void CkGroupExtSend_multi(int gid, int pe, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2492 CkAssert(num_bufs >= 1);
2493 //ckCheck(); // checks that gid is not zero
2495 for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2496 int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2497 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2498 PUP::toMem implP((void *)impl_msg->msgBuf);
2501 implP | buf_sizes[0];
2502 for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2507 CkBroadcastMsgBranch(epIdx, impl_msg, gId, 0);
2509 CkSendMsgBranch(epIdx, impl_msg, pe, gId, 0);
2513 void CkArrayExtSend(int aid, int *idx, int ndims, int epIdx, char *msg, int msgSize) {
2514 int marshall_msg_size = (sizeof(char)*msgSize + 3*sizeof(int));
2515 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2516 PUP::toMem implP((void *)impl_msg->msgBuf);
2520 implP(msg, msgSize);
2521 UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);
2522 CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;
2523 impl_amsg->array_setIfNotThere(CkArray_IfNotThere_buffer);
2527 CkArrayIndex arrIndex(ndims, idx);
2528 // TODO is there a better function for this?
2529 CProxyElement_ArrayBase::ckSendWrapper(gId, arrIndex, impl_amsg, epIdx, 0);
2530 } else { // broadcast
2531 CkBroadcastMsgArray(epIdx, impl_amsg, gId, 0);
2536 void CkArrayExtSend_multi(int aid, int *idx, int ndims, int epIdx, int num_bufs, char **bufs, int *buf_sizes) {
2537 CkAssert(num_bufs >= 1);
2539 for (int i=0; i < num_bufs; i++) totalSize += buf_sizes[i];
2540 int marshall_msg_size = (sizeof(char)*totalSize + 3*sizeof(int));
2541 CkMarshallMsg *impl_msg = CkAllocateMarshallMsg(marshall_msg_size, NULL);
2542 PUP::toMem implP((void *)impl_msg->msgBuf);
2545 implP | buf_sizes[0];
2546 for (int i=0; i < num_bufs; i++) implP(bufs[i], buf_sizes[i]);
2547 UsrToEnv(impl_msg)->setMsgtype(ForArrayEltMsg);
2548 CkArrayMessage *impl_amsg=(CkArrayMessage *)impl_msg;
2549 impl_amsg->array_setIfNotThere(CkArray_IfNotThere_buffer);
2553 CkArrayIndex arrIndex(ndims, idx);
2554 // TODO is there a better function for this?
2555 CProxyElement_ArrayBase::ckSendWrapper(gId, arrIndex, impl_amsg, epIdx, 0);
2556 } else { // broadcast
2557 CkBroadcastMsgArray(epIdx, impl_amsg, gId, 0);
2561 //------------------- Message Watcher (record/replay) ----------------
2565 CkpvDeclare(int, envelopeEventID);
2566 int _recplay_crc = 0;
2567 int _recplay_checksum = 0;
2568 int _recplay_logsize = 1024*1024;
2570 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
2571 #define REPLAYDEBUG(args) /* empty */
2573 CkMessageWatcher::~CkMessageWatcher() { if (next!=NULL) delete next;}
2575 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
2576 #include "BaseLB.h" /* For LBMigrateMsg message */
2578 #if CMK_REPLAYSYSTEM
2579 static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
2580 std::string fName = CkpvAccess(traceRoot);
2582 fName += std::to_string(CkMyPe());
2584 FILE *f = fopen(fName.c_str(), permissions);
2585 REPLAYDEBUG("openReplayfile " << fName.c_str());
2587 CkPrintf("[%d] Could not open replay file '%s' with permissions '%w'\n",
2588 CkMyPe(), fName.c_str(), permissions);
2589 CkAbort("openReplayFile> Could not open replay file");
2594 class CkMessageRecorder : public CkMessageWatcher {
2595 unsigned int curpos;
2597 std::vector<char> buffer;
2599 CkMessageRecorder(FILE *f_): curpos(0), firstOpen(true), buffer(_recplay_logsize) { f=f_; }
2600 ~CkMessageRecorder() {
2602 fprintf(f,"-1 -1 -1 ");
2605 FILE *stsfp = fopen("sts", "w");
2606 void traceWriteSTS(FILE *stsfp,int nUserEvents);
2607 traceWriteSTS(stsfp, 0);
2610 CkPrintf("[%d] closing log at %f.\n", CkMyPe(), CmiWallTimer());
2614 void flushLog(int verbose=1) {
2615 if (verbose) CkPrintf("[%d] flushing log\n", CkMyPe());
2616 fprintf(f, "%s", buffer.data());
2619 virtual bool process(envelope **envptr,CkCoreState *ck) {
2620 if ((*envptr)->getEvent()) {
2621 bool wasPacked = (*envptr)->isPacked();
2622 if (!wasPacked) CkPackMessage(envptr);
2623 envelope *env = *envptr;
2624 unsigned int crc1=0, crc2=0;
2626 //unsigned int crc = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2627 crc1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2628 crc2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2629 } else if (_recplay_checksum) {
2630 crc1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2631 crc2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2633 curpos+=sprintf(&buffer[curpos],"%d %d %d %d %x %x %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc1, crc2, env->getEpIdx());
2634 if (curpos > _recplay_logsize-128) flushLog();
2635 if (!wasPacked) CkUnpackMessage(envptr);
2639 virtual bool process(CthThreadToken *token,CkCoreState *ck) {
2640 curpos+=sprintf(&buffer[curpos], "%d %d %d\n",CkMyPe(), -2, token->serialNo);
2641 if (curpos > _recplay_logsize-128) flushLog();
2645 virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2647 if (firstOpen) f = openReplayFile("ckreplay_",".lb","w");
2648 else f = openReplayFile("ckreplay_",".lb","a");
2652 p | (*msg)->n_moves; // Need to store to be able to reload the message during replay
2660 class CkMessageDetailRecorder : public CkMessageWatcher {
2662 CkMessageDetailRecorder(FILE *f_) {
2664 /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
2665 * The value of 'x' is the pointer size.
2667 CmiUInt2 little = sizeof(void*);
2668 fwrite(&little, 2, 1, f);
2670 ~CkMessageDetailRecorder() {fclose(f);}
2672 virtual bool process(envelope **envptr, CkCoreState *ck) {
2673 bool wasPacked = (*envptr)->isPacked();
2674 if (!wasPacked) CkPackMessage(envptr);
2675 envelope *env = *envptr;
2676 CmiUInt4 size = env->getTotalsize();
2677 fwrite(&size, 4, 1, f);
2678 fwrite(env, env->getTotalsize(), 1, f);
2679 if (!wasPacked) CkUnpackMessage(envptr);
2684 extern "C" void CkMessageReplayQuiescence(void *rep, double time);
2685 extern "C" void CkMessageDetailReplayDone(void *rep, double time);
2687 class CkMessageReplay : public CkMessageWatcher {
2689 int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
2691 unsigned int crc1, crc2;
2693 /// Read the next message we need from the file:
2694 void getNext(void) {
2695 if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
2697 // We are reading a regular message
2698 if (4!=fscanf(f,"%d%x%x%d", &nexttype,&crc1,&crc2,&nextEP)) {
2699 CkAbort("CkMessageReplay> Syntax error reading replay file");
2701 REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2702 } else if (nextSize == -2) {
2703 // We are reading a special message (right now only thread awaken)
2704 // Nothing to do since we have already read all info
2705 REPLAYDEBUG("getNext: "<<nextPE<<" " << nextSize << " " << nextEvent)
2706 } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
2707 CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
2708 CkAbort("CkMessageReplay> Unrecognized input");
2711 if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
2712 CkAbort("CkMessageReplay> Syntax error reading replay file");
2713 nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
2718 /// If this is the next message we need, advance and return true.
2719 bool isNext(envelope *env) {
2720 if (nextPE!=env->getSrcPe()) return false;
2721 if (nextEvent!=env->getEvent()) return false;
2722 if (nextSize<0) return false; // not waiting for a regular message
2724 if (nextEP != env->getEpIdx()) {
2725 CkPrintf("[%d] CkMessageReplay> Message EP changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2729 #if ! CMK_BIGSIM_CHARM
2730 if (nextSize!=env->getTotalsize())
2732 CkPrintf("[%d] CkMessageReplay> Message size changed during replay org: [%d %d %d %d] got: [%d %d %d %d]\n", CkMyPe(), nextPE, nextSize, nextEvent, nextEP, env->getSrcPe(), env->getTotalsize(), env->getEvent(), env->getEpIdx());
2735 if (_recplay_crc || _recplay_checksum) {
2736 bool wasPacked = env->isPacked();
2737 if (!wasPacked) CkPackMessage(&env);
2739 //unsigned int crcnew = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
2740 unsigned int crcnew1 = crc32_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2741 unsigned int crcnew2 = crc32_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2742 if (crcnew1 != crc1) {
2743 CkPrintf("CkMessageReplay %d> Envelope CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2745 if (crcnew2 != crc2) {
2746 CkPrintf("CkMessageReplay %d> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2748 } else if (_recplay_checksum) {
2749 unsigned int crcnew1 = checksum_initial(((unsigned char*)env)+CmiMsgHeaderSizeBytes, sizeof(*env)-CmiMsgHeaderSizeBytes);
2750 unsigned int crcnew2 = checksum_initial(((unsigned char*)env)+sizeof(*env), env->getTotalsize()-sizeof(*env));
2751 if (crcnew1 != crc1) {
2752 CkPrintf("CkMessageReplay %d> Envelope Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc1,crcnew1);
2754 if (crcnew2 != crc2) {
2755 CkPrintf("CkMessageReplay %d> Message Checksum changed during replay org: [0x%x] got: [0x%x]\n",CkMyPe(),crc2,crcnew2);
2758 if (!wasPacked) CkUnpackMessage(&env);
2763 bool isNext(CthThreadToken *token) {
2764 if (nextPE==CkMyPe() && nextSize==-2 && nextEvent==token->serialNo) return true;
2768 /// This is a (short) list of messages we aren't yet ready for:
2769 CkQ<envelope *> delayedMessages;
2770 /// This is a (short) list of tokens (i.e messages that awake user-threads) we aren't yet ready for:
2771 CkQ<CthThreadToken *> delayedTokens;
2773 /// Try to flush out any delayed messages
2776 int len=delayedMessages.length();
2777 for (int i=0;i<len;i++) {
2778 envelope *env=delayedMessages.deq();
2779 if (isNext(env)) { /* this is the next message: process it */
2780 REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2781 CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
2784 else /* Not ready yet-- put it back in the
2787 REPLAYDEBUG("requeueing delayed message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" ep:"<<env->getEpIdx())
2788 delayedMessages.enq(env);
2791 } else if (nextSize==-2) {
2792 int len=delayedTokens.length();
2793 for (int i=0;i<len;++i) {
2794 CthThreadToken *token=delayedTokens.deq();
2795 if (isNext(token)) {
2796 REPLAYDEBUG("Dequeueing token: "<<token->serialNo)
2797 #if ! CMK_BIGSIM_CHARM
2798 CsdEnqueueLifo((void*)token);
2800 CthEnqueueBigSimThread(token,0,0,NULL);
2804 REPLAYDEBUG("requeueing delayed token: "<<token->serialNo)
2805 delayedTokens.enq(token);
2812 CkMessageReplay(FILE *f_) : lbFile(NULL) {
2816 REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
2818 if (CkMyPe()==0) CmiStartQD(CkMessageReplayQuiescence, this);
2821 ~CkMessageReplay() {fclose(f);}
2824 virtual bool process(envelope **envptr,CkCoreState *ck) {
2825 bool wasPacked = (*envptr)->isPacked();
2826 if (!wasPacked) CkPackMessage(envptr);
2827 envelope *env = *envptr;
2828 //CkAssert(*(int*)env == 0x34567890);
2829 REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx() << " ep:" << env->getEpIdx());
2830 if (env->getEvent() == 0) return true;
2831 if (isNext(env)) { /* This is the message we were expecting */
2832 REPLAYDEBUG("Executing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
2833 getNext(); /* Advance over this message */
2834 flush(); /* try to process queued-up stuff */
2835 if (!wasPacked) CkUnpackMessage(envptr);
2839 else if (env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg) {
2840 // try next rank, we can't just buffer the msg and left
2841 // we need to keep unprocessed msg on the fly
2842 int nextpe = CkMyPe()+1;
2843 if (nextpe == CkNodeFirst(CkMyNode())+CkMyNodeSize())
2844 nextpe = CkNodeFirst(CkMyNode());
2845 CmiSyncSendAndFree(nextpe,env->getTotalsize(),(char *)env);
2849 else /*!isNext(env) */ {
2850 REPLAYDEBUG("Queueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent()<<" "<<env->getEpIdx()
2851 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent << " " << nextEP)
2852 delayedMessages.enq(env);
2857 virtual bool process(CthThreadToken *token, CkCoreState *ck) {
2858 REPLAYDEBUG("ProcessToken token: "<<token->serialNo);
2859 if (isNext(token)) {
2860 REPLAYDEBUG("Executing token: "<<token->serialNo)
2865 REPLAYDEBUG("Queueing token: "<<token->serialNo
2866 <<" because we wanted "<<nextPE<<" "<<nextSize<<" "<<nextEvent)
2867 delayedTokens.enq(token);
2872 virtual bool process(LBMigrateMsg **msg,CkCoreState *ck) {
2873 if (lbFile == NULL) lbFile = openReplayFile("ckreplay_",".lb","r");
2874 if (lbFile != NULL) {
2876 PUP::fromDisk p(lbFile);
2878 if (num_moves != (*msg)->n_moves) {
2880 *msg = new (num_moves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
2888 class CkMessageDetailReplay : public CkMessageWatcher {
2890 CmiUInt4 size; size_t nread;
2891 if ((nread=fread(&size, 4, 1, f)) < 1) {
2892 if (feof(f)) return NULL;
2893 CkPrintf("Broken record file (metadata) got %d\n",nread);
2896 void *env = CmiAlloc(size);
2897 long tell = ftell(f);
2898 if ((nread=fread(env, size, 1, f)) < 1) {
2899 CkPrintf("Broken record file (data) expecting %d, got %d (file position %lld)\n",size,nread,tell);
2902 //*(int*)env = 0x34567890; // set first integer as magic
2907 CkMessageDetailReplay(FILE *f_) {
2909 starttime=CkWallTimer();
2910 /* This must match what CkMessageDetailRecorder did */
2912 fread(&little, 2, 1, f);
2913 if (little != sizeof(void*)) {
2914 CkAbort("Replaying on a different architecture from which recording was done!");
2917 CsdEnqueue(getNext());
2919 CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE, (CcdVoidFn)CkMessageDetailReplayDone, (void*)this);
2921 virtual bool process(envelope **env,CkCoreState *ck) {
2922 void *msg = getNext();
2923 if (msg != NULL) CsdEnqueue(msg);
2928 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
2929 #if ! CMK_BIGSIM_CHARM
2930 CkPrintf("[%d] Quiescence detected\n",CkMyPe());
2932 CkMessageReplay *replay = (CkMessageReplay*)rep;
2933 //CmiStartQD(CkMessageReplayQuiescence, replay);
2936 extern "C" void CkMessageDetailReplayDone(void *rep, double time) {
2937 CkMessageDetailReplay *replay = (CkMessageDetailReplay *)rep;
2938 CkPrintf("[%d] Detailed replay finished after %f seconds. Exiting.\n",CkMyPe(),CkWallTimer()-replay->starttime);
2943 static bool CpdExecuteThreadResume(CthThreadToken *token) {
2944 CkCoreState *ck = CkpvAccess(_coreState);
2945 if (ck->watcher!=NULL) {
2946 return ck->watcher->processThread(token,ck);
2951 CpvExtern(int, CthResumeNormalThreadIdx);
2952 extern "C" void CthResumeNormalThreadDebug(CthThreadToken* token)
2954 CthThread t = token->thread;
2960 #if CMK_TRACE_ENABLED
2961 #if ! CMK_TRACE_IN_CHARM
2962 if(CpvAccess(traceOn))
2964 /* if(CpvAccess(_traceCoreOn))
2965 resumeTraceCore();*/
2969 CthSetPrev(t, CthSelf());
2971 /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
2972 if (CpdExecuteThreadResume(token)) {
2976 CthScheduledDecrement();
2977 CthSetPrev(CthSelf(), 0);
2981 void CpdHandleLBMessage(LBMigrateMsg **msg) {
2982 CkCoreState *ck = CkpvAccess(_coreState);
2983 if (ck->watcher!=NULL) {
2984 ck->watcher->processLBMessage(msg, ck);
2988 #if CMK_BIGSIM_CHARM
2989 CpvExtern(int , CthResumeBigSimThreadIdx);
2992 #include "ckliststring.h"
2993 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
2994 CmiArgGroup("Charm++","Record/Replay");
2995 bool forceReplay = false;
2998 if (CmiGetArgFlagDesc(argv,"+recplay-crc","Enable CRC32 checksum for message record-replay")) {
2999 if(CmiMyRank() == 0) _recplay_crc = 1;
3001 if (CmiGetArgFlagDesc(argv,"+recplay-xor","Enable simple XOR checksum for message record-replay")) {
3002 if(CmiMyRank() == 0) _recplay_checksum = 1;
3005 if(CmiGetArgIntDesc(argv,"+recplay-logsize",&tmplogsize,"Specify the size of the buffer used by the message recorder"))
3007 if(CmiMyRank() == 0) _recplay_logsize = tmplogsize;
3009 REPLAYDEBUG("CkMessageWatcherInit ");
3010 if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
3011 #if CMK_REPLAYSYSTEM
3012 CkListString list(procs);
3013 if (list.includes(CkMyPe())) {
3014 CkPrintf("Charm++> Recording full detail for processor %d\n",CkMyPe());
3015 CpdSetInitializeMemory(1);
3016 ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
3019 CkAbort("Option `+record-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
3022 if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
3023 #if CMK_REPLAYSYSTEM
3024 if (CkMyPe() == 0) {
3025 CmiPrintf("Charm++> record mode.\n");
3026 if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
3027 CmiPrintf("Charm++> Warning: disabling recording for message integrity detection (requires linking with -memory charmdebug)\n");
3028 _recplay_crc = _recplay_checksum = 0;
3031 CpdSetInitializeMemory(1);
3032 CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3033 ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
3035 CkAbort("Option `+record' requires that record-replay support be enabled at configure time (--enable-replay)");
3038 if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
3039 #if CMK_REPLAYSYSTEM
3041 CpdSetInitializeMemory(1);
3042 // Set the parameters of the processor
3043 #if CMK_SHARED_VARS_UNAVAILABLE
3044 _Cmi_mype = atoi(procs);
3045 while (procs[0]!='/') procs++;
3047 _Cmi_numpes = atoi(procs);
3049 CkAbort("+replay-detail available only for non-SMP build");
3052 ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
3054 CkAbort("Option `+replay-detail' requires that record-replay support be enabled at configure time (--enable-replay)");
3057 if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream") || forceReplay) {
3058 #if CMK_REPLAYSYSTEM
3059 if (CkMyPe() == 0) {
3060 CmiPrintf("Charm++> replay mode.\n");
3061 if (!CmiMemoryIs(CMI_MEMORY_IS_CHARMDEBUG)) {
3062 CmiPrintf("Charm++> Warning: disabling message integrity detection during replay (requires linking with -memory charmdebug)\n");
3063 _recplay_crc = _recplay_checksum = 0;
3066 CpdSetInitializeMemory(1);
3067 #if ! CMK_BIGSIM_CHARM
3068 CmiNumberHandler(CpvAccess(CthResumeNormalThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3070 CkNumberHandler(CpvAccess(CthResumeBigSimThreadIdx), (CmiHandler)CthResumeNormalThreadDebug);
3072 ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
3074 CkAbort("Option `+replay' requires that record-replay support be enabled at configure time (--enable-replay)");
3077 if (_recplay_crc && _recplay_checksum) {
3078 CmiAbort("Both +recplay-crc and +recplay-checksum options specified, only one allowed.");
3083 int CkMessageToEpIdx(void *msg) {
3084 envelope *env=UsrToEnv(msg);
3085 int ep=env->getEpIdx();
3086 if (ep==CkIndex_CkArray::recvBroadcast(0))
3087 return env->getsetArrayBcastEp();
3093 int getCharmEnvelopeSize() {
3094 return sizeof(envelope);
3097 /// Best-effort guess at whether @arg msg points at a charm envelope
3099 int isCharmEnvelope(void *msg) {
3100 envelope *e = (envelope *)msg;
3101 if (SIZEFIELD(msg) < sizeof(envelope)) return 0;
3102 if (SIZEFIELD(msg) < e->getTotalsize()) return 0;
3103 if (e->getTotalsize() < sizeof(envelope)) return 0;
3104 if (e->getEpIdx()<=0 || e->getEpIdx()>=_entryTable.size()) return 0;
3106 if (e->getSrcPe()>=CkNumPes()+CkNumNodes()) return 0;
3108 if (e->getSrcPe()>=CkNumPes()) return 0;
3110 if (e->getMsgtype()<=0 || e->getMsgtype()>=LAST_CK_ENVELOPE_TYPE) return 0;
3114 #include "CkMarshall.def.h"