12 #include "ckfutures.h"
13 #include "TopoManager.h"
15 #if CMK_ERROR_CHECKING
16 #define _CHECK_VALID(p, msg) do {if((p)==0){CkAbort(msg);}} while(0)
18 #define _CHECK_VALID(p, msg) do { } while(0)
21 // Flag that tells the system if we are replaying using Record/Replay
22 extern "C" int _replaySystem
;
25 extern "C" int ConverseDeliver(int pe
);
26 inline void _CldEnqueue(int pe
, void *msg
, int infofn
) {
27 if (!ConverseDeliver(pe
)) {
32 if(((envelope
*)msg
)->isRdma()){
33 CkRdmaPrepareMsg((envelope
**)&msg
, pe
);
36 CldEnqueue(pe
, msg
, infofn
);
38 inline void _CldEnqueueMulti(int npes
, int *pes
, void *msg
, int infofn
) {
39 if (!ConverseDeliver(-1)) {
43 CldEnqueueMulti(npes
, pes
, msg
, infofn
);
45 inline void _CldEnqueueGroup(CmiGroup grp
, void *msg
, int infofn
) {
46 if (!ConverseDeliver(-1)) {
50 CldEnqueueGroup(grp
, msg
, infofn
);
52 inline void _CldNodeEnqueue(int node
, void *msg
, int infofn
) {
53 if (!ConverseDeliver(node
)) {
58 if(((envelope
*)msg
)->isRdma()){
59 CkRdmaPrepareMsg((envelope
**)&msg
, CmiNodeFirst(node
));
62 CldNodeEnqueue(node
, msg
, infofn
);
66 inline void _CldEnqueue(int pe
, void *msg
, int infofn
) {
68 if(((envelope
*)msg
)->isRdma()){
69 CkRdmaPrepareMsg((envelope
**)&msg
, pe
);
72 CldEnqueue(pe
, msg
, infofn
);
75 inline void _CldNodeEnqueue(int node
, void *msg
, int infofn
) {
77 if(((envelope
*)msg
)->isRdma()){
78 CkRdmaPrepareMsg((envelope
**)&msg
, CmiNodeFirst(node
));
81 CldNodeEnqueue(node
, msg
, infofn
);
83 #define _CldEnqueueMulti CldEnqueueMulti
84 #define _CldEnqueueGroup CldEnqueueGroup
87 #ifndef CMK_CHARE_USE_PTR
88 CkpvExtern(CkVec
<void *>, chare_objs
);
91 /// A set of "Virtual ChareID"'s
93 enum VidState
{FILLED
, UNFILLED
};
97 void msgDeliver(envelope
*env
) {
98 // This was causing sync entry methods not to return properly in some cases
99 //env->setSrcPe(CkMyPe());
100 env
->setMsgtype(ForChareMsg
);
101 env
->setObjPtr(actualID
.objPtr
);
102 _CldEnqueue(actualID
.onPE
, env
, _infoIdx
);
103 CpvAccess(_qd
)->create();
107 void send(envelope
*env
) {
108 if(state
==UNFILLED
) {
109 msgQ
->enq((void *)env
);
114 void fill(int onPE
, void *oPtr
) {
116 actualID
.onPE
= onPE
;
117 actualID
.objPtr
= oPtr
;
119 while(NULL
!=(env
=(envelope
*)msgQ
->deq())) {
124 void *getLocalChare(void) {
125 if (state
==FILLED
&& actualID
.onPE
==CkMyPe())
126 return actualID
.objPtr
;
129 void *getLocalChareObj(void) {
130 // returns actual object, different when CMK_CHARE_USE_PTR is false
131 if (state
==FILLED
&& actualID
.onPE
==CkMyPe())
132 #ifdef CMK_CHARE_USE_PTR
133 return actualID
.objPtr
;
135 return CkpvAccess(chare_objs
)[(CmiIntPtr
)actualID
.objPtr
];
139 void pup(PUP::er
&p
) {
140 #ifndef CMK_CHARE_USE_PTR
142 if (!p
.isUnpacking()) s
= state
-FILLED
;
144 if (p
.isUnpacking()) state
= (VidState
)(FILLED
+s
);
145 if (p
.isUnpacking()) msgQ
= NULL
; // fixme
153 /// Message watcher: for record/replay support
154 class CkMessageWatcher
{
157 CkMessageWatcher
*next
;
159 CkMessageWatcher() : f(NULL
), next(NULL
) { }
160 virtual ~CkMessageWatcher();
162 * This message is about to be processed by Charm.
163 * If this function returns false, the message will not be processed.
164 * The message is processed by the watcher starting from the innermost one
165 * up to the outermost
167 #define PROCESS_MACRO(name,type) inline bool process##name(type *input,CkCoreState *ck) { \
168 bool result = true; \
169 if (next != NULL) result &= next->process##name(input, ck); \
170 result &= process(input, ck); \
174 PROCESS_MACRO(Message
,envelope
*);
175 PROCESS_MACRO(Thread
,CthThreadToken
);
176 PROCESS_MACRO(LBMessage
,LBMigrateMsg
*);
180 /** These are used internally by this class to call the correct subclass method */
181 virtual bool process(envelope
**env
,CkCoreState
*ck
) =0;
182 virtual bool process(CthThreadToken
*token
, CkCoreState
*ck
) {return true;}
183 virtual bool process(LBMigrateMsg
**msg
, CkCoreState
*ck
) {return true;}
185 inline void setNext(CkMessageWatcher
*w
) { next
= w
; }
188 /// All the state that's useful to have on the receive side in the Charm Core (ck.C)
190 GroupTable
*groupTable
;
193 CkMessageWatcher
*watcher
;
194 /** Adds an extra watcher (which wrap the previously existing one) */
195 inline void addWatcher(CkMessageWatcher
*w
) {
201 :groupTable(CkpvAccess(_groupTable
)),
202 qd(CpvAccess(_qd
)) { watcher
=NULL
; }
203 ~CkCoreState() { delete watcher
;}
205 inline GroupTable
*getGroupTable() {
208 inline IrrGroup
*localBranch(CkGroupID gID
) {
209 return groupTable
->find(gID
).getObj();
212 inline QdState
*getQD() {return qd
;}
213 // when in interrupt based net version, use the extra copy
214 // of qd when inside an immediate handler function.
215 inline void process(int n
=1) {
216 if (CmiImmIsRunning())
217 CpvAccessOther(_qd
, 1)->process(n
);
221 inline void create(int n
=1) {
222 if (CmiImmIsRunning())
223 CpvAccessOther(_qd
, 1)->create(n
);
229 CkpvExtern(CkCoreState
*, _coreState
);
231 void CpdHandleLBMessage(LBMigrateMsg
**msg
);
232 void CkMessageWatcherInit(char **argv
,CkCoreState
*ck
);
234 extern void _processHandler(void *converseMsg
,CkCoreState
*ck
);
235 extern void _processBocInitMsg(CkCoreState
*ck
,envelope
*msg
);
236 extern void _processNodeBocInitMsg(CkCoreState
*ck
,envelope
*msg
);
237 extern void _infoFn(void *msg
, CldPackFn
*pfn
, int *len
,
238 int *queueing
, int *priobits
, UInt
**prioptr
);
239 extern void CkCreateLocalGroup(CkGroupID groupID
, int eIdx
, envelope
*env
);
240 extern void CkCreateLocalNodeGroup(CkGroupID groupID
, int eIdx
, envelope
*env
);
241 extern void _createGroup(CkGroupID groupID
, envelope
*env
);
242 extern void _createNodeGroup(CkGroupID groupID
, envelope
*env
);
243 extern int _getGroupIdx(int,int,int);