13 #include "LBDBManager.h"
15 // Temporary vacating flags
16 // Set PROC to -1 to disable
18 #define VACATE_PROC -1
19 //#define VACATE_PROC (CkNumPes()/2)
20 #define VACATE_AFTER 30
21 #define UNVACATE_AFTER 15
23 CreateLBFunc_Def(WSLB, "Workstation load balancer")
25 void WSLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
27 WSLB *me = (WSLB*)(data);
29 me->Migrated(h, waitBarrier);
32 void WSLB::staticAtSync(void* data)
34 WSLB *me = (WSLB*)(data);
39 WSLB::WSLB(const CkLBOptions &opt) : CBase_WSLB(opt)
42 thisProxy = CProxy_WSLB(thisgroup);
45 CkPrintf("[%d] WSLB created\n",CkMyPe());
49 AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
50 notifier = theLbdb->getLBDB()->
51 NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
54 LBtopoFn topofn = LBTopoLookup(_lbtopo);
56 if (CkMyPe()==0) CmiPrintf("LB> Fatal error: Unknown topology: %s.\n", _lbtopo);
59 topo = topofn(CkNumPes());
61 // I had to move neighbor initialization outside the constructor
62 // in order to get the virtual functions of any derived classes
63 // so I'll just set them to illegal values here.
68 migrates_completed = 0;
69 migrates_expected = -1;
70 mig_msgs_received = 0;
73 myStats.proc_speed = theLbdb->ProcessorSpeed();
75 // gethostname(hostname,79);
76 // CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
77 myStats.obj_data_sz = 0;
78 myStats.comm_data_sz = 0;
79 receive_stats_ready = 0;
85 theLbdb->CollectStatsOn();
92 theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
95 RemoveNotifyMigrated(notifier);
97 // RemoveStartLBFn((LDStartLBFn)(staticStartLB));
99 if (statsMsgsList) delete [] statsMsgsList;
100 if (statsDataList) delete [] statsDataList;
101 if (neighbor_pes) delete [] neighbor_pes;
102 if (mig_msgs) delete [] mig_msgs;
106 void WSLB::FindNeighbors()
108 if (neighbor_pes == 0) { // Neighbors never initialized, so init them
109 // and other things that depend on the number
111 int maxneighbors = topo->max_neighbors();
112 statsMsgsList = new WSLBStatsMsg*[maxneighbors];
113 for(int i=0; i < maxneighbors; i++)
114 statsMsgsList[i] = 0;
115 statsDataList = new LDStats[maxneighbors];
117 neighbor_pes = new int[maxneighbors];
118 topo->neighbors(CkMyPe(), neighbor_pes, mig_msgs_expected);
119 mig_msgs = new LBMigrateMsg*[mig_msgs_expected];
127 // CkPrintf("[%d] WSLB At Sync step %d!!!!\n",CkMyPe(),mystep);
130 start_lb_time = CkWallTimer();
131 CkPrintf("Load balancing step %d starting at %f\n",
132 step(),start_lb_time);
135 if (neighbor_pes == 0) FindNeighbors();
137 if (!QueryBalanceNow(step()) || mig_msgs_expected == 0) {
142 WSLBStatsMsg* msg = AssembleStats();
144 thisProxy.ReceiveStats(msg,mig_msgs_expected,neighbor_pes);
146 // Tell our own node that we are ready
147 ReceiveStats((WSLBStatsMsg*)0);
151 WSLBStatsMsg* WSLB::AssembleStats()
155 theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
156 theLbdb->IdleTime(&myStats.idletime);
157 theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
158 myStats.obj_data_sz = theLbdb->GetObjDataSz();
159 myStats.objData = new LDObjData[myStats.obj_data_sz];
160 theLbdb->GetObjData(myStats.objData);
162 myStats.comm_data_sz = theLbdb->GetCommDataSz();
163 myStats.commData = new LDCommData[myStats.comm_data_sz];
164 theLbdb->GetCommData(myStats.commData);
166 myStats.obj_walltime = myStats.obj_cputime = 0;
167 for(int i=0; i < myStats.obj_data_sz; i++) {
168 myStats.obj_walltime += myStats.objData[i].wallTime;
169 myStats.obj_cputime += myStats.objData[i].cpuTime;
172 WSLBStatsMsg* msg = new WSLBStatsMsg;
174 // Calculate usage percentage
175 double myload = myStats.total_walltime - myStats.idletime;
177 // for(i=0; i < myStats.obj_data_sz; i++) {
178 // myobjcpu += myStats.objData[i].cpuTime;
179 // myobjwall += myStats.objData[i].wallTime;
181 // if (myobjwall > 0)
182 // myusage = myobjcpu / myobjwall;
186 myusage = myStats.total_cputime / myload;
188 // Apply proportional-integral control on usage changes
189 const double usage_err = myusage - usage;
190 usage_int_err += usage_err;
191 usage += usage_err * 0.1 + usage_int_err * 0.01;
192 // CkPrintf("[%d] Usage err = %f %f\n",CkMyPe(),usage_err,usage_int_err);
194 // Allow usage to decrease quickly, but increase slowly
195 // if (myusage > usage)
196 // usage += (myusage-usage) * 0.1;
197 // else usage = myusage;
200 // CkPrintf("PE %d myload = %f myusage = %f usage = %f\n",
201 // CkMyPe(),myload,myusage,usage);
203 msg->from_pe = CkMyPe();
204 // msg->serial = rand();
205 msg->serial = CrnRand();
206 msg->proc_speed = myStats.proc_speed;
207 msg->total_walltime = myStats.total_walltime;
208 msg->total_cputime = myStats.total_cputime;
209 msg->idletime = myStats.idletime;
210 msg->bg_walltime = myStats.bg_walltime;
211 msg->bg_cputime = myStats.bg_cputime;
212 msg->obj_walltime = myStats.obj_walltime;
213 msg->obj_cputime = myStats.obj_cputime;
214 msg->vacate_me = vacate;
217 if (_lb_args.debug()) {
219 "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
220 CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
221 msg->idletime,msg->bg_walltime,msg->bg_cputime,
222 msg->obj_walltime,msg->obj_cputime);
225 // CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
226 // CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
233 void WSLB::Migrated(LDObjHandle h, int waitBarrier)
236 migrates_completed++;
237 // CkPrintf("[%d] An object migrated! %d %d\n",
238 // CkMyPe(),migrates_completed,migrates_expected);
239 if (migrates_completed == migrates_expected) {
245 void WSLB::ReceiveStats(WSLBStatsMsg *m)
248 if (neighbor_pes == 0) FindNeighbors();
250 if (m == 0) { // This is from our own node
251 receive_stats_ready = 1;
253 const int pe = m->from_pe;
254 // CkPrintf("Stats msg received, %d %d %d %d %p\n",
255 // pe,stats_msg_count,m->n_objs,m->serial,m);
257 for(int i=0; i < mig_msgs_expected; i++) {
258 if (pe == neighbor_pes[i]) {
263 if (peslot == -1 || statsMsgsList[peslot] != 0) {
264 CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
267 statsMsgsList[peslot] = m;
268 statsDataList[peslot].from_pe = m->from_pe;
269 statsDataList[peslot].total_walltime = m->total_walltime;
270 statsDataList[peslot].total_cputime = m->total_cputime;
271 statsDataList[peslot].idletime = m->idletime;
272 statsDataList[peslot].bg_walltime = m->bg_walltime;
273 statsDataList[peslot].bg_cputime = m->bg_cputime;
274 statsDataList[peslot].proc_speed = m->proc_speed;
275 statsDataList[peslot].obj_walltime = m->obj_walltime;
276 statsDataList[peslot].obj_cputime = m->obj_cputime;
277 statsDataList[peslot].vacate_me = m->vacate_me;
278 statsDataList[peslot].usage = m->usage;
283 const int clients = mig_msgs_expected;
284 if (stats_msg_count == clients && receive_stats_ready) {
285 double strat_start_time = CkWallTimer();
286 receive_stats_ready = 0;
287 LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
291 // Migrate messages from me to elsewhere
292 for(i=0; i < migrateMsg->n_moves; i++) {
293 MigrateInfo& move = migrateMsg->moves[i];
294 const int me = CkMyPe();
295 if (move.from_pe == me && move.to_pe != me) {
296 theLbdb->Migrate(move.obj,move.to_pe);
297 } else if (move.from_pe != me) {
298 CkPrintf("[%d] error, strategy wants to move from %d to %d\n",
299 me,move.from_pe,move.to_pe);
303 // Now, send migrate messages to neighbors
304 thisProxy.ReceiveMigration(migrateMsg,mig_msgs_expected,neighbor_pes);
306 // Zero out data structures for next cycle
307 for(i=0; i < clients; i++) {
308 delete statsMsgsList[i];
313 theLbdb->ClearLoads();
315 double strat_end_time = CkWallTimer();
316 CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
322 void WSLB::ReceiveMigration(LBMigrateMsg *msg)
325 if (neighbor_pes == 0) FindNeighbors();
327 if (mig_msgs_received == 0) migrates_expected = 0;
329 mig_msgs[mig_msgs_received] = msg;
331 // CkPrintf("[%d] Received migration msg %d of %d\n",
332 // CkMyPe(),mig_msgs_received,mig_msgs_expected);
334 if (mig_msgs_received > mig_msgs_expected) {
335 CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
339 if (mig_msgs_received != mig_msgs_expected) {
343 // CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
344 for(int neigh=0; neigh < mig_msgs_received;neigh++) {
345 LBMigrateMsg* m = mig_msgs[neigh];
346 for(int i=0; i < m->n_moves; i++) {
347 MigrateInfo& move = m->moves[i];
348 const int me = CkMyPe();
349 if (move.from_pe != me && move.to_pe == me) {
356 // CkPrintf("[%d] in ReceiveMigration %d expected\n",
357 // CkMyPe(),migrates_expected);
358 mig_msgs_received = 0;
359 if (migrates_expected == 0 || migrates_expected == migrates_completed)
365 void WSLB::MigrationDone()
369 double end_lb_time = CkWallTimer();
370 CkPrintf("Load balancing step %d finished at %f duration %f\n",
371 step(),end_lb_time,end_lb_time - start_lb_time);
373 migrates_completed = 0;
374 migrates_expected = -1;
375 // Increment to next step
377 thisProxy [CkMyPe()].ResumeClients();
381 void WSLB::ResumeClients()
384 theLbdb->ResumeClients();
388 bool WSLB::QueryBalanceNow(int step)
391 double now = CkWallTimer();
394 first_step_time = now;
395 else if (CkMyPe() == VACATE_PROC && now > VACATE_AFTER
396 && now < (VACATE_AFTER+UNVACATE_AFTER)) {
398 CkPrintf("PE %d vacating at %f\n",CkMyPe(),now);
402 CkPrintf("PE %d unvacating at %f\n",CkMyPe(),now);
409 LBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
412 // CkPrintf("[%d] Strategy starting\n",CkMyPe());
413 // Compute the average load to see if we are overloaded relative
415 const double load_factor = 1.05;
418 double myload = myStats.total_walltime - myStats.idletime;
419 double avgload = myload;
420 int unvacated_neighbors = 0;
422 for(i=0; i < count; i++) {
423 // If the neighbor is vacating, skip him
424 if (stats[i].vacate_me)
427 // Scale times we need appropriately for relative proc speeds
428 double hisload = stats[i].total_walltime - stats[i].idletime;
429 const double hisusage = stats[i].usage;
431 const double scale = (myStats.proc_speed * usage)
432 / (stats[i].proc_speed * hisusage);
435 stats[i].total_walltime *= scale;
436 stats[i].idletime *= scale;
438 // CkPrintf("PE %d %d hisload = %f hisusage = %f\n",
439 // CkMyPe(),i,hisload,hisusage);
441 unvacated_neighbors++;
443 if (vacate && unvacated_neighbors == 0)
444 CkPrintf("[%d] ALL NEIGHBORS WANT TO VACATE!!!\n",CkMyPe());
446 avgload /= (unvacated_neighbors+1);
448 CkVec<MigrateInfo*> migrateInfo;
450 // If we want to vacate, we always dump our load, otherwise
451 // only if we are overloaded
453 if (vacate || myload > avgload) {
454 // CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
455 // CkMyPe(),myload,avgload);
457 // First, build heaps of other processors and my objects
458 // Then assign objects to other processors until either
459 // - The smallest remaining object would put me below average, or
460 // - I only have 1 object left, or
461 // - The smallest remaining object would put someone else
465 minHeap procs(count);
466 for(i=0; i < count; i++) {
467 // If all my neighbors vacate, I won't have anyone to give work
469 if (!stats[i].vacate_me) {
470 InfoRecord* item = new InfoRecord;
471 item->load = stats[i].total_walltime - stats[i].idletime;
472 item->Id = stats[i].from_pe;
477 maxHeap objs(myStats.obj_data_sz);
478 for(i=0; i < myStats.obj_data_sz; i++) {
479 InfoRecord* item = new InfoRecord;
480 item->load = myStats.objData[i].wallTime;
485 int objs_here = myStats.obj_data_sz;
487 // if (objs_here <= 1) break; // For now, always leave 1 object
492 // Get the lightest-loaded processor
493 p = procs.deleteMin();
495 // CkPrintf("[%d] No destination PE found!\n",CkMyPe());
499 // Get the biggest object
500 bool objfound = false;
502 obj = objs.deleteMax();
505 objload = load_factor * obj->load;
507 double new_p_load = p->load + objload;
508 double my_new_load = myload - objload;
510 // If we're vacating, the biggest object is always good.
511 // Otherwise, only take it if it doesn't produce overload
512 if (vacate || new_p_load < my_new_load) {
515 // This object is too big, so throw it away
516 // CkPrintf("[%d] Can't move object w/ load %f to proc %d load %f %f\n",
517 // CkMyPe(),obj->load,p->Id,p->load,avgload);
523 // CkPrintf("[%d] No suitable object found!\n",CkMyPe());
527 const int me = CkMyPe();
528 // Apparently we can give this object to this processor
529 if (_lb_args.debug())
530 CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
531 CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
533 MigrateInfo* migrateMe = new MigrateInfo;
534 migrateMe->obj = myStats.objData[obj->Id].handle;
535 migrateMe->from_pe = me;
536 migrateMe->to_pe = p->Id;
537 migrateInfo.insertAtEnd(migrateMe);
541 // We may want to assign more to this processor, so lets
542 // update it and put it back in the heap
547 // This object is assigned, so we delete it from the heap
550 } while(vacate || myload > avgload);
552 // Now empty out the heaps
554 while (NULL!=(p=procs.deleteMin()))
557 while (NULL!=(obj=objs.deleteMax()))
561 // Now build the message to actually perform the migrations
562 int migrate_count=migrateInfo.length();
563 // if (migrate_count) {
564 // CkPrintf("PE %d: Sent away %d of %d objects\n",
565 // CkMyPe(),migrate_count,myStats.obj_data_sz);
567 LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
568 msg->n_moves = migrate_count;
569 for(i=0; i < migrate_count; i++) {
570 MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
571 msg->moves[i] = *item;
582 #include "WSLB.def.h"