1 /**************************************************************************
2 ** Amit Sharma (asharma6@uiuc.edu)
5 ** This is a topology conscious load balancer.
6 ** It migrates objects to new processors based on the topology in which the processors are connected.
7 ****************************************************************************/
11 #include "TopoCentLB.decl.h"
12 #include "TopoCentLB.h"
14 #define alpha PER_MESSAGE_SEND_OVERHEAD_DEFAULT /*Startup time per message, seconds*/
15 #define beta PER_BYTE_SEND_OVERHEAD_DEFAULT /*Long-message time per byte, seconds*/
16 #define DEG_THRES 0.50
20 #define make_mapping 0
22 extern int quietModeRequested;
24 CreateLBFunc_Def(TopoCentLB,"Balance objects based on the network topology")
27 /*static void lbinit (void)
29 LBRegisterBalancer ("TopoCentLB",
32 "Balance objects based on the network topology");
36 TopoCentLB::TopoCentLB(const CkLBOptions &opt) : CBase_TopoCentLB (opt)
38 lbname = "TopoCentLB";
39 if (CkMyPe () == 0 && !quietModeRequested) {
40 CkPrintf("CharmLB> TopoCentLB created.\n");
45 bool TopoCentLB::QueryBalanceNow (int _step)
50 TopoCentLB::~TopoCentLB(){
54 /*This routine partitions the task graph minimizing the communication and balancing the object load on all partitions*/
55 /*It uses METIS library to accomplish that*/
56 void TopoCentLB::computePartitions(CentralLB::LDStats *stats,int count,int *newmap)
59 int numobjs = stats->n_objs;
62 // allocate space for the computing data
63 double *objtime = new double[numobjs];
64 int *objwt = new int[numobjs];
65 int *origmap = new int[numobjs];
66 LDObjHandle *handles = new LDObjHandle[numobjs];
68 for(i=0;i<numobjs;i++) {
74 //Prepare compute loads for METIS library
75 for (i=0; i<stats->n_objs; i++) {
76 LDObjData &odata = stats->objData[i];
77 if (!odata.migratable)
78 CmiAbort("MetisLB doesnot dupport nonmigratable object.\n");
79 int frompe = stats->from_proc[i];
81 objtime[i] = odata.wallTime*stats->procs[frompe].pe_speed;
82 handles[i] = odata.handle;
85 // to convert the weights on vertices to integers
86 double max_objtime = objtime[0];
87 for(i=0; i<numobjs; i++) {
88 if(max_objtime < objtime[i])
89 max_objtime = objtime[i];
93 double ratio = 1000.0/max_objtime;
94 for(i=0; i<numobjs; i++) {
95 objwt[i] = (int)(objtime[i]*ratio);
101 int **comm = new int*[numobjs];
102 for (i=0; i<numobjs; i++) {
103 comm[i] = new int[numobjs];
104 for (j=0; j<numobjs; j++) {
109 //Prepare communication for METIS library
110 const int csz = stats->n_comm;
111 for(i=0; i<csz; i++) {
112 LDCommData &cdata = stats->commData[i];
113 //if(cdata.from_proc() || cdata.receiver.get_type() != LD_OBJ_MSG)
115 if(!cdata.from_proc() && cdata.receiver.get_type() == LD_OBJ_MSG){
116 int senderID = stats->getHash(cdata.sender);
117 int recverID = stats->getHash(cdata.receiver.get_destObj());
118 CmiAssert(senderID < numobjs);
119 CmiAssert(recverID < numobjs);
120 comm[senderID][recverID] += cdata.messages;
121 comm[recverID][senderID] += cdata.messages;
122 //Use bytes or messages -- do i include messages for objlist too...??
124 else if (cdata.receiver.get_type() == LD_OBJLIST_MSG) {
125 //CkPrintf("in objlist..\n");
127 LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
128 int senderID = stats->getHash(cdata.sender);
129 for (j=0; j<nobjs; j++) {
130 int recverID = stats->getHash(objs[j]);
131 if((senderID == -1)||(recverID == -1)) {
132 if (_lb_args.migObjOnly()) continue;
133 else CkAbort("Error in search\n");
135 comm[senderID][recverID] += cdata.messages;
136 comm[recverID][senderID] += cdata.messages;
141 // ignore messages sent from an object to itself
142 for (i=0; i<numobjs; i++)
145 // construct the graph in CSR format
146 int *xadj = new int[numobjs+1];
148 for(i=0;i<numobjs;i++) {
149 for(j=0;j<numobjs;j++) {
154 int *adjncy = new int[numedges];
155 int *edgewt = new int[numedges];
159 for (i=0; i<numobjs; i++) {
160 for (j=0; j<numobjs; j++) {
161 if (comm[i][j] != 0) {
162 adjncy[count4all] = j;
163 edgewt[count4all++] = comm[i][j]/factor;
166 xadj[i+1] = count4all;
170 int wgtflag = 3; // Weights both on vertices and edges
171 int numflag = 0; // C Style numbering
177 CkPrintf("error: Number of Pe less than 1!");
179 else if (count == 1) {
180 for(m=0;m<numobjs;m++)
181 newmap[i] = origmap[i];
186 METIS_PartGraphKway(&numobjs, xadj, adjncy, objwt, edgewt,
187 &wgtflag, &numflag, &count, options,
190 METIS_PartGraphRecursive(&numobjs, xadj, adjncy, objwt, edgewt,
191 &wgtflag, &numflag, &count, options,
194 METIS_PartGraphRecursive(&numobjs, xadj, adjncy, objwt, edgewt,
195 &wgtflag, &numflag, &count, options,
200 //Debugging code: Checking load on each partition
201 if(_lb_args.debug() >=2){
203 int *chkwt = new int[count];
206 for(i=0;i<numobjs;i++){
207 chkwt[newmap[i]] += objwt[i];
211 CkPrintf("%d -- %d\n",i,chkwt[i]);
212 CkPrintf("Totalwt of all partitions after call to METIS:%d, Avg is %d\n",total,total/count);
215 //Clean up all the variables allocated in this routine
216 for(i=0;i<numobjs;i++)
229 int TopoCentLB::findMaxObjs(int *map,int totalobjs,int count)
231 int *max_num = new int[count];
238 for(i=0;i<totalobjs;i++)
242 if(max_num[i]>maxobjs)
243 maxobjs = max_num[i];
250 void TopoCentLB::Heapify(HeapNode *heap, int node, int heapSize)
253 int right = 2*node+2;
256 if (left < heapSize && (heap[left].key > heap[node].key))
261 if (right < heapSize && (heap[right].key > heap[xchange].key))
264 if (xchange != node) {
267 heap[node] = heap[xchange];
269 heapMapping[heap[node].node]=node;
270 heapMapping[heap[xchange].node]=xchange;
271 Heapify(heap,xchange,heapSize);
276 TopoCentLB::HeapNode TopoCentLB::extractMax(HeapNode *heap,int *heapSize){
279 CmiAbort("Empty Heap passed to extractMin!\n");
281 HeapNode max = heap[0];
282 heap[0] = heap[*heapSize-1];
283 heapMapping[heap[0].node]=0;
284 *heapSize = *heapSize - 1;
285 Heapify(heap,0,*heapSize);
289 void TopoCentLB::BuildHeap(HeapNode *heap,int heapSize){
290 for(int i=heapSize/2; i >= 0; i--)
291 Heapify(heap,i,heapSize);
294 void TopoCentLB :: increaseKey(HeapNode *heap,int i,double wt){
303 int parent = (i-1)/2;
305 if(heap[parent].key >= heap[i].key)
308 HeapNode tmp = heap[parent];
309 heap[parent] = heap[i];
311 heapMapping[heap[parent].node]=parent;
312 heapMapping[heap[i].node]=i;
313 increaseKey(heap,parent,-1.00);
317 /*This routine implements the algorithm used to produce the partition-processor mapping*/
318 /*The algorithm uses an idea similar to the standard MST algorithm*/
319 void TopoCentLB :: calculateMST(PartGraph *partgraph,LBTopology *topo,int *proc_mapping,int max_comm_part) {
323 int count = partgraph->n_nodes;
326 //Arrays needed for keeping information
327 inHeap = new int[partgraph->n_nodes];
328 keys = new double[partgraph->n_nodes];
330 int *assigned_procs = new int[count];
332 hopCount = new double*[count];
333 for(i=0;i<count;i++){
336 hopCount[i] = new double[count];
341 //Call a topology routine to fill up hopCount
342 topo->get_pairwise_hop_count(hopCount);
344 int max_neighbors = topo->max_neighbors();
346 HeapNode *heap = new HeapNode[partgraph->n_nodes];
347 heapMapping = new int[partgraph->n_nodes];
351 for(i=0;i<partgraph->n_nodes;i++){
359 //Assign the max comm partition first
360 heap[max_comm_part].key = 1.00;
362 heapSize = partgraph->n_nodes;
363 BuildHeap(heap,heapSize);
365 int k=0,comm_cnt=0,m=0;
366 int *commParts = new int[partgraph->n_nodes];
372 /****Phase1: Extracting appropriate partition from heap****/
374 HeapNode max = extractMax(heap,&heapSize);
375 inHeap[max.node] = 0;
377 for(i=0;i<partgraph->n_nodes;i++){
379 PartGraph::Edge wt = partgraph->edges[max.node][i];
389 /*This part has been COMMENTED out for optimizing the code: we handle the updation using heapMapping*/
390 /*array instead of searching for node in the heap everytime*/
392 //Update in the heap too
393 //First, find where this node is..in the heap
394 /*for(j=0;j<heapSize;j++)
395 if(heap[j].node == i)
398 CmiAbort("Some error in heap...\n");*/
399 increaseKey(heap,heapMapping[i],wt);
403 /*Phase2: ASSIGNING partition to processor*/
406 if(heapSize == partgraph->n_nodes-1){ //Assign max comm partition to 0th proc in the topology
407 proc_mapping[max.node]=0;
417 int min_cost_index=-1;
422 for(k=0;k<partgraph->n_nodes;k++){
423 if(!inHeap[k] && partgraph->edges[k][max.node]){
424 commParts[comm_cnt]=k;
429 //Optimized the loop by commenting out the get_hop_count code and getting all the hop counts initially
430 for(m=0;m<count;m++){
431 if(!assigned_procs[m]){
433 for(p=0;p<comm_cnt;p++){
434 //if(!hopCount[proc_mapping[commParts[p]]][m])
435 //hopCount[proc_mapping[commParts[p]]][m]=topo->get_hop_count(proc_mapping[commParts[p]],m);
436 cost += hopCount[proc_mapping[commParts[p]]][m]*partgraph->edges[commParts[p]][max.node];
438 if(min_cost==-1 || cost<min_cost){
445 proc_mapping[max.node]=min_cost_index;
446 assigned_procs[min_cost_index]=1;
452 delete[] assigned_procs;
458 void TopoCentLB :: work(LDStats *stats)
462 int n_pes = stats->nprocs();
464 if (_lb_args.debug() >= 2) {
465 CkPrintf("In TopoCentLB Strategy...\n");
468 // Make sure that there is at least one available processor.
469 for (proc = 0; proc < n_pes; proc++) {
470 if (stats->procs[proc].available) {
476 CmiAbort ("TopoCentLB: no available processors!");
480 removeNonMigratable(stats, n_pes);
481 int *newmap = new int[stats->n_objs];
485 computePartitions(stats, n_pes, newmap);
487 //mapping taken from previous algo
488 for(i=0;i<stats->n_objs;i++) {
489 newmap[i]=stats->from_proc[i];
494 if(_lb_args.debug() >=2){
495 CkPrintf("Map obtained from partitioning:\n");
496 for(i=0;i<stats->n_objs;i++)
497 CkPrintf(" %d,%d ",i,newmap[i]);
500 int max_objs = findMaxObjs(newmap,stats->n_objs, n_pes);
502 partgraph = new PartGraph(n_pes, max_objs);
504 //Fill up the partition graph - first fill the nodes and then, the edges
506 for(i=0;i<stats->n_objs;i++)
508 PartGraph::Node* n = &partgraph->nodes[newmap[i]];
509 n->obj_list[n->num_objs]=i;
513 int *addedComm=new int[n_pes];
515 stats->makeCommHash();
517 int max_comm_part=-1;
521 //Try putting random amount of communication on the partition graph edges to see if things work fine
522 //This also checks the running time of the algorithm since number of edges is high than in a practical scenario
524 for(i = 0; i < n_pes; i++) {
525 for(j = i+1; j < n_pes; j++) {
532 partgraph->edges[i][j] = val;
533 partgraph->edges[j][i] = val;
535 partgraph->nodes[i].comm += val;
536 partgraph->nodes[j].comm += val;
538 if(partgraph->nodes[i].comm > max_comm){
539 max_comm = partgraph->nodes[i].comm;
542 if(partgraph->nodes[j].comm > max_comm){
543 max_comm = partgraph->nodes[j].comm;
549 //Adding communication to the partition graph edges
550 for(i=0;i<stats->n_comm;i++)
552 //DO I consider other comm too....i.e. to or from a processor
553 LDCommData &cdata = stats->commData[i];
554 if(!cdata.from_proc() && cdata.receiver.get_type() == LD_OBJ_MSG){
555 int senderID = stats->getHash(cdata.sender);
556 int recverID = stats->getHash(cdata.receiver.get_destObj());
557 CmiAssert(senderID < stats->n_objs);
558 CmiAssert(recverID < stats->n_objs);
560 if(newmap[senderID]==newmap[recverID])
563 if(partgraph->edges[newmap[senderID]][newmap[recverID]] == 0){
564 partgraph->nodes[newmap[senderID]].degree++;
565 partgraph->nodes[newmap[recverID]].degree++;
568 partgraph->edges[newmap[senderID]][newmap[recverID]] += cdata.bytes;
569 partgraph->edges[newmap[recverID]][newmap[senderID]] += cdata.bytes;
571 partgraph->nodes[newmap[senderID]].comm += cdata.bytes;
572 partgraph->nodes[newmap[recverID]].comm += cdata.bytes;
574 //Keeping track of maximum communiacting partition
575 if(partgraph->nodes[newmap[senderID]].comm > max_comm){
576 max_comm = partgraph->nodes[newmap[senderID]].comm;
577 max_comm_part = newmap[senderID];
579 if(partgraph->nodes[newmap[recverID]].comm > max_comm){
580 max_comm = partgraph->nodes[newmap[recverID]].comm;
581 max_comm_part = newmap[recverID];
584 else if(cdata.receiver.get_type() == LD_OBJLIST_MSG) {
586 LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
587 int senderID = stats->getHash(cdata.sender);
588 for(j = 0; j < n_pes; j++)
590 for (j=0; j<nobjs; j++) {
591 int recverID = stats->getHash(objs[j]);
592 if((senderID == -1)||(recverID == -1)) {
593 if (_lb_args.migObjOnly()) continue;
594 else CkAbort("Error in search\n");
597 if(newmap[senderID]==newmap[recverID])
600 if(partgraph->edges[newmap[senderID]][newmap[recverID]] == 0){
601 partgraph->nodes[newmap[senderID]].degree++;
602 partgraph->nodes[newmap[recverID]].degree++;
605 //Communication added only once for a message sent to many objects on a single processor
606 if(!addedComm[newmap[recverID]]){
607 partgraph->edges[newmap[senderID]][newmap[recverID]] += cdata.bytes;
608 partgraph->edges[newmap[recverID]][newmap[senderID]] += cdata.bytes;
610 partgraph->nodes[newmap[senderID]].comm += cdata.bytes;
611 partgraph->nodes[newmap[recverID]].comm += cdata.bytes;
613 if(partgraph->nodes[newmap[senderID]].comm > max_comm){
614 max_comm = partgraph->nodes[newmap[senderID]].comm;
615 max_comm_part = newmap[senderID];
617 if(partgraph->nodes[newmap[recverID]].comm > max_comm){
618 max_comm = partgraph->nodes[newmap[recverID]].comm;
619 max_comm_part = newmap[recverID];
621 //bytesComm[newmap[senderID]][newmap[recverID]] += cdata.bytes;
622 //bytesComm[newmap[recverID]][newmap[senderID]] += cdata.bytes;
623 addedComm[newmap[recverID]]=1;
631 int *proc_mapping = new int[n_pes];
637 //Parsing the command line input for getting the processor topology
638 char *lbcopy = strdup(_lbtopo);
639 char *ptr = strchr(lbcopy, ':');
641 ptr = strtok(lbcopy, ":");
645 topofn = LBTopoLookup(ptr);
646 if (topofn == NULL) {
648 CmiPrintf("TopoCentLB> Fatal error: Unknown topology: %s. Choose from:\n", ptr);
650 sprintf(str, "TopoCentLB> Fatal error: Unknown topology: %s", ptr);
654 topo = topofn(n_pes);
656 //Call the core routine to produce the partition processor mapping
657 calculateMST(partgraph,topo,proc_mapping,max_comm_part);
658 //Returned partition graph is a Maximum Spanning Tree -- converted in above function itself
660 //Debugging code: Result of mapping partition graph onto processor graph
661 if (_lb_args.debug()>1) {
662 CkPrintf("Resultant mapping..(partition,processor)\n");
663 for(i = 0; i < n_pes; i++)
664 CkPrintf("%d,%d\n",i,proc_mapping[i]);
667 //Store the result in the load balancing database
670 for(i = 0; i < n_pes; i++){
671 pe = proc_mapping[i];
672 n = &partgraph->nodes[i];
673 for(j=0;j<n->num_objs;j++){
674 stats->to_proc[n->obj_list[j]] = pe;
675 if (_lb_args.debug()>1)
676 CkPrintf("[%d] Obj %d migrating from %d to %d\n", CkMyPe(),n->obj_list[j],stats->from_proc[n->obj_list[j]],pe);
681 delete[] proc_mapping;
683 for(i = 0; i < n_pes; i++)
684 delete[] hopCount[i];
687 delete[] heapMapping;
692 #include "TopoCentLB.def.h"