1 /** \file CommAwareRefineLB.C
3 * Written by Harshitha Menon
5 * This Loadbalancer strategy is Refine but taking into consideration the
6 * Communication between the processors.
7 * The following are the steps in the loadbalancing strategy
9 * 1. Construct a max heap of processor load whose load is greater than avg
10 * 2. Construct a sorted array of processor load whose load is less than avg
11 * 3. Pick the heaviest processor from the heap, randomly select a chare in
12 * that processor and decide on which processor in the underloaded processor
13 * list to transfer it to based on the one with which it is
14 * heavily communicating.
15 * 4. If the load of the processors after the transfer is less than the avg
16 * load, then push it into the underloaded processor list, else push it into
25 #include "CommAwareRefineLB.h"
35 #define THRESHOLD 0.02
36 #define SWAP_MULTIPLIER 5
38 inline void eraseObjFromParrObjs(vector<int> & parr_objs, int remove_objid);
39 inline void printMapping(vector<Vertex> &vertices);
40 inline void removeFromArray(int pe_id, vector<int> &array);
41 inline int popFromProcHeap(vector<int> & parr_above_avg, ProcArray *parr);
42 inline void handleTransfer(int randomly_obj_id, ProcInfo& p, int possible_pe, vector<int> *parr_objs, ObjGraph *ogr, ProcArray* parr);
43 inline void updateLoadInfo(int p_index, int possible_pe, double upper_threshold, double lower_threshold,
44 vector<int> &parr_above_avg, vector<int> &parr_below_avg,
45 vector<bool> &proc_load_info, ProcArray *parr);
46 inline void getPossiblePes(vector<int>& possible_pes, int randomly_obj_id,
47 ObjGraph *ogr, ProcArray* parr);
49 double upper_threshold;
50 double lower_threshold;
52 CreateLBFunc_Def(CommAwareRefineLB, "always assign the heaviest obj onto lightest loaded processor.")
54 CommAwareRefineLB::CommAwareRefineLB(const CkLBOptions &opt): CBase_CommAwareRefineLB(opt)
56 lbname = "CommAwareRefineLB";
58 CkPrintf("[%d] CommAwareRefineLB created\n",CkMyPe());
61 bool CommAwareRefineLB::QueryBalanceNow(int _step)
63 // CkPrintf("[%d] Balancing on step %d\n",CkMyPe(),_step);
67 class ProcLoadGreater {
69 ProcLoadGreater(ProcArray *parr) : parr(parr) {
71 bool operator()(int lhs, int rhs) {
72 return (parr->procs[lhs].getTotalLoad() < parr->procs[rhs].getTotalLoad());
79 class ObjLoadGreater {
81 bool operator()(Vertex v1, Vertex v2) {
82 return (v1.getVertexLoad() > v2.getVertexLoad());
88 PeCommInfo() : num_msg(0), num_bytes(0) {
91 PeCommInfo(int pe_id) : pe_id(pe_id), num_msg(0) , num_bytes(0) {
96 // TODO: Should probably have a communication cost
99 // Consists of communication information of an object with is maintained
100 // as a list of PeCommInfo containing the processor id and the bytes transferred
101 class ObjPeCommInfo {
107 vector<PeCommInfo> pcomm;
110 class ProcCommGreater {
112 bool operator()(PeCommInfo p1, PeCommInfo p2) {
113 // TODO(Harshitha): Should probably consider total communication cost
114 return (p1.num_bytes > p2.num_bytes);
118 void PrintProcLoad(ProcArray *parr) {
121 for (vert = 0; vert < parr->procs.size(); vert++) {
122 pe_load = parr->procs[vert].getTotalLoad();
123 if (pe_load > upper_threshold) {
124 CkPrintf("Above load : %d load : %E overhead : %E\n",
125 parr->procs[vert].getProcId(), parr->procs[vert].getTotalLoad(),
126 parr->procs[vert].overhead());
127 } else if (pe_load < lower_threshold) {
128 CkPrintf("Below load : %d load : %E overhead : %E\n",
129 parr->procs[vert].getProcId(), parr->procs[vert].getTotalLoad(),
130 parr->procs[vert].overhead());
132 CkPrintf("Within avg load : %d load : %E overhead : %E\n",
133 parr->procs[vert].getProcId(), parr->procs[vert].getTotalLoad(),
134 parr->procs[vert].overhead());
139 void PrintProcObj(ProcArray *parr, vector<int>* parr_objs) {
141 CkPrintf("---------------------\n");
142 for (i = 0; i < parr->procs.size(); i++) {
143 CkPrintf("[%d] contains ", i);
144 for (j = 0; j < parr_objs[i].size(); j++) {
145 CkPrintf(" %d, ", parr_objs[i][j]);
149 CkPrintf("---------------------\n");
153 void CommAwareRefineLB::work(LDStats* stats) {
154 /** ========================== INITIALIZATION ============================= */
155 ProcArray *parr = new ProcArray(stats); // Processor Array
156 ObjGraph *ogr = new ObjGraph(stats); // Object Graph
157 double avgload = parr->getAverageLoad(); // Average load of processors
159 // Sets to false if it is overloaded, else to true
160 vector<bool> proc_load_info(parr->procs.size(), false);
162 // Create an array of vectors for each processor mapping to the objects in
164 vector<int>* parr_objs = new vector<int>[parr->procs.size()];
166 upper_threshold = avgload + (avgload * THRESHOLD);
167 //lower_threshold = avgload - (avgload * THRESHOLD * THRESHOLD);
168 lower_threshold = avgload;
170 int less_loaded_counter = 0;
173 /** ============================= STRATEGY ================================ */
175 if (_lb_args.debug()>1)
176 CkPrintf("[%d] In CommAwareRefineLB strategy\n",CkMyPe());
178 CkPrintf("-- Average load %E\n", avgload);
183 // Iterate over all the chares and construct the peid, vector<chareid> array
184 for(vert = 0; vert < ogr->vertices.size(); vert++) {
185 curr_pe = ogr->vertices[vert].getCurrentPe();
186 parr_objs[curr_pe].push_back(vert);
187 ogr->vertices[vert].setNewPe(curr_pe);
190 vector<int> parr_above_avg;
191 vector<int> parr_below_avg;
195 // Insert into parr_above_avg if the processor fits under the criteria of
196 // overloaded processor.
197 // Insert the processor id into parr_below_avg if the processor is underloaded
198 for (vert = 0; vert < parr->procs.size(); vert++) {
199 pe_load = parr->procs[vert].getTotalLoad();
200 if (pe_load > upper_threshold) {
201 // Pushing ProcInfo into this list
202 parr_above_avg.push_back(vert);
203 } else if (pe_load < lower_threshold) {
204 parr_below_avg.push_back(parr->procs[vert].getProcId());
205 proc_load_info[parr->procs[vert].getProcId()] = true;
206 less_loaded_counter++;
210 std::make_heap(parr_above_avg.begin(), parr_above_avg.end(),
211 ProcLoadGreater(parr));
217 // Allow as many swaps as there are chares
218 int total_swaps = ogr->vertices.size() * SWAP_MULTIPLIER;
222 // Keep on loadbalancing until the number of above avg processors is 0
223 while (parr_above_avg.size() != 0 && total_swaps > 0 && parr_below_avg.size() != 0) {
224 // CkPrintf("Above avg : %d Below avg : %d Total swaps: %d\n", parr_above_avg.size(),
225 // parr_below_avg.size(), total_swaps);
226 obj_allocated = false;
229 // Pop the heaviest processor
230 int p_index = popFromProcHeap(parr_above_avg, parr);
231 ProcInfo& p = parr->procs[p_index];
233 while (!obj_allocated && num_tries < parr_objs[p.getProcId()].size()) {
235 // It might so happen that due to overhead load, it might not have any
236 // more objects in its list
237 if (parr_objs[p.getProcId()].size() == 0) {
238 // CkPrintf("No obj left to be allocated\n");
239 obj_allocated = true;
244 random = randd % parr_objs[p.getProcId()].size();
245 randomly_obj_id = parr_objs[p.getProcId()][random];
246 obj_load = ogr->vertices[randomly_obj_id].getVertexLoad();
248 // CkPrintf("Heavy %d: Parr obj size : %d random : %d random obj id : %d\n", p_index,
249 // parr_objs[p.getProcId()].size(), randd, randomly_obj_id);
250 vector<int> possible_pes;
251 getPossiblePes(possible_pes, randomly_obj_id, ogr, parr);
252 for (i = 0; i < possible_pes.size(); i++) {
254 // If the heaviest communicating processor is there in the list, then
255 // assign it to that.
256 possible_pe = possible_pes[i];
258 if ((parr->procs[possible_pe].getTotalLoad() + obj_load) < upper_threshold) {
259 // CkPrintf("** Transfered %d(Load %lf) from %d:%d(Load %lf) to %d:%d(Load %lf)\n",
260 // randomly_obj_id, obj_load, CkNodeOf(p.getProcId()), p.getProcId(), p.getTotalLoad(),
261 // CkNodeOf(possible_pe), possible_pe,
262 // parr->procs[possible_pe].getTotalLoad());
264 handleTransfer(randomly_obj_id, p, possible_pe, parr_objs, ogr, parr);
265 obj_allocated = true;
267 updateLoadInfo(p_index, possible_pe, upper_threshold, lower_threshold,
268 parr_above_avg, parr_below_avg, proc_load_info, parr);
274 // Since there is no processor in the least loaded list with which this
275 // chare communicates, pick a random least loaded processor.
276 if (!obj_allocated) {
277 //CkPrintf(":( Could not transfer to the nearest communicating ones\n");
278 for (int x = 0; x < parr_below_avg.size(); x++) {
279 int random_pe = parr_below_avg[x];
280 if ((parr->procs[random_pe].getTotalLoad() + obj_load) < upper_threshold) {
281 obj_allocated = true;
283 handleTransfer(randomly_obj_id, p, random_pe, parr_objs, ogr, parr);
284 updateLoadInfo(p_index, random_pe, upper_threshold, lower_threshold,
285 parr_above_avg, parr_below_avg, proc_load_info, parr);
293 if (!obj_allocated) {
294 //CkPrintf("!!!! Could not handle the heavy proc %d so giving up\n", p_index);
295 // parr_above_avg.push_back(p_index);
296 // std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
297 // ProcLoadGreater(parr));
301 //CkPrintf("CommAwareRefine> After lb max load: %lf avg load: %lf\n", max_load, avg_load/parr->procs.size());
303 /** ============================== CLEANUP ================================ */
304 ogr->convertDecisions(stats); // Send decisions back to LDStats
310 inline void eraseObjFromParrObjs(vector<int> & parr_objs, int remove_objid) {
311 for (int i = 0; i < parr_objs.size(); i++) {
312 if (parr_objs[i] == remove_objid) {
313 parr_objs.erase(parr_objs.begin() + i);
319 inline void printMapping(vector<Vertex> &vertices) {
320 for (int i = 0; i < vertices.size(); i++) {
321 CkPrintf("%d: old map : %d new map : %d\n", i, vertices[i].getCurrentPe(),
322 vertices[i].getNewPe());
326 inline void removeFromArray(int pe_id, vector<int> &array) {
327 for (int i = 0; i < array.size(); i++) {
328 if (array[i] == pe_id) {
329 array.erase(array.begin() + i);
334 inline int popFromProcHeap(vector<int> & parr_above_avg, ProcArray *parr) {
335 int p_index = parr_above_avg.front();
336 std::pop_heap(parr_above_avg.begin(), parr_above_avg.end(),
337 ProcLoadGreater(parr));
338 parr_above_avg.pop_back();
343 inline void handleTransfer(int randomly_obj_id, ProcInfo& p, int possible_pe, vector<int>* parr_objs, ObjGraph *ogr, ProcArray* parr) {
344 ogr->vertices[randomly_obj_id].setNewPe(possible_pe);
345 parr_objs[possible_pe].push_back(randomly_obj_id);
346 ProcInfo &possible_pe_procinfo = parr->procs[possible_pe];
348 p.totalLoad() -= ogr->vertices[randomly_obj_id].getVertexLoad();
349 possible_pe_procinfo.totalLoad() += ogr->vertices[randomly_obj_id].getVertexLoad();
350 eraseObjFromParrObjs(parr_objs[p.getProcId()], randomly_obj_id);
351 //CkPrintf("After transfered %d from %d : Load %E to %d : Load %E\n", randomly_obj_id, p.getProcId(), p.getTotalLoad(),
352 // possible_pe, possible_pe_procinfo.getTotalLoad());
355 inline void updateLoadInfo(int p_index, int possible_pe, double upper_threshold, double lower_threshold,
356 vector<int>& parr_above_avg, vector<int>& parr_below_avg,
357 vector<bool> &proc_load_info, ProcArray *parr) {
359 ProcInfo& p = parr->procs[p_index];
360 ProcInfo& possible_pe_procinfo = parr->procs[possible_pe];
362 // If the updated load is still greater than the average by the
363 // threshold value, then push it back to the max heap
364 if (p.getTotalLoad() > upper_threshold) {
365 parr_above_avg.push_back(p_index);
366 std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
367 ProcLoadGreater(parr));
368 //CkPrintf("\t Pushing pe : %d to max heap\n", p.getProcId());
369 } else if (p.getTotalLoad() < lower_threshold) {
370 parr_below_avg.push_back(p_index);
371 proc_load_info[p_index] = true;
372 //CkPrintf("\t Adding pe : %d to less loaded\n", p.getProcId());
375 // If the newly assigned processor's load is greater than the average
376 // by the threshold value, then push it into the max heap.
377 if (possible_pe_procinfo.getTotalLoad() > upper_threshold) {
378 // TODO: It should be the index in procarray :(
379 parr_above_avg.push_back(possible_pe);
380 std::push_heap(parr_above_avg.begin(), parr_above_avg.end(),
381 ProcLoadGreater(parr));
382 removeFromArray(possible_pe, parr_below_avg);
383 proc_load_info[possible_pe] = false;
384 //CkPrintf("\t Pusing pe : %d to max heap\n", possible_pe);
385 } else if (possible_pe_procinfo.getTotalLoad() < lower_threshold) {
387 removeFromArray(possible_pe, parr_below_avg);
388 proc_load_info[possible_pe] = false;
389 //CkPrintf("\t Removing from lower list pe : %d\n", possible_pe);
394 inline void getPossiblePes(vector<int>& possible_pes, int vert,
395 ObjGraph *ogr, ProcArray* parr) {
396 std::map<int, int> tmp_map_pid_index;
400 ObjPeCommInfo objpcomm;
401 // CkPrintf("%d sends msgs to %d and recv msgs from %d\n", vert,
402 // ogr->vertices[vert].sendToList.size(),
403 // ogr->vertices[vert].recvFromList.size());
405 for (i = 0; i < ogr->vertices[vert].sendToList.size(); i++) {
406 nbrid = ogr->vertices[vert].sendToList[i].getNeighborId();
407 j = ogr->vertices[nbrid].getNewPe(); // Fix me!! New PE
408 // TODO: Should it index with vertexId?
409 if (tmp_map_pid_index.count(j) == 0) {
410 tmp_map_pid_index[j] = counter;
411 PeCommInfo pecomminf(j);
412 // TODO: Shouldn't it use vertexId instead of vert?
413 objpcomm.pcomm.push_back(pecomminf);
416 index = tmp_map_pid_index[j];
418 objpcomm.pcomm[index].num_msg +=
419 ogr->vertices[vert].sendToList[i].getNumMsgs();
420 objpcomm.pcomm[index].num_bytes +=
421 ogr->vertices[vert].sendToList[i].getNumBytes();
424 for (i = 0; i < ogr->vertices[vert].recvFromList.size(); i++) {
425 nbrid = ogr->vertices[vert].recvFromList[i].getNeighborId();
426 j = ogr->vertices[nbrid].getNewPe();
428 if (tmp_map_pid_index.count(j) == 0) {
429 tmp_map_pid_index[j] = counter;
430 PeCommInfo pecomminf(j);
431 // TODO: Shouldn't it use vertexId instead of vert?
432 objpcomm.pcomm.push_back(pecomminf);
435 index = tmp_map_pid_index[j];
437 objpcomm.pcomm[index].num_msg +=
438 ogr->vertices[vert].sendToList[i].getNumMsgs();
439 objpcomm.pcomm[index].num_bytes +=
440 ogr->vertices[vert].sendToList[i].getNumBytes();
443 // Sort the pe communication vector for this chare
444 std::sort(objpcomm.pcomm.begin(), objpcomm.pcomm.end(),
451 //CkPrintf("%d talks to %d pes and possible pes are :\n", vert,
452 // objpcomm.pcomm.size());
453 for (i = 0; i < objpcomm.pcomm.size(); i++) {
454 pe_id = objpcomm.pcomm[i].pe_id;
455 node_id = CkNodeOf(pe_id);
456 node_size = CkNodeSize(node_id);
457 node_first = CkNodeFirst(node_id);
458 // CkPrintf("smp details pe_id %d, node_id %d, node_size %d, node_first %d\n",
459 // pe_id, node_id, node_size, node_first);
460 for (j = 0; j < node_size; j++) {
461 possible_pes.push_back(node_first + j);
462 //CkPrintf("\t %d:%d (comm: %d)\n",node_id, node_first+j, objpcomm.pcomm[i].num_bytes);
468 #include "CommAwareRefineLB.def.h"