build: fix travis MPI/SMP build
[charm.git] / src / ck-ldb / WSLB.C
blobcfa995f595423857f88f9ab69a1837c7f59ab072
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
6 #ifndef _WIN32
7 #include <unistd.h>
8 #endif
10 #include "elements.h"
11 #include "ckheap.h"
12 #include "WSLB.h"
13 #include "LBDBManager.h"
15 // Temporary vacating flags
16 // Set PROC to -1 to disable
18 #define VACATE_PROC -1
19 //#define VACATE_PROC (CkNumPes()/2)
20 #define VACATE_AFTER 30
21 #define UNVACATE_AFTER 15
23 extern int quietModeRequested;
25 CreateLBFunc_Def(WSLB, "Workstation load balancer")
27 void WSLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
29   WSLB *me = (WSLB*)(data);
31   me->Migrated(h, waitBarrier);
34 void WSLB::staticAtSync(void* data)
36   WSLB *me = (WSLB*)(data);
38   me->AtSync();
41 WSLB::WSLB(const CkLBOptions &opt) : CBase_WSLB(opt)
43 #if CMK_LBDB_ON
44   thisProxy = CProxy_WSLB(thisgroup);
45   lbname = "WSLB";
46   if (CkMyPe() == 0 && !quietModeRequested)
47     CkPrintf("CharmLB> WSLB created.\n");
49   mystep = 0;
50   theLbdb->
51     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
52   notifier = theLbdb->getLBDB()->
53     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
56    LBtopoFn topofn = LBTopoLookup(_lbtopo);
57   if (topofn == NULL) {
58     if (CkMyPe()==0) CmiPrintf("LB> Fatal error: Unknown topology: %s.\n", _lbtopo);
59     CmiAbort("");
60   }
61   topo = topofn(CkNumPes());
63   // I had to move neighbor initialization outside the constructor
64   // in order to get the virtual functions of any derived classes
65   // so I'll just set them to illegal values here.
66   neighbor_pes = NULL;
67   stats_msg_count = 0;
68   statsMsgsList = NULL;
69   statsDataList = NULL;
70   migrates_completed = 0;
71   migrates_expected = -1;
72   mig_msgs_received = 0;
73   mig_msgs = NULL;
75   myStats.proc_speed = theLbdb->ProcessorSpeed();
76 //  char hostname[80];
77 //  gethostname(hostname,79);
78 //  CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
79   myStats.obj_data_sz = 0;
80   myStats.comm_data_sz = 0;
81   receive_stats_ready = 0;
83   vacate = false;
84   usage = 1.0;
85   usage_int_err = 0.;
87   theLbdb->CollectStatsOn();
88 #endif
91 WSLB::~WSLB()
93 #if CMK_LBDB_ON
94   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
95   if (theLbdb) {
96     theLbdb->getLBDB()->
97       RemoveNotifyMigrated(notifier);
98     //theLbdb->
99     //  RemoveStartLBFn((LDStartLBFn)(staticStartLB));
100   }
101   if (statsMsgsList) delete [] statsMsgsList;
102   if (statsDataList) delete [] statsDataList;
103   if (neighbor_pes)  delete [] neighbor_pes;
104   if (mig_msgs)      delete [] mig_msgs;
105 #endif
108 void WSLB::FindNeighbors()
110   if (neighbor_pes == 0) { // Neighbors never initialized, so init them
111                            // and other things that depend on the number
112                            // of neighbors
113     int maxneighbors = topo->max_neighbors();
114     statsMsgsList = new WSLBStatsMsg*[maxneighbors];
115     for(int i=0; i < maxneighbors; i++)
116       statsMsgsList[i] = 0;
117     statsDataList = new LDStats[maxneighbors];
119     neighbor_pes = new int[maxneighbors];
120     topo->neighbors(CkMyPe(), neighbor_pes, mig_msgs_expected);
121     mig_msgs = new LBMigrateMsg*[mig_msgs_expected];
122   }
126 void WSLB::AtSync()
128 #if CMK_LBDB_ON
129   //  CkPrintf("[%d] WSLB At Sync step %d!!!!\n",CkMyPe(),mystep);
131   if (CkMyPe() == 0) {
132     start_lb_time = CkWallTimer();
133     CkPrintf("Load balancing step %d starting at %f\n",
134              step(),start_lb_time);
135   }
137   if (neighbor_pes == 0) FindNeighbors();
139   if (!QueryBalanceNow(step()) || mig_msgs_expected == 0) {
140     MigrationDone();
141     return;
142   }
144   WSLBStatsMsg* msg = AssembleStats();
146   thisProxy.ReceiveStats(msg,mig_msgs_expected,neighbor_pes);
148   // Tell our own node that we are ready
149   ReceiveStats((WSLBStatsMsg*)0);
150 #endif
153 WSLBStatsMsg* WSLB::AssembleStats()
155 #if CMK_LBDB_ON
156   // Get stats
157   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
158   theLbdb->IdleTime(&myStats.idletime);
159   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
160   myStats.obj_data_sz = theLbdb->GetObjDataSz();
161   myStats.objData = new LDObjData[myStats.obj_data_sz];
162   theLbdb->GetObjData(myStats.objData);
164   myStats.comm_data_sz = theLbdb->GetCommDataSz();
165   myStats.commData = new LDCommData[myStats.comm_data_sz];
166   theLbdb->GetCommData(myStats.commData);
168   myStats.obj_walltime = myStats.obj_cputime = 0;
169   for(int i=0; i < myStats.obj_data_sz; i++) {
170     myStats.obj_walltime += myStats.objData[i].wallTime;
171     myStats.obj_cputime += myStats.objData[i].cpuTime;
172   }    
174   WSLBStatsMsg* msg = new WSLBStatsMsg;
176   // Calculate usage percentage
177   double myload = myStats.total_walltime - myStats.idletime;
178   double myusage;
179 //   for(i=0; i < myStats.obj_data_sz; i++) {
180 //     myobjcpu += myStats.objData[i].cpuTime;
181 //     myobjwall += myStats.objData[i].wallTime;
182 //   }
183 //   if (myobjwall > 0)
184 //     myusage = myobjcpu / myobjwall;
185 //   else
187   if (myload > 0)
188     myusage = myStats.total_cputime / myload;
189   else myusage = 1.0;
190   // Apply proportional-integral control on usage changes
191   const double usage_err = myusage - usage;
192   usage_int_err += usage_err;
193   usage += usage_err * 0.1 + usage_int_err * 0.01;
194   //  CkPrintf("[%d] Usage err = %f %f\n",CkMyPe(),usage_err,usage_int_err);
196   // Allow usage to decrease quickly, but increase slowly
197   //   if (myusage > usage)
198   //     usage += (myusage-usage) * 0.1;
199   //   else usage = myusage;
202   //  CkPrintf("PE %d myload = %f myusage = %f usage = %f\n",
203   //       CkMyPe(),myload,myusage,usage);
205   msg->from_pe = CkMyPe();
206   // msg->serial = rand();
207   msg->serial = CrnRand();
208   msg->proc_speed = myStats.proc_speed;
209   msg->total_walltime = myStats.total_walltime;
210   msg->total_cputime = myStats.total_cputime;
211   msg->idletime = myStats.idletime;
212   msg->bg_walltime = myStats.bg_walltime;
213   msg->bg_cputime = myStats.bg_cputime;
214   msg->obj_walltime = myStats.obj_walltime;
215   msg->obj_cputime = myStats.obj_cputime;
216   msg->vacate_me = vacate;
217   msg->usage = usage;
219   if (_lb_args.debug()) {
220     CkPrintf(
221       "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
222       CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
223       msg->idletime,msg->bg_walltime,msg->bg_cputime,
224       msg->obj_walltime,msg->obj_cputime);
225   }
227   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
228   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
229   return msg;
230 #else
231   return NULL;
232 #endif
235 void WSLB::Migrated(LDObjHandle h, int waitBarrier)
237 #if CMK_LBDB_ON
238   migrates_completed++;
239   //  CkPrintf("[%d] An object migrated! %d %d\n",
240   //       CkMyPe(),migrates_completed,migrates_expected);
241   if (migrates_completed == migrates_expected) {
242     MigrationDone();
243   }
244 #endif
247 void WSLB::ReceiveStats(WSLBStatsMsg *m)
249 #if CMK_LBDB_ON
250   if (neighbor_pes == 0) FindNeighbors();
252   if (m == 0) { // This is from our own node
253     receive_stats_ready = 1;
254   } else {
255     const int pe = m->from_pe;
256     //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
257     //             pe,stats_msg_count,m->n_objs,m->serial,m);
258     int peslot = -1;
259     for(int i=0; i < mig_msgs_expected; i++) {
260       if (pe == neighbor_pes[i]) {
261         peslot = i;
262         break;
263       }
264     }
265     if (peslot == -1 || statsMsgsList[peslot] != 0) {
266       CkPrintf("*** Unexpected WSLBStatsMsg in ReceiveStats from PE %d ***\n",
267                pe);
268     } else {
269       statsMsgsList[peslot] = m;
270       statsDataList[peslot].from_pe = m->from_pe;
271       statsDataList[peslot].total_walltime = m->total_walltime;
272       statsDataList[peslot].total_cputime = m->total_cputime;
273       statsDataList[peslot].idletime = m->idletime;
274       statsDataList[peslot].bg_walltime = m->bg_walltime;
275       statsDataList[peslot].bg_cputime = m->bg_cputime;
276       statsDataList[peslot].proc_speed = m->proc_speed;
277       statsDataList[peslot].obj_walltime = m->obj_walltime;
278       statsDataList[peslot].obj_cputime = m->obj_cputime;
279       statsDataList[peslot].vacate_me = m->vacate_me;
280       statsDataList[peslot].usage = m->usage;
281       stats_msg_count++;
282     }
283   }
285   const int clients = mig_msgs_expected;
286   if (stats_msg_count == clients && receive_stats_ready) {
287     double strat_start_time = CkWallTimer();
288     receive_stats_ready = 0;
289     LBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
291     int i;
293     // Migrate messages from me to elsewhere
294     for(i=0; i < migrateMsg->n_moves; i++) {
295       MigrateInfo& move = migrateMsg->moves[i];
296       const int me = CkMyPe();
297       if (move.from_pe == me && move.to_pe != me) {
298         theLbdb->Migrate(move.obj,move.to_pe);
299       } else if (move.from_pe != me) {
300         CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
301                  me,move.from_pe,move.to_pe);
302       }
303     }
304     
305     // Now, send migrate messages to neighbors
306     thisProxy.ReceiveMigration(migrateMsg,mig_msgs_expected,neighbor_pes);
307     
308     // Zero out data structures for next cycle
309     for(i=0; i < clients; i++) {
310       delete statsMsgsList[i];
311       statsMsgsList[i]=0;
312     }
313     stats_msg_count=0;
315     theLbdb->ClearLoads();
316     if (CkMyPe() == 0) {
317       double strat_end_time = CkWallTimer();
318       CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
319     }
320   }
321 #endif  
324 void WSLB::ReceiveMigration(LBMigrateMsg *msg)
326 #if CMK_LBDB_ON
327   if (neighbor_pes == 0) FindNeighbors();
329   if (mig_msgs_received == 0) migrates_expected = 0;
331   mig_msgs[mig_msgs_received] = msg;
332   mig_msgs_received++;
333   //  CkPrintf("[%d] Received migration msg %d of %d\n",
334   //       CkMyPe(),mig_msgs_received,mig_msgs_expected);
336   if (mig_msgs_received > mig_msgs_expected) {
337     CkPrintf("[%d] WSLB Error! Too many migration messages received\n",
338              CkMyPe());
339   }
341   if (mig_msgs_received != mig_msgs_expected) {
342     return;
343   }
345   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
346   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
347     LBMigrateMsg* m = mig_msgs[neigh];
348     for(int i=0; i < m->n_moves; i++) {
349       MigrateInfo& move = m->moves[i];
350       const int me = CkMyPe();
351       if (move.from_pe != me && move.to_pe == me) {
352         migrates_expected++;
353       }
354     }
355     delete m;
356     mig_msgs[neigh]=0;
357   }
358   //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
359   //       CkMyPe(),migrates_expected);
360   mig_msgs_received = 0;
361   if (migrates_expected == 0 || migrates_expected == migrates_completed)
362     MigrationDone();
363 #endif
367 void WSLB::MigrationDone()
369 #if CMK_LBDB_ON
370   if (CkMyPe() == 0) {
371     double end_lb_time = CkWallTimer();
372     CkPrintf("Load balancing step %d finished at %f duration %f\n",
373              step(),end_lb_time,end_lb_time - start_lb_time);
374   }
375   migrates_completed = 0;
376   migrates_expected = -1;
377   // Increment to next step
378   mystep++;
379   thisProxy [CkMyPe()].ResumeClients();
380 #endif
383 void WSLB::ResumeClients()
385 #if CMK_LBDB_ON
386   theLbdb->ResumeClients();
387 #endif
390 bool WSLB::QueryBalanceNow(int step)
392 #if CMK_LBDB_ON
393   double now = CkWallTimer();
395   if (step==0)
396     first_step_time = now;
397   else if (CkMyPe() == VACATE_PROC && now > VACATE_AFTER
398            && now < (VACATE_AFTER+UNVACATE_AFTER)) {
399     if (vacate == false) 
400       CkPrintf("PE %d vacating at %f\n",CkMyPe(),now);
401     vacate = true;
402   } else {
403     if (vacate == true)
404       CkPrintf("PE %d unvacating at %f\n",CkMyPe(),now);
405     vacate = false;
406   }
407 #endif
408   return true;
411 LBMigrateMsg* WSLB::Strategy(WSLB::LDStats* stats, int count)
413 #if CMK_LBDB_ON
414   //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
415   // Compute the average load to see if we are overloaded relative
416   // to our neighbors
417   const double load_factor = 1.05;
418   double objload;
420   double myload = myStats.total_walltime - myStats.idletime;
421   double avgload = myload;
422   int unvacated_neighbors = 0;
423   int i;
424   for(i=0; i < count; i++) {
425     // If the neighbor is vacating, skip him
426     if (stats[i].vacate_me)
427       continue;
429     // Scale times we need appropriately for relative proc speeds
430     double hisload = stats[i].total_walltime - stats[i].idletime;
431     const double hisusage = stats[i].usage;
433     const double scale =  (myStats.proc_speed * usage) 
434       / (stats[i].proc_speed * hisusage);
436     hisload *= scale;
437     stats[i].total_walltime *= scale;
438     stats[i].idletime *= scale;
440     //    CkPrintf("PE %d %d hisload = %f hisusage = %f\n",
441     //       CkMyPe(),i,hisload,hisusage);
442     avgload += hisload;
443     unvacated_neighbors++;
444   }
445   if (vacate && unvacated_neighbors == 0)
446     CkPrintf("[%d] ALL NEIGHBORS WANT TO VACATE!!!\n",CkMyPe());
448   avgload /= (unvacated_neighbors+1);
450   CkVec<MigrateInfo*> migrateInfo;
452   // If we want to vacate, we always dump our load, otherwise
453   // only if we are overloaded
455   if (vacate || myload > avgload) {
456     //    CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
457     //       CkMyPe(),myload,avgload);
459     // First, build heaps of other processors and my objects
460     // Then assign objects to other processors until either
461     //   - The smallest remaining object would put me below average, or
462     //   - I only have 1 object left, or
463     //   - The smallest remaining object would put someone else 
464     //     above average
466     // Build heaps
467     minHeap procs(count);
468     for(i=0; i < count; i++) {
469       // If all my neighbors vacate, I won't have anyone to give work 
470       // to
471       if (!stats[i].vacate_me) {
472         InfoRecord* item = new InfoRecord;
473         item->load = stats[i].total_walltime - stats[i].idletime;
474         item->Id =  stats[i].from_pe;
475         procs.insert(item);
476       }
477     }
478       
479     maxHeap objs(myStats.obj_data_sz);
480     for(i=0; i < myStats.obj_data_sz; i++) {
481       InfoRecord* item = new InfoRecord;
482       item->load = myStats.objData[i].wallTime;
483       item->Id = i;
484       objs.insert(item);
485     }
487     int objs_here = myStats.obj_data_sz;
488     do {
489       //      if (objs_here <= 1) break;  // For now, always leave 1 object
491       InfoRecord* p;
492       InfoRecord* obj;
494       // Get the lightest-loaded processor
495       p = procs.deleteMin();
496       if (p == 0) {
497         //      CkPrintf("[%d] No destination PE found!\n",CkMyPe());
498         break;
499       }
501       // Get the biggest object
502       bool objfound = false;
503       do {
504         obj = objs.deleteMax();
505         if (obj == 0) break;
507         objload = load_factor * obj->load;
509         double new_p_load = p->load + objload;
510         double my_new_load = myload - objload;
512         // If we're vacating, the biggest object is always good.
513         // Otherwise, only take it if it doesn't produce overload
514         if (vacate || new_p_load < my_new_load) {
515           objfound = true;
516         } else {
517           // This object is too big, so throw it away
518 //        CkPrintf("[%d] Can't move object w/ load %f to proc %d load %f %f\n",
519 //                 CkMyPe(),obj->load,p->Id,p->load,avgload);
520           delete obj;
521         }
522       } while (!objfound);
524       if (!objfound) {
525         //      CkPrintf("[%d] No suitable object found!\n",CkMyPe());
526         break;
527       }
529       const int me = CkMyPe();
530       // Apparently we can give this object to this processor
531       if (_lb_args.debug())
532       CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
533                CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
535       MigrateInfo* migrateMe = new MigrateInfo;
536       migrateMe->obj = myStats.objData[obj->Id].handle;
537       migrateMe->from_pe = me;
538       migrateMe->to_pe = p->Id;
539       migrateInfo.insertAtEnd(migrateMe);
541       objs_here--;
542       
543       // We may want to assign more to this processor, so lets
544       // update it and put it back in the heap
545       p->load += objload;
546       myload -= objload;
547       procs.insert(p);
548       
549       // This object is assigned, so we delete it from the heap
550       delete obj;
552     } while(vacate || myload > avgload);
554     // Now empty out the heaps
555     InfoRecord* p;
556     while (NULL!=(p=procs.deleteMin()))
557       delete p;
558     InfoRecord* obj;
559     while (NULL!=(obj=objs.deleteMax()))
560       delete obj;
561   }  
563   // Now build the message to actually perform the migrations
564   int migrate_count=migrateInfo.length();
565   //  if (migrate_count) {
566   //    CkPrintf("PE %d: Sent away %d of %d objects\n",
567   //         CkMyPe(),migrate_count,myStats.obj_data_sz);
568   //  }
569   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
570   msg->n_moves = migrate_count;
571   for(i=0; i < migrate_count; i++) {
572     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
573     msg->moves[i] = *item;
574     delete item;
575     migrateInfo[i] = 0;
576   }
578   return msg;
579 #else
580   return NULL;
581 #endif
584 #include "WSLB.def.h"
587 /*@}*/