Fix memory leaks in trace projections and summary
[charm.git] / src / conv-ldb / cldb.neighbor.c
blob57c3f5396b12df70579e5526f6c5b0f0d94508b7
1 #include <stdlib.h>
3 #include "converse.h"
4 #include "cldb.neighbor.h"
5 #include "queueing.h"
6 #include "cldb.h"
7 #include "topology.h"
9 #define USE_MULTICAST 0
10 #define IDLE_IMMEDIATE 1
11 #define TRACE_USEREVENTS 1
13 #define PERIOD 20 /* default: 30 */
14 #define MAXOVERLOAD 1
16 static int LBPeriod = PERIOD; /* time to call load balancing */
17 static int overload_threshold = MAXOVERLOAD;
19 typedef struct CldProcInfo_s {
20 double lastCheck;
21 int sent; /* flag to disable idle work request */
22 int balanceEvt; /* user event for balancing */
23 int updateLoadEvt;
24 int idleEvt; /* user event for idle balancing */
25 int idleprocEvt; /* user event for processing idle req */
26 } *CldProcInfo;
28 extern char *_lbtopo; /* topology name string */
29 int _lbsteal = 0; /* work stealing flag */
31 void gengraph(int, int, int, int *, int *);
33 CpvStaticDeclare(CldProcInfo, CldData);
34 CpvStaticDeclare(int, CldLoadResponseHandlerIndex);
35 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
36 CpvStaticDeclare(int, MinLoad);
37 CpvStaticDeclare(int, MinProc);
38 CpvStaticDeclare(int, Mindex);
39 CpvStaticDeclare(int, start);
41 #if ! USE_MULTICAST
42 CpvStaticDeclare(loadmsg *, msgpool);
44 static
45 #if CMK_C_INLINE
46 inline
47 #endif
48 loadmsg *getPool(){
49 loadmsg *msg;
50 if (CpvAccess(msgpool)!=NULL) {
51 msg = CpvAccess(msgpool);
52 CpvAccess(msgpool) = msg->next;
54 else {
55 msg = CmiAlloc(sizeof(loadmsg));
56 CmiSetHandler(msg, CpvAccess(CldLoadResponseHandlerIndex));
58 return msg;
61 static
62 #if CMK_C_INLINE
63 inline
64 #endif
65 void putPool(loadmsg *msg)
67 msg->next = CpvAccess(msgpool);
68 CpvAccess(msgpool) = msg;
71 #endif
73 void LoadNotifyFn(int l)
75 CldProcInfo cldData = CpvAccess(CldData);
76 cldData->sent = 0;
79 char *CldGetStrategy(void)
81 return "neighbor";
84 /* since I am idle, ask for work from neighbors */
85 static void CldBeginIdle(void *dummy)
87 CpvAccess(CldData)->lastCheck = CmiWallTimer();
90 static void CldEndIdle(void *dummy)
92 CpvAccess(CldData)->lastCheck = -1;
95 static void CldStillIdle(void *dummy, double curT)
97 int i;
98 double startT;
99 requestmsg msg;
100 int myload;
101 CldProcInfo cldData = CpvAccess(CldData);
103 double now = curT;
104 double lt = cldData->lastCheck;
105 /* only ask for work every 20ms */
106 if (cldData->sent && (lt!=-1 && now-lt< PERIOD*0.001)) return;
107 cldData->lastCheck = now;
109 myload = CldCountTokens();
110 if (myload > 0) return;
112 msg.from_pe = CmiMyPe();
113 CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
114 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
115 /* fixme */
116 CmiBecomeImmediate(&msg);
117 for (i=0; i<CpvAccess(numNeighbors); i++) {
118 msg.to_rank = CmiRankOf(CpvAccess(neighbors)[i].pe);
119 CmiSyncNodeSend(CmiNodeOf(CpvAccess(neighbors)[i].pe),sizeof(requestmsg),(char *)&msg);
121 #else
122 msg.to_rank = -1;
123 CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(requestmsg), &msg);
124 #endif
125 cldData->sent = 1;
127 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
128 traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
129 #endif
132 /* immediate message handler, work at node level */
133 /* send some work to requested proc */
134 static void CldAskLoadHandler(requestmsg *msg)
136 int receiver, rank, recvIdx, i;
137 int myload = CldCountTokens();
138 double now = CmiWallTimer();
140 /* only give you work if I have more than 1 */
141 if (myload>0) {
142 int sendLoad;
143 receiver = msg->from_pe;
144 rank = CmiMyRank();
145 if (msg->to_rank != -1) rank = msg->to_rank;
146 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
147 /* try the lock */
148 if (CmiTryLock(CpvAccessOther(cldLock, rank))) {
149 CmiDelayImmediate(); /* postpone immediate message */
150 return;
152 CmiUnlock(CpvAccessOther(cldLock, rank)); /* release lock, grab later */
153 #endif
154 sendLoad = myload / CpvAccess(numNeighbors) / 2;
155 if (sendLoad < 1) sendLoad = 1;
156 sendLoad = 1;
157 for (i=0; i<CpvAccess(numNeighbors); i++)
158 if (CpvAccess(neighbors)[i].pe == receiver) break;
160 if(i<CpvAccess(numNeighbors)) {CmiFree(msg); return;} /* ? */
161 CpvAccess(neighbors)[i].load += sendLoad;
162 CldMultipleSend(receiver, sendLoad, rank, 0);
163 #if 0
164 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
165 /* this is dangerous since projections logging is not thread safe */
167 CldProcInfo cldData = CpvAccessOther(CldData, rank);
168 traceUserBracketEvent(cldData->idleprocEvt, now, CmiWallTimer());
170 #endif
171 #endif
173 CmiFree(msg);
176 /* balancing by exchanging load among neighbors */
178 void CldSendLoad()
180 #if CMK_MULTICORE
181 /* directly send load to neighbors */
182 double myload = CldCountTokens();
183 int nNeighbors = CpvAccess(numNeighbors);
184 int i;
185 for (i=0; i<nNeighbors; i++) {
186 int neighbor_pe = CpvAccess(neighbors)[i].pe;
187 int j, found=0;
188 for (j=0; j<CpvAccessOther(numNeighbors, neighbor_pe); j++)
189 if (CpvAccessOther(neighbors, neighbor_pe)[j].pe == CmiMyPe())
191 CpvAccessOther(neighbors, neighbor_pe)[j].load = myload;
192 found = 1;
193 break;
196 #else
197 #if USE_MULTICAST
198 loadmsg msg;
200 msg.pe = CmiMyPe();
201 msg.load = CldCountTokens();
202 CmiSetHandler(&msg, CpvAccess(CldLoadResponseHandlerIndex));
203 CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(loadmsg), &msg);
204 CpvAccess(CldLoadBalanceMessages) += CpvAccess(numNeighbors);
205 #else
206 int i;
207 int mype = CmiMyPe();
208 int myload = CldCountTokens();
209 for(i=0; i<CpvAccess(numNeighbors); i++) {
210 loadmsg *msg = getPool();
211 msg->fromindex = i;
212 msg->toindex = CpvAccess(neighbors)[i].index;
213 msg->pe = mype;
214 msg->load = myload;
215 CmiSyncSendAndFree(CpvAccess(neighbors)[i].pe, sizeof(loadmsg), msg);
217 #endif
218 #endif
221 int CldMinAvg()
223 int sum=0, i;
224 int myload;
226 int nNeighbors = CpvAccess(numNeighbors);
227 if (CpvAccess(start) == -1)
228 CpvAccess(start) = CmiMyPe() % nNeighbors;
230 #if 0
231 /* update load from neighbors for multicore */
232 for (i=0; i<nNeighbors; i++) {
233 CpvAccess(neighbors)[i].load = CldLoadRank(CpvAccess(neighbors)[i].pe);
235 #endif
236 CpvAccess(MinProc) = CpvAccess(neighbors)[CpvAccess(start)].pe;
237 CpvAccess(MinLoad) = CpvAccess(neighbors)[CpvAccess(start)].load;
238 sum = CpvAccess(neighbors)[CpvAccess(start)].load;
239 CpvAccess(Mindex) = CpvAccess(start);
240 for (i=1; i<nNeighbors; i++) {
241 CpvAccess(start) = (CpvAccess(start)+1) % nNeighbors;
242 sum += CpvAccess(neighbors)[CpvAccess(start)].load;
243 if (CpvAccess(MinLoad) > CpvAccess(neighbors)[CpvAccess(start)].load) {
244 CpvAccess(MinLoad) = CpvAccess(neighbors)[CpvAccess(start)].load;
245 CpvAccess(MinProc) = CpvAccess(neighbors)[CpvAccess(start)].pe;
246 CpvAccess(Mindex) = CpvAccess(start);
249 CpvAccess(start) = (CpvAccess(start)+2) % nNeighbors;
250 myload = CldCountTokens();
251 sum += myload;
252 if (myload < CpvAccess(MinLoad)) {
253 CpvAccess(MinLoad) = myload;
254 CpvAccess(MinProc) = CmiMyPe();
256 i = (int)(1.0 + (((float)sum) /((float)(nNeighbors+1))));
257 return i;
260 void CldBalance(void *dummy, double curT)
262 int i, j, overload, numToMove=0, avgLoad;
263 int totalUnderAvg=0, numUnderAvg=0, maxUnderAvg=0;
265 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
266 double startT = curT;
267 #endif
269 /*CmiPrintf("[%d] CldBalance %f\n", CmiMyPe(), startT);*/
270 avgLoad = CldMinAvg();
272 overload = CldLoad() - avgLoad;
273 if (overload > CldCountTokens())
274 overload = CldCountTokens();
276 overload = CldCountTokens() - avgLoad;
278 if (overload > overload_threshold) {
279 int nNeighbors = CpvAccess(numNeighbors);
280 for (i=0; i<nNeighbors; i++)
281 if (CpvAccess(neighbors)[i].load < avgLoad) {
282 totalUnderAvg += avgLoad-CpvAccess(neighbors)[i].load;
283 if (avgLoad - CpvAccess(neighbors)[i].load > maxUnderAvg)
284 maxUnderAvg = avgLoad - CpvAccess(neighbors)[i].load;
285 numUnderAvg++;
287 if (numUnderAvg > 0) {
288 int myrank = CmiMyRank();
289 for (i=0; ((i<nNeighbors) && (overload>0)); i++) {
290 j = (i+CpvAccess(Mindex))%CpvAccess(numNeighbors);
291 if (CpvAccess(neighbors)[j].load < avgLoad) {
292 numToMove = (avgLoad - CpvAccess(neighbors)[j].load);
293 if (numToMove > overload)
294 numToMove = overload;
295 overload -= numToMove;
296 CpvAccess(neighbors)[j].load += numToMove;
297 #if CMK_MULTICORE || CMK_USE_IBVERBS
298 CldSimpleMultipleSend(CpvAccess(neighbors)[j].pe, numToMove, myrank);
299 #else
300 CldMultipleSend(CpvAccess(neighbors)[j].pe,
301 numToMove, myrank,
302 #if CMK_SMP
304 #else
306 #endif
308 #endif
311 } /* end of numUnderAvg > 0 */
313 CldSendLoad();
314 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
315 traceUserBracketEvent(CpvAccess(CldData)->balanceEvt, startT, CmiWallTimer());
316 #endif
319 void CldBalancePeriod(void *dummy, double curT)
321 CldBalance(NULL, curT);
322 CcdCallFnAfterOnPE((CcdVoidFn)CldBalancePeriod, NULL, LBPeriod, CmiMyPe());
326 void CldLoadResponseHandler(loadmsg *msg)
328 int i;
330 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
331 double startT = CmiWallTimer();
332 #endif
333 #if USE_MULTICAST
334 for(i=0; i<CpvAccess(numNeighbors); i++)
335 if (CpvAccess(neighbors)[i].pe == msg->pe) {
336 CpvAccess(neighbors)[i].load = msg->load;
337 break;
339 CmiFree(msg);
340 #else
341 int index = msg->toindex;
342 if (index == -1) {
343 for(i=0; i<CpvAccess(numNeighbors); i++)
344 if (CpvAccess(neighbors)[i].pe == msg->pe) {
345 index = i;
346 break;
349 if (index != -1) { /* index can be -1, if neighbors table not init yet */
350 CpvAccess(neighbors)[index].load = msg->load;
351 if (CpvAccess(neighbors)[index].index == -1) CpvAccess(neighbors)[index].index = msg->fromindex;
353 putPool(msg);
354 #endif
355 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
356 traceUserBracketEvent(CpvAccess(CldData)->updateLoadEvt, startT, CmiWallTimer());
357 #endif
360 void CldBalanceHandler(void *msg)
362 CldRestoreHandler(msg);
363 CldPutToken(msg);
366 void CldHandler(void *msg)
368 CldInfoFn ifn; CldPackFn pfn;
369 int len, queueing, priobits; unsigned int *prioptr;
371 CldRestoreHandler(msg);
372 ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
373 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
374 CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
375 /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
378 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
380 int len, queueing, priobits,i; unsigned int *prioptr;
381 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
382 CldPackFn pfn;
383 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
384 if (pfn) {
385 pfn(&msg);
386 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
388 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
389 CmiSetInfo(msg,infofn);
391 CmiSyncMulticastAndFree(grp, len, msg);
394 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
396 int len, queueing, priobits,i; unsigned int *prioptr;
397 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
398 CldPackFn pfn;
399 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
400 if (pfn) {
401 pfn(&msg);
402 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
404 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
405 CmiSetInfo(msg,infofn);
407 for(i=0;i<npes;i++) {
408 CmiSyncSend(pes[i], len, msg);
410 CmiFree(msg);
412 CmiSyncListSendAndFree(npes, pes, len, msg);
415 void CldEnqueue(int pe, void *msg, int infofn)
417 int len, queueing, priobits, avg; unsigned int *prioptr;
418 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
419 CldPackFn pfn;
421 if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
422 avg = CldMinAvg();
423 if (CldCountTokens() < avg)
424 pe = CmiMyPe();
425 else
426 pe = CpvAccess(MinProc);
427 #if CMK_NODE_QUEUE_AVAILABLE
428 if (CmiNodeOf(pe) == CmiMyNode()) {
429 CldNodeEnqueue(CmiMyNode(), msg, infofn);
430 return;
432 #endif
433 /* always pack the message because the message may be move away
434 to a different processor later by CldGetToken() */
435 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
436 if (pfn && CmiNumNodes()>1) {
437 pfn(&msg);
438 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
440 if (pe != CmiMyPe()) {
441 CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
442 CpvAccess(CldRelocatedMessages)++;
443 CmiSetInfo(msg,infofn);
444 CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
445 CmiSyncSendAndFree(pe, len, msg);
447 else {
448 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
449 CmiSetInfo(msg,infofn);
450 CldPutToken(msg);
453 else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
454 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
455 /*CmiSetInfo(msg,infofn);*/
456 CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
457 /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
459 else {
460 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
461 if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
462 pfn(&msg);
463 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
465 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
466 CmiSetInfo(msg,infofn);
467 if (pe==CLD_BROADCAST)
468 CmiSyncBroadcastAndFree(len, msg);
469 else if (pe==CLD_BROADCAST_ALL)
470 CmiSyncBroadcastAllAndFree(len, msg);
471 else CmiSyncSendAndFree(pe, len, msg);
475 void CldNodeEnqueue(int node, void *msg, int infofn)
477 int len, queueing, priobits, pe, avg; unsigned int *prioptr;
478 CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
479 CldPackFn pfn;
480 if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
481 avg = CldMinAvg();
482 if (CldCountTokens() < avg)
483 pe = CmiMyPe();
484 else
485 pe = CpvAccess(MinProc);
486 node = CmiNodeOf(pe);
487 if (node != CmiMyNode()){
488 CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
489 CpvAccess(CldRelocatedMessages)++;
490 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
491 if (pfn) {
492 pfn(&msg);
493 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
495 CmiSetInfo(msg,infofn);
496 CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
497 CmiSyncNodeSendAndFree(node, len, msg);
499 else {
500 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
501 /* CmiSetInfo(msg,infofn);
502 CldPutToken(msg); */
503 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
506 else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
507 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
508 // CmiSetInfo(msg,infofn);
509 CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
511 else {
512 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
513 if (pfn) {
514 pfn(&msg);
515 ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
517 CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
518 CmiSetInfo(msg,infofn);
519 if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
520 else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
521 else CmiSyncNodeSendAndFree(node, len, msg);
525 void CldReadNeighborData()
527 FILE *fp;
528 char filename[25];
529 int i, *pes;
531 if (CmiNumPes() <= 1)
532 return;
533 sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
534 if ((fp = fopen(filename, "r")) == 0)
536 CmiError("Error opening graph init file on PE: %d\n", CmiMyPe());
537 return;
539 fscanf(fp, "%d", &CpvAccess(numNeighbors));
540 CpvAccess(neighbors) =
541 (struct CldNeighborData *)calloc(CpvAccess(numNeighbors),
542 sizeof(struct CldNeighborData));
543 pes = (int *)calloc(CpvAccess(numNeighbors), sizeof(int));
544 for (i=0; i<CpvAccess(numNeighbors); i++) {
545 fscanf(fp, "%d", &(CpvAccess(neighbors)[i].pe));
546 pes[i] = CpvAccess(neighbors)[i].pe;
547 CpvAccess(neighbors)[i].load = 0;
549 fclose(fp);
550 CpvAccess(neighborGroup) = CmiEstablishGroup(CpvAccess(numNeighbors), pes);
553 static void CldComputeNeighborData()
555 int i, npes;
556 int *pes;
557 LBtopoFn topofn;
558 void *topo;
560 topofn = LBTopoLookup(_lbtopo);
561 if (topofn == NULL) {
562 char str[1024];
563 CmiPrintf("SeedLB> Fatal error: Unknown topology: %s. Choose from:\n", _lbtopo);
564 printoutTopo();
565 sprintf(str, "SeedLB> Fatal error: Unknown topology: %s", _lbtopo);
566 CmiAbort(str);
568 topo = topofn(CmiNumPes());
569 npes = getTopoMaxNeighbors(topo);
570 pes = (int *)malloc(npes*sizeof(int));
571 getTopoNeighbors(topo, CmiMyPe(), pes, &npes);
572 #if 0
574 char buf[512], *ptr;
575 sprintf(buf, "Neighors for PE %d (%d): ", CmiMyPe(), npes);
576 ptr = buf + strlen(buf);
577 for (i=0; i<npes; i++) {
578 CmiAssert(pes[i] < CmiNumPes() && pes[i] != CmiMyPe());
579 sprintf(ptr, " %d ", pes[i]);
580 ptr += strlen(ptr);
582 strcat(ptr, "\n");
583 CmiPrintf(buf);
585 #endif
587 CpvAccess(numNeighbors) = npes;
588 CpvAccess(neighbors) =
589 (struct CldNeighborData *)calloc(npes, sizeof(struct CldNeighborData));
590 for (i=0; i<npes; i++) {
591 CpvAccess(neighbors)[i].pe = pes[i];
592 CpvAccess(neighbors)[i].load = 0;
593 #if ! USE_MULTICAST
594 CpvAccess(neighbors)[i].index = -1;
595 #endif
597 CpvAccess(neighborGroup) = CmiEstablishGroup(npes, pes);
598 free(pes);
601 static void topo_callback()
603 CldComputeNeighborData();
604 #if CMK_MULTICORE
605 CmiNodeBarrier();
606 #endif
607 CldBalancePeriod(NULL, CmiWallTimer());
610 void CldGraphModuleInit(char **argv)
612 CpvInitialize(CldProcInfo, CldData);
613 CpvInitialize(int, numNeighbors);
614 CpvInitialize(int, MinLoad);
615 CpvInitialize(int, Mindex);
616 CpvInitialize(int, MinProc);
617 CpvInitialize(int, start);
618 CpvInitialize(CmiGroup, neighborGroup);
619 CpvInitialize(CldNeighborData, neighbors);
620 CpvInitialize(int, CldBalanceHandlerIndex);
621 CpvInitialize(int, CldLoadResponseHandlerIndex);
622 CpvInitialize(int, CldAskLoadHandlerIndex);
624 CpvAccess(start) = -1;
625 CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
626 CpvAccess(CldData)->lastCheck = -1;
627 CpvAccess(CldData)->sent = 0;
628 #if CMK_TRACE_ENABLED
629 CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
630 CpvAccess(CldData)->updateLoadEvt = traceRegisterUserEvent("UpdateLoad", -1);
631 CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
632 CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
633 #endif
635 CpvAccess(MinLoad) = 0;
636 CpvAccess(Mindex) = 0;
637 CpvAccess(MinProc) = CmiMyPe();
638 CpvAccess(CldBalanceHandlerIndex) =
639 CmiRegisterHandler(CldBalanceHandler);
640 CpvAccess(CldLoadResponseHandlerIndex) =
641 CmiRegisterHandler((CmiHandler)CldLoadResponseHandler);
642 CpvAccess(CldAskLoadHandlerIndex) =
643 CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
645 /* communication thread */
646 if (CmiMyRank() == CmiMyNodeSize()) return;
648 CmiGetArgStringDesc(argv, "+LBTopo", &_lbtopo, "define load balancing topology");
649 if (CmiMyPe() == 0) CmiPrintf("Seed LB> Topology %s\n", _lbtopo);
651 if (CmiNumPes() > 1) {
652 #if 0
653 FILE *fp;
654 char filename[20];
656 sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
657 if ((fp = fopen(filename, "r")) == 0)
659 if (CmiMyPe() == 0) {
660 CmiPrintf("No proper graph%d directory exists in current directory.\n Generating... ", CmiNumPes());
661 gengraph(CmiNumPes(), (int)(sqrt(CmiNumPes())+0.5), 234);
662 CmiPrintf("done.\n");
664 else {
665 while (!(fp = fopen(filename, "r"))) ;
666 fclose(fp);
669 else fclose(fp);
670 CldReadNeighborData();
671 #endif
673 CldComputeNeighborData();
674 #if CMK_MULTICORE
675 CmiNodeBarrier();
676 #endif
677 CldBalancePeriod(NULL, CmiWallTimer());
679 CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)topo_callback, NULL);
683 if (CmiGetArgIntDesc(argv, "+cldb_neighbor_period", &LBPeriod, "time interval to do neighbor seed lb")) {
684 CmiAssert(LBPeriod>0);
685 if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor load balancing period is %d\n", LBPeriod);
687 if (CmiGetArgIntDesc(argv, "+cldb_neighbor_overload", &overload_threshold, "neighbor seed lb's overload threshold")) {
688 CmiAssert(overload_threshold>0);
689 if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor overload threshold is %d\n", overload_threshold);
692 #if 1
693 _lbsteal = CmiGetArgFlagDesc(argv, "+workstealing", "Charm++> Enable work stealing at idle time");
694 if (_lbsteal) {
695 /* register idle handlers - when idle, keep asking work from neighbors */
696 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
697 (CcdVoidFn) CldBeginIdle, NULL);
698 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
699 (CcdVoidFn) CldStillIdle, NULL);
700 if (CmiMyPe() == 0)
701 CmiPrintf("Charm++> Work stealing is enabled. \n");
703 #endif
707 void CldModuleInit(char **argv)
709 CpvInitialize(int, CldHandlerIndex);
710 CpvInitialize(int, CldRelocatedMessages);
711 CpvInitialize(int, CldLoadBalanceMessages);
712 CpvInitialize(int, CldMessageChunks);
713 CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
714 CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) =
715 CpvAccess(CldMessageChunks) = 0;
717 CpvInitialize(loadmsg *, msgpool);
718 CpvAccess(msgpool) = NULL;
720 CldModuleGeneralInit(argv);
721 CldGraphModuleInit(argv);
723 CpvAccess(CldLoadNotify) = 1;