8 #include "LBDBManager.h"
10 #include "GreedyCommLB.h"
11 #include "RefineCommLB.h"
14 #define DEBUGF(x) // CmiPrintf x;
16 CreateLBFunc_Def(HbmLB, "HybridBase load balancer")
18 void HbmLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
20 HbmLB *me = (HbmLB*)(data);
22 me->Migrated(h, waitBarrier);
25 void HbmLB::staticAtSync(void* data)
27 HbmLB *me = (HbmLB*)(data);
32 HbmLB::HbmLB(const CkLBOptions &opt): CBase_HbmLB(opt)
35 lbname = (char *)"HbmLB";
36 thisProxy = CProxy_HbmLB(thisgroup);
38 AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),
40 notifier = theLbdb->getLBDB()->
41 NotifyMigrated((LDMigratedFn)(staticMigrated), (void*)(this));
44 tree = new HypercubeTree;
57 if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
64 theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
67 RemoveNotifyMigrated(notifier);
69 // RemoveStartLBFn((LDStartLBFn)(staticStartLB));
75 // get tree information
76 void HbmLB::FindNeighbors()
78 if (foundNeighbors == 0) { // Neighbors never initialized, so init them
79 // and other things that depend on the number
82 int nlevels = tree->numLevels();
84 for (int level=0; level<nlevels; level++)
86 LevelData *data = new LevelData;
87 data->parent = tree->parent(mype, level);
88 if (tree->isroot(mype, level)) {
89 data->nChildren = tree->numChildren(mype, level);
90 data->children = new int[data->nChildren];
91 tree->getChildren(mype, level, data->children, data->nChildren);
92 data->statsData = new LDStats(data->nChildren+1);
94 ProcStats &procStat = data->statsData->procs[data->nChildren];
95 procStat.available = false;
97 levelData.push_back(data);
98 DEBUGF(("[%d] level: %d nchildren:%d - %d %d\n", CkMyPe(), level, data->nChildren, data->nChildren>0?data->children[0]:-1, data->nChildren>1?data->children[1]:-1));
108 // CkPrintf("[%d] HbmLB At Sync step %d!!!!\n",CkMyPe(),mystep);
112 // if num of processor is only 1, nothing should happen
113 if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
118 thisProxy[CkMyPe()].ProcessAtSync();
122 void HbmLB::ProcessAtSync()
129 start_lb_time = CkWallTimer();
130 if (_lb_args.debug())
131 CkPrintf("[%s] Load balancing step %d starting at %f\n",
132 lbName(), step(), CkWallTimer());
136 LBRealType total_walltime, total_cputime, idletime, bg_walltime, bg_cputime;
137 theLbdb->TotalTime(&total_walltime,&total_cputime);
138 theLbdb->IdleTime(&idletime);
139 theLbdb->BackgroundLoad(&bg_walltime,&bg_cputime);
141 myStats.n_objs = theLbdb->GetObjDataSz();
142 myStats.objData.resize(myStats.n_objs);
143 myStats.from_proc.resize(myStats.n_objs);
144 myStats.to_proc.resize(myStats.n_objs);
145 theLbdb->GetObjData(myStats.objData.getVec());
146 for (i=0; i<myStats.n_objs; i++)
147 myStats.from_proc[i] = myStats.to_proc[i] = 0; // only one PE
149 myStats.n_comm = theLbdb->GetCommDataSz();
150 myStats.commData.resize(myStats.n_comm);
151 theLbdb->GetCommData(myStats.commData.getVec());
153 myStats.complete_flag = 0;
156 DEBUGF(("[%d] Send stats to parent %d\n", CkMyPe(), levelData[0]->parent));
158 for (i=0; i<myStats.n_objs; i++) tload += myStats.objData[i].wallTime;
159 thisProxy[levelData[0]->parent].ReceiveStats(tload, CkMyPe(), 0);
163 void HbmLB::ReceiveStats(double t, int frompe, int fromlevel)
168 int atlevel = fromlevel + 1;
169 CmiAssert(tree->isroot(CkMyPe(), atlevel));
171 DEBUGF(("[%d] ReceiveStats from PE %d from level: %d\n", CkMyPe(), frompe, fromlevel));
172 int neighborIdx = NeighborIndex(frompe, atlevel);
173 CmiAssert(neighborIdx==0 || neighborIdx==1);
174 LevelData *lData = levelData[atlevel];
175 lData->statsList[neighborIdx] = t;
177 int &stats_msg_count = levelData[atlevel]->stats_msg_count;
180 DEBUGF(("[%d] ReceiveStats at level: %d %d/%d\n", CkMyPe(), atlevel, stats_msg_count, levelData[atlevel]->nChildren));
181 if (stats_msg_count == levelData[atlevel]->nChildren)
184 int parent = levelData[atlevel]->parent;
187 thisProxy[CkMyPe()].Loadbalancing(atlevel);
194 inline double myabs(double x) { return x>0.0?x:-x; }
195 inline double mymax(double x, double y) { return x>y?x:y; }
197 // LDStats data sent to parent contains real PE
198 // LDStats in parent should contain relative PE
199 void HbmLB::Loadbalancing(int atlevel)
202 CmiAssert(atlevel >= 1);
204 LevelData *lData = levelData[atlevel];
205 LDStats *statsData = lData->statsData;
206 CmiAssert(statsData);
208 // at this time, all objects processor location is relative, and
209 // all incoming objects from outside group belongs to the fake root proc.
211 // clear background load if needed
212 if (_lb_args.ignoreBgLoad()) statsData->clearBgLoad();
214 currentLevel = atlevel;
216 double start_lb_time(CkWallTimer());
218 double lload = lData->statsList[0];
219 double rload = lData->statsList[1];
221 double diff = myabs(lload-rload);
222 double maxl = mymax(lload, rload);
223 double avg = (lload+rload)/2.0;
224 CkPrintf("[%d] lload: %f rload: %f atlevel: %d\n", CkMyPe(), lload, rload, atlevel);
225 if (diff/avg > 0.02) {
226 // we need to perform load balancing
227 int numpes = (int)pow(2.0, atlevel);
228 double delta = myabs(lload-rload) / numpes;
230 int overloaded = lData->children[0];
232 overloaded = lData->children[1];
234 DEBUGF(("[%d] branch %d is overloaded by %f... \n", CkMyPe(), overloaded, delta));
235 thisProxy[overloaded].ReceiveMigrationDelta(delta, atlevel, atlevel);
238 LoadbalancingDone(atlevel);
242 // when receiving all response from underloaded pes
243 void HbmLB::LoadbalancingDone(int atlevel)
245 LevelData *lData = levelData[atlevel];
246 DEBUGF(("[%d] LoadbalancingDone at level: %d\n", CkMyPe(), atlevel));
247 if (lData->parent != -1) {
249 double lload = lData->statsList[0];
250 double rload = lData->statsList[1];
251 double totalLoad = lload + rload;
252 thisProxy[lData->parent].ReceiveStats(totalLoad, CkMyPe(), atlevel);
255 // done now, broadcast via tree to resume all
256 // thisProxy.ReceiveResumeClients(1, tree->numLevels()-1, lData->nChildren, lData->children);
257 thisProxy.ReceiveResumeClients(1, tree->numLevels()-1);
261 void HbmLB::ReceiveResumeClients(int balancing, int fromlevel){
263 int atlevel = fromlevel-1;
264 LevelData *lData = levelData[atlevel];
266 thisProxy.ReceiveResumeClients(balancing, atlevel, lData->nChildren, lData->children);
268 ResumeClients(balancing);
270 ResumeClients(balancing); // it is always syncResume
272 if (balancing && _lb_args.syncResume()) {
274 CkCallback cb(CkIndex_HbmLB::ResumeClients((CkReductionMsg*)NULL),
276 contribute(sizeof(double), &maxLoad, CkReduction::max_double, cb);
279 thisProxy[CkMyPe()].ResumeClients(balancing);
285 // pick objects to migrate "t" amount of work
286 void HbmLB::ReceiveMigrationDelta(double t, int lblevel, int fromlevel)
290 int atlevel = fromlevel-1;
291 LevelData *lData = levelData[atlevel];
293 thisProxy.ReceiveMigrationDelta(t, lblevel, atlevel, lData->nChildren, lData->children);
297 // I am leave, find objects to migrate
300 CkVec<LDObjData> &objData = myStats.objData;
301 for (i=0; i<myStats.n_objs; i++) {
302 LDObjData &oData = objData[i];
303 if (oData.wallTime < t) {
310 int nmigs = migs.size();
312 int matchPE = CkMyPe() ^ (1<<(lblevel-1));
314 DEBUGF(("[%d] migrating %d objs to %d at lblevel %d! \n", CkMyPe(),nmigs,matchPE,lblevel));
315 thisProxy[matchPE].ReceiveMigrationCount(nmigs, lblevel);
318 for (i=0; i<nmigs; i++) {
320 LDObjData &oData = objData[idx];
321 CkVec<LDCommData> comms;
322 collectCommData(idx, comms);
323 thisProxy[matchPE].ObjMigrated(oData, comms.getVec(), comms.size());
324 theLbdb->Migrate(oData.handle, matchPE);
325 // TODO modify LDStats
326 DEBUGF(("myStats.removeObject: %d, %d, %d\n", migs[i], i, objData.size()));
327 myStats.removeObject(idx);
333 void HbmLB::collectCommData(int objIdx, CkVec<LDCommData> &comms)
336 LevelData *lData = levelData[0];
338 LDObjData &objData = myStats.objData[objIdx];
340 for (int com=0; com<myStats.n_comm; com++) {
341 LDCommData &cdata = myStats.commData[com];
342 if (cdata.from_proc()) continue;
343 if (cdata.sender.objID() == objData.objID() && cdata.sender.omID() == objData.omID())
344 comms.push_back(cdata);
350 // an object arrives with only objdata
351 void HbmLB::ObjMigrated(LDObjData data, LDCommData *cdata, int n)
353 LevelData *lData = levelData[0];
355 CkVec<LDObjData> &oData = myStats.objData;
357 // need to update LDObjHandle later
358 lData->obj_completed++;
359 data.handle.handle = -100;
360 oData.push_back(data);
362 if (data.migratable) myStats.n_migrateobjs++;
363 myStats.from_proc.push_back(-1); // not important
364 myStats.to_proc.push_back(0);
366 // copy into comm data
368 CkVec<LDCommData> &cData = myStats.commData;
369 for (int i=0; i<n; i++)
370 cData.push_back(cdata[i]);
372 myStats.deleteCommHash();
375 if (lData->migrationDone()) {
376 // migration done finally
381 void HbmLB::ReceiveMigrationCount(int count, int lblevel)
385 LevelData *lData = levelData[0];
386 lData->migrates_expected = count;
387 if (lData->migrationDone()) {
388 // migration done finally
393 void HbmLB::Migrated(LDObjHandle h, int waitBarrier)
395 LevelData *lData = levelData[0];
397 lData->migrates_completed++;
398 newObjs.push_back(h);
399 DEBUGF(("[%d] An object migrated! %d %d\n", CkMyPe(),lData->migrates_completed,lData->migrates_expected));
400 if (lData->migrationDone()) {
401 // migration done finally
406 void HbmLB::NotifyObjectMigrationDone(int fromlevel, int lblevel)
409 int atlevel = fromlevel + 1;
410 LevelData *lData = levelData[atlevel];
412 lData->mig_reported ++;
413 DEBUGF(("[%d] HbmLB::NotifyObjectMigrationDone at level: %d lblevel: %d reported: %d!\n", CkMyPe(), atlevel, lblevel, lData->mig_reported));
414 if (atlevel < lblevel) {
415 if (lData->mig_reported == lData->nChildren) {
416 lData->mig_reported = 0;
417 thisProxy[lData->parent].NotifyObjectMigrationDone(atlevel, lbLevel);
421 if (lData->mig_reported == lData->nChildren/2) { // half tree
422 lData->mig_reported = 0;
423 // load balancing done at this level
424 LoadbalancingDone(atlevel);
429 // migration done at current lbLevel
430 void HbmLB::MigrationDone(int balancing)
434 LevelData *lData = levelData[0];
436 DEBUGF(("[%d] HbmLB::MigrationDone lbLevel:%d numLevels:%d!\n", CkMyPe(), lbLevel, tree->numLevels()));
438 CmiAssert(newObjs.size() == lData->migrates_expected);
441 if (lbLevel == tree->numLevels()-1) {
447 lData->migrates_expected = -1;
448 lData->migrates_completed = 0;
449 lData->obj_completed = 0;
452 lData->migrates_expected = -1;
453 lData->migrates_completed = 0;
454 lData->obj_completed = 0;
457 CkVec<LDObjData> &oData = myStats.objData;
461 for (i=0; i<oData.size(); i++)
462 if (oData[i].handle.handle == -100) count++;
463 CmiAssert(count == newObjs.size());
465 for (i=0; i<oData.size(); i++) {
466 if (oData[i].handle.handle == -100) {
467 LDObjHandle &handle = oData[i].handle;
468 for (j=0; j<newObjs.size(); j++) {
469 if (handle.omID() == newObjs[j].omID() &&
470 handle.objID() == newObjs[j].objID()) {
475 CmiAssert(j<newObjs.size());
480 thisProxy[lData->parent].NotifyObjectMigrationDone(0, lbLevel);
484 void HbmLB::ResumeClients(double result)
486 if (CkMyPe() == 0 && _lb_args.printSummary()) {
487 double mload = result;
488 CkPrintf("[%d] MAX Load: %f at step %d.\n", CkMyPe(), mload, step()-1);
493 void HbmLB::ResumeClients(int balancing)
496 DEBUGF(("[%d] ResumeClients. \n", CkMyPe()));
500 LevelData *lData = levelData[0];
503 if (CkMyPe() == 0 && balancing) {
504 double end_lb_time = CkWallTimer();
505 if (_lb_args.debug())
506 CkPrintf("[%s] Load balancing step %d finished at %f duration %f\n",
507 lbName(), step()-1,end_lb_time,end_lb_time - start_lb_time);
509 if (balancing && _lb_args.printSummary()) {
512 LDStats *stats = &myStats;
513 info.getInfo(stats, count, 0); // no comm cost
514 LBRealType mLoad, mCpuLoad, totalLoad;
515 info.getSummary(mLoad, mCpuLoad, totalLoad);
517 stats->computeNonlocalComm(nmsgs, nbytes);
518 CkPrintf("[%d] Load with %d objs: max (with comm): %f max (obj only): %f total: %f on %d processors at step %d useMem: %fKB nonlocal: %d %.2fKB.\n", CkMyPe(), stats->n_objs, mLoad, mCpuLoad, totalLoad, count, step()-1, (1.0*useMem())/1024, nmsgs, nbytes/1024.0);
519 thisProxy[0].reportLBQulity(mLoad, mCpuLoad, totalLoad, nmsgs, 1.0*nbytes/1024.0);
523 theLbdb->ClearLoads();
525 theLbdb->ResumeClients();
529 // only called on PE 0
530 void HbmLB::reportLBQulity(double mload, double mCpuLoad, double totalload, int nmsgs, double bytes)
532 static int pecount=0;
533 CmiAssert(CkMyPe() == 0);
534 if (mload > maxLoad) maxLoad = mload;
535 if (mCpuLoad > maxCpuLoad) maxCpuLoad = mCpuLoad;
536 totalLoad += totalload;
537 maxCommCount += nmsgs;
538 maxCommBytes += bytes; // KB
540 if (pecount == CkNumPes()) {
541 CkPrintf("[%d] Load Summary: max (with comm): %f max (obj only): %f total: %f at step %d nonlocal: %d msgs, %.2fKB reported from %d PEs.\n", CkMyPe(), maxLoad, maxCpuLoad, totalLoad, step(), maxCommCount, maxCommBytes, pecount);
551 void HbmLB::work(LDStats* stats)
554 CkPrintf("[%d] HbmLB::work called!\n", CkMyPe());
558 int HbmLB::NeighborIndex(int pe, int atlevel)
561 for(int i=0; i < levelData[atlevel]->nChildren; i++) {
562 if (pe == levelData[atlevel]->children[i]) {
574 for (i=0; i<levelData.size(); i++)
575 if (levelData[i]) memused+=levelData[i]->useMem();
579 #include "HbmLB.def.h"