10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
14 #define DEBUGF(x) // CmiPrintf x;
15 #define DEBUG(x) // x;
17 #if CMK_MEM_CHECKPOINT
18 /* can not handle reduction in inmem FT */
19 #define USE_REDUCTION 0
20 #define USE_LDB_SPANNING_TREE 0
22 #define USE_REDUCTION 1
23 #define USE_LDB_SPANNING_TREE 1
26 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
27 extern bool _restartFlag;
28 extern void getGlobalStep(CkGroupID );
29 extern void initMlogLBStep(CkGroupID );
30 extern int globalResumeCount;
31 extern void sendDummyMigrationCounts(int *);
34 #if CMK_GRID_QUEUE_AVAILABLE
35 CpvExtern(void *, CkGridObject);
38 #if CMK_GLOBAL_LOCATION_UPDATE
39 extern void UpdateLocation(MigrateInfo& migData);
43 extern "C" void charmrun_realloc(char *s);
44 extern char willContinue;
45 extern realloc_state pending_realloc_state;
46 extern char * se_avail_vector;
47 extern "C" int mynewpe;
48 extern char *_shrinkexpand_basedir;
49 int numProcessAfterRestart;
52 CkGroupID loadbalancer;
54 bool load_balancer_created;
56 CreateLBFunc_Def(CentralLB, "CentralLB base class")
58 static int broadcastThreshold = 32;
60 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count,
61 LBMigrateMsg *, LBInfo &info, int considerComm);
64 void CreateCentralLB()
66 CProxy_CentralLB::ckNew(0);
70 void CentralLB::staticStartLB(void* data)
72 CentralLB *me = (CentralLB*)(data);
76 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
78 CentralLB *me = (CentralLB*)(data);
79 me->Migrated(h, waitBarrier);
82 void CentralLB::staticAtSync(void* data)
84 CentralLB *me = (CentralLB*)(data);
88 void CentralLB::initLB(const CkLBOptions &opt)
92 thisProxy = CProxy_CentralLB(thisgroup);
93 // CkPrintf("Construct in %d\n",CkMyPe());
94 loadbalancer = thisgroup;
95 // create and turn on by default
97 AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
98 notifier = theLbdb->getLBDB()->
99 NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
100 startLbFnHdl = theLbdb->getLBDB()->
101 AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
103 // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
104 if (opt.getSeqNo() > 0) turnOff();
107 statsMsgsList = NULL;
110 storedMigrateMsg = NULL;
111 reduction_started = false;
113 // for future predictor
114 if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
115 else predicted_model=0;
116 // register user interface callbacks
117 theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
119 myspeed = theLbdb->ProcessorSpeed();
121 migrates_completed = 0;
122 future_migrates_completed = 0;
123 migrates_expected = -1;
124 future_migrates_expected = -1;
125 cur_ld_balancer = _lb_args.central_pe(); // 0 default
131 if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
133 load_balancer_created = true;
136 logicalCoresPerNode=physicalCoresPerNode=4;
137 logicalCoresPerChip=4;
143 CentralLB::~CentralLB()
146 delete [] statsMsgsList;
148 theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
151 RemoveNotifyMigrated(notifier);
153 RemoveStartLBFn((LDStartLBFn)(staticStartLB));
158 void CentralLB::turnOn()
162 TurnOnBarrierReceiver(receiver);
164 TurnOnNotifyMigrated(notifier);
166 TurnOnStartLBFn(startLbFnHdl);
170 void CentralLB::turnOff()
174 TurnOffBarrierReceiver(receiver);
176 TurnOffNotifyMigrated(notifier);
178 TurnOffStartLBFn(startLbFnHdl);
182 void CentralLB::SetPESpeed(int speed)
187 int CentralLB::GetPESpeed()
192 void CentralLB::AtSync()
195 DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
196 #if CMK_MEM_CHECKPOINT
199 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
200 CpvAccess(_currentObj)=this;
203 // if num of processor is only 1, nothing should happen
204 if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
208 if(CmiNodeAlive(CkMyPe())){
209 thisProxy [CkMyPe()].ProcessAtSync();
214 void CentralLB::ProcessAtSync()
217 if (reduction_started) return; // reducton in progress
219 CmiAssert(CmiNodeAlive(CkMyPe()));
220 if (CkMyPe() == cur_ld_balancer) {
221 start_lb_time = CkWallTimer();
224 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
225 initMlogLBStep(thisgroup);
232 // reduction to get total number of objects and comm
233 // so that processor 0 can pre-allocate load balancing database
235 counts[0] = theLbdb->GetObjDataSz();
236 counts[1] = theLbdb->GetCommDataSz();
240 cb = CkCallback(CkReductionTarget(CentralLB, ReceiveCounts), thisProxy); // every PE receives counts
242 cb = CkCallback(CkReductionTarget(CentralLB, ReceiveCounts), thisProxy[0]);
243 contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
244 reduction_started = true;
251 #if defined(TEMP_LDB)
252 static int cpufreq_sysfs_write (
253 const char *setting,int proc
257 sprintf(path,"/sys/devices/system/cpu/cpu%d/cpufreq/scaling_setspeed",proc);
258 FILE *fd = fopen (path, "w");
261 printf("PROC#%d ooooooo666 FILE OPEN ERROR file=%s\n",CkMyPe(),path);
264 // else CkPrintf("PROC#%d opened freq file=%s\n",proc,path);
266 fseek ( fd , 0 , SEEK_SET );
267 int numw=fprintf (fd, setting);
271 printf("FILE WRITING ERROR\n");
274 // else CkPrintf("Freq for Proc#%d set to %s numw=%d\n",proc,setting,numw);
280 static int cpufreq_sysfs_read (int proc)
285 sprintf(path,"/sys/devices/system/cpu/cpu%d/cpufreq/scaling_setspeed",i);
287 fd = fopen (path, "r");
290 printf("33 FILE OPEN ERROR file=%s\n",path);
301 float CentralLB::getTemp(int cpu)
306 sprintf(path,"/sys/devices/platform/coretemp.%d/temp1_input",cpu);
309 printf("777 FILE OPEN ERROR file=%s\n",path);
313 if(f==NULL) {printf("ddddddddddddddddddddddddddd\n");exit(0);}
316 return atof(val)/1000;
321 // called only on 0 (or every PE if concurrent=true)
322 void CentralLB::ReceiveCounts(int *counts, int n)
324 if (!concurrent) CmiAssert(CkMyPe() == 0);
325 if (statsData == NULL) statsData = new LDStats;
327 // check that only 2 counts are sent
329 int n_objs = counts[0];
330 int n_comm = counts[1];
333 statsData->objData.resize(n_objs);
334 statsData->from_proc.resize(n_objs);
335 statsData->to_proc.resize(n_objs);
336 statsData->commData.resize(n_comm);
338 DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
340 if (concurrent) SendStats();
341 else thisProxy.SendStats(); // broadcast call to let everybody start to send stats
344 void CentralLB::BuildStatsMsg()
347 // build and send stats
348 const int osz = theLbdb->GetObjDataSz();
349 const int csz = theLbdb->GetCommDataSz();
351 int npes = CkNumPes();
352 CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
354 msg->from_pe = CkMyPe();
355 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
358 //msg->serial = CrnRand();
361 theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
362 theLbdb->IdleTime(&msg->idletime);
363 theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
366 theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
367 &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
369 theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
370 &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
372 #if defined(TEMP_LDB)
373 float mytemp=getTemp(CkMyPe()%physicalCoresPerNode);
374 int freq=cpufreq_sysfs_read (CkMyPe()%logicalCoresPerNode);
378 msg->pe_speed = myspeed;
381 DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
384 theLbdb->GetObjData(msg->objData);
386 theLbdb->GetCommData(msg->commData);
387 // theLbdb->ClearLoads();
388 DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
390 if(CkMyPe() == cur_ld_balancer) {
391 msg->avail_vector = new char[CkNumPes()];
392 LBDatabaseObj()->get_avail_vector(msg->avail_vector);
393 msg->next_lb = LBDatabaseObj()->new_lbbalancer();
396 CmiAssert(statsMsg == NULL);
402 // called on every processor
403 void CentralLB::SendStats()
406 CmiAssert(statsMsg != NULL);
407 reduction_started = false;
409 #if USE_LDB_SPANNING_TREE
412 if (CkMyPe() == cur_ld_balancer)
413 thisProxy[CkMyPe()].ReceiveStats(statsMsg);
415 thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
420 DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
421 thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
431 // enfore the barrier to wait until centralLB says no
434 theLbdb->getLBDB()->RegisteringObjects(h);
439 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
440 extern int donotCountMigration;
443 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
445 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
446 if(donotCountMigration){
453 migrates_completed++;
454 DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
455 if (migrates_completed == migrates_expected) {
460 future_migrates_completed ++;
461 DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
462 if (future_migrates_completed == future_migrates_expected) {
463 CheckMigrationComplete();
469 void CentralLB::MissMigrate(int waitForBarrier)
472 Migrated(h, waitForBarrier);
475 // build a complete data from bufferred messages
476 // not used when USE_REDUCTION = 1
477 void CentralLB::buildStats()
479 statsData->nprocs() = stats_msg_count;
481 statsData->objData.resize(statsData->n_objs);
482 statsData->from_proc.resize(statsData->n_objs);
483 statsData->to_proc.resize(statsData->n_objs);
484 statsData->commData.resize(statsData->n_comm);
489 // copy all data in individule message to this big structure
490 for (int pe=0; pe<CkNumPes(); pe++) {
492 CLBStatsMsg *msg = statsMsgsList[pe];
493 if(msg == NULL) continue;
494 for (i=0; i<msg->n_objs; i++) {
495 statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
496 statsData->objData[nobj] = msg->objData[i];
497 if (msg->objData[i].migratable) nmigobj++;
500 for (i=0; i<msg->n_comm; i++) {
501 statsData->commData[ncom] = msg->commData[i];
508 statsData->n_migrateobjs = nmigobj;
511 // deposit one processor data at a time, note database is pre-allocated
512 // to have enough space
513 // used when USE_REDUCTION = 1
514 void CentralLB::depositData(CLBStatsMsg *m)
517 if (m == NULL) return;
519 const int pe = m->from_pe;
520 struct ProcStats &procStat = statsData->procs[pe];
521 #if defined(TEMP_LDB)
522 procStat.pe_temp=m->pe_temp;
523 procStat.pe_speed=m->pe_speed;
527 procStat.total_walltime = m->total_walltime;
528 procStat.idletime = m->idletime;
529 procStat.bg_walltime = m->bg_walltime;
531 procStat.total_cputime = m->total_cputime;
532 procStat.bg_cputime = m->bg_cputime;
534 procStat.pe_speed = m->pe_speed;
536 //procStat.utilization = 1.0;
537 procStat.available = true;
538 procStat.n_objs = m->n_objs;
540 int &nobj = statsData->n_objs;
541 int &nmigobj = statsData->n_migrateobjs;
542 for (i=0; i<m->n_objs; i++) {
543 statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
544 statsData->objData[nobj] = m->objData[i];
545 if (m->objData[i].migratable) nmigobj++;
547 CmiAssert(nobj <= statsData->objData.capacity());
549 int &n_comm = statsData->n_comm;
550 for (i=0; i<m->n_comm; i++) {
551 statsData->commData[n_comm] = m->commData[i];
553 CmiAssert(n_comm <= statsData->commData.capacity());
558 void CentralLB::ReceiveStatsFromRoot(CkMarshalledCLBStatsMessage &msg) {
560 if (CkMyPe() == cur_ld_balancer) return;
561 else ReceiveStats(msg);
565 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
568 if (concurrent && (CkMyPe() == cur_ld_balancer)) {
569 thisProxy.ReceiveStatsFromRoot(msg); // broadcast stats to all other PEs
572 if (statsMsgsList == NULL) {
573 statsMsgsList = new CLBStatsMsg*[CkNumPes()];
574 CmiAssert(statsMsgsList != NULL);
575 for(int i=0; i < CkNumPes(); i++)
576 statsMsgsList[i] = 0;
578 if (statsData == NULL) statsData = new LDStats;
580 // loop through all CLBStatsMsg in the incoming msg
581 int count = msg.getCount();
582 for (int num = 0; num < count; num++)
584 CLBStatsMsg *m = msg.getMessage(num);
586 const int pe = m->from_pe;
587 DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
588 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
590 * if(m->step < step()){
591 * //TODO: if a processor is redoing an old load balance step..
592 * //tell it that the step is done and that it should not perform any migrations
593 * thisProxy[pe].ReceiveDummyMigration();
597 if(!CmiNodeAlive(pe)){
598 DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
602 if (m->avail_vector!=NULL) {
603 LBDatabaseObj()->set_avail_vector(m->avail_vector, m->next_lb);
606 if (statsMsgsList[pe] != 0) {
607 CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
610 statsMsgsList[pe] = m;
614 // store per processor data right away
615 struct ProcStats &procStat = statsData->procs[pe];
617 procStat.total_walltime = m->total_walltime;
618 procStat.idletime = m->idletime;
619 procStat.bg_walltime = m->bg_walltime;
621 procStat.total_cputime = m->total_cputime;
622 procStat.bg_cputime = m->bg_cputime;
624 procStat.pe_speed = m->pe_speed;
625 //procStat.utilization = 1.0;
626 procStat.available = true;
627 procStat.n_objs = m->n_objs;
629 statsData->n_objs += m->n_objs;
630 statsData->n_comm += m->n_comm;
631 #if defined(TEMP_LDB)
632 procStat.pe_temp=m->pe_temp;
633 procStat.pe_speed=m->pe_speed;
641 const int clients = CkNumValidPes();
642 DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
644 if (stats_msg_count == clients) {
645 DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
646 statsData->nprocs() = stats_msg_count;
648 thisProxy[CkMyPe()].t_LoadBalance();
650 thisProxy[CkMyPe()].LoadBalance();
655 /** added by Abhinav for receiving msgs via spanning tree */
656 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
659 CmiAssert(CkMyPe() != 0);
660 bufMsg.add(msg); // buffer messages
662 //CkPrintf("here %d\n", CkMyPe());
663 if (count_msgs == st.numChildren+1) {
666 thisProxy[0].ReceiveStats(bufMsg);
667 //CkPrintf("from %d\n", CkMyPe());
670 thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
678 static LDHandle *loadBalancer_pointers;
681 void CentralLB::LoadBalance()
685 const int clients = CkNumPes();
691 for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
694 theLbdb->ResetAdaptive();
695 if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
697 if (_lb_args.debug() && (CkMyPe() == cur_ld_balancer))
698 CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
699 lbname, cur_ld_balancer, step(), start_lb_time,
700 CmiMemoryUsage()/(1024.0*1024.0));
702 // if we are in simulation mode read data
703 if (LBSimulation::doSimulation) simulationRead();
705 char *availVector = LBDatabaseObj()->availVector();
706 for(proc = 0; proc < clients; proc++)
707 statsData->procs[proc].available = (bool)availVector[proc];
710 removeCommDataOfDeletedObjs(statsData);
711 preprocess(statsData);
713 // CkPrintf("Before Calling Strategy\n");
715 if (_lb_args.printSummary()) {
716 LBInfo info(clients);
717 // not take comm data
718 info.getInfo(statsData, clients, 0);
719 LBRealType mLoad, mCpuLoad, totalLoad;
720 info.getSummary(mLoad, mCpuLoad, totalLoad);
722 statsData->computeNonlocalComm(nmsgs, nbytes);
723 CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
724 // if (_lb_args.debug() > 1) {
725 // for (int i=0; i<statsData->n_objs; i++)
726 // CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
731 if (_replaySystem && !concurrent) {
732 loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
733 for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
737 storedMigrateMsg = Strategy(statsData);
739 if (!concurrent) ApplyDecision(); // immediately apply the migration decision
743 void CentralLB::ApplyDecision() {
745 const int clients = CkNumPes();
747 LBMigrateMsg *migrateMsg;
749 migrateMsg = createMigrateMsg(statsData);
750 if (_lb_args.debug()) printStrategyStats(migrateMsg);
752 migrateMsg = storedMigrateMsg;
753 storedMigrateMsg = NULL;
756 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
757 migrateMsg->step = step();
761 CpdHandleLBMessage(&migrateMsg);
762 if (_replaySystem && !concurrent) {
763 for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
764 free(loadBalancer_pointers);
768 LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
769 migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
771 // if this is the step at which we need to dump the database
774 // calculate predicted load
775 // very time consuming though, so only happen when debugging is on
776 if (_lb_args.printSummary()) {
777 LBInfo info(clients);
778 // not take comm data
779 getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
780 LBRealType mLoad, mCpuLoad, totalLoad;
781 info.getSummary(mLoad, mCpuLoad, totalLoad);
783 statsData->computeNonlocalComm(nmsgs, nbytes);
784 CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
785 for (int i=0; i<clients; i++)
786 migrateMsg->expectedLoad[i] = info.peLoads[i];
789 DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
790 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
792 migrateMsg->lbDecisionCount = lbDecisionCount;
795 envelope *env = UsrToEnv(migrateMsg);
796 #if CMK_SCATTER_LB_RESULTS
797 InitiateScatter(migrateMsg);
801 thisProxy.ReceiveMigration(migrateMsg);
804 // split the migration for each processor
805 for (int p=0; p<CkNumPes(); p++) {
806 LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
807 thisProxy[p].ReceiveMigration(m);
812 // Zero out data structures for next cycle
813 // CkPrintf("zeroing out data\n");
819 void CentralLB::t_LoadBalance()
824 void CentralLB::InitiateScatter(LBMigrateMsg *msg) {
826 if (CkNumPes() <= broadcastThreshold) {
827 thisProxy.ReceiveMigration(msg);
831 int middlePe = CkNumPes() / 2;
833 // allocate maximum possible size to avoid later copies
834 // the messages will be resized before sending
835 LBScatterMsg *leftMsg = new (middlePe, msg->n_moves)
836 LBScatterMsg(0, middlePe - 1);
837 LBScatterMsg *rightMsg = new (CkNumPes() - middlePe, msg->n_moves)
838 LBScatterMsg(middlePe, CkNumPes() - 1);
840 int *migrateTally = new int[CkNumPes()];
841 memset(migrateTally, 0, CkNumPes() * sizeof(int));
843 for (int i = 0; i < msg->n_moves; i++) {
844 MigrateInfo* item = (MigrateInfo*) &msg->moves[i];
845 migrateTally[item->to_pe]++;
846 if (item->from_pe < middlePe) {
847 leftMsg->moves[leftMsg->numMigrates++] = *item;
850 rightMsg->moves[rightMsg->numMigrates++] = *item;
854 memcpy(leftMsg->numMigratesPerPe, migrateTally, middlePe * sizeof(int));
855 memcpy(rightMsg->numMigratesPerPe, &migrateTally[middlePe], (CkNumPes() - middlePe) * sizeof(int));
857 delete [] migrateTally;
859 // shrink the size of the messages
860 envelope *env = UsrToEnv(rightMsg);
861 env->shrinkUsersize((msg->n_moves - rightMsg->numMigrates) * sizeof(MigrateDecision));
863 // left message is not getting sent yet, but better resize it now
864 // before we lose track of its original size
865 env = UsrToEnv(leftMsg);
866 env->shrinkUsersize((msg->n_moves - leftMsg->numMigrates) * sizeof(MigrateDecision));
868 // send out results for right half of PEs first
869 // to overlap communication with computation
870 thisProxy[middlePe].ScatterMigrationResults(rightMsg);
873 ScatterMigrationResults(leftMsg);
876 void CentralLB::ScatterMigrationResults(LBScatterMsg *msg) {
878 int finished = false;
880 CkAssert(msg->firstPeInSpan == CkMyPe());
881 int numPesInSpan = msg->lastPeInSpan - msg->firstPeInSpan + 1 ;
883 if (numPesInSpan <= broadcastThreshold) {
884 for (int i = msg->firstPeInSpan; i < msg->lastPeInSpan; i++) {
885 // TODO: multicast without allocating new message each time
886 LBScatterMsg *msgCopy = new (numPesInSpan, msg->numMigrates)
887 LBScatterMsg(msg->firstPeInSpan, msg->lastPeInSpan);
888 msgCopy->numMigrates = msg->numMigrates;
889 memcpy(msgCopy->numMigratesPerPe, msg->numMigratesPerPe,
890 numPesInSpan * sizeof(int));
891 memcpy(msgCopy->moves, msg->moves,
892 msg->numMigrates * sizeof(MigrateDecision));
893 thisProxy[i].ReceiveMigration(msgCopy);
895 // use original message for last send
896 thisProxy[msg->lastPeInSpan].ReceiveMigration(msg);
900 int middlePe = (msg->firstPeInSpan + msg->lastPeInSpan + 1) / 2;
901 // reuse received message, taking care not to overwrite needed data
902 LBScatterMsg *leftMsg = msg;
903 int numMigrates = leftMsg->numMigrates;
904 int numPesInRightSpan = leftMsg->lastPeInSpan - middlePe + 1;
905 LBScatterMsg *rightMsg =
906 new (numPesInRightSpan, leftMsg->numMigrates)
907 LBScatterMsg(middlePe, leftMsg->lastPeInSpan);
908 leftMsg->numMigrates = 0;
909 leftMsg->lastPeInSpan = middlePe - 1;
910 for (int i = 0; i < numMigrates; i++) {
911 if (leftMsg->moves[i].fromPe < middlePe) {
912 leftMsg->moves[leftMsg->numMigrates++] = leftMsg->moves[i];
915 rightMsg->moves[rightMsg->numMigrates++] = leftMsg->moves[i];
919 memcpy(rightMsg->numMigratesPerPe,
920 &leftMsg->numMigratesPerPe[middlePe - leftMsg->firstPeInSpan],
921 (numPesInRightSpan) * sizeof(int));
923 // shrink the size of the messages
924 envelope *env = UsrToEnv(rightMsg);
925 env->shrinkUsersize((numMigrates - rightMsg->numMigrates)
926 * sizeof(MigrateDecision));
928 // left message is not getting sent yet, but better resize it now
929 // before we lose track of its original size
930 env = UsrToEnv(leftMsg);
931 env->shrinkUsersize((numMigrates - leftMsg->numMigrates)
932 * sizeof(MigrateDecision));
934 thisProxy[middlePe].ScatterMigrationResults(rightMsg);
941 // test if sender and receiver in a commData is nonmigratable.
942 static bool isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
945 for (int pe=0 ; pe<count; pe++)
947 for (int i=0; i<len[pe]; i++)
948 if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
949 LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID()))
956 // rebuild LDStats and remove all non-migratble objects and related things
957 void CentralLB::removeNonMigratable(LDStats* stats, int count)
961 // check if we have non-migratable objects
963 for (i=0; i<stats->n_objs; i++)
965 LDObjData &odata = stats->objData[i];
966 if (!odata.migratable) {
970 if (have == 0) return;
972 CkVec<LDObjData> nonmig;
973 CkVec<int> new_from_proc, new_to_proc;
974 nonmig.resize(stats->n_migrateobjs);
975 new_from_proc.resize(stats->n_migrateobjs);
976 new_to_proc.resize(stats->n_migrateobjs);
978 for (i=0; i<stats->n_objs; i++)
980 LDObjData &odata = stats->objData[i];
981 if (odata.migratable) {
982 nonmig[n_objs] = odata;
983 new_from_proc[n_objs] = stats->from_proc[i];
984 new_to_proc[n_objs] = stats->to_proc[i];
988 stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
990 stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
994 CmiAssert(stats->n_migrateobjs == n_objs);
996 stats->makeCommHash();
998 CkVec<LDCommData> newCommData;
999 newCommData.resize(stats->n_comm);
1001 for (i=0; i<stats->n_comm; i++)
1003 LDCommData& cdata = stats->commData[i];
1004 if (!cdata.from_proc())
1006 int idx = stats->getSendHash(cdata);
1007 CmiAssert(idx != -1);
1008 if (!stats->objData[idx].migratable) continue;
1010 switch (cdata.receiver.get_type()) {
1014 int idx = stats->getRecvHash(cdata);
1015 if (stats->complete_flag)
1016 CmiAssert(idx != -1);
1017 else if (idx == -1) continue; // receiver not in this group
1018 if (!stats->objData[idx].migratable) continue;
1021 case LD_OBJLIST_MSG: // object message FIXME add multicast
1024 newCommData[n_comm] = cdata;
1028 if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
1031 stats->objData = nonmig;
1032 stats->from_proc = new_from_proc;
1033 stats->to_proc = new_to_proc;
1034 stats->n_objs = n_objs;
1036 stats->commData = newCommData;
1037 stats->n_comm = n_comm;
1039 stats->deleteCommHash();
1040 stats->makeCommHash();
1045 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1046 extern int restarted;
1049 void CentralLB::ReceiveMigration(LBScatterMsg *m) {
1051 if (CkMyPe() == 0) theLbdb->SetStrategyCost(CkWallTimer() - strat_start_time);
1052 // Zero out data structures for next cycle
1056 storedMigrateMsg = NULL;
1057 storedScatterMsg = m;
1058 #if CMK_MEM_CHECKPOINT
1061 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1062 restoreParallelRecovery(&resumeAfterRestoreParallelRecovery,(void *)this);
1064 contribute(CkCallback(CkReductionTarget(CentralLB, ProcessMigrationDecision),
1070 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
1073 if (CkMyPe() == 0) theLbdb->SetStrategyCost(CkWallTimer() - strat_start_time);
1074 // Zero out data structures for next cycle
1078 storedMigrateMsg = m;
1079 #if CMK_MEM_CHECKPOINT
1082 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1083 restoreParallelRecovery(&resumeAfterRestoreParallelRecovery,(void *)this);
1085 contribute(CkCallback(CkReductionTarget(CentralLB, ProcessReceiveMigration),
1090 void CentralLB::ProcessMigrationDecision() {
1092 LBScatterMsg *m = storedScatterMsg;
1093 CkAssert(m != NULL);
1095 migrates_expected = m->numMigratesPerPe[CkMyPe() - m->firstPeInSpan];
1096 future_migrates_expected = 0;
1098 for(int i = 0; i < m->numMigrates; i++) {
1099 MigrateDecision& move = m->moves[i];
1100 const int me = CkMyPe();
1101 if (move.fromPe == me) {
1102 DEBUGF(("[%d] migrating object to %d\n", move.fromPe, move.toPe));
1103 // migrate object, in case it is already gone, inform toPe
1104 LDObjHandle objInfo = theLbdb->GetObjHandle(move.dbIndex);
1106 if (theLbdb->Migrate(objInfo,move.toPe) == 0) {
1107 CkAbort("Error: Async arrival not supported in scattering mode\n");
1112 if (migrates_expected == 0 || migrates_completed == migrates_expected) {
1120 void CentralLB::ProcessReceiveMigration()
1124 LBMigrateMsg *m = storedMigrateMsg;
1127 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1130 DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1131 // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1132 if(step() > m->step){
1134 envelope *env = UsrToEnv(m);
1137 lbDecisionCount = m->lbDecisionCount;
1140 if (_lb_args.debug() > 1)
1141 if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1143 for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1144 CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1146 if(!CmiNodeAlive(CkMyPe())){
1150 migrates_expected = 0;
1151 future_migrates_expected = 0;
1152 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1155 LBDB *_myLBDB = theLbdb->getLBDB();
1157 dummyCounts = new int[CmiNumPes()];
1158 memset(dummyCounts,0,sizeof(int)*CmiNumPes());
1161 for(i=0; i < m->n_moves; i++) {
1162 MigrateInfo& move = m->moves[i];
1163 const int me = CkMyPe();
1164 if (move.from_pe == me && move.to_pe != me) {
1165 DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1166 // migrate object, in case it is already gone, inform toPe
1167 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1168 if (theLbdb->Migrate(move.obj,move.to_pe) == 0)
1169 thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1172 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1173 theLbdb->Migrate(move.obj,move.to_pe);
1176 if(_myLBDB->validObjHandle(move.obj)){
1177 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1178 theLbdb->Migrate(move.obj,move.to_pe);
1181 DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1182 dummyCounts[move.to_pe]++;
1187 } else if (move.from_pe != me && move.to_pe == me) {
1188 DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1189 if (!move.async_arrival) migrates_expected++;
1190 else future_migrates_expected++;
1193 #if CMK_GLOBAL_LOCATION_UPDATE
1194 UpdateLocation(move);
1199 DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1200 // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1202 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1204 sendDummyMigrationCounts(dummyCounts);
1205 _restartFlag =false;
1206 delete []dummyCounts;
1212 if (m->n_moves ==0) {
1213 theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1216 cur_ld_balancer = m->next_lb;
1217 if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1218 LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1221 if (migrates_expected == 0 || migrates_completed == migrates_expected)
1225 // CkEvacuatedElement();
1226 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1227 // migrates_expected = 0;
1228 // // ResumeClients(1);
1233 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1234 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1235 DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1236 //TODO: this is gonna be important when a crash happens during checkpoint
1237 //the globalDecisionCount would have to be saved and compared against
1238 //a future recvMigration
1240 thisProxy[CkMyPe()].ResumeClients(1);
1244 // We assume that bit vector would have been aptly set async by either scheduler or charmrun.
1245 void CentralLB::CheckForRealloc(){
1246 #if CMK_SHRINK_EXPAND
1247 if(pending_realloc_state == REALLOC_MSG_RECEIVED) {
1248 pending_realloc_state = REALLOC_IN_PROGRESS; //in progress
1249 CkPrintf("Load balancer invoking charmrun to handle reallocation on pe %d\n", CkMyPe());
1250 double end_lb_time = CkWallTimer();
1251 CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1252 lbname, cur_ld_balancer, step()-1, end_lb_time, end_lb_time-start_lb_time);
1254 CkCallback cb(CkIndex_CentralLB::ResumeFromReallocCheckpoint(), thisProxy[0]);
1255 CkStartCheckpoint(_shrinkexpand_basedir, cb);
1258 thisProxy.MigrationDoneImpl(1);
1263 void CentralLB::ResumeFromReallocCheckpoint(){
1264 #if CMK_SHRINK_EXPAND
1265 std::vector<char> avail(se_avail_vector, se_avail_vector + CkNumPes());
1266 free(se_avail_vector);
1267 thisProxy.WillIbekilled(avail, numProcessAfterRestart);
1273 #if CMK_SHRINK_EXPAND
1274 int GetNewPeNumber(std::vector<char> avail){
1275 int mype = CkMyPe();
1277 for (int i =0; i <mype; i++){
1278 if(avail[i] ==0) count++;
1280 return (mype - count);
1284 void CentralLB::WillIbekilled(std::vector<char> avail, int newnumProcessAfterRestart){
1285 #if CMK_SHRINK_EXPAND
1286 numProcessAfterRestart = newnumProcessAfterRestart;
1287 mynewpe = GetNewPeNumber(avail);
1288 willContinue = avail[CkMyPe()];
1289 CkCallback cb(CkIndex_CentralLB::StartCleanup(), thisProxy[0]);
1294 void CentralLB::StartCleanup(){
1295 #if CMK_SHRINK_EXPAND
1299 void CentralLB::MigrationDone(int balancing)
1301 #if CMK_SHRINK_EXPAND
1302 // barrier to check for reallocation
1303 CkCallback cb(CkIndex_CentralLB::CheckForRealloc(), thisProxy[0]);
1307 MigrationDoneImpl(balancing);
1310 void CentralLB::MigrationDoneImpl (int balancing)
1314 migrates_completed = 0;
1315 migrates_expected = -1;
1317 if (balancing) theLbdb->ClearLoads();
1318 // Increment to next step
1320 DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1321 // if sync resume, invoke a barrier
1323 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1324 savedBalancing = balancing;
1325 startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1328 LBDatabase::Object()->MigrationDone(); // call registered callbacks
1330 LoadbalanceDone(balancing); // callback
1331 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1332 // if sync resume invoke a barrier
1333 if (balancing && _lb_args.syncResume()) {
1334 contribute(CkCallback(CkReductionTarget(CentralLB, ResumeClients),
1338 if(CmiNodeAlive(CkMyPe())){
1339 thisProxy [CkMyPe()].ResumeClients(balancing);
1342 #if CMK_GRID_QUEUE_AVAILABLE
1343 CmiGridQueueDeregisterAll ();
1344 CpvAccess(CkGridObject) = NULL;
1345 #endif // if CMK_GRID_QUEUE_AVAILABLE
1346 #endif // if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1347 #endif // if CMK_LBDB_ON
1350 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1351 void CentralLB::endMigrationDone(int balancing){
1352 DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1355 if (balancing && _lb_args.syncResume()) {
1356 contribute(CkCallback(CkReductionTarget(CentralLB, ResumeClients),
1360 if(CmiNodeAlive(CkMyPe())){
1361 DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1362 thisProxy [CkMyPe()].ResumeClients(balancing);
1369 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1370 void resumeCentralLbAfterChkpt(void *_lb){
1371 CentralLB *lb= (CentralLB *)_lb;
1372 CpvAccess(_currentObj)=lb;
1373 lb->endMigrationDone(lb->savedBalancing);
1375 void resumeAfterRestoreParallelRecovery(void *_lb){
1376 CentralLB *lb= (CentralLB *)_lb;
1377 lb->ProcessReceiveMigration();
1382 void CentralLB::ResumeClients()
1387 void CentralLB::ResumeClients(int balancing)
1390 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1392 globalResumeCount = resumeCount;
1394 DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1396 theLbdb->ResumeClients();
1399 CheckMigrationComplete();
1400 if (future_migrates_expected == 0 ||
1401 future_migrates_expected == future_migrates_completed) {
1402 CheckMigrationComplete();
1409 migration of objects contains two different kinds:
1410 (1) objects want to make a barrier for migration completion
1411 (waitForBarrier is true)
1412 migrationDone() to finish and resumeClients
1413 (2) objects don't need a barrier
1414 However, next load balancing can only happen when both migrations complete
1416 void CentralLB::CheckMigrationComplete()
1421 double end_lb_time = CkWallTimer();
1422 if (_lb_args.debug() && CkMyPe()==0) {
1423 CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1424 lbname, CkMyPe(), step()-1, end_lb_time,
1425 end_lb_time-start_lb_time);
1428 theLbdb->SetMigrationCost(end_lb_time - start_lb_time);
1431 future_migrates_expected = -1;
1432 future_migrates_completed = 0;
1435 DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1436 // release local barrier so that the next load balancer can go
1439 theLbdb->getLBDB()->DoneRegisteringObjects(h);
1440 // switch to the next load balancer in the list
1441 // subtle: called from Migrated() may result in Migrated() called in next LB
1442 theLbdb->nextLoadbalancer(seqno);
1447 // Remove edges from commData in LDStats which contains deleted elements
1448 void CentralLB::removeCommDataOfDeletedObjs(LDStats* stats) {
1449 stats->makeCommHash();
1451 CkVec<LDCommData> newCommData;
1452 newCommData.resize(stats->n_comm);
1454 for (int i=0; i<stats->n_comm; i++) {
1455 LDCommData& cdata = stats->commData[i];
1456 switch (cdata.receiver.get_type()) {
1460 if (!cdata.from_proc()) {
1461 int sidx = stats->getSendHash(cdata);
1462 int ridx = stats->getRecvHash(cdata);
1463 if (sidx == -1 || ridx == -1) continue;
1467 case LD_OBJLIST_MSG: {
1468 int sidx = stats->getSendHash(cdata);
1469 if (sidx == -1) continue;
1471 LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
1472 for (int id=0; id<nobjs; id++) {
1473 int idx = stats->getHash(objs[id]);
1476 objs[id] = objs[nobjs-1];
1481 if(nobjs == 0) continue;
1482 cdata.receiver.dest.destObjs.len = nobjs;
1487 stats->commData[n_comm] = cdata;
1491 stats->commData.resize(n_comm);
1492 stats->n_comm = n_comm;
1495 void CentralLB::preprocess(LDStats* stats)
1497 if (_lb_args.ignoreBgLoad())
1498 stats->clearBgLoad();
1500 // Call the predictor for the future
1501 if (_lb_predict) FuturePredictor(statsData);
1504 void CentralLB::printStrategyStats(LBMigrateMsg *msg) {
1506 envelope *env = UsrToEnv(msg);
1508 double strat_end_time = CkWallTimer();
1509 double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1510 CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1511 lbname, CkMyPe(), (int)lbdbMemsize, (int)(useMem()/1000));
1512 CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, CkMyPe(), msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1513 CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1514 lbname, CkMyPe(), strat_end_time, strat_end_time-strat_start_time);
1518 // default load balancing strategy
1519 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1522 strat_start_time = CkWallTimer();
1523 if (_lb_args.debug() && (CkMyPe() == cur_ld_balancer))
1524 CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1529 if ((_lb_args.debug()>2) && (CkMyPe() == cur_ld_balancer)) {
1530 CkPrintf("CharmLB> Obj Map:\n");
1531 for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1535 if (concurrent) return NULL; // migrate msg will only be created on PE with best solution
1537 LBMigrateMsg *msg = createMigrateMsg(stats);
1539 /* Extra feature for MetaBalancer
1540 if (_lb_args.metaLbOn()) {
1541 int clients = CkNumPes();
1542 LBInfo info(clients);
1543 getPredictedLoadWithMsg(stats, clients, msg, info, 0);
1544 LBRealType mLoad, mCpuLoad, totalLoad, totalLoadWComm;
1545 info.getSummary(mLoad, mCpuLoad, totalLoad);
1546 theLbdb->UpdateDataAfterLB(mLoad, mCpuLoad, totalLoad/clients);
1550 double strat_end_time = CkWallTimer();
1551 theLbdb->SetStrategyCost(strat_end_time - strat_start_time);
1553 if (_lb_args.debug() && (CkMyPe() == cur_ld_balancer)) {
1554 printStrategyStats(msg);
1562 void CentralLB::changeFreq(int r)
1564 CkAbort("ERROR: changeFreq in CentralLB should never be called!\n");
1567 void CentralLB::changeFreq(int nFreq)
1570 //CkPrintf("PROC#%d in changeFreq numProcs=%d\n",CkMyPe(),nFreq);
1571 // for(int i=0;i<numProcs;i++)
1573 // if(procFreq[i]!=procFreqNew[i])
1576 sprintf(newfreq,"%d",nFreq);
1577 cpufreq_sysfs_write(newfreq,CkMyPe()%physicalCoresPerNode);//i%physicalCoresPerNode);
1578 // CkPrintf("PROC#%d freq changing from %d to %d temp=%f\n",i,procFreq[i],procFreqNew[i],procTemp[i]);
1582 CmiAbort("You should never call CentralLB::changeFreq without using the flag TEMP_LDB\n");
1587 void CentralLB::work(LDStats* stats)
1589 // does nothing but print the database
1593 // generate migrate message from stats->from_proc and to_proc
1594 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1597 CkVec<MigrateInfo*> migrateInfo;
1598 for (i=0; i<stats->n_objs; i++) {
1599 LDObjData &objData = stats->objData[i];
1600 int frompe = stats->from_proc[i];
1601 int tope = stats->to_proc[i];
1602 if (frompe != tope) {
1603 // CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1604 // CkMyPe(),obj,pe,dest);
1605 MigrateInfo *migrateMe = new MigrateInfo;
1606 migrateMe->obj = objData.handle;
1607 migrateMe->from_pe = frompe;
1608 migrateMe->to_pe = tope;
1609 migrateMe->async_arrival = objData.asyncArrival;
1610 migrateInfo.insertAtEnd(migrateMe);
1614 int migrate_count=migrateInfo.length();
1615 LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1616 msg->n_moves = migrate_count;
1617 for(i=0; i < migrate_count; i++) {
1618 MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1619 msg->moves[i] = *item;
1626 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1631 for (i=0; i<m->n_moves; i++) {
1632 MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1633 if (item->from_pe == p || item->to_pe == p) nmoves++;
1635 for (i=0; i<CkNumPes();i++) {
1636 if (!m->avail_vector[i]) nunavail++;
1639 if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1640 else msg = new(nmoves,0,0,0) LBMigrateMsg;
1641 msg->n_moves = nmoves;
1642 msg->level = m->level;
1643 msg->next_lb = m->next_lb;
1644 for (i=0,nmoves=0; i<m->n_moves; i++) {
1645 MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1646 if (item->from_pe == p || item->to_pe == p) {
1647 msg->moves[nmoves] = *item;
1651 // copy processor data
1653 for (i=0; i<CkNumPes();i++) {
1654 msg->avail_vector[i] = m->avail_vector[i];
1655 msg->expectedLoad[i] = m->expectedLoad[i];
1660 void CentralLB::simulationWrite() {
1661 if(step() == LBSimulation::dumpStep)
1663 // here we are supposed to dump the database
1664 int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1665 char *dumpFileName = (char *)malloc(dumpFileSize);
1666 while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1669 dumpFileName = (char *)malloc(dumpFileSize);
1671 writeStatsMsgs(dumpFileName);
1673 CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1674 ++LBSimulation::dumpStep;
1675 --LBSimulation::dumpStepSize;
1676 if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1677 CmiPrintf("Charm++> Exiting...\n");
1684 void CentralLB::simulationRead() {
1685 if (concurrent) CkAbort("Error: LB simulation not supported in concurrent mode");
1686 LBSimulation *simResults = NULL, *realResults;
1687 LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1688 voidMessage->n_moves=0;
1689 for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1690 // here we are supposed to read the data from the dump database
1691 int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1692 char *simFileName = (char *)malloc(simFileSize);
1693 while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1696 simFileName = (char *)malloc(simFileSize);
1698 readStatsMsgs(simFileName);
1700 // allocate simResults (only the first step)
1701 if (simResults == NULL) {
1702 simResults = new LBSimulation(LBSimulation::simProcs);
1703 realResults = new LBSimulation(LBSimulation::simProcs);
1706 // should be the same number of procs of the original simulation!
1707 if (!LBSimulation::procsChanged) {
1708 // it means we have a previous step, so in simResults there is data.
1709 // we can now print the real effects of the load balancer during the simulation
1710 // or print the difference between the predicted data and the real one.
1711 realResults->reset();
1712 // reset to_proc of statsData to be equal to from_proc
1713 for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1714 findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1715 simResults->PrintDifferences(realResults,statsData);
1717 simResults->reset();
1720 // now pass it to the strategy routine
1721 double startT = CkWallTimer();
1722 preprocess(statsData);
1723 CmiPrintf("%s> Strategy starts ... \n", lbname);
1724 LBMigrateMsg* migrateMsg = Strategy(statsData);
1725 CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1726 lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1728 // now calculate the results of the load balancing simulation
1729 findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1731 // now we have the simulation data, so print it and loop
1732 CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1733 // **CWL** Officially recording my disdain here for using ints for bool
1734 if (LBSimulation::showDecisionsOnly) {
1735 simResults->PrintDecisions(migrateMsg, simFileName,
1736 LBSimulation::simProcs);
1738 simResults->PrintSimulationResults();
1743 CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1745 // deallocate simResults
1747 CmiPrintf("Charm++> Exiting...\n");
1751 void CentralLB::readStatsMsgs(const char* filename)
1755 FILE *f = fopen(filename, "r");
1757 CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1761 // at this stage, we need to rebuild the statsMsgList and
1762 // statsDataList structures. For that first deallocate the
1764 if (statsMsgsList) {
1765 for(i = 0; i < stats_msg_count; i++)
1766 delete statsMsgsList[i];
1767 delete[] statsMsgsList;
1771 PUP::fromDisk pd(f);
1772 PUP::machineInfo machInfo;
1774 pd((char *)&machInfo, sizeof(machInfo)); // read machine info
1775 PUP::xlater p(machInfo, pd);
1777 if (_lb_args.lbversion() > 1) {
1778 p|_lb_args.lbversion(); // write version number
1779 CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1780 CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1784 CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1785 if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1786 if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1788 // LBSimulation::simProcs must be set
1791 CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1792 CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1794 // file f is closed in the destructor of PUP::fromDisk
1795 CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1799 void CentralLB::writeStatsMsgs(const char* filename)
1802 FILE *f = fopen(filename, "w");
1804 CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1808 const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1810 p((char *)&machInfo, sizeof(machInfo)); // machine info
1812 p|_lb_args.lbversion(); // write version number
1818 CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1822 // calculate the predicted wallclock/cpu load for every processors
1823 // considering communication overhead if considerComm is true
1824 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count,
1825 LBMigrateMsg *msg, LBInfo &info,
1829 stats->makeCommHash();
1831 // update to_proc according to migration msgs
1832 for(int i = 0; i < msg->n_moves; i++) {
1833 MigrateInfo &mInfo = msg->moves[i];
1834 int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1835 CmiAssert(idx != -1);
1836 stats->to_proc[idx] = mInfo.to_pe;
1839 info.getInfo(stats, count, considerComm);
1844 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1846 CkAssert(simResults != NULL && count == simResults->numPes);
1847 // estimate the new loads of the processors. As a first approximation, this is the
1848 // sum of the cpu times of the objects on that processor
1849 double startT = CkWallTimer();
1850 getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1851 CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1854 void CentralLB::pup(PUP::er &p) {
1855 if (p.isUnpacking()) {
1856 initLB(CkLBOptions(seqno));
1858 p|reduction_started;
1860 if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1863 if (p.isUnpacking())
1864 statsMsg = new CLBStatsMsg;
1867 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1868 p | lbDecisionCount;
1874 int CentralLB::useMem() {
1875 return sizeof(CentralLB) + statsData->useMem() +
1876 CkNumPes() * sizeof(CLBStatsMsg *);
1881 CLBStatsMsg is not a real message now.
1882 CLBStatsMsg is used for all processors to fill in their local load and comm
1883 statistics and send to processor 0
1886 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1889 objData = new LDObjData[osz];
1890 commData = new LDCommData[csz];
1891 avail_vector = NULL;
1894 CLBStatsMsg::~CLBStatsMsg() {
1897 delete [] avail_vector;
1900 void CLBStatsMsg::pup(PUP::er &p) {
1906 #if defined(TEMP_LDB)
1915 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1919 if (p.isUnpacking()) objData = new LDObjData[n_objs];
1920 for (i=0; i<n_objs; i++) p|objData[i];
1922 if (p.isUnpacking()) commData = new LDCommData[n_comm];
1923 for (i=0; i<n_comm; i++) p|commData[i];
1925 int has_avail_vector;
1926 if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1928 if (p.isUnpacking()) {
1929 if (has_avail_vector) avail_vector = new char[CkNumPes()];
1930 else avail_vector = NULL;
1932 if (has_avail_vector) p(avail_vector, CkNumPes());
1937 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1938 // the entry function, it is just used to use to pup.
1939 // I don't use CLBStatsMsg directly as marshalled parameter because
1940 // I want the data pointer stored and not to be freed by the Charm++.
1941 void CkMarshalledCLBStatsMessage::free() {
1942 int count = msgs.size();
1943 for (int i=0; i<count; i++) {
1950 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1952 int count = m.getCount();
1953 for (int i=0; i<count; i++) add(m.getMessage(i));
1956 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1958 int count = msgs.size();
1960 for (int i=0; i<count; i++) {
1962 if (p.isUnpacking()) msg = new CLBStatsMsg;
1964 msg = msgs[i]; CmiAssert(msg!=NULL);
1967 if (p.isUnpacking()) add(msg);
1971 SpanningTree::SpanningTree()
1973 double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1974 arity = (int)ceil(sq/2);
1975 calcParent(CkMyPe());
1976 calcNumChildren(CkMyPe());
1979 void SpanningTree::calcParent(int n)
1982 if(n != 0 && arity > 0)
1983 parent = (n-1)/arity;
1986 void SpanningTree::calcNumChildren(int n)
1989 if (arity == 0) return;
1990 int fullNode=(CkNumPes()-1-arity)/arity;
1992 numChildren = arity;
1994 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1999 #include "CentralLB.def.h"