4 #include "cldb.workstealing.h"
8 #define TRACE_USEREVENTS 1
11 void CldMultipleSendPrio(int pe
, int numToSend
, int rank
, int immed
);
13 typedef struct CldProcInfo_s
{
14 int askEvt
; /* user event for askLoad */
15 int askNoEvt
; /* user event for askNoLoad */
16 int idleEvt
; /* user event for idle balancing */
19 static int WS_Threshold
= -1;
20 static int _steal_prio
= 0;
21 static int _stealonly1
= 0;
22 static int _steal_immediate
= 0;
23 static int workstealingproactive
= 0;
25 CpvStaticDeclare(CldProcInfo
, CldData
);
26 CpvStaticDeclare(int, CldAskLoadHandlerIndex
);
27 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex
);
28 CpvStaticDeclare(int, isStealing
);
31 char *CldGetStrategy(void)
33 return "work stealing";
37 static void StealLoad()
46 if (CpvAccess(isStealing
)) return; /* already stealing, return */
48 CpvAccess(isStealing
) = 1;
50 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
58 victim
= (((CrnRand()+mype
)&0x7FFFFFFF)%numpes
);
59 }while(victim
== mype
);
61 CmiSetHandler(&msg
, CpvAccess(CldAskLoadHandlerIndex
));
64 if (_steal_immediate
) CmiBecomeImmediate(&msg
);
66 /* msg.to_rank = CmiRankOf(victim); */
68 CmiSyncSend(victim
, sizeof(requestmsg
),(char *)&msg
);
70 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
71 traceUserBracketEvent(CpvAccess(CldData
)->idleEvt
, now
, CmiWallTimer());
75 void LoadNotifyFn(int l
)
77 if(CldCountTokens() <= WS_Threshold
)
81 /* since I am idle, ask for work from neighbors */
82 static void CldBeginIdle(void *dummy
)
84 //if (CldCountTokens() == 0) StealLoad();
85 CmiAssert(CldCountTokens()==0);
89 /* immediate message handler, work at node level */
90 /* send some work to requested proc */
91 static void CldAskLoadHandler(requestmsg
*msg
)
93 int receiver
, rank
, recvIdx
, i
;
96 /* int myload = CldLoad(); */
98 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
102 /* rank = msg->to_rank; */
103 CmiAssert(msg
->to_pe
!=-1);
104 rank
= CmiRankOf(msg
->to_pe
);
106 myload
= CldCountTokensRank(rank
);
108 receiver
= msg
->from_pe
;
109 /* only give you work if I have more than 1 */
110 if (myload
>LOADTHRESH
) {
111 if(_stealonly1
) sendLoad
= 1;
112 else sendLoad
= myload
/2;
114 #if ! CMK_USE_IBVERBS
116 CldMultipleSendPrio(receiver
, sendLoad
, rank
, 0);
118 CldMultipleSend(receiver
, sendLoad
, rank
, 0);
120 CldSimpleMultipleSend(receiver
, sendLoad
, rank
);
125 { /* send ack indicating there is no task */
127 msg
->to_pe
= msg
->from_pe
;
129 /*msg->to_rank = CmiMyRank(); */
131 CmiSetHandler(msg
, CpvAccess(CldAckNoTaskHandlerIndex
));
132 CmiSyncSendAndFree(receiver
, sizeof(requestmsg
),(char *)msg
);
135 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
136 traceUserBracketEvent(CpvAccess(CldData
)->askEvt
, now
, CmiWallTimer());
140 void CldAckNoTaskHandler(requestmsg
*msg
)
144 /* int notaskpe = msg->from_pe; */
145 int mype
= CmiMyPe();
146 int numpes
= CmiNumPes();
148 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
149 now
= CmiWallTimer();
153 /*victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());*/
154 victim
= (((CrnRand()+mype
)&0x7FFFFFFF)%numpes
);
155 } while(victim
== mype
);
158 #if CMK_IMMEDIATE_MSG
160 if (_steal_immediate
) CmiBecomeImmediate(msg
);
162 /*msg->to_rank = CmiRankOf(victim); */
165 CmiSetHandler(msg
, CpvAccess(CldAskLoadHandlerIndex
));
166 CmiSyncSendAndFree(victim
, sizeof(requestmsg
),(char *)msg
);
168 CpvAccess(isStealing
) = 1;
170 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
171 traceUserBracketEvent(CpvAccess(CldData
)->askNoEvt
, now
, CmiWallTimer());
175 void CldHandler(void *msg
)
177 CldInfoFn ifn
; CldPackFn pfn
;
178 int len
, queueing
, priobits
; unsigned int *prioptr
;
180 CldRestoreHandler(msg
);
181 ifn
= (CldInfoFn
)CmiHandlerToFunction(CmiGetInfo(msg
));
182 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
183 CsdEnqueueGeneral(msg
, queueing
, priobits
, prioptr
);
184 /* CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr); */
187 #define CldPUTTOKEN(msg) \
189 CldPutTokenPrio(msg); \
193 void CldBalanceHandler(void *msg
)
195 CldRestoreHandler(msg
);
197 CpvAccess(isStealing
) = 0; /* fixme: this may not be right */
200 void CldEnqueueGroup(CmiGroup grp
, void *msg
, int infofn
)
202 int len
, queueing
, priobits
,i
; unsigned int *prioptr
;
203 CldInfoFn ifn
= (CldInfoFn
)CmiHandlerToFunction(infofn
);
205 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
208 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
210 CldSwitchHandler(msg
, CpvAccess(CldHandlerIndex
));
211 CmiSetInfo(msg
,infofn
);
213 CmiSyncMulticastAndFree(grp
, len
, msg
);
216 void CldEnqueueMulti(int npes
, int *pes
, void *msg
, int infofn
)
218 int len
, queueing
, priobits
,i
; unsigned int *prioptr
;
219 CldInfoFn ifn
= (CldInfoFn
)CmiHandlerToFunction(infofn
);
221 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
224 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
226 CldSwitchHandler(msg
, CpvAccess(CldHandlerIndex
));
227 CmiSetInfo(msg
,infofn
);
228 CmiSyncListSendAndFree(npes
, pes
, len
, msg
);
231 void CldEnqueue(int pe
, void *msg
, int infofn
)
233 int len
, queueing
, priobits
, avg
; unsigned int *prioptr
;
234 CldInfoFn ifn
= (CldInfoFn
)CmiHandlerToFunction(infofn
);
237 if ((pe
== CLD_ANYWHERE
) && (CmiNumPes() > 1)) {
239 /* always pack the message because the message may be move away
240 to a different processor later by CldGetToken() */
241 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
242 if (pfn
&& CmiNumNodes()>1) {
244 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
246 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
247 CmiSetInfo(msg
,infofn
);
250 else if ((pe
== CmiMyPe()) || (CmiNumPes() == 1)) {
251 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
252 CsdEnqueueGeneral(msg
, queueing
, priobits
, prioptr
);
255 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
256 if (pfn
&& CmiNodeOf(pe
) != CmiMyNode()) {
258 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
260 CldSwitchHandler(msg
, CpvAccess(CldHandlerIndex
));
261 CmiSetInfo(msg
,infofn
);
262 if (pe
==CLD_BROADCAST
)
263 CmiSyncBroadcastAndFree(len
, msg
);
264 else if (pe
==CLD_BROADCAST_ALL
)
265 CmiSyncBroadcastAllAndFree(len
, msg
);
266 else CmiSyncSendAndFree(pe
, len
, msg
);
270 void CldNodeEnqueue(int node
, void *msg
, int infofn
)
272 int len
, queueing
, priobits
, pe
, avg
; unsigned int *prioptr
;
273 CldInfoFn ifn
= (CldInfoFn
)CmiHandlerToFunction(infofn
);
275 if ((node
== CLD_ANYWHERE
) && (CmiNumPes() > 1)) {
277 node
= CmiNodeOf(pe
);
278 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
279 CsdNodeEnqueueGeneral(msg
, queueing
, priobits
, prioptr
);
281 else if ((node
== CmiMyNode()) || (CmiNumPes() == 1)) {
282 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
283 CsdNodeEnqueueGeneral(msg
, queueing
, priobits
, prioptr
);
286 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
289 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
291 CldSwitchHandler(msg
, CpvAccess(CldHandlerIndex
));
292 CmiSetInfo(msg
,infofn
);
293 if (node
==CLD_BROADCAST
) { CmiSyncNodeBroadcastAndFree(len
, msg
); }
294 else if (node
==CLD_BROADCAST_ALL
){CmiSyncNodeBroadcastAllAndFree(len
,msg
);}
295 else CmiSyncNodeSendAndFree(node
, len
, msg
);
300 void CldGraphModuleInit(char **argv
)
302 CpvInitialize(CldProcInfo
, CldData
);
303 CpvInitialize(int, CldAskLoadHandlerIndex
);
304 CpvInitialize(int, CldAckNoTaskHandlerIndex
);
305 CpvInitialize(int, CldBalanceHandlerIndex
);
307 CpvAccess(CldData
) = (CldProcInfo
)CmiAlloc(sizeof(struct CldProcInfo_s
));
308 #if CMK_TRACE_ENABLED
309 CpvAccess(CldData
)->askEvt
= traceRegisterUserEvent("CldAskLoad", -1);
310 CpvAccess(CldData
)->idleEvt
= traceRegisterUserEvent("StealLoad", -1);
311 CpvAccess(CldData
)->askNoEvt
= traceRegisterUserEvent("CldAckNoTask", -1);
314 CpvAccess(CldBalanceHandlerIndex
) =
315 CmiRegisterHandler(CldBalanceHandler
);
316 CpvAccess(CldAskLoadHandlerIndex
) =
317 CmiRegisterHandler((CmiHandler
)CldAskLoadHandler
);
319 CpvAccess(CldAckNoTaskHandlerIndex
) =
320 CmiRegisterHandler((CmiHandler
)CldAckNoTaskHandler
);
322 /* communication thread */
323 if (CmiMyRank() == CmiMyNodeSize()) return;
325 _stealonly1
= CmiGetArgFlagDesc(argv
, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
327 if(CmiGetArgIntDesc(argv
, "+WSThreshold", &WS_Threshold
, "The number of minimum load before stealing"))
329 CmiAssert(WS_Threshold
>=0);
332 _steal_immediate
= CmiGetArgFlagDesc(argv
, "+WSImmediate", "Charm++> Work Stealing, steal using immediate messages");
334 _steal_prio
= CmiGetArgFlagDesc(argv
, "+WSPriority", "Charm++> Work Stealing, using priority");
336 /* register idle handlers - when idle, keep asking work from neighbors */
338 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE
,
339 (CcdVoidFn
) CldBeginIdle
, NULL
);
340 if(WS_Threshold
>= 0 && CmiMyPe() == 0)
341 CmiPrintf("Charm++> Steal work when load is fewer than %d. \n", WS_Threshold
);
342 #if CMK_IMMEDIATE_MSG
343 if(_steal_immediate
&& CmiMyPe() == 0)
344 CmiPrintf("Charm++> Steal work using immediate messages. \n", WS_Threshold
);
349 void CldModuleInit(char **argv
)
351 CpvInitialize(int, CldHandlerIndex
);
352 CpvInitialize(int, CldRelocatedMessages
);
353 CpvInitialize(int, CldLoadBalanceMessages
);
354 CpvInitialize(int, CldMessageChunks
);
355 CpvAccess(CldHandlerIndex
) = CmiRegisterHandler(CldHandler
);
356 CpvAccess(CldRelocatedMessages
) = CpvAccess(CldLoadBalanceMessages
) =
357 CpvAccess(CldMessageChunks
) = 0;
359 CldModuleGeneralInit(argv
);
360 CldGraphModuleInit(argv
);
362 CpvAccess(CldLoadNotify
) = 1;
364 CpvInitialize(int, isStealing
);
365 CpvAccess(isStealing
) = 0;