2 FIXME: (OSL, 2/28/2003)
3 Messages sent from the communications thread or SIGIO
4 (i.e., messages sent from immediate messages) add
5 mCreated and mProcessed to the communication thread's
6 CpvAccess(cQdState), not to any real processor's
7 CpvAccess(cQdState). Thus processor rank 0 should
8 add CpvAccessOther(cQdState,CmiNodeSize())'s counts to its own.
10 Until this is fixed, if you send or receive immediate
11 messages (e.g., from reductions) you CANNOT use converse
15 #include "conv-config.h"
20 #include "quiescence.h"
24 #define DEBUGF(x) /*printf x*/
27 CpvDeclare(CQdState
, cQdState
);
28 unsigned int _CQdHandlerIdx
;
29 unsigned int _CQdAnnounceHandlerIdx
;
32 int CQdMsgGetPhase(CQdMsg msg
)
33 { return msg
->phase
; }
35 void CQdMsgSetPhase(CQdMsg msg
, int p
)
38 CmiInt8
CQdMsgGetCreated(CQdMsg msg
)
39 { CmiAssert(msg
->phase
==1); return msg
->u
.p1
.created
; }
41 void CQdMsgSetCreated(CQdMsg msg
, CmiInt8 c
)
42 { CmiAssert(msg
->phase
==1); msg
->u
.p1
.created
= c
; }
44 CmiInt8
CQdMsgGetProcessed(CQdMsg msg
)
45 { CmiAssert(msg
->phase
==1); return msg
->u
.p1
.processed
; }
47 void CQdMsgSetProcessed(CQdMsg msg
, CmiInt8 p
)
48 { CmiAssert(msg
->phase
==1); msg
->u
.p1
.processed
= p
; }
50 char CQdMsgGetDirty(CQdMsg msg
)
51 { CmiAssert(msg
->phase
==2); return msg
->u
.p2
.dirty
; }
53 void CQdMsgSetDirty(CQdMsg msg
, char d
)
54 { CmiAssert(msg
->phase
==2); msg
->u
.p2
.dirty
= d
; }
57 CmiInt8
CQdGetCreated(CQdState state
)
58 { return state
->mCreated
; }
60 void CQdCreate(CQdState state
, CmiInt8 n
)
61 { state
->mCreated
+= n
; }
63 CmiInt8
CQdGetProcessed(CQdState state
)
64 { return state
->mProcessed
; }
66 void CQdProcess(CQdState state
, CmiInt8 n
)
67 { state
->mProcessed
+= n
; }
69 void CQdPropagate(CQdState state
, CQdMsg msg
)
72 CmiSetHandler(msg
, _CQdHandlerIdx
);
73 for(i
=0; i
<state
->nChildren
; i
++) {
75 CmiSyncSend(state
->children
[i
], sizeof(struct ConvQdMsg
), (char *)msg
);
79 int CQdGetParent(CQdState state
)
80 { return state
->parent
; }
82 CmiInt8
CQdGetCCreated(CQdState state
)
83 { return state
->cCreated
; }
85 CmiInt8
CQdGetCProcessed(CQdState state
)
86 { return state
->cProcessed
; }
88 void CQdSubtreeCreate(CQdState state
, CmiInt8 c
)
89 { state
->cCreated
+= c
; }
91 void CQdSubtreeProcess(CQdState state
, CmiInt8 p
)
92 { state
->cProcessed
+= p
; }
94 int CQdGetStage(CQdState state
)
95 { return state
->stage
; }
97 void CQdSetStage(CQdState state
, int p
)
100 void CQdReported(CQdState state
)
101 { state
->nReported
++; }
103 int CQdAllReported(CQdState state
)
104 { return state
->nReported
==(state
->nChildren
+1);}
106 void CQdReset(CQdState state
)
108 state
->nReported
=0; state
->cCreated
=0;
109 state
->cProcessed
=0; state
->cDirty
=0;
112 void CQdMarkProcessed(CQdState state
)
113 { state
->oProcessed
= state
->mProcessed
; }
115 char CQdIsDirty(CQdState state
)
116 { return ((state
->mProcessed
> state
->oProcessed
) || state
->cDirty
); }
118 void CQdSubtreeSetDirty(CQdState state
, char d
)
119 { state
->cDirty
= state
->cDirty
|| d
; }
121 CQdState
CQdStateCreate(void)
123 CQdState state
= (CQdState
) malloc(sizeof(struct ConvQdState
));
126 state
->mProcessed
= 0;
128 state
->nReported
= 0;
129 state
->oProcessed
= 0;
131 state
->cProcessed
= 0;
133 state
->nChildren
= CmiNumSpanTreeChildren(CmiMyPe());
134 state
->parent
= CmiSpanTreeParent(CmiMyPe());
135 /* fixed bug on SP3, when nChildren is 0, NULL will be returned by malloc */
136 if (state
->nChildren
) {
137 state
->children
= (int *) malloc(state
->nChildren
*sizeof(int));
138 _MEMCHECK(state
->children
);
141 state
->children
= NULL
;
142 CmiSpanTreeChildren(CmiMyPe(), state
->children
);
148 static void CQdBcastQD1(CQdState state
, CQdMsg msg
)
150 CQdMsgSetPhase(msg
, 0);
151 CQdPropagate(state
, msg
);
152 CQdMsgSetPhase(msg
, 1);
153 CQdMsgSetCreated(msg
, CQdGetCreated(state
));
154 CQdMsgSetProcessed(msg
, CQdGetProcessed(state
));
155 CQdCreate(state
, -1);
156 CmiSyncSendAndFree(CmiMyPe(), sizeof(struct ConvQdMsg
), (char *) msg
);
157 CQdMarkProcessed(state
);
159 CQdSetStage(state
, 1);
163 static void CQdBcastQD2(CQdState state
, CQdMsg msg
)
165 CQdMsgSetPhase(msg
, 1);
166 CQdPropagate(state
, msg
);
167 CQdMsgSetPhase(msg
, 2);
168 CQdMsgSetDirty(msg
, CQdIsDirty(state
));
169 CQdCreate(state
, -1);
170 CmiSyncSendAndFree(CmiMyPe(), sizeof(struct ConvQdMsg
), (char *) msg
);
172 CQdSetStage(state
, 2);
176 static void CQdHandlePhase0(CQdState state
, CQdMsg msg
)
178 CmiAssert(CmiMyPe()==0 || CQdGetStage(state
)==0);
179 if(CQdGetStage(state
)==0)
180 CQdBcastQD1(state
, msg
);
186 static void CQdHandlePhase1(CQdState state
, CQdMsg msg
)
188 switch(CQdGetStage(state
)) {
190 CmiAssert(CmiMyPe()!=0);
191 CQdBcastQD2(state
, msg
);
194 CQdSubtreeCreate(state
, CQdMsgGetCreated(msg
));
195 CQdSubtreeProcess(state
, CQdMsgGetProcessed(msg
));
198 if(CQdAllReported(state
)) {
200 if(CQdGetCCreated(state
) == CQdGetCProcessed(state
))
201 CQdBcastQD2(state
, msg
);
203 CQdBcastQD1(state
, msg
);
206 CQdMsgSetCreated(msg
, CQdGetCCreated(state
));
207 CQdMsgSetProcessed(msg
, CQdGetCProcessed(state
));
208 CQdCreate(state
, -1);
209 CmiSyncSendAndFree(CQdGetParent(state
),
210 sizeof(struct ConvQdMsg
), (char *) msg
);
211 DEBUGF(("PE = %d, My parent = %d\n", CmiMyPe(), CQdGetParent(state
)));
213 CQdSetStage(state
, 0);
220 CmiAbort("Internal QD Error. Contact Developers.!\n");
225 static void CQdHandlePhase2(CQdState state
, CQdMsg msg
)
227 CmiAssert(CQdGetStage(state
)==2);
228 CQdSubtreeSetDirty(state
, CQdMsgGetDirty(msg
));
230 if(CQdAllReported(state
)) {
232 if(CQdIsDirty(state
))
233 CQdBcastQD1(state
, msg
);
235 CmiSetHandler(msg
, _CQdAnnounceHandlerIdx
);
236 CQdCreate(state
, 0-CmiNumPes());
237 CmiSyncBroadcastAllAndFree(sizeof(struct ConvQdMsg
), (char *) msg
);
239 CQdSetStage(state
, 0);
243 CQdMsgSetDirty(msg
, CQdIsDirty(state
));
244 CQdCreate(state
, -1);
245 CmiSyncSendAndFree(CQdGetParent(state
),
246 sizeof(struct ConvQdMsg
), (char *) msg
);
248 CQdSetStage(state
, 0);
256 static void CQdCallWhenIdle(CQdMsg msg
)
258 CQdState state
= CpvAccess(cQdState
);
260 switch(CQdMsgGetPhase(msg
)) {
261 case 0 : CQdHandlePhase0(state
, msg
); break;
262 case 1 : CQdHandlePhase1(state
, msg
); break;
263 case 2 : CQdHandlePhase2(state
, msg
); break;
264 default: CmiAbort("Internal QD Error. Contact Developers.!\n");
269 void CQdHandler(CQdMsg msg
)
271 CQdProcess(CpvAccess(cQdState
), -1);
272 CcdCallOnCondition(CcdPROCESSOR_STILL_IDLE
,
273 (CcdVoidFn
)CQdCallWhenIdle
, (void*) msg
);
277 void CQdRegisterCallback(CQdVoidFn fn
, void *arg
)
279 CcdCallOnCondition(CcdQUIESCENCE
, fn
, arg
);
282 void CQdAnnounceHandler(CQdMsg msg
)
284 CQdProcess(CpvAccess(cQdState
), -1);
285 CcdRaiseCondition(CcdQUIESCENCE
);
288 void CQdCpvInit(void) {
289 CpvInitialize(CQdState
, cQdState
);
290 CpvAccess(cQdState
) = CQdStateCreate();
296 _CQdHandlerIdx
= CmiRegisterHandler((CmiHandler
)CQdHandler
);
297 _CQdAnnounceHandlerIdx
=
298 CmiRegisterHandler((CmiHandler
)CQdAnnounceHandler
);
301 void CmiStartQD(CQdVoidFn fn
, void *arg
)
303 CQdMsg msg
= (CQdMsg
) CmiAlloc(sizeof(struct ConvQdMsg
));
304 CQdRegisterCallback(fn
, arg
);
305 CQdMsgSetPhase(msg
, 0);
306 CmiSetHandler(msg
, _CQdHandlerIdx
);
307 CQdCreate(CpvAccess(cQdState
), -1);
308 CmiSyncSendAndFree(0, sizeof(struct ConvQdMsg
), (char *)msg
);