Cleanup #48: Move SdagConstruct::numberNodes into each subclass
[charm.git] / src / ck-ldb / Refiner.C
blob178a58fe67b7bf0e41987ae951756b36afef9bf8
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
6 /** This code is derived from RefineLB.C, and RefineLB.C should
7  be rewritten to use this, so there is no code duplication
8 */
10 #include "Refiner.h"
12 int* Refiner::AllocProcs(int count, BaseLB::LDStats* stats)
14   return new int[stats->n_objs];
17 void Refiner::FreeProcs(int* bufs)
19   delete [] bufs;
22 void Refiner::create(int count, BaseLB::LDStats* stats, int* procs)
24   int i;
26   // now numComputes is all the computes: both migratable and nonmigratable.
27   // afterwards, nonmigratable computes will be taken off
29   numAvail = 0;
30   for(i=0; i < P; i++) {
31     processors[i].Id = i;
32     processors[i].backgroundLoad = stats->procs[i].bg_walltime;
33     processors[i].load = processors[i].backgroundLoad;
34     processors[i].computeLoad = 0;
35     processors[i].computeSet = new Set();
36     processors[i].pe_speed = stats->procs[i].pe_speed;
37 //    processors[i].utilization = stats->procs[i].utilization;
38     processors[i].available = stats->procs[i].available;
39     if (processors[i].available == true) numAvail++;
40   }
42   for (i=0; i<stats->n_objs; i++)
43   {
44         LDObjData &odata = stats->objData[i];
45         computes[i].Id = i;
46         computes[i].id = odata.objID();
47 //        computes[i].handle = odata.handle;
48         computes[i].load = odata.wallTime;     // was cpuTime
49         computes[i].processor = -1;
50         computes[i].oldProcessor = procs[i];
51         computes[i].migratable = odata.migratable;
52         if (computes[i].oldProcessor >= P)  {
53           if (stats->complete_flag)
54             CmiAbort("LB Panic: the old processor in RefineLB cannot be found, is this in a simulation mode?");
55           else {
56               // an object from outside domain, randomize its location
57             computes[i].oldProcessor = CrnRand()%P;
58           }
59         }
60   }
61 //  for (i=0; i < numComputes; i++)
62 //      processors[computes[i].oldProcessor].computeLoad += computes[i].load;
65 void Refiner::assign(computeInfo *c, int processor)
67   assign(c, &(processors[processor]));
70 void Refiner::assign(computeInfo *c, processorInfo *p)
72    c->processor = p->Id;
73    p->computeSet->insert((InfoRecord *) c);
74    p->computeLoad += c->load;
75    p->load = p->computeLoad + p->backgroundLoad;
78 void  Refiner::deAssign(computeInfo *c, processorInfo *p)
80    c->processor = -1;
81    p->computeSet->remove(c);
82    p->computeLoad -= c->load;
83    p->load = p->computeLoad + p->backgroundLoad;
86 double Refiner::computeAverageLoad() {
87   computeAverage();
88   return averageLoad;
91 void Refiner::computeAverage()
93   int i;
94   double total = 0.;
95   for (i=0; i<numComputes; i++) total += computes[i].load;
97   for (i=0; i<P; i++)
98     if (processors[i].available == true) 
99         total += processors[i].backgroundLoad;
101   averageLoad = total/numAvail;
104 double Refiner::computeMax()
106   int i;
107   double max = -1.0;
108   for (i=0; i<P; i++) {
109     if (processors[i].available == true && processors[i].load > max)
110       max = processors[i].load;
111   }
112   return max;
115 int Refiner::isHeavy(processorInfo *p)
117   if (p->available == true) 
118      return p->load > overLoad*averageLoad;
119   else {
120      return p->computeSet->numElements() != 0;
121   }
124 int Refiner::isLight(processorInfo *p)
126   if (p->available == true) 
127      return p->load < averageLoad;
128   else 
129      return 0;
132 // move the compute jobs out from unavailable PE
133 void Refiner::removeComputes()
135   int first;
136   Iterator nextCompute;
138   if (numAvail < P) {
139     if (numAvail == 0) CmiAbort("No processor available!");
140     for (first=0; first<P; first++)
141       if (processors[first].available == true) break;
142     for (int i=0; i<P; i++) {
143       if (processors[i].available == false) {
144           computeInfo *c = (computeInfo *)
145                    processors[i].computeSet->iterator((Iterator *)&nextCompute);
146           while (c) {
147             deAssign(c, &processors[i]);
148             assign(c, &processors[first]);
149             nextCompute.id++;
150             c = (computeInfo *)
151                    processors[i].computeSet->next((Iterator *)&nextCompute);
152           }
153       }
154     }
155   }
158 int Refiner::refine()
160   int i;
161   int finish = 1;
162   maxHeap *heavyProcessors = new maxHeap(P);
164   Set *lightProcessors = new Set();
165   for (i=0; i<P; i++) {
166     if (isHeavy(&processors[i])) {  
167       //      CkPrintf("Processor %d is HEAVY: load:%f averageLoad:%f!\n",
168       //               i, processors[i].load, averageLoad);
169       heavyProcessors->insert((InfoRecord *) &(processors[i]));
170     } else if (isLight(&processors[i])) {
171       //      CkPrintf("Processor %d is LIGHT: load:%f averageLoad:%f!\n",
172       //               i, processors[i].load, averageLoad);
173       lightProcessors->insert((InfoRecord *) &(processors[i]));
174     }
175   }
176   int done = 0;
178   while (!done) {
179     double bestSize;
180     computeInfo *bestCompute;
181     processorInfo *bestP;
182     
183     processorInfo *donor = (processorInfo *) heavyProcessors->deleteMax();
184     if (!donor) break;
186     //find the best pair (c,receiver)
187     Iterator nextProcessor;
188     processorInfo *p = (processorInfo *) 
189       lightProcessors->iterator((Iterator *) &nextProcessor);
190     bestSize = 0;
191     bestP = 0;
192     bestCompute = 0;
194     while (p) {
195       Iterator nextCompute;
196       nextCompute.id = 0;
197       computeInfo *c = (computeInfo *) 
198         donor->computeSet->iterator((Iterator *)&nextCompute);
199       // iout << iINFO << "Considering Procsessor : " 
200       //      << p->Id << "\n" << endi;
201       while (c) {
202         if (!c->migratable) {
203           nextCompute.id++;
204           c = (computeInfo *) 
205             donor->computeSet->next((Iterator *)&nextCompute);
206           continue;
207         }
208         //CkPrintf("c->load: %f p->load:%f overLoad*averageLoad:%f \n",
209         //c->load, p->load, overLoad*averageLoad);
210         if ( c->load + p->load < overLoad*averageLoad) {
211           // iout << iINFO << "Considering Compute : " 
212           //      << c->Id << " with load " 
213           //      << c->load << "\n" << endi;
214           if(c->load > bestSize) {
215             bestSize = c->load;
216             bestCompute = c;
217             bestP = p;
218           }
219         }
220         nextCompute.id++;
221         c = (computeInfo *) 
222           donor->computeSet->next((Iterator *)&nextCompute);
223       }
224       p = (processorInfo *) 
225         lightProcessors->next((Iterator *) &nextProcessor);
226     }
228     if (bestCompute) {
229       //      CkPrintf("Assign: [%d] with load: %f from %d to %d \n",
230       //               bestCompute->id.id[0], bestCompute->load, 
231       //               donor->Id, bestP->Id);
232       deAssign(bestCompute, donor);      
233       assign(bestCompute, bestP);
234     } else {
235       finish = 0;
236       break;
237     }
239     if (bestP->load > averageLoad)
240       lightProcessors->remove(bestP);
241     
242     if (isHeavy(donor))
243       heavyProcessors->insert((InfoRecord *) donor);
244     else if (isLight(donor))
245       lightProcessors->insert((InfoRecord *) donor);
246   }  
248   delete heavyProcessors;
249   delete lightProcessors;
251   return finish;
254 int Refiner::multirefine(bool reset)
256   computeAverage();
257   double avg = averageLoad;
258   double max = computeMax();
260   const double overloadStep = 0.01;
261   const double overloadStart = overLoad;
262   double dCurOverload = max / avg;
263                                                                                 
264   int minOverload = 0;
265   int maxOverload = (int)((dCurOverload - overloadStart)/overloadStep + 1);
266   double dMinOverload = minOverload * overloadStep + overloadStart;
267   double dMaxOverload = maxOverload * overloadStep + overloadStart;
268   int curOverload;
269   int refineDone = 0;
270   if (_lb_args.debug()>=1)
271     CmiPrintf("dMinOverload: %f dMaxOverload: %f\n", dMinOverload, dMaxOverload);
272                                                                                 
273   overLoad = dMinOverload;
274   if (refine())
275     refineDone = 1;
276   else {
277     overLoad = dMaxOverload;
278     if (!refine()) {
279       CmiPrintf("ERROR: Could not refine at max overload\n");
280       refineDone = 1;
281     }
282   }
283                                                                                 
284   // Scan up, until we find a refine that works
285   while (!refineDone) {
286     if (maxOverload - minOverload <= 1)
287       refineDone = 1;
288     else {
289       curOverload = (maxOverload + minOverload ) / 2;
290                                                                                 
291       overLoad = curOverload * overloadStep + overloadStart;
292       if (_lb_args.debug()>=1)
293       CmiPrintf("Testing curOverload %d = %f [min,max]= %d, %d\n", curOverload, overLoad, minOverload, maxOverload);
295       // Reset the processors datastructure to the original
296       if (reset) {
297         int i;
298         for (i = 0; i < P; i++) {
299           processors[i].computeLoad = 0;
300           delete processors[i].computeSet;
301           processors[i].computeSet = new Set();
302         }
303         for (i = 0; i < numComputes; i++)
304           assign((computeInfo *) &(computes[i]),
305               (processorInfo *) &(processors[computes[i].oldProcessor]));
306       }
308       if (refine())
309         maxOverload = curOverload;
310       else
311         minOverload = curOverload;
312     }
313   }
314   return 1;
317 void Refiner::Refine(int count, BaseLB::LDStats* stats, 
318                      int* cur_p, int* new_p)
320   //  CkPrintf("[%d] Refiner strategy\n",CkMyPe());
322   P = count;
323   numComputes = stats->n_objs;
324   computes = new computeInfo[numComputes];
325   processors = new processorInfo[count];
327   create(count, stats, cur_p);
329   int i;
330   for (i=0; i<numComputes; i++)
331     assign((computeInfo *) &(computes[i]),
332            (processorInfo *) &(processors[computes[i].oldProcessor]));
334   removeComputes();
336   computeAverage();
338   if (_lb_args.debug()>2)  {
339     CkPrintf("Old PE load (bg load): ");
340     for (i=0; i<count; i++) CkPrintf("%d:%f(%f) ", i, processors[i].load, processors[i].backgroundLoad);
341     CkPrintf("\n");
342   }
344   // Perform multi refine but reset it to the original state before changing the
345   // refinement load balancing threshold.
346   multirefine(true);
348   int nmoves = 0;
349   for (int pe=0; pe < P; pe++) {
350     Iterator nextCompute;
351     nextCompute.id = 0;
352     computeInfo *c = (computeInfo *)
353       processors[pe].computeSet->iterator((Iterator *)&nextCompute);
354     while(c) {
355       new_p[c->Id] = c->processor;
356       if (new_p[c->Id] != cur_p[c->Id]) nmoves++;
357 //      if (c->oldProcessor != c->processor)
358 //      CkPrintf("Refiner::Refine: from %d to %d\n", c->oldProcessor, c->processor);
359       nextCompute.id++;
360       c = (computeInfo *) processors[pe].computeSet->
361                      next((Iterator *)&nextCompute);
362     }
363   }
364   if (_lb_args.debug()>2)  {
365     CkPrintf("New PE load: ");
366     for (i=0; i<count; i++) CkPrintf("%f ", processors[i].load);
367     CkPrintf("\n");
368   }
369   if (_lb_args.debug()>1) 
370     CkPrintf("Refiner: moving %d obejcts. \n", nmoves);
371   delete [] computes;
372   delete [] processors;
376 /*@}*/