12 #include "ckfutures.h"
13 #include "TopoManager.h"
15 #if CMK_ERROR_CHECKING
16 #define _CHECK_VALID(p, msg) do {if((p)==0){CkAbort(msg);}} while(0)
18 #define _CHECK_VALID(p, msg) do { } while(0)
21 // Flag that tells the system if we are replaying using Record/Replay
22 CMI_EXTERNC_VARIABLE
int _replaySystem
;
25 extern "C" int ConverseDeliver(int pe
);
26 inline void _CldEnqueue(int pe
, void *msg
, int infofn
) {
27 if (!ConverseDeliver(pe
)) {
32 if(((envelope
*)msg
)->isRdma()){
33 CkRdmaPrepareMsg((envelope
**)&msg
, pe
);
36 CldEnqueue(pe
, msg
, infofn
);
38 inline void _CldEnqueueMulti(int npes
, int *pes
, void *msg
, int infofn
) {
39 if (!ConverseDeliver(-1)) {
43 CldEnqueueMulti(npes
, pes
, msg
, infofn
);
45 inline void _CldEnqueueGroup(CmiGroup grp
, void *msg
, int infofn
) {
46 if (!ConverseDeliver(-1)) {
50 CldEnqueueGroup(grp
, msg
, infofn
);
52 inline void _CldNodeEnqueue(int node
, void *msg
, int infofn
) {
53 if (!ConverseDeliver(node
)) {
58 if(((envelope
*)msg
)->isRdma()){
59 CkRdmaPrepareMsg((envelope
**)&msg
, CmiNodeFirst(node
));
62 CldNodeEnqueue(node
, msg
, infofn
);
66 inline void _CldEnqueue(int pe
, void *msg
, int infofn
) {
68 if(((envelope
*)msg
)->isRdma()){
69 CkRdmaPrepareMsg((envelope
**)&msg
, pe
);
72 CldEnqueue(pe
, msg
, infofn
);
75 inline void _CldNodeEnqueue(int node
, void *msg
, int infofn
) {
77 if(((envelope
*)msg
)->isRdma()){
78 CkRdmaPrepareMsg((envelope
**)&msg
, CmiNodeFirst(node
));
81 CldNodeEnqueue(node
, msg
, infofn
);
83 #define _CldEnqueueMulti CldEnqueueMulti
84 #define _CldEnqueueGroup CldEnqueueGroup
87 #ifndef CMK_CHARE_USE_PTR
88 CkpvExtern(std::vector
<void *>, chare_objs
);
91 #include <unordered_map>
92 typedef std::unordered_map
<CmiUInt8
, ArrayElement
*> ArrayObjMap
;
93 CkpvExtern(ArrayObjMap
, array_objs
);
95 /// A set of "Virtual ChareID"'s
97 enum VidState
: uint8_t {FILLED
, UNFILLED
};
101 void msgDeliver(envelope
*env
) {
102 // This was causing sync entry methods not to return properly in some cases
103 //env->setSrcPe(CkMyPe());
104 env
->setMsgtype(ForChareMsg
);
105 env
->setObjPtr(actualID
.objPtr
);
106 _CldEnqueue(actualID
.onPE
, env
, _infoIdx
);
107 CpvAccess(_qd
)->create();
111 void send(envelope
*env
) {
112 if(state
==UNFILLED
) {
113 msgQ
->enq((void *)env
);
118 void fill(int onPE
, void *oPtr
) {
120 actualID
.onPE
= onPE
;
121 actualID
.objPtr
= oPtr
;
123 while(NULL
!=(env
=(envelope
*)msgQ
->deq())) {
128 void *getLocalChare(void) {
129 if (state
==FILLED
&& actualID
.onPE
==CkMyPe())
130 return actualID
.objPtr
;
133 void *getLocalChareObj(void) {
134 // returns actual object, different when CMK_CHARE_USE_PTR is false
135 if (state
==FILLED
&& actualID
.onPE
==CkMyPe())
136 #ifdef CMK_CHARE_USE_PTR
137 return actualID
.objPtr
;
139 return CkpvAccess(chare_objs
)[(CmiIntPtr
)actualID
.objPtr
];
143 void pup(PUP::er
&p
) {
144 #ifndef CMK_CHARE_USE_PTR
146 if (!p
.isUnpacking()) s
= state
-FILLED
;
148 if (p
.isUnpacking()) state
= (VidState
)(FILLED
+s
);
149 if (p
.isUnpacking()) msgQ
= NULL
; // fixme
157 /// Message watcher: for record/replay support
158 class CkMessageWatcher
{
161 CkMessageWatcher
*next
;
163 CkMessageWatcher() : f(NULL
), next(NULL
) { }
164 virtual ~CkMessageWatcher();
166 * This message is about to be processed by Charm.
167 * If this function returns false, the message will not be processed.
168 * The message is processed by the watcher starting from the innermost one
169 * up to the outermost
171 #define PROCESS_MACRO(name,type) inline bool process##name(type *input,CkCoreState *ck) { \
172 bool result = true; \
173 if (next != NULL) result &= next->process##name(input, ck); \
174 result &= process(input, ck); \
178 PROCESS_MACRO(Message
,envelope
*);
179 PROCESS_MACRO(Thread
,CthThreadToken
);
180 PROCESS_MACRO(LBMessage
,LBMigrateMsg
*);
184 /** These are used internally by this class to call the correct subclass method */
185 virtual bool process(envelope
**env
,CkCoreState
*ck
) =0;
186 virtual bool process(CthThreadToken
*token
, CkCoreState
*ck
) {return true;}
187 virtual bool process(LBMigrateMsg
**msg
, CkCoreState
*ck
) {return true;}
189 inline void setNext(CkMessageWatcher
*w
) { next
= w
; }
192 /// All the state that's useful to have on the receive side in the Charm Core (ck.C)
194 GroupTable
*groupTable
;
197 CkMessageWatcher
*watcher
;
198 /** Adds an extra watcher (which wrap the previously existing one) */
199 inline void addWatcher(CkMessageWatcher
*w
) {
205 :groupTable(CkpvAccess(_groupTable
)),
206 qd(CpvAccess(_qd
)) { watcher
=NULL
; }
207 ~CkCoreState() { delete watcher
;}
209 inline GroupTable
*getGroupTable() const {
212 inline IrrGroup
*localBranch(CkGroupID gID
) const {
213 return groupTable
->find(gID
).getObj();
216 inline QdState
*getQD() {return qd
;}
217 // when in interrupt based net version, use the extra copy
218 // of qd when inside an immediate handler function.
219 inline void process(int n
=1) {
220 if (CmiImmIsRunning())
221 CpvAccessOther(_qd
, 1)->process(n
);
225 inline void create(int n
=1) {
226 if (CmiImmIsRunning())
227 CpvAccessOther(_qd
, 1)->create(n
);
233 CkpvExtern(CkCoreState
*, _coreState
);
235 void CpdHandleLBMessage(LBMigrateMsg
**msg
);
236 void CkMessageWatcherInit(char **argv
,CkCoreState
*ck
);
238 extern void _processHandler(void *converseMsg
,CkCoreState
*ck
);
239 extern void _processBocInitMsg(CkCoreState
*ck
,envelope
*msg
);
240 extern void _processNodeBocInitMsg(CkCoreState
*ck
,envelope
*msg
);
241 extern void _infoFn(void *msg
, CldPackFn
*pfn
, int *len
,
242 int *queueing
, int *priobits
, UInt
**prioptr
);
243 extern void CkCreateLocalGroup(CkGroupID groupID
, int eIdx
, envelope
*env
);
244 extern void CkCreateLocalNodeGroup(CkGroupID groupID
, int eIdx
, envelope
*env
);
245 extern void _createGroup(CkGroupID groupID
, envelope
*env
);
246 extern void _createNodeGroup(CkGroupID groupID
, envelope
*env
);
247 extern int _getGroupIdx(int,int,int);
248 static inline IrrGroup
*_lookupGroupAndBufferIfNotThere(const CkCoreState
*ck
, const envelope
*env
,const CkGroupID
&groupID
);
251 void QdCreate(int n
);
252 void QdProcess(int n
);