13 #include "LBDBManager.h"
15 // Temporary vacating flags
16 // Set PROC to -1 to disable
18 #define VACATE_PROC -1
19 //#define VACATE_PROC (CkNumPes()/2)
20 #define VACATE_AFTER 30
21 #define UNVACATE_AFTER 15
23 extern int quietModeRequested;
25 CreateLBFunc_Def(WSLB, "Workstation load balancer")
27 void WSLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
29 WSLB *me = (WSLB*)(data);
31 me->Migrated(h, waitBarrier);
34 void WSLB::staticAtSync(void* data)
36 WSLB *me = (WSLB*)(data);
41 WSLB::WSLB(const CkLBOptions &opt) : CBase_WSLB(opt)
44 thisProxy = CProxy_WSLB(thisgroup);
46 if (CkMyPe() == 0 && !quietModeRequested)
47 CkPrintf("CharmLB> WSLB created.\n");
51 AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
52 notifier = theLbdb->getLBDB()->
53 NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
56 LBtopoFn topofn = LBTopoLookup(_lbtopo);
58 if (CkMyPe()==0) CmiPrintf("LB> Fatal error: Unknown topology: %s.\n", _lbtopo);
61 topo = topofn(CkNumPes());
63 // I had to move neighbor initialization outside the constructor
64 // in order to get the virtual functions of any derived classes
65 // so I'll just set them to illegal values here.
70 migrates_completed = 0;
71 migrates_expected = -1;
72 mig_msgs_received = 0;
75 myStats.proc_speed = theLbdb->ProcessorSpeed();
77 // gethostname(hostname,79);
78 // CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
79 myStats.obj_data_sz = 0;
80 myStats.comm_data_sz = 0;
81 receive_stats_ready = 0;
87 theLbdb->CollectStatsOn();
94 theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
97 RemoveNotifyMigrated(notifier);
99 // RemoveStartLBFn((LDStartLBFn)(staticStartLB));
101 if (statsMsgsList) delete [] statsMsgsList;
102 if (statsDataList) delete [] statsDataList;
103 if (neighbor_pes) delete [] neighbor_pes;
104 if (mig_msgs) delete [] mig_msgs;
108 void WSLB::FindNeighbors()
110 if (neighbor_pes == 0) { // Neighbors never initialized, so init them
111 // and other things that depend on the number
113 int maxneighbors = topo->max_neighbors();
114 statsMsgsList = new WSLBStatsMsg*[maxneighbors];
115 for(int i=0; i < maxneighbors; i++)
116 statsMsgsList[i] = 0;
117 statsDataList = new LDStats[maxneighbors];
119 neighbor_pes = new int[maxneighbors];
120 topo->neighbors(CkMyPe(), neighbor_pes, mig_msgs_expected);
121 mig_msgs = new LBMigrateMsg*[mig_msgs_expected];
129 // CkPrintf("[%d] WSLB At Sync step %d!!!!\n",CkMyPe(),mystep);
132 start_lb_time = CkWallTimer();
133 CkPrintf("Load balancing step %d starting at %f\n",
134 step(),start_lb_time);
137 if (neighbor_pes == 0) FindNeighbors();
139 if (!QueryBalanceNow(step()) || mig_msgs_expected == 0) {
144 WSLBStatsMsg* msg = AssembleStats();
146 thisProxy.ReceiveStats(msg,mig_msgs_expected,neighbor_pes);
148 // Tell our own node that we are ready
149 ReceiveStats((WSLBStatsMsg*)0);
153 WSLBStatsMsg* WSLB::AssembleStats()
157 theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
158 theLbdb->IdleTime(&myStats.idletime);
159 theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
160 myStats.obj_data_sz = theLbdb->GetObjDataSz();
161 myStats.objData = new LDObjData[myStats.obj_data_sz];
162 theLbdb->GetObjData(myStats.objData);
164 myStats.comm_data_sz = theLbdb->GetCommDataSz();
165 myStats.commData = new LDCommData[myStats.comm_data_sz];
166 theLbdb->GetCommData(myStats.commData);
168 myStats.obj_walltime = myStats.obj_cputime = 0;
169 for(int i=0; i < myStats.obj_data_sz; i++) {
170 myStats.obj_walltime += myStats.objData[i].wallTime;
171 myStats.obj_cputime += myStats.objData[i].cpuTime;
174 WSLBStatsMsg* msg = new WSLBStatsMsg;
176 // Calculate usage percentage
177 double myload = myStats.total_walltime - myStats.idletime;
179 // for(i=0; i < myStats.obj_data_sz; i++) {
180 // myobjcpu += myStats.objData[i].cpuTime;
181 // myobjwall += myStats.objData[i].wallTime;
183 // if (myobjwall > 0)
184 // myusage = myobjcpu / myobjwall;
188 myusage = myStats.total_cputime / myload;
190 // Apply proportional-integral control on usage changes
191 const double usage_err = myusage - usage;
192 usage_int_err += usage_err;
193 usage += usage_err * 0.1 + usage_int_err * 0.01;
194 // CkPrintf("[%d] Usage err = %f %f\n",CkMyPe(),usage_err,usage_int_err);
196 // Allow usage to decrease quickly, but increase slowly
197 // if (myusage > usage)
198 // usage += (myusage-usage) * 0.1;
199 // else usage = myusage;
202 // CkPrintf("PE %d myload = %f myusage = %f usage = %f\n",
203 // CkMyPe(),myload,myusage,usage);
205 msg->from_pe = CkMyPe();
206 // msg->serial = rand();
207 msg->serial = CrnRand();
208 msg->proc_speed = myStats.proc_speed;
209 msg->total_walltime = myStats.total_walltime;
210 msg->total_cputime = myStats.total_cputime;
211 msg->idletime = myStats.idletime;
212 msg->bg_walltime = myStats.bg_walltime;
213 msg->bg_cputime = myStats.bg_cputime;
214 msg->obj_walltime = myStats.obj_walltime;
215 msg->obj_cputime = myStats.obj_cputime;
216 msg->vacate_me = vacate;
219 if (_lb_args.debug()) {
221 "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
222 CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
223 msg->idletime,msg->bg_walltime,msg->bg_cputime,
224 msg->obj_walltime,msg->obj_cputime);
227 // CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
228 // CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
235 void WSLB::Migrated(LDObjHandle h, int waitBarrier)
238 migrates_completed++;
239 // CkPrintf("[%d] An object migrated! %d %d\n",
240 // CkMyPe(),migrates_completed,migrates_expected);
241 if (migrates_completed == migrates_expected) {
247 void WSLB::ReceiveStats(WSLBStatsMsg *m)
250 if (neighbor_pes == 0) FindNeighbors();
252 if (m == 0) { // This is from our own node
253 receive_stats_ready = 1;
255 const int pe = m->from_pe;
256 // CkPrintf("Stats msg received, %d %d %d %d %p\n",
257 // pe,stats_msg_count,m->n_objs,m->serial,m);
259 for(int i=0; i < mig_msgs_expected; i++) {
260 if (pe == neighbor_pes[i]) {
265 if (peslot == -1 || statsMsgsList[peslot] != 0) {
266 CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
269 statsMsgsList[peslot] = m;
270 statsDataList[peslot].from_pe = m->from_pe;
271 statsDataList[peslot].total_walltime = m->total_walltime;
272 statsDataList[peslot].total_cputime = m->total_cputime;
273 statsDataList[peslot].idletime = m->idletime;
274 statsDataList[peslot].bg_walltime = m->bg_walltime;
275 statsDataList[peslot].bg_cputime = m->bg_cputime;
276 statsDataList[peslot].proc_speed = m->proc_speed;
277 statsDataList[peslot].obj_walltime = m->obj_walltime;
278 statsDataList[peslot].obj_cputime = m->obj_cputime;
279 statsDataList[peslot].vacate_me = m->vacate_me;
280 statsDataList[peslot].usage = m->usage;
285 const int clients = mig_msgs_expected;
286 if (stats_msg_count == clients && receive_stats_ready) {
287 double strat_start_time = CkWallTimer();
288 receive_stats_ready = 0;
289 LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
293 // Migrate messages from me to elsewhere
294 for(i=0; i < migrateMsg->n_moves; i++) {
295 MigrateInfo& move = migrateMsg->moves[i];
296 const int me = CkMyPe();
297 if (move.from_pe == me && move.to_pe != me) {
298 theLbdb->Migrate(move.obj,move.to_pe);
299 } else if (move.from_pe != me) {
300 CkPrintf("[%d] error, strategy wants to move from %d to %d\n",
301 me,move.from_pe,move.to_pe);
305 // Now, send migrate messages to neighbors
306 thisProxy.ReceiveMigration(migrateMsg,mig_msgs_expected,neighbor_pes);
308 // Zero out data structures for next cycle
309 for(i=0; i < clients; i++) {
310 delete statsMsgsList[i];
315 theLbdb->ClearLoads();
317 double strat_end_time = CkWallTimer();
318 CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
324 void WSLB::ReceiveMigration(LBMigrateMsg *msg)
327 if (neighbor_pes == 0) FindNeighbors();
329 if (mig_msgs_received == 0) migrates_expected = 0;
331 mig_msgs[mig_msgs_received] = msg;
333 // CkPrintf("[%d] Received migration msg %d of %d\n",
334 // CkMyPe(),mig_msgs_received,mig_msgs_expected);
336 if (mig_msgs_received > mig_msgs_expected) {
337 CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
341 if (mig_msgs_received != mig_msgs_expected) {
345 // CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
346 for(int neigh=0; neigh < mig_msgs_received;neigh++) {
347 LBMigrateMsg* m = mig_msgs[neigh];
348 for(int i=0; i < m->n_moves; i++) {
349 MigrateInfo& move = m->moves[i];
350 const int me = CkMyPe();
351 if (move.from_pe != me && move.to_pe == me) {
358 // CkPrintf("[%d] in ReceiveMigration %d expected\n",
359 // CkMyPe(),migrates_expected);
360 mig_msgs_received = 0;
361 if (migrates_expected == 0 || migrates_expected == migrates_completed)
367 void WSLB::MigrationDone()
371 double end_lb_time = CkWallTimer();
372 CkPrintf("Load balancing step %d finished at %f duration %f\n",
373 step(),end_lb_time,end_lb_time - start_lb_time);
375 migrates_completed = 0;
376 migrates_expected = -1;
377 // Increment to next step
379 thisProxy [CkMyPe()].ResumeClients();
383 void WSLB::ResumeClients()
386 theLbdb->ResumeClients();
390 bool WSLB::QueryBalanceNow(int step)
393 double now = CkWallTimer();
396 first_step_time = now;
397 else if (CkMyPe() == VACATE_PROC && now > VACATE_AFTER
398 && now < (VACATE_AFTER+UNVACATE_AFTER)) {
400 CkPrintf("PE %d vacating at %f\n",CkMyPe(),now);
404 CkPrintf("PE %d unvacating at %f\n",CkMyPe(),now);
411 LBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
414 // CkPrintf("[%d] Strategy starting\n",CkMyPe());
415 // Compute the average load to see if we are overloaded relative
417 const double load_factor = 1.05;
420 double myload = myStats.total_walltime - myStats.idletime;
421 double avgload = myload;
422 int unvacated_neighbors = 0;
424 for(i=0; i < count; i++) {
425 // If the neighbor is vacating, skip him
426 if (stats[i].vacate_me)
429 // Scale times we need appropriately for relative proc speeds
430 double hisload = stats[i].total_walltime - stats[i].idletime;
431 const double hisusage = stats[i].usage;
433 const double scale = (myStats.proc_speed * usage)
434 / (stats[i].proc_speed * hisusage);
437 stats[i].total_walltime *= scale;
438 stats[i].idletime *= scale;
440 // CkPrintf("PE %d %d hisload = %f hisusage = %f\n",
441 // CkMyPe(),i,hisload,hisusage);
443 unvacated_neighbors++;
445 if (vacate && unvacated_neighbors == 0)
446 CkPrintf("[%d] ALL NEIGHBORS WANT TO VACATE!!!\n",CkMyPe());
448 avgload /= (unvacated_neighbors+1);
450 CkVec<MigrateInfo*> migrateInfo;
452 // If we want to vacate, we always dump our load, otherwise
453 // only if we are overloaded
455 if (vacate || myload > avgload) {
456 // CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
457 // CkMyPe(),myload,avgload);
459 // First, build heaps of other processors and my objects
460 // Then assign objects to other processors until either
461 // - The smallest remaining object would put me below average, or
462 // - I only have 1 object left, or
463 // - The smallest remaining object would put someone else
467 minHeap procs(count);
468 for(i=0; i < count; i++) {
469 // If all my neighbors vacate, I won't have anyone to give work
471 if (!stats[i].vacate_me) {
472 InfoRecord* item = new InfoRecord;
473 item->load = stats[i].total_walltime - stats[i].idletime;
474 item->Id = stats[i].from_pe;
479 maxHeap objs(myStats.obj_data_sz);
480 for(i=0; i < myStats.obj_data_sz; i++) {
481 InfoRecord* item = new InfoRecord;
482 item->load = myStats.objData[i].wallTime;
487 int objs_here = myStats.obj_data_sz;
489 // if (objs_here <= 1) break; // For now, always leave 1 object
494 // Get the lightest-loaded processor
495 p = procs.deleteMin();
497 // CkPrintf("[%d] No destination PE found!\n",CkMyPe());
501 // Get the biggest object
502 bool objfound = false;
504 obj = objs.deleteMax();
507 objload = load_factor * obj->load;
509 double new_p_load = p->load + objload;
510 double my_new_load = myload - objload;
512 // If we're vacating, the biggest object is always good.
513 // Otherwise, only take it if it doesn't produce overload
514 if (vacate || new_p_load < my_new_load) {
517 // This object is too big, so throw it away
518 // CkPrintf("[%d] Can't move object w/ load %f to proc %d load %f %f\n",
519 // CkMyPe(),obj->load,p->Id,p->load,avgload);
525 // CkPrintf("[%d] No suitable object found!\n",CkMyPe());
529 const int me = CkMyPe();
530 // Apparently we can give this object to this processor
531 if (_lb_args.debug())
532 CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
533 CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
535 MigrateInfo* migrateMe = new MigrateInfo;
536 migrateMe->obj = myStats.objData[obj->Id].handle;
537 migrateMe->from_pe = me;
538 migrateMe->to_pe = p->Id;
539 migrateInfo.insertAtEnd(migrateMe);
543 // We may want to assign more to this processor, so lets
544 // update it and put it back in the heap
549 // This object is assigned, so we delete it from the heap
552 } while(vacate || myload > avgload);
554 // Now empty out the heaps
556 while (NULL!=(p=procs.deleteMin()))
559 while (NULL!=(obj=objs.deleteMax()))
563 // Now build the message to actually perform the migrations
564 int migrate_count=migrateInfo.length();
565 // if (migrate_count) {
566 // CkPrintf("PE %d: Sent away %d of %d objects\n",
567 // CkMyPe(),migrate_count,myStats.obj_data_sz);
569 LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
570 msg->n_moves = migrate_count;
571 for(i=0; i < migrate_count; i++) {
572 MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
573 msg->moves[i] = *item;
584 #include "WSLB.def.h"