Clean up C linkage specific to the C++ migration
[charm.git] / src / ck-ldb / TopoCentLB.C
blob229425d5769cfe4aec24517eaf9ed2dbcccf3ec5
1 /**************************************************************************
2 ** Amit Sharma (asharma6@uiuc.edu)
3 ** November 23, 2004
4 **
5 ** This is a topology conscious load balancer.
6 ** It migrates objects to new processors based on the topology in which the processors are connected.
7 ****************************************************************************/
9 #include <math.h>
10 #include <stdlib.h>
11 #include "TopoCentLB.decl.h"
12 #include "TopoCentLB.h"
14 #define alpha PER_MESSAGE_SEND_OVERHEAD_DEFAULT  /*Startup time per message, seconds*/
15 #define beta PER_BYTE_SEND_OVERHEAD_DEFAULT     /*Long-message time per byte, seconds*/
16 #define DEG_THRES 0.50
18 //#define MAX_EDGE
19 //#define RAND_COMM
20 #define make_mapping 0
22 extern int quietModeRequested;
24 CreateLBFunc_Def(TopoCentLB,"Balance objects based on the network topology")
27 /*static void lbinit (void)
29   LBRegisterBalancer ("TopoCentLB",
30                       CreateTopoCentLB,
31                       AllocateTopoCentLB,
32                       "Balance objects based on the network topology");
33 }*/
36 TopoCentLB::TopoCentLB(const CkLBOptions &opt) : CBase_TopoCentLB (opt)
38   lbname = "TopoCentLB";
39   if (CkMyPe () == 0 && !quietModeRequested) {
40     CkPrintf("CharmLB> TopoCentLB created.\n");
41   }
45 bool TopoCentLB::QueryBalanceNow (int _step)
47   return true;
50 TopoCentLB::~TopoCentLB(){
51         if(topo) delete topo;
54 /*This routine partitions the task graph minimizing the communication and balancing the object load on all partitions*/
55 /*It uses METIS library to accomplish that*/
56 void TopoCentLB::computePartitions(CentralLB::LDStats *stats,int count,int *newmap)
58         
59   int numobjs = stats->n_objs;
60         int i, j, m;
62   // allocate space for the computing data
63   double *objtime = new double[numobjs];
64   int *objwt = new int[numobjs];
65   int *origmap = new int[numobjs];
66   LDObjHandle *handles = new LDObjHandle[numobjs];
67   
68         for(i=0;i<numobjs;i++) {
69     objtime[i] = 0.0;
70     objwt[i] = 0;
71     origmap[i] = 0;
72   }
74   //Prepare compute loads for METIS library
75   for (i=0; i<stats->n_objs; i++) {
76     LDObjData &odata = stats->objData[i];
77     if (!odata.migratable) 
78       CmiAbort("MetisLB doesnot dupport nonmigratable object.\n");
79     int frompe = stats->from_proc[i];
80     origmap[i] = frompe;
81     objtime[i] = odata.wallTime*stats->procs[frompe].pe_speed;
82     handles[i] = odata.handle;
83   }
85   // to convert the weights on vertices to integers
86   double max_objtime = objtime[0];
87   for(i=0; i<numobjs; i++) {
88     if(max_objtime < objtime[i])
89       max_objtime = objtime[i];
90   }
91         int maxobj=0;
92         int totalwt=0;
93   double ratio = 1000.0/max_objtime;
94   for(i=0; i<numobjs; i++) {
95       objwt[i] = (int)(objtime[i]*ratio);
96                         if(maxobj<objwt[i])
97                                 maxobj=objwt[i];
98                         totalwt+=objwt[i];
99   }
100         
101   int **comm = new int*[numobjs];
102   for (i=0; i<numobjs; i++) {
103     comm[i] = new int[numobjs];
104     for (j=0; j<numobjs; j++)  {
105       comm[i][j] = 0;
106     }
107   }
109   //Prepare communication for METIS library
110   const int csz = stats->n_comm;
111   for(i=0; i<csz; i++) {
112       LDCommData &cdata = stats->commData[i];
113       //if(cdata.from_proc() || cdata.receiver.get_type() != LD_OBJ_MSG)
114         //continue;
115                         if(!cdata.from_proc() && cdata.receiver.get_type() == LD_OBJ_MSG){
116         int senderID = stats->getHash(cdata.sender);
117         int recverID = stats->getHash(cdata.receiver.get_destObj());
118         CmiAssert(senderID < numobjs);
119         CmiAssert(recverID < numobjs);
120                                 comm[senderID][recverID] += cdata.messages;
121         comm[recverID][senderID] += cdata.messages;
122                                 //Use bytes or messages -- do i include messages for objlist too...??
123                         }
124                         else if (cdata.receiver.get_type() == LD_OBJLIST_MSG) {
125                                 //CkPrintf("in objlist..\n");
126         int nobjs;
127         LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
128         int senderID = stats->getHash(cdata.sender);
129         for (j=0; j<nobjs; j++) {
130            int recverID = stats->getHash(objs[j]);
131            if((senderID == -1)||(recverID == -1)) {
132               if (_lb_args.migObjOnly()) continue;
133               else CkAbort("Error in search\n");
134            }
135            comm[senderID][recverID] += cdata.messages;
136            comm[recverID][senderID] += cdata.messages;
137         }
138                         }
139                 }
141 // ignore messages sent from an object to itself
142   for (i=0; i<numobjs; i++)
143     comm[i][i] = 0;
145   // construct the graph in CSR format
146   int *xadj = new int[numobjs+1];
147   int numedges = 0;
148   for(i=0;i<numobjs;i++) {
149     for(j=0;j<numobjs;j++) {
150       if(comm[i][j] != 0)
151         numedges++;
152     }
153   }
154   int *adjncy = new int[numedges];
155   int *edgewt = new int[numedges];
156         int factor = 10;
157   xadj[0] = 0;
158   int count4all = 0;
159   for (i=0; i<numobjs; i++) {
160     for (j=0; j<numobjs; j++) { 
161       if (comm[i][j] != 0) { 
162         adjncy[count4all] = j;
163         edgewt[count4all++] = comm[i][j]/factor;
164       }
165     }
166     xadj[i+1] = count4all;
167   }
169   //Call METIS routine
170   int wgtflag = 3; // Weights both on vertices and edges
171   int numflag = 0; // C Style numbering
172   int options[5];
173   int edgecut;
174   options[0] = 0;
176   if (count < 1) {
177     CkPrintf("error: Number of Pe less than 1!");
178   }
179   else if (count == 1) {
180         for(m=0;m<numobjs;m++) 
181                         newmap[i] = origmap[i];
182   }
183   else {
184                 /*
185         if (count > 8)
186                         METIS_PartGraphKway(&numobjs, xadj, adjncy, objwt, edgewt, 
187                             &wgtflag, &numflag, &count, options, 
188                             &edgecut, newmap);
189           else
190                         METIS_PartGraphRecursive(&numobjs, xadj, adjncy, objwt, edgewt, 
191                                  &wgtflag, &numflag, &count, options, 
192                                  &edgecut, newmap);
193                 */
194                 METIS_PartGraphRecursive(&numobjs, xadj, adjncy, objwt, edgewt,
195                                  &wgtflag, &numflag, &count, options,
196                                  &edgecut, newmap);
197   }
198          
200   //Debugging code: Checking load on each partition
201   if(_lb_args.debug() >=2){
202           int total=0;
203           int *chkwt = new int[count];
204           for(i=0;i<count;i++)
205                   chkwt[i]=0;
206           for(i=0;i<numobjs;i++){
207                 chkwt[newmap[i]] += objwt[i];
208                   total += objwt[i];
209           }
210           for(i=0;i<count;i++)
211                   CkPrintf("%d -- %d\n",i,chkwt[i]);
212           CkPrintf("Totalwt of all partitions after call to METIS:%d, Avg is %d\n",total,total/count);
213   }
215   //Clean up all the variables allocated in this routine
216   for(i=0;i<numobjs;i++)
217     delete[] comm[i];
218   delete[] comm;
219   delete[] objtime;
220   delete[] xadj;
221   delete[] adjncy;
222   delete[] objwt;
223   delete[] edgewt;
224         delete[] handles;
225   delete[] origmap;
229 int TopoCentLB::findMaxObjs(int *map,int totalobjs,int count)
231         int *max_num = new int[count];
232         int i;
233         int maxobjs=0;
234         
235         for(i=0;i<count;i++)
236                 max_num[i]=0;
237                 
238         for(i=0;i<totalobjs;i++)
239                 max_num[map[i]]++;
240         
241         for(i=0;i<count;i++)
242                 if(max_num[i]>maxobjs)
243                         maxobjs = max_num[i];
244         
245         delete[] max_num;
247         return maxobjs;
250 void TopoCentLB::Heapify(HeapNode *heap, int node, int heapSize)
252   int left = 2*node+1;
253   int right = 2*node+2;
254   int xchange;
255         
256   if (left < heapSize && (heap[left].key > heap[node].key))
257     xchange = left;
258   else 
259                 xchange = node;
260   
261   if (right < heapSize && (heap[right].key > heap[xchange].key))
262     xchange = right;
264   if (xchange != node) {
265     HeapNode tmp;
266     tmp = heap[node];
267     heap[node] = heap[xchange];
268     heap[xchange] = tmp;
269                 heapMapping[heap[node].node]=node;
270                 heapMapping[heap[xchange].node]=xchange;
271     Heapify(heap,xchange,heapSize);
272   }    
276 TopoCentLB::HeapNode TopoCentLB::extractMax(HeapNode *heap,int *heapSize){
278         if(*heapSize < 1)
279                 CmiAbort("Empty Heap passed to extractMin!\n");
281         HeapNode max = heap[0];
282         heap[0] = heap[*heapSize-1];
283         heapMapping[heap[0].node]=0;
284         *heapSize = *heapSize - 1;
285         Heapify(heap,0,*heapSize);
286         return max;
289 void TopoCentLB::BuildHeap(HeapNode *heap,int heapSize){
290         for(int i=heapSize/2; i >= 0; i--)
291                 Heapify(heap,i,heapSize);
294 void TopoCentLB :: increaseKey(HeapNode *heap,int i,double wt){
295         if(wt != -1.00){
296                 #ifdef MAX_EDGE
297                         if(wt>heap[i].key)
298                                 heap[i].key = wt;
299                 #else
300                         heap[i].key += wt;
301                 #endif
302         }
303         int parent = (i-1)/2;
304         
305         if(heap[parent].key >= heap[i].key)
306                 return;
307         else {
308                 HeapNode tmp = heap[parent];
309                 heap[parent] = heap[i];
310                 heap[i] = tmp;
311                 heapMapping[heap[parent].node]=parent;
312                 heapMapping[heap[i].node]=i;
313                 increaseKey(heap,parent,-1.00);
314         }
317 /*This routine implements the algorithm used to produce the partition-processor mapping*/
318 /*The algorithm uses an idea similar to the standard MST algorithm*/
319 void TopoCentLB :: calculateMST(PartGraph *partgraph,LBTopology *topo,int *proc_mapping,int max_comm_part) {
321   int *inHeap;
322   double *keys;
323   int count = partgraph->n_nodes;
324   int i=0,j=0;
326   //Arrays needed for keeping information
327   inHeap = new int[partgraph->n_nodes];
328   keys = new double[partgraph->n_nodes];
330   int *assigned_procs = new int[count];
332   hopCount = new double*[count];
333   for(i=0;i<count;i++){
334     proc_mapping[i]=-1;
335     assigned_procs[i]=0;
336     hopCount[i] = new double[count];
337     for(j=0;j<count;j++)
338       hopCount[i][j] = 0;
339   }
341   //Call a topology routine to fill up hopCount
342   topo->get_pairwise_hop_count(hopCount);
343         
344   int max_neighbors = topo->max_neighbors();
345         
346   HeapNode *heap = new HeapNode[partgraph->n_nodes];
347   heapMapping = new int[partgraph->n_nodes];
348         
349   int heapSize = 0;
351   for(i=0;i<partgraph->n_nodes;i++){
352     heap[i].key = 0.00;
353     heap[i].node = i;
354     keys[i] = 0.00;
355     inHeap[i] = 1;
356     heapMapping[i]=i;
357   }
359   //Assign the max comm partition first
360   heap[max_comm_part].key = 1.00;
361         
362   heapSize = partgraph->n_nodes;
363   BuildHeap(heap,heapSize);
365   int k=0,comm_cnt=0,m=0;
366   int *commParts = new int[partgraph->n_nodes];
367         
368   //srand(count);
370   while(heapSize > 0){
372     /****Phase1: Extracting appropriate partition from heap****/
374     HeapNode max = extractMax(heap,&heapSize);
375     inHeap[max.node] = 0;
377     for(i=0;i<partgraph->n_nodes;i++){
378       commParts[i]=-1;
379       PartGraph::Edge wt = partgraph->edges[max.node][i];
380       if(wt == 0)
381         continue;
382       if(inHeap[i]){
383 #ifdef MAX_EDGE
384         if(wt>keys[i])
385           keys[i]=wt;
386 #else
387         keys[i] += wt;
388 #endif
389         /*This part has been COMMENTED out for optimizing the code: we handle the updation using heapMapping*/
390         /*array instead of searching for node in the heap everytime*/
392         //Update in the heap too
393         //First, find where this node is..in the heap
394         /*for(j=0;j<heapSize;j++)
395           if(heap[j].node == i)
396           break;
397           if(j==heapSize)
398           CmiAbort("Some error in heap...\n");*/
399         increaseKey(heap,heapMapping[i],wt);
400       }
401     }
402                  
403     /*Phase2: ASSIGNING partition to processor*/
404                 
405     //Special case
406     if(heapSize == partgraph->n_nodes-1){ //Assign max comm partition to 0th proc in the topology
407       proc_mapping[max.node]=0;
408       assigned_procs[0]=1;
409       continue;
410     }
411                 
412     m=0;
414     comm_cnt=0;
416     double min_cost=-1;
417     int min_cost_index=-1;
418     double cost=0;
419     int p=0;
420     //int q=0;
422     for(k=0;k<partgraph->n_nodes;k++){
423       if(!inHeap[k] && partgraph->edges[k][max.node]){
424         commParts[comm_cnt]=k;
425         comm_cnt++;
426       }
427     }
429     //Optimized the loop by commenting out the get_hop_count code and getting all the hop counts initially
430     for(m=0;m<count;m++){
431       if(!assigned_procs[m]){
432         cost=0;
433         for(p=0;p<comm_cnt;p++){
434           //if(!hopCount[proc_mapping[commParts[p]]][m])
435           //hopCount[proc_mapping[commParts[p]]][m]=topo->get_hop_count(proc_mapping[commParts[p]],m);
436           cost += hopCount[proc_mapping[commParts[p]]][m]*partgraph->edges[commParts[p]][max.node];
437         }
438         if(min_cost==-1 || cost<min_cost){
439           min_cost=cost;
440           min_cost_index=m;
441         }
442       }
443     }
445     proc_mapping[max.node]=min_cost_index;
446     assigned_procs[min_cost_index]=1;
447   }
449   //clear up memory
450   delete[] inHeap;
451   delete[] keys;
452   delete[] assigned_procs;
453   delete[] heap;
454   delete[] commParts;
458 void TopoCentLB :: work(LDStats *stats)
460   int proc;
461   int i,j;
462   int n_pes = stats->nprocs();
463         
464   if (_lb_args.debug() >= 2) {
465     CkPrintf("In TopoCentLB Strategy...\n");
466   }
467   
468   // Make sure that there is at least one available processor.
469   for (proc = 0; proc < n_pes; proc++) {
470     if (stats->procs[proc].available) {
471       break;
472     }
473   }
475   if (proc == n_pes) {
476     CmiAbort ("TopoCentLB: no available processors!");
477   }
479   
480   removeNonMigratable(stats, n_pes);
481   int *newmap = new int[stats->n_objs];
484   if(make_mapping)
485     computePartitions(stats, n_pes, newmap);
486   else {
487     //mapping taken from previous algo
488     for(i=0;i<stats->n_objs;i++) {
489       newmap[i]=stats->from_proc[i];
490     }
491   }
493   //Debugging Code
494   if(_lb_args.debug() >=2){
495     CkPrintf("Map obtained from partitioning:\n");
496     for(i=0;i<stats->n_objs;i++)
497       CkPrintf(" %d,%d ",i,newmap[i]);
498   }
500   int max_objs = findMaxObjs(newmap,stats->n_objs, n_pes);
501         
502   partgraph = new PartGraph(n_pes, max_objs);
504   //Fill up the partition graph - first fill the nodes and then, the edges
506   for(i=0;i<stats->n_objs;i++)
507     {
508       PartGraph::Node* n = &partgraph->nodes[newmap[i]];
509       n->obj_list[n->num_objs]=i;
510       n->num_objs++;
511     }
513   int *addedComm=new int[n_pes];
514   
515   stats->makeCommHash();
516   
517   int max_comm_part=-1;
518         
519   double max_comm=0;
521   //Try putting random amount of communication on the partition graph edges to see if things work fine
522   //This also checks the running time of the algorithm since number of edges is high than in a practical scenario
523 #ifdef RAND_COMM
524   for(i = 0; i < n_pes; i++) {
525     for(j = i+1; j < n_pes; j++) {
526       int val;
527       if(rand()%5==0)
528         val=0;
529       else
530         val= rand()%1000;
531                                 
532       partgraph->edges[i][j] = val;
533       partgraph->edges[j][i] = val;
534                         
535       partgraph->nodes[i].comm += val;
536       partgraph->nodes[j].comm += val;
537                         
538       if(partgraph->nodes[i].comm > max_comm){
539         max_comm = partgraph->nodes[i].comm;
540         max_comm_part = i;
541       }
542       if(partgraph->nodes[j].comm > max_comm){
543         max_comm = partgraph->nodes[j].comm;
544         max_comm_part = j;
545       }
546     }
547   }
548 #else
549   //Adding communication to the partition graph edges
550   for(i=0;i<stats->n_comm;i++)
551     {
552       //DO I consider other comm too....i.e. to or from a processor
553       LDCommData &cdata = stats->commData[i];
554       if(!cdata.from_proc() && cdata.receiver.get_type() == LD_OBJ_MSG){
555         int senderID = stats->getHash(cdata.sender);
556         int recverID = stats->getHash(cdata.receiver.get_destObj());
557         CmiAssert(senderID < stats->n_objs);
558         CmiAssert(recverID < stats->n_objs);
559                 
560         if(newmap[senderID]==newmap[recverID])
561           continue;
562         
563         if(partgraph->edges[newmap[senderID]][newmap[recverID]] == 0){
564           partgraph->nodes[newmap[senderID]].degree++;
565           partgraph->nodes[newmap[recverID]].degree++;
566         }
567                 
568         partgraph->edges[newmap[senderID]][newmap[recverID]] += cdata.bytes;
569         partgraph->edges[newmap[recverID]][newmap[senderID]] += cdata.bytes;
570                         
571         partgraph->nodes[newmap[senderID]].comm += cdata.bytes;
572         partgraph->nodes[newmap[recverID]].comm += cdata.bytes;
574         //Keeping track of maximum communiacting partition
575         if(partgraph->nodes[newmap[senderID]].comm > max_comm){
576           max_comm = partgraph->nodes[newmap[senderID]].comm;
577           max_comm_part = newmap[senderID];
578         }
579         if(partgraph->nodes[newmap[recverID]].comm > max_comm){
580           max_comm = partgraph->nodes[newmap[recverID]].comm;
581           max_comm_part = newmap[recverID];
582         }
583       }
584       else if(cdata.receiver.get_type() == LD_OBJLIST_MSG) {
585         int nobjs;
586         LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
587         int senderID = stats->getHash(cdata.sender);
588         for(j = 0; j < n_pes; j++)
589           addedComm[j]=0;
590         for (j=0; j<nobjs; j++) {
591           int recverID = stats->getHash(objs[j]);
592           if((senderID == -1)||(recverID == -1)) {
593             if (_lb_args.migObjOnly()) continue;
594             else CkAbort("Error in search\n");
595           }
596                                         
597           if(newmap[senderID]==newmap[recverID])
598             continue;
599         
600           if(partgraph->edges[newmap[senderID]][newmap[recverID]] == 0){
601             partgraph->nodes[newmap[senderID]].degree++;
602             partgraph->nodes[newmap[recverID]].degree++;
603           }
605           //Communication added only once for a message sent to many objects on a single processor
606           if(!addedComm[newmap[recverID]]){
607             partgraph->edges[newmap[senderID]][newmap[recverID]] += cdata.bytes;
608             partgraph->edges[newmap[recverID]][newmap[senderID]] += cdata.bytes;
609         
610             partgraph->nodes[newmap[senderID]].comm += cdata.bytes;
611             partgraph->nodes[newmap[recverID]].comm += cdata.bytes;
613             if(partgraph->nodes[newmap[senderID]].comm > max_comm){
614               max_comm = partgraph->nodes[newmap[senderID]].comm;
615               max_comm_part = newmap[senderID];
616             }
617             if(partgraph->nodes[newmap[recverID]].comm > max_comm){
618               max_comm = partgraph->nodes[newmap[recverID]].comm;
619               max_comm_part = newmap[recverID];
620             }
621             //bytesComm[newmap[senderID]][newmap[recverID]] += cdata.bytes;
622             //bytesComm[newmap[recverID]][newmap[senderID]] += cdata.bytes;
623             addedComm[newmap[recverID]]=1;
624           }
625         }
626       }
628     }
629 #endif
630         
631   int *proc_mapping = new int[n_pes];
632         
633   delete [] addedComm;
634                 
635   LBtopoFn topofn;
637   //Parsing the command line input for getting the processor topology
638   char *lbcopy = strdup(_lbtopo);
639   char *ptr = strchr(lbcopy, ':');
640   if (ptr!=NULL)
641     ptr = strtok(lbcopy, ":");
642   else
643     ptr=lbcopy;
645   topofn = LBTopoLookup(ptr);
646   if (topofn == NULL) {
647     char str[1024];
648     CmiPrintf("TopoCentLB> Fatal error: Unknown topology: %s. Choose from:\n", ptr);
649     printoutTopo();
650     sprintf(str, "TopoCentLB> Fatal error: Unknown topology: %s", ptr);
651     CmiAbort(str);
652   }
653   
654   topo = topofn(n_pes);
656   //Call the core routine to produce the partition processor mapping
657   calculateMST(partgraph,topo,proc_mapping,max_comm_part);
658   //Returned partition graph is a Maximum Spanning Tree -- converted in above function itself
660   //Debugging code: Result of mapping partition graph onto processor graph
661   if (_lb_args.debug()>1) {
662     CkPrintf("Resultant mapping..(partition,processor)\n");
663     for(i = 0; i < n_pes; i++)
664       CkPrintf("%d,%d\n",i,proc_mapping[i]);
665   }
667   //Store the result in the load balancing database
668   int pe;
669   PartGraph::Node* n;
670   for(i = 0; i < n_pes; i++){
671     pe = proc_mapping[i];
672     n = &partgraph->nodes[i];
673     for(j=0;j<n->num_objs;j++){
674       stats->to_proc[n->obj_list[j]] = pe;
675       if (_lb_args.debug()>1) 
676         CkPrintf("[%d] Obj %d migrating from %d to %d\n", CkMyPe(),n->obj_list[j],stats->from_proc[n->obj_list[j]],pe);
677     }
678   }
680   delete[] newmap;
681   delete[] proc_mapping;
682   //Delete hopCount
683   for(i = 0; i < n_pes; i++)
684     delete[] hopCount[i];
686   delete[] hopCount;
687   delete[] heapMapping;
688         
689   delete partgraph;
692 #include "TopoCentLB.def.h"