AMPI: support Cartesian comms that are 0-D or subsets of oldcomm
[charm.git] / src / conv-ldb / cldb.workstealing.c
blobacae6e73795ff8613bda42a8176fbd7f87a52cec
1 #include <stdlib.h>
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
8 #define TRACE_USEREVENTS 1
9 #define LOADTHRESH 3
11 void CldMultipleSendPrio(int pe, int numToSend, int rank, int immed);
13 typedef struct CldProcInfo_s {
14 int askEvt; /* user event for askLoad */
15 int askNoEvt; /* user event for askNoLoad */
16 int idleEvt; /* user event for idle balancing */
17 } *CldProcInfo;
19 static int WS_Threshold = -1;
20 static int _steal_prio = 0;
21 static int _stealonly1 = 0;
22 static int _steal_immediate = 0;
23 static int workstealingproactive = 0;
25 CpvStaticDeclare(CldProcInfo, CldData);
26 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
27 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
28 CpvStaticDeclare(int, isStealing);
31 char *CldGetStrategy(void)
33 return "work stealing";
37 static void StealLoad(void)
39 int i;
40 double now;
41 requestmsg msg;
42 int victim;
43 int mype;
44 int numpes;
46 if (CpvAccess(isStealing)) return; /* already stealing, return */
48 CpvAccess(isStealing) = 1;
50 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
51 now = CmiWallTimer();
52 #endif
54 mype = CmiMyPe();
55 msg.from_pe = mype;
56 numpes = CmiNumPes();
57 do{
58 victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
59 }while(victim == mype);
61 CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
62 #if CMK_IMMEDIATE_MSG
63 /* fixme */
64 if (_steal_immediate) CmiBecomeImmediate(&msg);
65 #endif
66 /* msg.to_rank = CmiRankOf(victim); */
67 msg.to_pe = victim;
68 CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
70 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
71 traceUserBracketEvent(CpvAccess(CldData)->idleEvt, now, CmiWallTimer());
72 #endif
75 void LoadNotifyFn(int l)
77 if(CldCountTokens() <= WS_Threshold)
78 StealLoad();
81 /* since I am idle, ask for work from neighbors */
82 static void CldBeginIdle(void *dummy)
84 //if (CldCountTokens() == 0) StealLoad();
85 CmiAssert(CldCountTokens()==0);
86 StealLoad();
89 /* immediate message handler, work at node level */
90 /* send some work to requested proc */
91 static void CldAskLoadHandler(requestmsg *msg)
93 int receiver, rank, recvIdx, i;
94 int myload, sendLoad;
95 double now;
96 /* int myload = CldLoad(); */
98 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
99 now = CmiWallTimer();
100 #endif
102 /* rank = msg->to_rank; */
103 CmiAssert(msg->to_pe!=-1);
104 rank = CmiRankOf(msg->to_pe);
105 CmiAssert(rank!=-1);
106 myload = CldCountTokensRank(rank);
108 receiver = msg->from_pe;
109 /* only give you work if I have more than 1 */
110 if (myload>LOADTHRESH) {
111 if(_stealonly1) sendLoad = 1;
112 else sendLoad = myload/2;
113 if(sendLoad > 0) {
114 #if ! CMK_USE_IBVERBS
115 if (_steal_prio)
116 CldMultipleSendPrio(receiver, sendLoad, rank, 0);
117 else
118 CldMultipleSend(receiver, sendLoad, rank, 0);
119 #else
120 CldSimpleMultipleSend(receiver, sendLoad, rank);
121 #endif
123 CmiFree(msg);
124 }else
125 { /* send ack indicating there is no task */
126 int pe = msg->to_pe;
127 msg->to_pe = msg->from_pe;
128 msg->from_pe = pe;
129 /*msg->to_rank = CmiMyRank(); */
131 CmiSetHandler(msg, CpvAccess(CldAckNoTaskHandlerIndex));
132 CmiSyncSendAndFree(receiver, sizeof(requestmsg),(char *)msg);
135 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
136 traceUserBracketEvent(CpvAccess(CldData)->askEvt, now, CmiWallTimer());
137 #endif
140 void CldAckNoTaskHandler(requestmsg *msg)
142 double now;
143 int victim;
144 /* int notaskpe = msg->from_pe; */
145 int mype = CmiMyPe();
146 int numpes = CmiNumPes();
148 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
149 now = CmiWallTimer();
150 #endif
153 /*victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());*/
154 victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
155 } while(victim == mype);
157 /* reuse msg */
158 #if CMK_IMMEDIATE_MSG
159 /* fixme */
160 if (_steal_immediate) CmiBecomeImmediate(msg);
161 #endif
162 /*msg->to_rank = CmiRankOf(victim); */
163 msg->to_pe = victim;
164 msg->from_pe = mype;
165 CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
166 CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
168 CpvAccess(isStealing) = 1;
170 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
171 traceUserBracketEvent(CpvAccess(CldData)->askNoEvt, now, CmiWallTimer());
172 #endif
175 void CldHandler(void *msg)
177 CldInfoFn ifn; CldPackFn pfn;
178 int len, queueing, priobits; unsigned int *prioptr;
180 CldRestoreHandler(msg);
181 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
182 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
183 CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
184 /* CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr); */
187 #define CldPUTTOKEN(msg) \
188 if (_steal_prio) \
189 CldPutTokenPrio(msg); \
190 else \
191 CldPutToken(msg);
193 void CldBalanceHandler(void *msg)
195 CldRestoreHandler(msg);
196 CldPUTTOKEN(msg);
197 CpvAccess(isStealing) = 0; /* fixme: this may not be right */
200 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
202 int len, queueing, priobits,i; unsigned int *prioptr;
203 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
204 CldPackFn pfn;
205 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
206 if (pfn) {
207 pfn(&msg);
208 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
210 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
211 CmiSetInfo(msg,infofn);
213 CmiSyncMulticastAndFree(grp, len, msg);
216 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
218 int len, queueing, priobits,i; unsigned int *prioptr;
219 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
220 CldPackFn pfn;
221 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
222 if (pfn) {
223 pfn(&msg);
224 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
226 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
227 CmiSetInfo(msg,infofn);
228 CmiSyncListSendAndFree(npes, pes, len, msg);
231 void CldEnqueue(int pe, void *msg, int infofn)
233 int len, queueing, priobits, avg; unsigned int *prioptr;
234 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
235 CldPackFn pfn;
237 if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
238 pe = CmiMyPe();
239 /* always pack the message because the message may be move away
240 to a different processor later by CldGetToken() */
241 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
242 if (pfn && CmiNumNodes()>1) {
243 pfn(&msg);
244 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
246 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
247 CmiSetInfo(msg,infofn);
248 CldPUTTOKEN(msg);
250 else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
251 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
252 CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
254 else {
255 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
256 if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
257 pfn(&msg);
258 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
260 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
261 CmiSetInfo(msg,infofn);
262 if (pe==CLD_BROADCAST)
263 CmiSyncBroadcastAndFree(len, msg);
264 else if (pe==CLD_BROADCAST_ALL)
265 CmiSyncBroadcastAllAndFree(len, msg);
266 else CmiSyncSendAndFree(pe, len, msg);
270 void CldNodeEnqueue(int node, void *msg, int infofn)
272 int len, queueing, priobits, pe, avg; unsigned int *prioptr;
273 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
274 CldPackFn pfn;
275 if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
276 pe = CmiMyPe();
277 node = CmiNodeOf(pe);
278 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
279 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
281 else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
282 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
283 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
285 else {
286 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
287 if (pfn) {
288 pfn(&msg);
289 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
291 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
292 CmiSetInfo(msg,infofn);
293 if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
294 else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
295 else CmiSyncNodeSendAndFree(node, len, msg);
300 void CldGraphModuleInit(char **argv)
302 CpvInitialize(CldProcInfo, CldData);
303 CpvInitialize(int, CldAskLoadHandlerIndex);
304 CpvInitialize(int, CldAckNoTaskHandlerIndex);
305 CpvInitialize(int, CldBalanceHandlerIndex);
307 CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
308 #if CMK_TRACE_ENABLED
309 CpvAccess(CldData)->askEvt = traceRegisterUserEvent("CldAskLoad", -1);
310 CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("StealLoad", -1);
311 CpvAccess(CldData)->askNoEvt = traceRegisterUserEvent("CldAckNoTask", -1);
312 #endif
314 CpvAccess(CldBalanceHandlerIndex) =
315 CmiRegisterHandler(CldBalanceHandler);
316 CpvAccess(CldAskLoadHandlerIndex) =
317 CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
319 CpvAccess(CldAckNoTaskHandlerIndex) =
320 CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
322 /* communication thread */
323 if (CmiMyRank() == CmiMyNodeSize()) return;
325 _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
327 if(CmiGetArgIntDesc(argv, "+WSThreshold", &WS_Threshold, "The number of minimum load before stealing"))
329 CmiAssert(WS_Threshold>=0);
332 _steal_immediate = CmiGetArgFlagDesc(argv, "+WSImmediate", "Charm++> Work Stealing, steal using immediate messages");
334 _steal_prio = CmiGetArgFlagDesc(argv, "+WSPriority", "Charm++> Work Stealing, using priority");
336 /* register idle handlers - when idle, keep asking work from neighbors */
337 if(CmiNumPes() > 1)
338 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
339 (CcdVoidFn) CldBeginIdle, NULL);
340 if(WS_Threshold >= 0 && CmiMyPe() == 0)
341 CmiPrintf("Charm++> Steal work when load is fewer than %d. \n", WS_Threshold);
342 #if CMK_IMMEDIATE_MSG
343 if(_steal_immediate && CmiMyPe() == 0)
344 CmiPrintf("Charm++> Steal work using immediate messages. \n", WS_Threshold);
345 #endif
349 void CldModuleInit(char **argv)
351 CpvInitialize(int, CldHandlerIndex);
352 CpvInitialize(int, CldRelocatedMessages);
353 CpvInitialize(int, CldLoadBalanceMessages);
354 CpvInitialize(int, CldMessageChunks);
355 CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
356 CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) =
357 CpvAccess(CldMessageChunks) = 0;
359 CldModuleGeneralInit(argv);
360 CldGraphModuleInit(argv);
362 CpvAccess(CldLoadNotify) = 1;
364 CpvInitialize(int, isStealing);
365 CpvAccess(isStealing) = 0;
368 void CldCallback(void)