4 #include "cldb.neighbor.h"
9 #define USE_MULTICAST 0
10 #define IDLE_IMMEDIATE 1
11 #define TRACE_USEREVENTS 1
13 #define PERIOD 20 /* default: 30 */
16 static int LBPeriod
= PERIOD
; /* time to call load balancing */
17 static int overload_threshold
= MAXOVERLOAD
;
19 typedef struct CldProcInfo_s
{
21 int sent
; /* flag to disable idle work request */
22 int balanceEvt
; /* user event for balancing */
24 int idleEvt
; /* user event for idle balancing */
25 int idleprocEvt
; /* user event for processing idle req */
28 extern char *_lbtopo
; /* topology name string */
29 int _lbsteal
= 0; /* work stealing flag */
31 void gengraph(int, int, int, int *, int *);
33 CpvStaticDeclare(CldProcInfo
, CldData
);
34 CpvStaticDeclare(int, CldLoadResponseHandlerIndex
);
35 CpvStaticDeclare(int, CldAskLoadHandlerIndex
);
36 CpvStaticDeclare(int, MinLoad
);
37 CpvStaticDeclare(int, MinProc
);
38 CpvStaticDeclare(int, Mindex
);
39 CpvStaticDeclare(int, start
);
42 CpvStaticDeclare(loadmsg
*, msgpool
);
50 if (CpvAccess(msgpool
)!=NULL
) {
51 msg
= CpvAccess(msgpool
);
52 CpvAccess(msgpool
) = msg
->next
;
55 msg
= CmiAlloc(sizeof(loadmsg
));
56 CmiSetHandler(msg
, CpvAccess(CldLoadResponseHandlerIndex
));
65 void putPool(loadmsg
*msg
)
67 msg
->next
= CpvAccess(msgpool
);
68 CpvAccess(msgpool
) = msg
;
73 void LoadNotifyFn(int l
)
75 CldProcInfo cldData
= CpvAccess(CldData
);
79 char *CldGetStrategy(void)
84 /* since I am idle, ask for work from neighbors */
85 static void CldBeginIdle(void *dummy
)
87 CpvAccess(CldData
)->lastCheck
= CmiWallTimer();
90 static void CldEndIdle(void *dummy
)
92 CpvAccess(CldData
)->lastCheck
= -1;
95 static void CldStillIdle(void *dummy
, double curT
)
101 CldProcInfo cldData
= CpvAccess(CldData
);
104 double lt
= cldData
->lastCheck
;
105 /* only ask for work every 20ms */
106 if (cldData
->sent
&& (lt
!=-1 && now
-lt
< PERIOD
*0.001)) return;
107 cldData
->lastCheck
= now
;
109 myload
= CldCountTokens();
110 if (myload
> 0) return;
112 msg
.from_pe
= CmiMyPe();
113 CmiSetHandler(&msg
, CpvAccess(CldAskLoadHandlerIndex
));
114 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
116 CmiBecomeImmediate(&msg
);
117 for (i
=0; i
<CpvAccess(numNeighbors
); i
++) {
118 msg
.to_rank
= CmiRankOf(CpvAccess(neighbors
)[i
].pe
);
119 CmiSyncNodeSend(CmiNodeOf(CpvAccess(neighbors
)[i
].pe
),sizeof(requestmsg
),(char *)&msg
);
123 CmiSyncMulticast(CpvAccess(neighborGroup
), sizeof(requestmsg
), &msg
);
127 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
128 traceUserBracketEvent(cldData
->idleEvt
, now
, CmiWallTimer());
132 /* immediate message handler, work at node level */
133 /* send some work to requested proc */
134 static void CldAskLoadHandler(requestmsg
*msg
)
136 int receiver
, rank
, recvIdx
, i
;
137 int myload
= CldCountTokens();
138 double now
= CmiWallTimer();
140 /* only give you work if I have more than 1 */
143 receiver
= msg
->from_pe
;
145 if (msg
->to_rank
!= -1) rank
= msg
->to_rank
;
146 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
148 if (CmiTryLock(CpvAccessOther(cldLock
, rank
))) {
149 CmiDelayImmediate(); /* postpone immediate message */
152 CmiUnlock(CpvAccessOther(cldLock
, rank
)); /* release lock, grab later */
154 sendLoad
= myload
/ CpvAccess(numNeighbors
) / 2;
155 if (sendLoad
< 1) sendLoad
= 1;
157 for (i
=0; i
<CpvAccess(numNeighbors
); i
++)
158 if (CpvAccess(neighbors
)[i
].pe
== receiver
) break;
160 if(i
<CpvAccess(numNeighbors
)) {CmiFree(msg
); return;} /* ? */
161 CpvAccess(neighbors
)[i
].load
+= sendLoad
;
162 CldMultipleSend(receiver
, sendLoad
, rank
, 0);
164 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
165 /* this is dangerous since projections logging is not thread safe */
167 CldProcInfo cldData
= CpvAccessOther(CldData
, rank
);
168 traceUserBracketEvent(cldData
->idleprocEvt
, now
, CmiWallTimer());
176 /* balancing by exchanging load among neighbors */
181 /* directly send load to neighbors */
182 double myload
= CldCountTokens();
183 int nNeighbors
= CpvAccess(numNeighbors
);
185 for (i
=0; i
<nNeighbors
; i
++) {
186 int neighbor_pe
= CpvAccess(neighbors
)[i
].pe
;
188 for (j
=0; j
<CpvAccessOther(numNeighbors
, neighbor_pe
); j
++)
189 if (CpvAccessOther(neighbors
, neighbor_pe
)[j
].pe
== CmiMyPe())
191 CpvAccessOther(neighbors
, neighbor_pe
)[j
].load
= myload
;
201 msg
.load
= CldCountTokens();
202 CmiSetHandler(&msg
, CpvAccess(CldLoadResponseHandlerIndex
));
203 CmiSyncMulticast(CpvAccess(neighborGroup
), sizeof(loadmsg
), &msg
);
204 CpvAccess(CldLoadBalanceMessages
) += CpvAccess(numNeighbors
);
207 int mype
= CmiMyPe();
208 int myload
= CldCountTokens();
209 for(i
=0; i
<CpvAccess(numNeighbors
); i
++) {
210 loadmsg
*msg
= getPool();
212 msg
->toindex
= CpvAccess(neighbors
)[i
].index
;
215 CmiSyncSendAndFree(CpvAccess(neighbors
)[i
].pe
, sizeof(loadmsg
), msg
);
226 int nNeighbors
= CpvAccess(numNeighbors
);
227 if (CpvAccess(start
) == -1)
228 CpvAccess(start
) = CmiMyPe() % nNeighbors
;
231 /* update load from neighbors for multicore */
232 for (i
=0; i
<nNeighbors
; i
++) {
233 CpvAccess(neighbors
)[i
].load
= CldLoadRank(CpvAccess(neighbors
)[i
].pe
);
236 CpvAccess(MinProc
) = CpvAccess(neighbors
)[CpvAccess(start
)].pe
;
237 CpvAccess(MinLoad
) = CpvAccess(neighbors
)[CpvAccess(start
)].load
;
238 sum
= CpvAccess(neighbors
)[CpvAccess(start
)].load
;
239 CpvAccess(Mindex
) = CpvAccess(start
);
240 for (i
=1; i
<nNeighbors
; i
++) {
241 CpvAccess(start
) = (CpvAccess(start
)+1) % nNeighbors
;
242 sum
+= CpvAccess(neighbors
)[CpvAccess(start
)].load
;
243 if (CpvAccess(MinLoad
) > CpvAccess(neighbors
)[CpvAccess(start
)].load
) {
244 CpvAccess(MinLoad
) = CpvAccess(neighbors
)[CpvAccess(start
)].load
;
245 CpvAccess(MinProc
) = CpvAccess(neighbors
)[CpvAccess(start
)].pe
;
246 CpvAccess(Mindex
) = CpvAccess(start
);
249 CpvAccess(start
) = (CpvAccess(start
)+2) % nNeighbors
;
250 myload
= CldCountTokens();
252 if (myload
< CpvAccess(MinLoad
)) {
253 CpvAccess(MinLoad
) = myload
;
254 CpvAccess(MinProc
) = CmiMyPe();
256 i
= (int)(1.0 + (((float)sum
) /((float)(nNeighbors
+1))));
260 void CldBalance(void *dummy
, double curT
)
262 int i
, j
, overload
, numToMove
=0, avgLoad
;
263 int totalUnderAvg
=0, numUnderAvg
=0, maxUnderAvg
=0;
265 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
266 double startT
= curT
;
269 /*CmiPrintf("[%d] CldBalance %f\n", CmiMyPe(), startT);*/
270 avgLoad
= CldMinAvg();
272 overload = CldLoad() - avgLoad;
273 if (overload > CldCountTokens())
274 overload = CldCountTokens();
276 overload
= CldCountTokens() - avgLoad
;
278 if (overload
> overload_threshold
) {
279 int nNeighbors
= CpvAccess(numNeighbors
);
280 for (i
=0; i
<nNeighbors
; i
++)
281 if (CpvAccess(neighbors
)[i
].load
< avgLoad
) {
282 totalUnderAvg
+= avgLoad
-CpvAccess(neighbors
)[i
].load
;
283 if (avgLoad
- CpvAccess(neighbors
)[i
].load
> maxUnderAvg
)
284 maxUnderAvg
= avgLoad
- CpvAccess(neighbors
)[i
].load
;
287 if (numUnderAvg
> 0) {
288 int myrank
= CmiMyRank();
289 for (i
=0; ((i
<nNeighbors
) && (overload
>0)); i
++) {
290 j
= (i
+CpvAccess(Mindex
))%CpvAccess(numNeighbors
);
291 if (CpvAccess(neighbors
)[j
].load
< avgLoad
) {
292 numToMove
= (avgLoad
- CpvAccess(neighbors
)[j
].load
);
293 if (numToMove
> overload
)
294 numToMove
= overload
;
295 overload
-= numToMove
;
296 CpvAccess(neighbors
)[j
].load
+= numToMove
;
297 #if CMK_MULTICORE || CMK_USE_IBVERBS
298 CldSimpleMultipleSend(CpvAccess(neighbors
)[j
].pe
, numToMove
, myrank
);
300 CldMultipleSend(CpvAccess(neighbors
)[j
].pe
,
311 } /* end of numUnderAvg > 0 */
314 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
315 traceUserBracketEvent(CpvAccess(CldData
)->balanceEvt
, startT
, CmiWallTimer());
319 void CldBalancePeriod(void *dummy
, double curT
)
321 CldBalance(NULL
, curT
);
322 CcdCallFnAfterOnPE((CcdVoidFn
)CldBalancePeriod
, NULL
, LBPeriod
, CmiMyPe());
326 void CldLoadResponseHandler(loadmsg
*msg
)
330 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
331 double startT
= CmiWallTimer();
334 for(i
=0; i
<CpvAccess(numNeighbors
); i
++)
335 if (CpvAccess(neighbors
)[i
].pe
== msg
->pe
) {
336 CpvAccess(neighbors
)[i
].load
= msg
->load
;
341 int index
= msg
->toindex
;
343 for(i
=0; i
<CpvAccess(numNeighbors
); i
++)
344 if (CpvAccess(neighbors
)[i
].pe
== msg
->pe
) {
349 if (index
!= -1) { /* index can be -1, if neighbors table not init yet */
350 CpvAccess(neighbors
)[index
].load
= msg
->load
;
351 if (CpvAccess(neighbors
)[index
].index
== -1) CpvAccess(neighbors
)[index
].index
= msg
->fromindex
;
355 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
356 traceUserBracketEvent(CpvAccess(CldData
)->updateLoadEvt
, startT
, CmiWallTimer());
360 void CldBalanceHandler(void *msg
)
362 CldRestoreHandler(msg
);
366 void CldHandler(void *msg
)
368 CldInfoFn ifn
; CldPackFn pfn
;
369 int len
, queueing
, priobits
; unsigned int *prioptr
;
371 CldRestoreHandler(msg
);
372 ifn
= (CldInfoFn
)CmiHandlerToFunction(CmiGetInfo(msg
));
373 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
374 CsdEnqueueGeneral(msg
, CQS_QUEUEING_LIFO
, priobits
, prioptr
);
375 /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
378 void CldEnqueueGroup(CmiGroup grp
, void *msg
, int infofn
)
380 int len
, queueing
, priobits
,i
; unsigned int *prioptr
;
381 CldInfoFn ifn
= (CldInfoFn
)CmiHandlerToFunction(infofn
);
383 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
386 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
388 CldSwitchHandler(msg
, CpvAccess(CldHandlerIndex
));
389 CmiSetInfo(msg
,infofn
);
391 CmiSyncMulticastAndFree(grp
, len
, msg
);
394 void CldEnqueueMulti(int npes
, int *pes
, void *msg
, int infofn
)
396 int len
, queueing
, priobits
,i
; unsigned int *prioptr
;
397 CldInfoFn ifn
= (CldInfoFn
)CmiHandlerToFunction(infofn
);
399 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
402 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
404 CldSwitchHandler(msg
, CpvAccess(CldHandlerIndex
));
405 CmiSetInfo(msg
,infofn
);
407 for(i=0;i<npes;i++) {
408 CmiSyncSend(pes[i], len, msg);
412 CmiSyncListSendAndFree(npes
, pes
, len
, msg
);
415 void CldEnqueue(int pe
, void *msg
, int infofn
)
417 int len
, queueing
, priobits
, avg
; unsigned int *prioptr
;
418 CldInfoFn ifn
= (CldInfoFn
)CmiHandlerToFunction(infofn
);
421 if ((pe
== CLD_ANYWHERE
) && (CmiNumPes() > 1)) {
423 if (CldCountTokens() < avg
)
426 pe
= CpvAccess(MinProc
);
427 #if CMK_NODE_QUEUE_AVAILABLE
428 if (CmiNodeOf(pe
) == CmiMyNode()) {
429 CldNodeEnqueue(CmiMyNode(), msg
, infofn
);
433 /* always pack the message because the message may be move away
434 to a different processor later by CldGetToken() */
435 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
436 if (pfn
&& CmiNumNodes()>1) {
438 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
440 if (pe
!= CmiMyPe()) {
441 CpvAccess(neighbors
)[CpvAccess(Mindex
)].load
++;
442 CpvAccess(CldRelocatedMessages
)++;
443 CmiSetInfo(msg
,infofn
);
444 CldSwitchHandler(msg
, CpvAccess(CldBalanceHandlerIndex
));
445 CmiSyncSendAndFree(pe
, len
, msg
);
448 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
449 CmiSetInfo(msg
,infofn
);
453 else if ((pe
== CmiMyPe()) || (CmiNumPes() == 1)) {
454 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
455 /*CmiSetInfo(msg,infofn);*/
456 CsdEnqueueGeneral(msg
, CQS_QUEUEING_LIFO
, priobits
, prioptr
);
457 /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
460 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
461 if (pfn
&& CmiNodeOf(pe
) != CmiMyNode()) {
463 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
465 CldSwitchHandler(msg
, CpvAccess(CldHandlerIndex
));
466 CmiSetInfo(msg
,infofn
);
467 if (pe
==CLD_BROADCAST
)
468 CmiSyncBroadcastAndFree(len
, msg
);
469 else if (pe
==CLD_BROADCAST_ALL
)
470 CmiSyncBroadcastAllAndFree(len
, msg
);
471 else CmiSyncSendAndFree(pe
, len
, msg
);
475 void CldNodeEnqueue(int node
, void *msg
, int infofn
)
477 int len
, queueing
, priobits
, pe
, avg
; unsigned int *prioptr
;
478 CldInfoFn ifn
= (CldInfoFn
)CmiHandlerToFunction(infofn
);
480 if ((node
== CLD_ANYWHERE
) && (CmiNumPes() > 1)) {
482 if (CldCountTokens() < avg
)
485 pe
= CpvAccess(MinProc
);
486 node
= CmiNodeOf(pe
);
487 if (node
!= CmiMyNode()){
488 CpvAccess(neighbors
)[CpvAccess(Mindex
)].load
++;
489 CpvAccess(CldRelocatedMessages
)++;
490 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
493 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
495 CmiSetInfo(msg
,infofn
);
496 CldSwitchHandler(msg
, CpvAccess(CldBalanceHandlerIndex
));
497 CmiSyncNodeSendAndFree(node
, len
, msg
);
500 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
501 /* CmiSetInfo(msg,infofn);
503 CsdNodeEnqueueGeneral(msg
, queueing
, priobits
, prioptr
);
506 else if ((node
== CmiMyNode()) || (CmiNumPes() == 1)) {
507 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
508 // CmiSetInfo(msg,infofn);
509 CsdNodeEnqueueGeneral(msg
, queueing
, priobits
, prioptr
);
512 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
515 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
517 CldSwitchHandler(msg
, CpvAccess(CldHandlerIndex
));
518 CmiSetInfo(msg
,infofn
);
519 if (node
==CLD_BROADCAST
) { CmiSyncNodeBroadcastAndFree(len
, msg
); }
520 else if (node
==CLD_BROADCAST_ALL
){CmiSyncNodeBroadcastAllAndFree(len
,msg
);}
521 else CmiSyncNodeSendAndFree(node
, len
, msg
);
525 void CldReadNeighborData()
531 if (CmiNumPes() <= 1)
533 sprintf(filename
, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
534 if ((fp
= fopen(filename
, "r")) == 0)
536 CmiError("Error opening graph init file on PE: %d\n", CmiMyPe());
539 fscanf(fp
, "%d", &CpvAccess(numNeighbors
));
540 CpvAccess(neighbors
) =
541 (struct CldNeighborData
*)calloc(CpvAccess(numNeighbors
),
542 sizeof(struct CldNeighborData
));
543 pes
= (int *)calloc(CpvAccess(numNeighbors
), sizeof(int));
544 for (i
=0; i
<CpvAccess(numNeighbors
); i
++) {
545 fscanf(fp
, "%d", &(CpvAccess(neighbors
)[i
].pe
));
546 pes
[i
] = CpvAccess(neighbors
)[i
].pe
;
547 CpvAccess(neighbors
)[i
].load
= 0;
550 CpvAccess(neighborGroup
) = CmiEstablishGroup(CpvAccess(numNeighbors
), pes
);
553 static void CldComputeNeighborData()
560 topofn
= LBTopoLookup(_lbtopo
);
561 if (topofn
== NULL
) {
563 CmiPrintf("SeedLB> Fatal error: Unknown topology: %s. Choose from:\n", _lbtopo
);
565 sprintf(str
, "SeedLB> Fatal error: Unknown topology: %s", _lbtopo
);
568 topo
= topofn(CmiNumPes());
569 npes
= getTopoMaxNeighbors(topo
);
570 pes
= (int *)malloc(npes
*sizeof(int));
571 getTopoNeighbors(topo
, CmiMyPe(), pes
, &npes
);
575 sprintf(buf
, "Neighors for PE %d (%d): ", CmiMyPe(), npes
);
576 ptr
= buf
+ strlen(buf
);
577 for (i
=0; i
<npes
; i
++) {
578 CmiAssert(pes
[i
] < CmiNumPes() && pes
[i
] != CmiMyPe());
579 sprintf(ptr
, " %d ", pes
[i
]);
587 CpvAccess(numNeighbors
) = npes
;
588 CpvAccess(neighbors
) =
589 (struct CldNeighborData
*)calloc(npes
, sizeof(struct CldNeighborData
));
590 for (i
=0; i
<npes
; i
++) {
591 CpvAccess(neighbors
)[i
].pe
= pes
[i
];
592 CpvAccess(neighbors
)[i
].load
= 0;
594 CpvAccess(neighbors
)[i
].index
= -1;
597 CpvAccess(neighborGroup
) = CmiEstablishGroup(npes
, pes
);
601 static void topo_callback()
603 CldComputeNeighborData();
607 CldBalancePeriod(NULL
, CmiWallTimer());
610 void CldGraphModuleInit(char **argv
)
612 CpvInitialize(CldProcInfo
, CldData
);
613 CpvInitialize(int, numNeighbors
);
614 CpvInitialize(int, MinLoad
);
615 CpvInitialize(int, Mindex
);
616 CpvInitialize(int, MinProc
);
617 CpvInitialize(int, start
);
618 CpvInitialize(CmiGroup
, neighborGroup
);
619 CpvInitialize(CldNeighborData
, neighbors
);
620 CpvInitialize(int, CldBalanceHandlerIndex
);
621 CpvInitialize(int, CldLoadResponseHandlerIndex
);
622 CpvInitialize(int, CldAskLoadHandlerIndex
);
624 CpvAccess(start
) = -1;
625 CpvAccess(CldData
) = (CldProcInfo
)CmiAlloc(sizeof(struct CldProcInfo_s
));
626 CpvAccess(CldData
)->lastCheck
= -1;
627 CpvAccess(CldData
)->sent
= 0;
628 #if CMK_TRACE_ENABLED
629 CpvAccess(CldData
)->balanceEvt
= traceRegisterUserEvent("CldBalance", -1);
630 CpvAccess(CldData
)->updateLoadEvt
= traceRegisterUserEvent("UpdateLoad", -1);
631 CpvAccess(CldData
)->idleEvt
= traceRegisterUserEvent("CldBalanceIdle", -1);
632 CpvAccess(CldData
)->idleprocEvt
= traceRegisterUserEvent("CldBalanceProcIdle", -1);
635 CpvAccess(MinLoad
) = 0;
636 CpvAccess(Mindex
) = 0;
637 CpvAccess(MinProc
) = CmiMyPe();
638 CpvAccess(CldBalanceHandlerIndex
) =
639 CmiRegisterHandler(CldBalanceHandler
);
640 CpvAccess(CldLoadResponseHandlerIndex
) =
641 CmiRegisterHandler((CmiHandler
)CldLoadResponseHandler
);
642 CpvAccess(CldAskLoadHandlerIndex
) =
643 CmiRegisterHandler((CmiHandler
)CldAskLoadHandler
);
645 /* communication thread */
646 if (CmiMyRank() == CmiMyNodeSize()) return;
648 CmiGetArgStringDesc(argv
, "+LBTopo", &_lbtopo
, "define load balancing topology");
649 if (CmiMyPe() == 0) CmiPrintf("Seed LB> Topology %s\n", _lbtopo
);
651 if (CmiNumPes() > 1) {
656 sprintf(filename
, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
657 if ((fp
= fopen(filename
, "r")) == 0)
659 if (CmiMyPe() == 0) {
660 CmiPrintf("No proper graph%d directory exists in current directory.\n Generating... ", CmiNumPes());
661 gengraph(CmiNumPes(), (int)(sqrt(CmiNumPes())+0.5), 234);
662 CmiPrintf("done.\n");
665 while (!(fp
= fopen(filename
, "r"))) ;
670 CldReadNeighborData();
673 CldComputeNeighborData();
677 CldBalancePeriod(NULL, CmiWallTimer());
679 CcdCallOnCondition(CcdTOPOLOGY_AVAIL
, (CcdVoidFn
)topo_callback
, NULL
);
683 if (CmiGetArgIntDesc(argv
, "+cldb_neighbor_period", &LBPeriod
, "time interval to do neighbor seed lb")) {
684 CmiAssert(LBPeriod
>0);
685 if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor load balancing period is %d\n", LBPeriod
);
687 if (CmiGetArgIntDesc(argv
, "+cldb_neighbor_overload", &overload_threshold
, "neighbor seed lb's overload threshold")) {
688 CmiAssert(overload_threshold
>0);
689 if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor overload threshold is %d\n", overload_threshold
);
693 _lbsteal
= CmiGetArgFlagDesc(argv
, "+workstealing", "Charm++> Enable work stealing at idle time");
695 /* register idle handlers - when idle, keep asking work from neighbors */
696 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE
,
697 (CcdVoidFn
) CldBeginIdle
, NULL
);
698 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE
,
699 (CcdVoidFn
) CldStillIdle
, NULL
);
701 CmiPrintf("Charm++> Work stealing is enabled. \n");
707 void CldModuleInit(char **argv
)
709 CpvInitialize(int, CldHandlerIndex
);
710 CpvInitialize(int, CldRelocatedMessages
);
711 CpvInitialize(int, CldLoadBalanceMessages
);
712 CpvInitialize(int, CldMessageChunks
);
713 CpvAccess(CldHandlerIndex
) = CmiRegisterHandler(CldHandler
);
714 CpvAccess(CldRelocatedMessages
) = CpvAccess(CldLoadBalanceMessages
) =
715 CpvAccess(CldMessageChunks
) = 0;
717 CpvInitialize(loadmsg
*, msgpool
);
718 CpvAccess(msgpool
) = NULL
;
720 CldModuleGeneralInit(argv
);
721 CldGraphModuleInit(argv
);
723 CpvAccess(CldLoadNotify
) = 1;