Cleanup #48: Move SdagConstruct::numberNodes into each subclass
[charm.git] / src / ck-ldb / CommAwareRefineLB.C
blob64fbf1f3469c9c46223b8c1ad612814c675ed079
1 /** \file CommAwareRefineLB.C
2  *
3  *  Written by Harshitha Menon
4  *  
5  *  This Loadbalancer strategy is Refine but taking into consideration the
6  *  Communication between the processors.
7  *  The following are the steps in the loadbalancing strategy
8  *
9  *  1. Construct a max heap of processor load whose load is greater than avg
10  *  2. Construct a sorted array of processor load whose load is less than avg
11  *  3. Pick the heaviest processor from the heap, randomly select a chare in
12  *  that processor and decide on which processor in the underloaded processor
13  *  list to transfer it to based on the one with which it is 
14  *  heavily communicating.
15  *  4. If the load of the processors after the transfer is less than the avg
16  *  load, then push it into the underloaded processor list, else push it into
17  *  the max heap.
18  */
20 /**
21  * \addtogroup CkLdb
23 /*@{*/
25 #include "CommAwareRefineLB.h"
26 #include "ckgraph.h"
27 #include <algorithm>
28 #include <map>
30 #include <vector>
31 using std::vector;
33 #include <time.h>
35 #define THRESHOLD 0.02
36 #define SWAP_MULTIPLIER 5 
38 inline void eraseObjFromParrObjs(vector<int> & parr_objs, int remove_objid);
39 inline void printMapping(vector<Vertex> &vertices);
40 inline void removeFromArray(int pe_id, vector<int> &array);
41 inline int popFromProcHeap(vector<int> & parr_above_avg, ProcArray *parr);
42 inline void handleTransfer(int randomly_obj_id, ProcInfo& p, int possible_pe, vector<int> *parr_objs, ObjGraph *ogr, ProcArray* parr);
43 inline void updateLoadInfo(int p_index, int possible_pe, double upper_threshold, double lower_threshold,
44                            vector<int> &parr_above_avg, vector<int> &parr_below_avg,
45                            vector<bool> &proc_load_info, ProcArray *parr);
46 inline void getPossiblePes(vector<int>& possible_pes, int randomly_obj_id,
47     ObjGraph *ogr, ProcArray* parr);
49 double upper_threshold;
50 double lower_threshold;
52 CreateLBFunc_Def(CommAwareRefineLB, "always assign the heaviest obj onto lightest loaded processor.")
54 CommAwareRefineLB::CommAwareRefineLB(const CkLBOptions &opt): CBase_CommAwareRefineLB(opt)
56   lbname = "CommAwareRefineLB";
57   if (CkMyPe()==0)
58     CkPrintf("[%d] CommAwareRefineLB created\n",CkMyPe());
61 bool CommAwareRefineLB::QueryBalanceNow(int _step)
63   //  CkPrintf("[%d] Balancing on step %d\n",CkMyPe(),_step);
64   return true;
67 class ProcLoadGreater {
68   public:
69     ProcLoadGreater(ProcArray *parr) : parr(parr) {
70     }
71     bool operator()(int lhs, int rhs) {
72       return (parr->procs[lhs].getTotalLoad() < parr->procs[rhs].getTotalLoad());
73     }
75   private:
76     ProcArray *parr;
79 class ObjLoadGreater {
80   public:
81     bool operator()(Vertex v1, Vertex v2) {
82       return (v1.getVertexLoad() > v2.getVertexLoad());
83     }
86 class PeCommInfo {
87   public:
88     PeCommInfo() : num_msg(0), num_bytes(0) {
89     }
91     PeCommInfo(int pe_id) : pe_id(pe_id), num_msg(0) , num_bytes(0) {
92     }
93     int pe_id;
94     int num_msg;
95     int num_bytes;
96     // TODO: Should probably have a communication cost
99 // Consists of communication information of an object with is maintained
100 // as a list of PeCommInfo containing the processor id and the bytes transferred
101 class ObjPeCommInfo {
102   public:
103     ObjPeCommInfo() {
104     }
106     int obj_id;
107     vector<PeCommInfo> pcomm;
110 class ProcCommGreater {
111   public:
112     bool operator()(PeCommInfo p1, PeCommInfo p2) {
113       // TODO(Harshitha): Should probably consider total communication cost
114       return (p1.num_bytes > p2.num_bytes);
115     }
118 void PrintProcLoad(ProcArray *parr) {
119   int vert;
120   double pe_load;
121   for (vert = 0; vert < parr->procs.size(); vert++) {
122     pe_load = parr->procs[vert].getTotalLoad();
123     if (pe_load > upper_threshold) {
124       CkPrintf("Above load : %d load : %E overhead : %E\n",
125         parr->procs[vert].getProcId(), parr->procs[vert].getTotalLoad(),
126         parr->procs[vert].overhead());
127     } else if (pe_load < lower_threshold) {
128       CkPrintf("Below load : %d load : %E overhead : %E\n",
129         parr->procs[vert].getProcId(), parr->procs[vert].getTotalLoad(),
130         parr->procs[vert].overhead());
131     } else {
132       CkPrintf("Within avg load : %d load : %E overhead : %E\n",
133         parr->procs[vert].getProcId(), parr->procs[vert].getTotalLoad(),
134         parr->procs[vert].overhead());
135     }
136   }
139 void PrintProcObj(ProcArray *parr, vector<int>* parr_objs) {
140   int i, j;
141   CkPrintf("---------------------\n");
142   for (i = 0; i < parr->procs.size(); i++) {
143     CkPrintf("[%d] contains ", i);
144     for (j = 0; j < parr_objs[i].size(); j++) {
145       CkPrintf(" %d, ", parr_objs[i][j]);
146     }
147     CkPrintf("\n");
148   }
149   CkPrintf("---------------------\n");
153 void CommAwareRefineLB::work(LDStats* stats) {
154   /** ========================== INITIALIZATION ============================= */
155   ProcArray *parr = new ProcArray(stats);       // Processor Array
156   ObjGraph *ogr = new ObjGraph(stats);          // Object Graph
157   double avgload = parr->getAverageLoad();      // Average load of processors
159   // Sets to false if it is overloaded, else to true
160   vector<bool> proc_load_info(parr->procs.size(), false);
162   // Create an array of vectors for each processor mapping to the objects in
163   // that processor
164   vector<int>* parr_objs = new vector<int>[parr->procs.size()];
166   upper_threshold = avgload + (avgload * THRESHOLD);
167   //lower_threshold = avgload - (avgload * THRESHOLD * THRESHOLD);
168   lower_threshold = avgload;
170   int less_loaded_counter = 0;
172   srand(time(NULL));
173   /** ============================= STRATEGY ================================ */
175   if (_lb_args.debug()>1) 
176     CkPrintf("[%d] In CommAwareRefineLB strategy\n",CkMyPe());
178   CkPrintf("-- Average load %E\n", avgload);
180   int vert, i, j;
181   int curr_pe;
183   // Iterate over all the chares and construct the peid, vector<chareid> array
184   for(vert = 0; vert < ogr->vertices.size(); vert++) {
185     curr_pe = ogr->vertices[vert].getCurrentPe();
186     parr_objs[curr_pe].push_back(vert);
187     ogr->vertices[vert].setNewPe(curr_pe);
188   }
190   vector<int> parr_above_avg;
191   vector<int> parr_below_avg;
193   double pe_load;  
195   // Insert into parr_above_avg if the processor fits under the criteria of
196   // overloaded processor.
197   // Insert the processor id into parr_below_avg if the processor is underloaded 
198   for (vert = 0; vert < parr->procs.size(); vert++) {
199     pe_load = parr->procs[vert].getTotalLoad();
200     if (pe_load > upper_threshold) {
201       // Pushing ProcInfo into this list
202       parr_above_avg.push_back(vert);
203     } else if (pe_load < lower_threshold) {
204       parr_below_avg.push_back(parr->procs[vert].getProcId());
205       proc_load_info[parr->procs[vert].getProcId()] = true;
206       less_loaded_counter++;
207     }
208   }
210   std::make_heap(parr_above_avg.begin(), parr_above_avg.end(),
211       ProcLoadGreater(parr));
213   int random;
214   int randomly_obj_id;
215   bool obj_allocated;
216   int num_tries;
217   // Allow as many swaps as there are chares
218   int total_swaps = ogr->vertices.size() * SWAP_MULTIPLIER;
219   int possible_pe;
220   double obj_load;
222   // Keep on loadbalancing until the number of above avg processors is 0
223   while (parr_above_avg.size() != 0 && total_swaps > 0 && parr_below_avg.size() != 0) {
224     // CkPrintf("Above avg : %d Below avg : %d Total swaps: %d\n", parr_above_avg.size(),
225     //    parr_below_avg.size(), total_swaps);
226     obj_allocated = false;
227     num_tries = 0;
229     // Pop the heaviest processor
230     int p_index = popFromProcHeap(parr_above_avg, parr);
231     ProcInfo& p = parr->procs[p_index];
233     while (!obj_allocated && num_tries < parr_objs[p.getProcId()].size()) {
235       // It might so happen that due to overhead load, it might not have any
236       // more objects in its list
237       if (parr_objs[p.getProcId()].size() == 0) {
238         // CkPrintf("No obj left to be allocated\n");
239         obj_allocated = true;
240         break;
241       }
243       int randd = rand();
244       random = randd % parr_objs[p.getProcId()].size();
245       randomly_obj_id = parr_objs[p.getProcId()][random];
246       obj_load = ogr->vertices[randomly_obj_id].getVertexLoad();
248       // CkPrintf("Heavy %d: Parr obj size : %d random : %d random obj id : %d\n", p_index,
249       //     parr_objs[p.getProcId()].size(), randd, randomly_obj_id);
250       vector<int> possible_pes;
251       getPossiblePes(possible_pes, randomly_obj_id, ogr, parr);
252       for (i = 0; i < possible_pes.size(); i++) {
254         // If the heaviest communicating processor is there in the list, then
255         // assign it to that.
256         possible_pe = possible_pes[i];
258         if ((parr->procs[possible_pe].getTotalLoad() + obj_load) < upper_threshold) {
259          // CkPrintf("**  Transfered %d(Load %lf) from %d:%d(Load %lf) to %d:%d(Load %lf)\n",
260          //     randomly_obj_id, obj_load, CkNodeOf(p.getProcId()), p.getProcId(), p.getTotalLoad(),
261          //     CkNodeOf(possible_pe), possible_pe,
262          //     parr->procs[possible_pe].getTotalLoad());
264           handleTransfer(randomly_obj_id, p, possible_pe, parr_objs, ogr, parr);
265           obj_allocated = true;
266           total_swaps--;
267           updateLoadInfo(p_index, possible_pe, upper_threshold, lower_threshold,
268               parr_above_avg, parr_below_avg, proc_load_info, parr);
270           break;
271         }
272       }
274       // Since there is no processor in the least loaded list with which this
275       // chare communicates, pick a random least loaded processor.
276       if (!obj_allocated) {
277         //CkPrintf(":( Could not transfer to the nearest communicating ones\n");
278         for (int x = 0; x < parr_below_avg.size(); x++) {
279           int random_pe = parr_below_avg[x];
280           if ((parr->procs[random_pe].getTotalLoad() + obj_load) < upper_threshold) {
281             obj_allocated = true;
282             total_swaps--;
283             handleTransfer(randomly_obj_id, p, random_pe, parr_objs, ogr, parr);
284             updateLoadInfo(p_index, random_pe, upper_threshold, lower_threshold,
285                 parr_above_avg, parr_below_avg, proc_load_info, parr);
286             break;
287           }
288           num_tries++;
289         }
290       }
291     }
293     if (!obj_allocated) {
294       //CkPrintf("!!!! Could not handle the heavy proc %d so giving up\n", p_index);
295       // parr_above_avg.push_back(p_index);
296       // std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
297       //     ProcLoadGreater(parr));
298     }
299   }
301   //CkPrintf("CommAwareRefine> After lb max load: %lf avg load: %lf\n", max_load, avg_load/parr->procs.size());
303   /** ============================== CLEANUP ================================ */
304   ogr->convertDecisions(stats);         // Send decisions back to LDStats
305   delete parr;
306   delete ogr;
307   delete[] parr_objs;
310 inline void eraseObjFromParrObjs(vector<int> & parr_objs, int remove_objid) {
311   for (int i = 0; i < parr_objs.size(); i++) {
312     if (parr_objs[i] == remove_objid) {
313       parr_objs.erase(parr_objs.begin() + i);
314       return;
315     }
316   }
319 inline void printMapping(vector<Vertex> &vertices) {
320   for (int i = 0; i < vertices.size(); i++) {
321     CkPrintf("%d: old map : %d new map : %d\n", i, vertices[i].getCurrentPe(),
322         vertices[i].getNewPe());
323   }
326 inline void removeFromArray(int pe_id, vector<int> &array) {
327   for (int i = 0; i < array.size(); i++) {
328     if (array[i] == pe_id) {
329       array.erase(array.begin() + i);
330     }
331   }
334 inline int popFromProcHeap(vector<int> & parr_above_avg, ProcArray *parr) {
335   int p_index = parr_above_avg.front();
336   std::pop_heap(parr_above_avg.begin(), parr_above_avg.end(),
337       ProcLoadGreater(parr));
338   parr_above_avg.pop_back();
339   return p_index;
342     
343 inline void handleTransfer(int randomly_obj_id, ProcInfo& p, int possible_pe, vector<int>* parr_objs, ObjGraph *ogr, ProcArray* parr) {
344   ogr->vertices[randomly_obj_id].setNewPe(possible_pe);
345   parr_objs[possible_pe].push_back(randomly_obj_id);
346   ProcInfo &possible_pe_procinfo = parr->procs[possible_pe];
348   p.totalLoad() -= ogr->vertices[randomly_obj_id].getVertexLoad();
349   possible_pe_procinfo.totalLoad() += ogr->vertices[randomly_obj_id].getVertexLoad();
350   eraseObjFromParrObjs(parr_objs[p.getProcId()], randomly_obj_id);
351   //CkPrintf("After transfered %d from %d : Load %E to %d : Load %E\n", randomly_obj_id, p.getProcId(), p.getTotalLoad(),
352   //    possible_pe, possible_pe_procinfo.getTotalLoad());
355 inline void updateLoadInfo(int p_index, int possible_pe, double upper_threshold, double lower_threshold,
356                            vector<int>& parr_above_avg, vector<int>& parr_below_avg,
357                            vector<bool> &proc_load_info, ProcArray *parr) {
359   ProcInfo& p = parr->procs[p_index];
360   ProcInfo& possible_pe_procinfo = parr->procs[possible_pe];
362   // If the updated load is still greater than the average by the
363   // threshold value, then push it back to the max heap
364   if (p.getTotalLoad() > upper_threshold) {
365     parr_above_avg.push_back(p_index);
366     std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
367         ProcLoadGreater(parr));
368     //CkPrintf("\t Pushing pe : %d to max heap\n", p.getProcId());
369   } else if (p.getTotalLoad() < lower_threshold) {
370     parr_below_avg.push_back(p_index);
371     proc_load_info[p_index] = true;
372     //CkPrintf("\t Adding pe : %d to less loaded\n", p.getProcId());
373   }
375   // If the newly assigned processor's load is greater than the average
376   // by the threshold value, then push it into the max heap.
377   if (possible_pe_procinfo.getTotalLoad() > upper_threshold) {
378     // TODO: It should be the index in procarray :(
379     parr_above_avg.push_back(possible_pe);
380     std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
381         ProcLoadGreater(parr));
382     removeFromArray(possible_pe, parr_below_avg);
383     proc_load_info[possible_pe] = false;
384     //CkPrintf("\t Pusing pe : %d to max heap\n", possible_pe);
385   } else if (possible_pe_procinfo.getTotalLoad() < lower_threshold) {
386   } else {
387     removeFromArray(possible_pe, parr_below_avg);
388     proc_load_info[possible_pe] = false;
389     //CkPrintf("\t Removing from lower list pe : %d\n", possible_pe);
390   }
394 inline void getPossiblePes(vector<int>& possible_pes, int vert,
395     ObjGraph *ogr, ProcArray* parr) {
396   std::map<int, int> tmp_map_pid_index;
397   int counter = 0;
398   int index;
399   int i, j, nbrid;
400   ObjPeCommInfo objpcomm;
401  // CkPrintf("%d sends msgs to %d and recv msgs from %d\n", vert,
402  //   ogr->vertices[vert].sendToList.size(),
403  //   ogr->vertices[vert].recvFromList.size());
404   
405   for (i = 0; i < ogr->vertices[vert].sendToList.size(); i++) {
406     nbrid = ogr->vertices[vert].sendToList[i].getNeighborId();
407     j = ogr->vertices[nbrid].getNewPe(); // Fix me!! New PE
408     // TODO: Should it index with vertexId?
409     if (tmp_map_pid_index.count(j) == 0) {
410       tmp_map_pid_index[j] = counter;
411       PeCommInfo pecomminf(j);
412       // TODO: Shouldn't it use vertexId instead of vert?
413       objpcomm.pcomm.push_back(pecomminf);
414       counter++;
415     }
416     index = tmp_map_pid_index[j];
418     objpcomm.pcomm[index].num_msg +=
419       ogr->vertices[vert].sendToList[i].getNumMsgs();
420     objpcomm.pcomm[index].num_bytes +=
421       ogr->vertices[vert].sendToList[i].getNumBytes();
422   }
424   for (i = 0; i < ogr->vertices[vert].recvFromList.size(); i++) {
425     nbrid = ogr->vertices[vert].recvFromList[i].getNeighborId();
426     j = ogr->vertices[nbrid].getNewPe();
428     if (tmp_map_pid_index.count(j) == 0) {
429       tmp_map_pid_index[j] = counter;
430       PeCommInfo pecomminf(j);
431       // TODO: Shouldn't it use vertexId instead of vert?
432       objpcomm.pcomm.push_back(pecomminf);
433       counter++;
434     }
435     index = tmp_map_pid_index[j];
437     objpcomm.pcomm[index].num_msg +=
438       ogr->vertices[vert].sendToList[i].getNumMsgs();
439     objpcomm.pcomm[index].num_bytes +=
440       ogr->vertices[vert].sendToList[i].getNumBytes();
441   }
443   // Sort the pe communication vector for this chare
444   std::sort(objpcomm.pcomm.begin(), objpcomm.pcomm.end(),
445       ProcCommGreater());
447   int pe_id;
448   int node_id;
449   int node_size;
450   int node_first;
451   //CkPrintf("%d talks to %d pes and possible pes are :\n", vert,
452   //    objpcomm.pcomm.size());
453   for (i = 0; i < objpcomm.pcomm.size(); i++) {
454     pe_id = objpcomm.pcomm[i].pe_id;
455     node_id = CkNodeOf(pe_id);
456     node_size = CkNodeSize(node_id);
457     node_first = CkNodeFirst(node_id);
458    // CkPrintf("smp details pe_id %d, node_id %d, node_size %d, node_first %d\n",
459    //   pe_id, node_id, node_size, node_first);
460     for (j = 0; j < node_size; j++) {
461       possible_pes.push_back(node_first + j);
462       //CkPrintf("\t %d:%d (comm: %d)\n",node_id, node_first+j, objpcomm.pcomm[i].num_bytes); 
463     }
464   }
468 #include "CommAwareRefineLB.def.h"
470 /*@}*/