4 #include "cldb.neighbor.h"
9 #define USE_MULTICAST 0
10 #define IDLE_IMMEDIATE 1
11 #define TRACE_USEREVENTS 1
13 #define PERIOD 20 /* default: 30 */
16 static int LBPeriod
= PERIOD
; /* time to call load balancing */
17 static int overload_threshold
= MAXOVERLOAD
;
19 typedef struct CldProcInfo_s
{
21 int sent
; /* flag to disable idle work request */
22 int balanceEvt
; /* user event for balancing */
24 int idleEvt
; /* user event for idle balancing */
25 int idleprocEvt
; /* user event for processing idle req */
28 CMI_EXTERNC_VARIABLE
char *_lbtopo
; /* topology name string */
29 int _lbsteal
= 0; /* work stealing flag */
32 void gengraph(int, int, int, int *, int *);
34 CpvStaticDeclare(CldProcInfo
, CldData
);
35 CpvStaticDeclare(int, CldLoadResponseHandlerIndex
);
36 CpvStaticDeclare(int, CldAskLoadHandlerIndex
);
37 CpvStaticDeclare(int, MinLoad
);
38 CpvStaticDeclare(int, MinProc
);
39 CpvStaticDeclare(int, Mindex
);
40 CpvStaticDeclare(int, start
);
43 CpvStaticDeclare(loadmsg
*, msgpool
);
49 loadmsg
*getPool(void){
51 if (CpvAccess(msgpool
)!=NULL
) {
52 msg
= CpvAccess(msgpool
);
53 CpvAccess(msgpool
) = msg
->next
;
56 msg
= (loadmsg
*)CmiAlloc(sizeof(loadmsg
));
57 CmiSetHandler(msg
, CpvAccess(CldLoadResponseHandlerIndex
));
66 void putPool(loadmsg
*msg
)
68 msg
->next
= CpvAccess(msgpool
);
69 CpvAccess(msgpool
) = msg
;
74 void LoadNotifyFn(int l
)
76 CldProcInfo cldData
= CpvAccess(CldData
);
80 const char *CldGetStrategy(void)
85 /* since I am idle, ask for work from neighbors */
86 static void CldBeginIdle(void *dummy
)
88 CpvAccess(CldData
)->lastCheck
= CmiWallTimer();
91 static void CldEndIdle(void *dummy
)
93 CpvAccess(CldData
)->lastCheck
= -1;
96 static void CldStillIdle(void *dummy
, double curT
)
102 CldProcInfo cldData
= CpvAccess(CldData
);
105 double lt
= cldData
->lastCheck
;
106 /* only ask for work every 20ms */
107 if (cldData
->sent
&& (lt
!=-1 && now
-lt
< PERIOD
*0.001)) return;
108 cldData
->lastCheck
= now
;
110 myload
= CldCountTokens();
111 if (myload
> 0) return;
113 msg
.from_pe
= CmiMyPe();
114 CmiSetHandler(&msg
, CpvAccess(CldAskLoadHandlerIndex
));
115 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
117 CmiBecomeImmediate(&msg
);
118 for (i
=0; i
<CpvAccess(numNeighbors
); i
++) {
119 msg
.to_rank
= CmiRankOf(CpvAccess(neighbors
)[i
].pe
);
120 CmiSyncNodeSend(CmiNodeOf(CpvAccess(neighbors
)[i
].pe
),sizeof(requestmsg
),(char *)&msg
);
124 CmiSyncMulticast(CpvAccess(neighborGroup
), sizeof(requestmsg
), &msg
);
128 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
129 traceUserBracketEvent(cldData
->idleEvt
, now
, CmiWallTimer());
133 /* immediate message handler, work at node level */
134 /* send some work to requested proc */
135 static void CldAskLoadHandler(requestmsg
*msg
)
137 int receiver
, rank
, recvIdx
, i
;
138 int myload
= CldCountTokens();
139 double now
= CmiWallTimer();
141 /* only give you work if I have more than 1 */
144 receiver
= msg
->from_pe
;
146 if (msg
->to_rank
!= -1) rank
= msg
->to_rank
;
147 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
149 if (CmiTryLock(CpvAccessOther(cldLock
, rank
))) {
150 CmiDelayImmediate(); /* postpone immediate message */
153 CmiUnlock(CpvAccessOther(cldLock
, rank
)); /* release lock, grab later */
155 sendLoad
= myload
/ CpvAccess(numNeighbors
) / 2;
156 if (sendLoad
< 1) sendLoad
= 1;
158 for (i
=0; i
<CpvAccess(numNeighbors
); i
++)
159 if (CpvAccess(neighbors
)[i
].pe
== receiver
) break;
161 if(i
<CpvAccess(numNeighbors
)) {CmiFree(msg
); return;} /* ? */
162 CpvAccess(neighbors
)[i
].load
+= sendLoad
;
163 CldMultipleSend(receiver
, sendLoad
, rank
, 0);
165 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
166 /* this is dangerous since projections logging is not thread safe */
168 CldProcInfo cldData
= CpvAccessOther(CldData
, rank
);
169 traceUserBracketEvent(cldData
->idleprocEvt
, now
, CmiWallTimer());
177 /* balancing by exchanging load among neighbors */
179 void CldSendLoad(void)
182 /* directly send load to neighbors */
183 double myload
= CldCountTokens();
184 int nNeighbors
= CpvAccess(numNeighbors
);
186 for (i
=0; i
<nNeighbors
; i
++) {
187 int neighbor_pe
= CpvAccess(neighbors
)[i
].pe
;
189 for (j
=0; j
<CpvAccessOther(numNeighbors
, neighbor_pe
); j
++)
190 if (CpvAccessOther(neighbors
, neighbor_pe
)[j
].pe
== CmiMyPe())
192 CpvAccessOther(neighbors
, neighbor_pe
)[j
].load
= myload
;
202 msg
.load
= CldCountTokens();
203 CmiSetHandler(&msg
, CpvAccess(CldLoadResponseHandlerIndex
));
204 CmiSyncMulticast(CpvAccess(neighborGroup
), sizeof(loadmsg
), &msg
);
205 CpvAccess(CldLoadBalanceMessages
) += CpvAccess(numNeighbors
);
208 int mype
= CmiMyPe();
209 int myload
= CldCountTokens();
210 for(i
=0; i
<CpvAccess(numNeighbors
); i
++) {
211 loadmsg
*msg
= getPool();
213 msg
->toindex
= CpvAccess(neighbors
)[i
].index
;
216 CmiSyncSendAndFree(CpvAccess(neighbors
)[i
].pe
, sizeof(loadmsg
), msg
);
227 int nNeighbors
= CpvAccess(numNeighbors
);
228 if (CpvAccess(start
) == -1)
229 CpvAccess(start
) = CmiMyPe() % nNeighbors
;
232 /* update load from neighbors for multicore */
233 for (i
=0; i
<nNeighbors
; i
++) {
234 CpvAccess(neighbors
)[i
].load
= CldLoadRank(CpvAccess(neighbors
)[i
].pe
);
237 CpvAccess(MinProc
) = CpvAccess(neighbors
)[CpvAccess(start
)].pe
;
238 CpvAccess(MinLoad
) = CpvAccess(neighbors
)[CpvAccess(start
)].load
;
239 sum
= CpvAccess(neighbors
)[CpvAccess(start
)].load
;
240 CpvAccess(Mindex
) = CpvAccess(start
);
241 for (i
=1; i
<nNeighbors
; i
++) {
242 CpvAccess(start
) = (CpvAccess(start
)+1) % nNeighbors
;
243 sum
+= CpvAccess(neighbors
)[CpvAccess(start
)].load
;
244 if (CpvAccess(MinLoad
) > CpvAccess(neighbors
)[CpvAccess(start
)].load
) {
245 CpvAccess(MinLoad
) = CpvAccess(neighbors
)[CpvAccess(start
)].load
;
246 CpvAccess(MinProc
) = CpvAccess(neighbors
)[CpvAccess(start
)].pe
;
247 CpvAccess(Mindex
) = CpvAccess(start
);
250 CpvAccess(start
) = (CpvAccess(start
)+2) % nNeighbors
;
251 myload
= CldCountTokens();
253 if (myload
< CpvAccess(MinLoad
)) {
254 CpvAccess(MinLoad
) = myload
;
255 CpvAccess(MinProc
) = CmiMyPe();
257 i
= (int)(1.0 + (((float)sum
) /((float)(nNeighbors
+1))));
261 void CldBalance(void *dummy
, double curT
)
263 int i
, j
, overload
, numToMove
=0, avgLoad
;
264 int totalUnderAvg
=0, numUnderAvg
=0, maxUnderAvg
=0;
266 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
267 double startT
= curT
;
270 /*CmiPrintf("[%d] CldBalance %f\n", CmiMyPe(), startT);*/
271 avgLoad
= CldMinAvg();
273 overload = CldLoad() - avgLoad;
274 if (overload > CldCountTokens())
275 overload = CldCountTokens();
277 overload
= CldCountTokens() - avgLoad
;
279 if (overload
> overload_threshold
) {
280 int nNeighbors
= CpvAccess(numNeighbors
);
281 for (i
=0; i
<nNeighbors
; i
++)
282 if (CpvAccess(neighbors
)[i
].load
< avgLoad
) {
283 totalUnderAvg
+= avgLoad
-CpvAccess(neighbors
)[i
].load
;
284 if (avgLoad
- CpvAccess(neighbors
)[i
].load
> maxUnderAvg
)
285 maxUnderAvg
= avgLoad
- CpvAccess(neighbors
)[i
].load
;
288 if (numUnderAvg
> 0) {
289 int myrank
= CmiMyRank();
290 for (i
=0; ((i
<nNeighbors
) && (overload
>0)); i
++) {
291 j
= (i
+CpvAccess(Mindex
))%CpvAccess(numNeighbors
);
292 if (CpvAccess(neighbors
)[j
].load
< avgLoad
) {
293 numToMove
= (avgLoad
- CpvAccess(neighbors
)[j
].load
);
294 if (numToMove
> overload
)
295 numToMove
= overload
;
296 overload
-= numToMove
;
297 CpvAccess(neighbors
)[j
].load
+= numToMove
;
298 #if CMK_MULTICORE || CMK_USE_IBVERBS
299 CldSimpleMultipleSend(CpvAccess(neighbors
)[j
].pe
, numToMove
, myrank
);
301 CldMultipleSend(CpvAccess(neighbors
)[j
].pe
,
312 } /* end of numUnderAvg > 0 */
315 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
316 traceUserBracketEvent(CpvAccess(CldData
)->balanceEvt
, startT
, CmiWallTimer());
320 void CldBalancePeriod(void *dummy
, double curT
)
322 CldBalance(NULL
, curT
);
323 CcdCallFnAfterOnPE((CcdVoidFn
)CldBalancePeriod
, NULL
, LBPeriod
, CmiMyPe());
327 void CldLoadResponseHandler(loadmsg
*msg
)
331 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
332 double startT
= CmiWallTimer();
335 for(i
=0; i
<CpvAccess(numNeighbors
); i
++)
336 if (CpvAccess(neighbors
)[i
].pe
== msg
->pe
) {
337 CpvAccess(neighbors
)[i
].load
= msg
->load
;
342 int index
= msg
->toindex
;
344 for(i
=0; i
<CpvAccess(numNeighbors
); i
++)
345 if (CpvAccess(neighbors
)[i
].pe
== msg
->pe
) {
350 if (index
!= -1) { /* index can be -1, if neighbors table not init yet */
351 CpvAccess(neighbors
)[index
].load
= msg
->load
;
352 if (CpvAccess(neighbors
)[index
].index
== -1) CpvAccess(neighbors
)[index
].index
= msg
->fromindex
;
356 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
357 traceUserBracketEvent(CpvAccess(CldData
)->updateLoadEvt
, startT
, CmiWallTimer());
361 void CldBalanceHandler(void *msg
)
363 CldRestoreHandler((char *)msg
);
364 CldPutToken((char *)msg
);
367 void CldHandler(void *msg
)
369 CldInfoFn ifn
; CldPackFn pfn
;
370 int len
, queueing
, priobits
; unsigned int *prioptr
;
372 CldRestoreHandler((char *)msg
);
373 ifn
= (CldInfoFn
)CmiHandlerToFunction(CmiGetInfo(msg
));
374 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
375 CsdEnqueueGeneral(msg
, CQS_QUEUEING_LIFO
, priobits
, prioptr
);
376 /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
379 void CldEnqueueGroup(CmiGroup grp
, void *msg
, int infofn
)
381 int len
, queueing
, priobits
,i
; unsigned int *prioptr
;
382 CldInfoFn ifn
= (CldInfoFn
)CmiHandlerToFunction(infofn
);
384 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
387 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
389 CldSwitchHandler((char *)msg
, CpvAccess(CldHandlerIndex
));
390 CmiSetInfo(msg
,infofn
);
392 CmiSyncMulticastAndFree(grp
, len
, msg
);
395 void CldEnqueueMulti(int npes
, int *pes
, void *msg
, int infofn
)
397 int len
, queueing
, priobits
,i
; unsigned int *prioptr
;
398 CldInfoFn ifn
= (CldInfoFn
)CmiHandlerToFunction(infofn
);
400 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
403 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
405 CldSwitchHandler((char *)msg
, CpvAccess(CldHandlerIndex
));
406 CmiSetInfo(msg
,infofn
);
408 for(i=0;i<npes;i++) {
409 CmiSyncSend(pes[i], len, msg);
413 CmiSyncListSendAndFree(npes
, pes
, len
, msg
);
416 void CldEnqueue(int pe
, void *msg
, int infofn
)
418 int len
, queueing
, priobits
, avg
; unsigned int *prioptr
;
419 CldInfoFn ifn
= (CldInfoFn
)CmiHandlerToFunction(infofn
);
422 if ((pe
== CLD_ANYWHERE
) && (CmiNumPes() > 1)) {
424 if (CldCountTokens() < avg
)
427 pe
= CpvAccess(MinProc
);
428 #if CMK_NODE_QUEUE_AVAILABLE
429 if (CmiNodeOf(pe
) == CmiMyNode()) {
430 CldNodeEnqueue(CmiMyNode(), msg
, infofn
);
434 /* always pack the message because the message may be move away
435 to a different processor later by CldGetToken() */
436 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
437 if (pfn
&& CmiNumNodes()>1) {
439 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
441 if (pe
!= CmiMyPe()) {
442 CpvAccess(neighbors
)[CpvAccess(Mindex
)].load
++;
443 CpvAccess(CldRelocatedMessages
)++;
444 CmiSetInfo(msg
,infofn
);
445 CldSwitchHandler((char *)msg
, CpvAccess(CldBalanceHandlerIndex
));
446 CmiSyncSendAndFree(pe
, len
, msg
);
449 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
450 CmiSetInfo(msg
,infofn
);
451 CldPutToken((char *)msg
);
454 else if ((pe
== CmiMyPe()) || (CmiNumPes() == 1)) {
455 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
456 /*CmiSetInfo(msg,infofn);*/
457 CsdEnqueueGeneral(msg
, CQS_QUEUEING_LIFO
, priobits
, prioptr
);
458 /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
461 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
462 if (pfn
&& CmiNodeOf(pe
) != CmiMyNode()) {
464 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
466 CldSwitchHandler((char *)msg
, CpvAccess(CldHandlerIndex
));
467 CmiSetInfo(msg
,infofn
);
468 if (pe
==CLD_BROADCAST
)
469 CmiSyncBroadcastAndFree(len
, msg
);
470 else if (pe
==CLD_BROADCAST_ALL
)
471 CmiSyncBroadcastAllAndFree(len
, msg
);
472 else CmiSyncSendAndFree(pe
, len
, msg
);
476 void CldNodeEnqueue(int node
, void *msg
, int infofn
)
478 int len
, queueing
, priobits
, pe
, avg
; unsigned int *prioptr
;
479 CldInfoFn ifn
= (CldInfoFn
)CmiHandlerToFunction(infofn
);
481 if ((node
== CLD_ANYWHERE
) && (CmiNumPes() > 1)) {
483 if (CldCountTokens() < avg
)
486 pe
= CpvAccess(MinProc
);
487 node
= CmiNodeOf(pe
);
488 if (node
!= CmiMyNode()){
489 CpvAccess(neighbors
)[CpvAccess(Mindex
)].load
++;
490 CpvAccess(CldRelocatedMessages
)++;
491 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
494 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
496 CmiSetInfo(msg
,infofn
);
497 CldSwitchHandler((char *)msg
, CpvAccess(CldBalanceHandlerIndex
));
498 CmiSyncNodeSendAndFree(node
, len
, msg
);
501 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
502 /* CmiSetInfo(msg,infofn);
503 CldPutToken((char *)msg); */
504 CsdNodeEnqueueGeneral(msg
, queueing
, priobits
, prioptr
);
507 else if ((node
== CmiMyNode()) || (CmiNumPes() == 1)) {
508 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
509 // CmiSetInfo(msg,infofn);
510 CsdNodeEnqueueGeneral(msg
, queueing
, priobits
, prioptr
);
513 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
516 ifn(msg
, &pfn
, &len
, &queueing
, &priobits
, &prioptr
);
518 CldSwitchHandler((char *)msg
, CpvAccess(CldHandlerIndex
));
519 CmiSetInfo(msg
,infofn
);
520 if (node
==CLD_BROADCAST
) { CmiSyncNodeBroadcastAndFree(len
, msg
); }
521 else if (node
==CLD_BROADCAST_ALL
){CmiSyncNodeBroadcastAllAndFree(len
,msg
);}
522 else CmiSyncNodeSendAndFree(node
, len
, msg
);
526 void CldReadNeighborData(void)
532 if (CmiNumPes() <= 1)
534 sprintf(filename
, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
535 if ((fp
= fopen(filename
, "r")) == 0)
537 CmiError("Error opening graph init file on PE: %d\n", CmiMyPe());
540 if (fscanf(fp
, "%d", &CpvAccess(numNeighbors
)) != 1) {
541 CmiAbort("CLD> reading neighbor data failed!");
543 CpvAccess(neighbors
) =
544 (struct CldNeighborData_s
*)calloc(CpvAccess(numNeighbors
),
545 sizeof(struct CldNeighborData_s
));
546 pes
= (int *)calloc(CpvAccess(numNeighbors
), sizeof(int));
547 for (i
=0; i
<CpvAccess(numNeighbors
); i
++) {
548 if (fscanf(fp
, "%d", &(CpvAccess(neighbors
)[i
].pe
)) != 1) {
549 CmiAbort("CLD> reading neighbor data failed!");
551 pes
[i
] = CpvAccess(neighbors
)[i
].pe
;
552 CpvAccess(neighbors
)[i
].load
= 0;
555 CpvAccess(neighborGroup
) = CmiEstablishGroup(CpvAccess(numNeighbors
), pes
);
558 static void CldComputeNeighborData(void)
565 topofn
= LBTopoLookup(_lbtopo
);
566 if (topofn
== NULL
) {
568 CmiPrintf("SeedLB> Fatal error: Unknown topology: %s. Choose from:\n", _lbtopo
);
570 sprintf(str
, "SeedLB> Fatal error: Unknown topology: %s", _lbtopo
);
573 topo
= topofn(CmiNumPes());
574 npes
= getTopoMaxNeighbors(topo
);
575 pes
= (int *)malloc(npes
*sizeof(int));
576 getTopoNeighbors(topo
, CmiMyPe(), pes
, &npes
);
580 sprintf(buf
, "Neighors for PE %d (%d): ", CmiMyPe(), npes
);
581 ptr
= buf
+ strlen(buf
);
582 for (i
=0; i
<npes
; i
++) {
583 CmiAssert(pes
[i
] < CmiNumPes() && pes
[i
] != CmiMyPe());
584 sprintf(ptr
, " %d ", pes
[i
]);
592 CpvAccess(numNeighbors
) = npes
;
593 CpvAccess(neighbors
) =
594 (struct CldNeighborData_s
*)calloc(npes
, sizeof(struct CldNeighborData_s
));
595 for (i
=0; i
<npes
; i
++) {
596 CpvAccess(neighbors
)[i
].pe
= pes
[i
];
597 CpvAccess(neighbors
)[i
].load
= 0;
599 CpvAccess(neighbors
)[i
].index
= -1;
602 CpvAccess(neighborGroup
) = CmiEstablishGroup(npes
, pes
);
606 static void topo_callback(void)
608 CldComputeNeighborData();
612 CldBalancePeriod(NULL
, CmiWallTimer());
615 void CldGraphModuleInit(char **argv
)
617 CpvInitialize(CldProcInfo
, CldData
);
618 CpvInitialize(int, numNeighbors
);
619 CpvInitialize(int, MinLoad
);
620 CpvInitialize(int, Mindex
);
621 CpvInitialize(int, MinProc
);
622 CpvInitialize(int, start
);
623 CpvInitialize(CmiGroup
, neighborGroup
);
624 CpvInitialize(CldNeighborData
, neighbors
);
625 CpvInitialize(int, CldBalanceHandlerIndex
);
626 CpvInitialize(int, CldLoadResponseHandlerIndex
);
627 CpvInitialize(int, CldAskLoadHandlerIndex
);
629 CpvAccess(start
) = -1;
630 CpvAccess(CldData
) = (CldProcInfo
)CmiAlloc(sizeof(struct CldProcInfo_s
));
631 CpvAccess(CldData
)->lastCheck
= -1;
632 CpvAccess(CldData
)->sent
= 0;
633 #if CMK_TRACE_ENABLED
634 CpvAccess(CldData
)->balanceEvt
= traceRegisterUserEvent("CldBalance", -1);
635 CpvAccess(CldData
)->updateLoadEvt
= traceRegisterUserEvent("UpdateLoad", -1);
636 CpvAccess(CldData
)->idleEvt
= traceRegisterUserEvent("CldBalanceIdle", -1);
637 CpvAccess(CldData
)->idleprocEvt
= traceRegisterUserEvent("CldBalanceProcIdle", -1);
640 CpvAccess(MinLoad
) = 0;
641 CpvAccess(Mindex
) = 0;
642 CpvAccess(MinProc
) = CmiMyPe();
643 CpvAccess(CldBalanceHandlerIndex
) =
644 CmiRegisterHandler(CldBalanceHandler
);
645 CpvAccess(CldLoadResponseHandlerIndex
) =
646 CmiRegisterHandler((CmiHandler
)CldLoadResponseHandler
);
647 CpvAccess(CldAskLoadHandlerIndex
) =
648 CmiRegisterHandler((CmiHandler
)CldAskLoadHandler
);
650 /* communication thread */
651 if (CmiMyRank() == CmiMyNodeSize()) return;
653 CmiGetArgStringDesc(argv
, "+LBTopo", &_lbtopo
, "define load balancing topology");
654 if (CmiMyPe() == 0) CmiPrintf("Seed LB> Topology %s\n", _lbtopo
);
656 if (CmiNumPes() > 1) {
661 sprintf(filename
, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
662 if ((fp
= fopen(filename
, "r")) == 0)
664 if (CmiMyPe() == 0) {
665 CmiPrintf("No proper graph%d directory exists in current directory.\n Generating... ", CmiNumPes());
666 gengraph(CmiNumPes(), (int)(sqrt(CmiNumPes())+0.5), 234);
667 CmiPrintf("done.\n");
670 while (!(fp
= fopen(filename
, "r"))) ;
675 CldReadNeighborData();
678 CldComputeNeighborData();
682 CldBalancePeriod(NULL, CmiWallTimer());
684 CcdCallOnCondition(CcdTOPOLOGY_AVAIL
, (CcdVoidFn
)topo_callback
, NULL
);
688 if (CmiGetArgIntDesc(argv
, "+cldb_neighbor_period", &LBPeriod
, "time interval to do neighbor seed lb")) {
689 CmiAssert(LBPeriod
>0);
690 if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor load balancing period is %d\n", LBPeriod
);
692 if (CmiGetArgIntDesc(argv
, "+cldb_neighbor_overload", &overload_threshold
, "neighbor seed lb's overload threshold")) {
693 CmiAssert(overload_threshold
>0);
694 if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor overload threshold is %d\n", overload_threshold
);
698 _lbsteal
= CmiGetArgFlagDesc(argv
, "+workstealing", "Charm++> Enable work stealing at idle time");
700 /* register idle handlers - when idle, keep asking work from neighbors */
701 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE
,
702 (CcdVoidFn
) CldBeginIdle
, NULL
);
703 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE
,
704 (CcdVoidFn
) CldStillIdle
, NULL
);
706 CmiPrintf("Charm++> Work stealing is enabled. \n");
712 void CldModuleInit(char **argv
)
714 CpvInitialize(int, CldHandlerIndex
);
715 CpvInitialize(int, CldRelocatedMessages
);
716 CpvInitialize(int, CldLoadBalanceMessages
);
717 CpvInitialize(int, CldMessageChunks
);
718 CpvAccess(CldHandlerIndex
) = CmiRegisterHandler(CldHandler
);
719 CpvAccess(CldRelocatedMessages
) = CpvAccess(CldLoadBalanceMessages
) =
720 CpvAccess(CldMessageChunks
) = 0;
722 CpvInitialize(loadmsg
*, msgpool
);
723 CpvAccess(msgpool
) = NULL
;
725 CldModuleGeneralInit(argv
);
726 CldGraphModuleInit(argv
);
728 CpvAccess(CldLoadNotify
) = 1;