8 * support processor avail bitvector
9 * support nonmigratable attrib
10 * support background load
12 rewritten by Gengbin Zheng to use the new load balancer database and hash table;
13 modified to recognize the nonmigratable attrib of an object
14 by Gengbin Zheng, 7/28/2003
19 #include "GreedyCommLB.h"
22 CreateLBFunc_Def(GreedyCommLB, "Greedy algorithm which takes communication graph into account")
24 void GreedyCommLB::init()
26 lbname = (char*)"GreedyCommLB";
27 alpha = _lb_args.alpha();
28 beta = _lb_args.beta();
32 GreedyCommLB::GreedyCommLB(const CkLBOptions &opt): CBase_GreedyCommLB(opt)
36 CkPrintf("[%d] GreedyCommLB created\n",CkMyPe());
39 GreedyCommLB::GreedyCommLB(CkMigrateMessage *m):CBase_GreedyCommLB(m) {
43 bool GreedyCommLB::QueryBalanceNow(int _step)
45 // CkPrintf("[%d] Balancing on step %d\n",CkMyPe(),_step);
49 // assign id to processor pe, load including both computation and communication
50 void GreedyCommLB::alloc(int pe,int id,double load){
51 // CkPrintf("alloc %d ,%d\n",pe,id);
52 assigned_array[id] = 1;
53 processors[pe].load += load;
56 // communication cost when obj id is put on proc pe
57 double GreedyCommLB::compute_com(LDStats* stats, int id, int pe){
58 int j,com_data=0,com_msg=0;
62 ptr = object_graph[id].next;
64 for(j=0;(j<2*nobj)&&(ptr != NULL);j++,ptr=ptr->next){
65 int destObj = ptr->id;
66 if(assigned_array[destObj] == 0) // this obj has not been assigned
68 if (stats->to_proc[destObj] == pe) // this obj is assigned to same pe
70 com_data += ptr->data;
74 total_time = alpha*com_msg + beta*com_data;
78 // update all communicating processors after assigning obj id on proc pe
79 void GreedyCommLB::update(LDStats* stats, int id, int pe){
80 graph * ptr = object_graph[id].next;
82 for(int j=0;(j<2*nobj)&&(ptr != NULL);j++,ptr=ptr->next){
83 int destObj = ptr->id;
84 if(assigned_array[destObj] == 0) // this obj has not been assigned
86 int destPe = stats->to_proc[destObj];
87 if (destPe == pe) // this obj is assigned to same pe
89 int com_data = ptr->data;
90 int com_msg = ptr->nmsg;
91 double com_time = alpha*com_msg + beta*com_data;
92 processors[destPe].load += com_time;
96 // add comm between obj x and y
98 void GreedyCommLB::add_graph(int x, int y, int data, int nmsg){
101 ptr = &(object_graph[x]);
108 temp->next = ptr->next;
112 ptr = &(object_graph[y]);
119 temp->next = ptr->next;
124 static void init_data(int *assign, graph * object_graph, int l, int b){
125 for(int obj=0;obj < b;obj++)
128 for(int j=0;j<b;j++){
129 object_graph[j].data = 0;
130 object_graph[j].nmsg = 0;
131 object_graph[j].next = NULL;
135 void GreedyCommLB::work(LDStats* stats)
141 if (_lb_args.debug()) CkPrintf("In GreedyCommLB strategy\n",CkMyPe());
142 npe = stats->nprocs();
143 nobj = stats->n_objs;
145 // nmigobj is calculated as the number of migratable objects
146 // ObjectHeap maxh is of size nmigobj
147 nmigobj = stats->n_migrateobjs;
149 stats->makeCommHash();
151 assigned_array = new int[nobj];
153 object_graph = new graph[nobj];
155 init_data(assigned_array,object_graph,npe,nobj);
157 #define MAXDOUBLE 1e10;
160 processors = new processorInfo[npe];
161 for (int p=0; p<npe; p++) {
162 processors[p].Id = p;
163 processors[p].backgroundLoad = stats->procs[p].bg_walltime;
164 processors[p].computeLoad = 0;
165 processors[p].pe_speed = stats->procs[p].pe_speed;
166 if (!stats->procs[p].available) {
167 processors[p].load = MAXDOUBLE;
170 processors[p].load = 0;
171 if (!_lb_args.ignoreBgLoad())
172 processors[p].load = processors[p].backgroundLoad;
177 // assign communication graph
178 for(com =0; com< stats->n_comm;com++) {
179 int xcoord=0,ycoord=0;
180 LDCommData &commData = stats->commData[com];
181 if((!commData.from_proc())&&(commData.recv_type()==LD_OBJ_MSG))
183 xcoord = stats->getHash(commData.sender);
184 ycoord = stats->getHash(commData.receiver.get_destObj());
185 if((xcoord == -1)||(ycoord == -1))
186 if (_lb_args.ignoreBgLoad() || stats->complete_flag==0) continue;
187 else CkAbort("Error in search\n");
188 add_graph(xcoord,ycoord,commData.bytes, commData.messages);
190 else if (commData.recv_type()==LD_OBJLIST_MSG) {
192 LDObjKey *objs = commData.receiver.get_destObjs(nobjs);
193 xcoord = stats->getHash(commData.sender);
194 for (int i=0; i<nobjs; i++) {
195 ycoord = stats->getHash(objs[i]);
196 if((xcoord == -1)||(ycoord == -1))
197 if (_lb_args.migObjOnly()) continue;
198 else CkAbort("Error in search\n");
199 //printf("Multicast: %d => %d %d %d\n", xcoord, ycoord, commData.bytes, commData.messages);
200 add_graph(xcoord,ycoord,commData.bytes, commData.messages);
205 // only build heap with migratable objects,
206 // mapping nonmigratable objects to the same processors
207 ObjectHeap maxh(nmigobj+1);
208 for(obj=0; obj < stats->n_objs; obj++) {
209 LDObjData &objData = stats->objData[obj];
210 int onpe = stats->from_proc[obj];
211 if (!objData.migratable) {
212 if (!stats->procs[onpe].available) {
213 CmiAbort("Load balancer is not be able to move a nonmigratable object out of an unavailable processor.\n");
215 alloc(onpe, obj, objData.wallTime);
216 update(stats, obj, onpe); // update communication cost on other pes
219 x = new ObjectRecord;
222 x->val = objData.wallTime;
228 minHeap *lightProcessors = new minHeap(npe);
229 for (i=0; i<npe; i++)
230 if (stats->procs[i].available)
231 lightProcessors->insert((InfoRecord *) &(processors[i]));
233 int id,maxid,minpe=0;
234 double temp,total_time,min_temp;
235 // for(pe=0;pe < count;pe++)
236 // CkPrintf("avail for %d = %d\n",pe,stats[pe].available);
238 double *pe_comm = new double[npe];
239 for (int i=0; i<npe; i++) pe_comm[i] = 0.0;
241 for(id = 0;id<nmigobj;id++){
242 x = maxh.deleteMax();
246 processorInfo *donor = (processorInfo *) lightProcessors->deleteMin();
248 int first_avail_pe = donor->Id;
249 temp = compute_com(stats, maxid, first_avail_pe);
251 //total_time = temp + alloc_array[first_avail_pe][nobj];
252 total_time = temp + donor->load;
253 minpe = first_avail_pe;
255 // search all procs for best
256 // optimization: only search processors that it communicates
257 // and the minimum of all others
259 graph * ptr = object_graph[maxid].next;
261 // find out all processors that this obj communicates
262 double commload = 0.0; // total comm load
263 for(int com=0;(com<2*nobj)&&(ptr != NULL);com++,ptr=ptr->next){
264 int destObj = ptr->id;
265 if(assigned_array[destObj] == 0) // this obj has not been assigned
267 int destPe = stats->to_proc[destObj];
268 if(stats->procs[destPe].available == 0) continue;
270 double cload = alpha*ptr->nmsg + beta*ptr->data;
271 pe_comm[destPe] += cload;
275 for (int pp=0; pp<commPes.size(); pp++)
276 if (destPe == commPes[pp]) { exist=1; break; } // duplicated
277 if (!exist) commPes.push_back(destPe);
281 for(k = 0; k < commPes.size(); k++){
283 processorInfo *commpe = (processorInfo *) &processors[pe];
285 temp = commload - pe_comm[pe];
287 //CkPrintf("check id = %d, processor = %d,com = %lf, pro = %lf, comp=%lf\n", maxid,pe,temp,alloc_array[pe][nobj],total_time);
288 if(total_time > (temp + commpe->load)){
290 total_time = temp + commpe->load;
294 /* CkPrintf("check id = %d, processor = %d, obj = %lf com = %lf, pro = %lf, comp=%lf\n", maxid,minpe,x->load,min_temp,alloc_array[minpe][nobj],total_time); */
296 // CkPrintf("before 2nd alloc\n");
297 stats->assign(maxid, minpe);
299 alloc(minpe, maxid, x->val + min_temp);
301 // now that maxid assigned to minpe, update other pes load
302 update(stats, maxid, minpe);
305 lightProcessors->insert(donor);
306 for(k = 0; k < commPes.size(); k++) {
308 processorInfo *commpe = (processorInfo *) &processors[pe];
309 lightProcessors->update(commpe);
310 pe_comm[pe] = 0.0; // clear
319 delete [] processors;
320 delete [] assigned_array;
322 delete lightProcessors;
324 for(int oindex= 0; oindex < nobj; oindex++){
325 graph * ptr = &object_graph[oindex];
334 delete [] object_graph;
338 #include "GreedyCommLB.def.h"