Fix platform-specific build errors introduced by 6292e029 and 014e434e
[charm.git] / src / util / spanningTree.C
blob2af2961faa794a5c384da5700a78c5f0352f1d8f
1 /**
2  * Author: jjgalvez@illinois.edu (Juan Galvez)
3  * Uses recursive bisect/trisect functionality similar to what was in
4  * src/util/treeStrategy_3dTorus_minHops.h
5  */
6 #include "spanningTree.h"
7 #include "TopoManager.h"
8 #include <algorithm>
9 #include <limits.h>
11 #if CMK_USING_XLC
12   #include <tr1/unordered_map>
13   typedef std::tr1::unordered_map<int,int> intMap;
14 #else
15   #include <unordered_map>
16   typedef std::unordered_map<int,int> intMap;
17 #endif
18 #include <bitset>
19 #define DIM_SET_SIZE 32     // bitset size
21 #define _DEBUG_SPANNING_TREE_ 0
23 #if _DEBUG_SPANNING_TREE_
24 #include <sstream>
25 #endif
27 template <typename Iterator>
28 class ST_RecursivePartition<Iterator>::PhyNode {
29 public:
30   PhyNode(int id, int pe, TopoManager *tmgr) : id(id), pe(pe) {
31     if (tmgr->haveTopologyInfo()) tmgr->rankToCoordinates(pe, coords);
32   }
33   inline void addNode(int n) { nodes.push_back(n); }
34   inline int size() const { return nodes.size(); }
35   inline int getNode(int i) const {
36     CkAssert(i >= 0 && i < nodes.size());
37     return nodes[i];
38   }
39   /// distance to other phynode
40   inline int distance(const PhyNode &o, TopoManager *tmgr) const {
41     return tmgr->getHopsBetweenRanks(pe, o.pe);
42   }
44 #if _DEBUG_SPANNING_TREE_
45   void print() {
46     CkPrintf("phynode %d, pe=%d, coords=", id, pe);
47     for (int i=0; i < coords.size(); i++) CkPrintf("%d ", coords[i]);
48     CkPrintf(", nodes: ");
49     for (int i=0; i < nodes.size(); i++) CkPrintf("%d ", nodes[i]);
50     CkPrintf("\n");
51   }
52 #endif
54   int id;
55   int pe; /// a pe in physical node (doesn't matter which one it is)
56   std::vector<int> nodes;  /// (charm)nodes in this phynode
57   std::vector<int> coords; /// coordinates of this phynode
60 template <typename Iterator>
61 class ST_RecursivePartition<Iterator>::PhyNodeCompare {
62 public:
63   PhyNodeCompare(int dim): dim(dim) {}
64   inline bool operator()(const typename ST_RecursivePartition::PhyNode *a,
65                          const typename ST_RecursivePartition::PhyNode *b) const {
66     if (a->coords[dim] == b->coords[dim]) return (a->id < b->id);
67     else return (a->coords[dim] < b->coords[dim]);
68   }
69 private:
70   const int dim;
73 // ----------------- ST_RecursivePartition -----------------
75 template <typename Iterator>
76 ST_RecursivePartition<Iterator>::ST_RecursivePartition(bool nodeTree, bool preSorted)
77   : nodeTree(nodeTree), preSorted(preSorted)
79   tmgr = TopoManager::getTopoManager();
80   for (int i=0; i < tmgr->getNumDims(); i++) {
81     if (tmgr->getDimSize(i) > DIM_SET_SIZE)
82       CkAbort("ST_RecursivePartition:: Increase bitset size to match size of largest topology dimension");
83   }
84 #if _DEBUG_SPANNING_TREE_
85   if (CkMyNode() == 0) {
86     CkPrintf("TopoManager reports topoinfo=%d, %d dims, dim sizes: ", tmgr->haveTopologyInfo(), tmgr->getNumDims());
87     for (int i=0; i < tmgr->getNumDims(); i++) CkPrintf("%d ", tmgr->getDimSize(i));
88     CkPrintf("\n");
89   }
90 #endif
93 template <typename Iterator>
94 int ST_RecursivePartition<Iterator>::buildSpanningTree(Iterator start, Iterator end,
95                                                        unsigned int maxBranches)
97   children.clear();
98   const int numNodes = std::distance(start, end);
99   if (numNodes == 0) CkAbort("Error: requested spanning tree but no nodes\n");
100   else if (numNodes == 1) return 0;
102 #if _DEBUG_SPANNING_TREE_
103   CkPrintf("[%d] ST_RecursivePartition:: Root is %d, being requested %d children, Num nodes incl root is %d\n",
104            CkMyNode(), *start, maxBranches, numNodes);
105 #endif
107   // group nodes into phynodes
108   std::vector<ST_RecursivePartition<Iterator>::PhyNode> phynodes;
109   initPhyNodes(start, end, phynodes);
110   std::vector<ST_RecursivePartition<Iterator>::PhyNode*> pphynodes(phynodes.size());
111   for (int i=0; i < phynodes.size(); i++) pphynodes[i] = &phynodes[i];
113   // build the spanning tree of physical nodes
114   build(pphynodes, start, maxBranches);
116 #if _DEBUG_SPANNING_TREE_
117   // print this node and children
118   for (int i=0; i < children.size()-1; i++) {
119     std::ostringstream oss;
120     for (Iterator j=children[i]; j != children[i+1]; j++) {
121       if (j == children[i]) oss << "[" << CkMyNode() << "] subtree " << *j << ": ";
122       else oss << *j << " ";
123     }
124     CkPrintf("%s\n", oss.str().c_str());
125   }
126 #endif
127   return (children.size() - 1);
130 template <typename Iterator>
131 void ST_RecursivePartition<Iterator>::initPhyNodes(Iterator start, Iterator end,
132                                                    std::vector<PhyNode> &phynodes) const
134 #if _DEBUG_SPANNING_TREE_
135   int rootPhyNodeId;
136   if (nodeTree) rootPhyNodeId = CmiPhysicalNodeID(CmiNodeFirst(*start));
137   else rootPhyNodeId = CmiPhysicalNodeID(*start);   // contains pes
138   CkPrintf("[%d] Root phynode is %d\n", CkMyNode(), rootPhyNodeId);
139 #endif
141   const int numNodes = std::distance(start, end);
142   phynodes.reserve(std::min(CmiNumPhysicalNodes(), numNodes));
143   intMap phyNodeMap;
144   int last = -1;
145   for (Iterator i=start; i != end; i++) {
146     int n = *i;
147     int pe = n;
148     if (nodeTree) pe = CmiNodeFirst(n);
149     int phyNodeId = CmiPhysicalNodeID(pe);
150 #if _DEBUG_SPANNING_TREE_
151     if (phyNodeId == rootPhyNodeId) CkPrintf("[%d] Node %d is in root phynode\n", CkMyNode(), n);
152 #endif
153     PhyNode *phyNode; // phynode of node n
154     if (preSorted) {
155       if (phyNodeId != last) {
156         phynodes.push_back(PhyNode(phyNodeId,pe,tmgr));
157         last = phyNodeId;
158       }
159       phyNode = &(phynodes.back());
160     } else {
161       intMap::iterator it = phyNodeMap.find(phyNodeId);
162       if (it == phyNodeMap.end()) {
163         phynodes.push_back(PhyNode(phyNodeId,pe,tmgr));
164         phyNodeMap[phyNodeId] = int(phynodes.size()-1);
165         phyNode = &(phynodes.back());
166       } else {
167         phyNode = &(phynodes[it->second]);
168       }
169     }
170     phyNode->addNode(n);
171   }
173 #if _DEBUG_SPANNING_TREE_
174   CkPrintf("%d physical nodes:\n", int(phynodes.size()));
175   for (int i=0; i < phynodes.size(); i++) phynodes[i].print();
176 #endif
177 #if XE6_TOPOLOGY
178   translateCoordinates(phynodes);
179 #endif
182 template <typename Iterator>
183 void ST_RecursivePartition<Iterator>::build(std::vector<PhyNode*> &phyNodes,
184                                             Iterator start,
185                                             unsigned int maxBranches)
187   ST_RecursivePartition<Iterator>::PhyNode *rootPhyNode = phyNodes[0];
188   children.reserve(rootPhyNode->size() + maxBranches); // reserve for max number of children
190   Iterator pos = start+1;
191   // make each node in same phynode as root a direct child
192   for (int i=1; i < rootPhyNode->size(); i++) {
193     *pos = rootPhyNode->getNode(i);
194     children.push_back(pos++);
195   }
197   // TODO another option, don't know if better, is if
198   // I'm the root node of a phynode (and phynodes.size() > 1), only have one other node
199   // in my phynode as direct child, and have that child direct-send to every other
200   // node in the phynode. This would be an easy change.
202   if (phyNodes.size() == 1) {
203     children.push_back(pos);
204     return;
205   }
207   // this will partition the nodes in phyNodes, by reorganizing the list.
208   // phyNodeChildren will point to where each partition starts
209   std::vector<int> phyNodeChildren;
210   phyNodeChildren.reserve(maxBranches+1);
211   partition(phyNodes, 1, phyNodes.size(), maxBranches, phyNodeChildren);
212   phyNodeChildren.push_back(phyNodes.size());
213   if (tmgr->haveTopologyInfo())
214     // choose root phynode in each subtree (closest one to top-level root phynode), put at beginning
215     chooseSubtreeRoots(phyNodes, phyNodeChildren);
217   // store result as subtrees of nodes
218   for (int i=0; i < phyNodeChildren.size() - 1; i++) {
219     children.push_back(pos);
220     for (int j=phyNodeChildren[i]; j < phyNodeChildren[i+1]; j++) {  // for each phynode in subtree
221       for (int k=0; k < phyNodes[j]->size(); k++) {    // for each node in phynode
222         *pos = phyNodes[j]->getNode(k);
223         pos++;
224       }
225     }
226   }
227   children.push_back(pos);
231  * phyNodes is list of phyNodes, grouped by subtrees (rootPhyNode in position 0)
232  * phyNodeChildren contains the indices (in phyNodes) of first node of each subtree
233  */
234 template <typename Iterator>
235 void ST_RecursivePartition<Iterator>::chooseSubtreeRoots(std::vector<PhyNode*> &phyNodes,
236                                                          std::vector<int> &children) const
238   for (int i=0; i < children.size() - 1; i++) { // for each subtree
239     int start = children[i];  // subtree start
240     int minDistance = INT_MAX;
241     int closestIdx = -1;
242     for (int j=start; j < children[i+1]; j++) { // for each phynode in subtree
243       int d = phyNodes[0]->distance(*phyNodes[j], tmgr);
244       if (d < minDistance) {
245         minDistance = d;
246         closestIdx = j;
247       }
248     }
249 #if _DEBUG_SPANNING_TREE_
250     if (CkMyNode() == 0) CkPrintf("Subtree %d, closest phynode to root is %d, distance=%d\n", i, phyNodes[closestIdx]->id, minDistance);
251 #endif
252     // make closest one the root
253     std::swap(phyNodes[start], phyNodes[closestIdx]);
254   }
257 /// recursive partitioning of phynodes into numPartitions
258 template <typename Iterator>
259 void ST_RecursivePartition<Iterator>::partition(std::vector<PhyNode*> &nodes,
260                                       int start, int end, int numPartitions,
261                                       std::vector<int> &children) const
263 #if _DEBUG_SPANNING_TREE_
264     CkPrintf("Partitioning into at most %d parts, phynodes [", numPartitions);
265     for (int i=start; i < end; i++) CkPrintf("%d ", nodes[i]->id);
266     CkPrintf("]\n");
267 #endif
268   int numNodes = end - start;
269   if ((numPartitions > 1) && (numNodes > 1)) {
270     // further partitioning is needed and there are nodes left to partition
271     if (numPartitions % 3 == 0) trisect(nodes, start, end, numPartitions, children);
272     else bisect(nodes, start, end, numPartitions, children);
273   } else if ((numPartitions >= 1) && (numNodes >= 1)) {
274     // just register the remaining node(s) as a sub-tree
275     children.push_back(start);
276   } else if (numNodes == 0) {
277     // there are no nodes left, do nothing
278   } else if ((numNodes >= 0) && (numPartitions == 0)) {
279     // if there are nodes remaining but no partitions to put them in
280     CkAbort("\nThere are nodes left but no remaining partitions to put them in.");
281   } else {
282     // fall through case. Should never get here unless something is wrong
283     CkAbort("\nPartitioning fell through to the default case (which it never should). Check the logic in this routine.");
284   }
287 template <typename Iterator>
288 void ST_RecursivePartition<Iterator>::bisect(std::vector<PhyNode*> &nodes,
289                                    int start, int end, int numPartitions,
290                                    std::vector<int> &children) const
292   const int numNodes = end - start;
293   int median = start + (numNodes / 2);
294   if (tmgr->haveTopologyInfo()) {
295     // Find the dimension along which to bisect the bounding box
296     int maxSpreadDim = maxSpreadDimension(nodes,start,end);
297     // Bisect the vertex list at the median element
298     typename std::vector<PhyNode*>::iterator itr = nodes.begin();
299     std::nth_element(itr+start, itr+median, itr+end, ST_RecursivePartition::PhyNodeCompare(maxSpreadDim));
300 #if _DEBUG_SPANNING_TREE_
301     CkPrintf("Bisecting, maxSpreadDim=%d\n", maxSpreadDim);
302 #endif
303   }
304   // Partition the two pieces further
305   int numLeft = numPartitions/2;
306   partition(nodes, start, median, numLeft, children);
307   partition(nodes, median, end, numPartitions - numLeft, children);
310 template <typename Iterator>
311 void ST_RecursivePartition<Iterator>::trisect(std::vector<PhyNode*> &nodes,
312                                    int start, int end, int numPartitions,
313                                    std::vector<int> &children) const
315   const int numNodes = end - start;
316   /// Pin the location of the 1/3 and 2/3 elements
317   int oneThird = start + (numNodes / 3);
318   int twoThird = oneThird + (numNodes / 3);
319   if (tmgr->haveTopologyInfo()) {
320     int maxSpreadDim = maxSpreadDimension(nodes,start,end);
321     typename std::vector<PhyNode*>::iterator itr = nodes.begin();
322     std::nth_element(itr+start,    itr+oneThird, itr+end, ST_RecursivePartition::PhyNodeCompare(maxSpreadDim));
323     std::nth_element(itr+oneThird, itr+twoThird, itr+end, ST_RecursivePartition::PhyNodeCompare(maxSpreadDim));
324 #if _DEBUG_SPANNING_TREE_
325     CkPrintf("Trisecting, maxSpreadDim=%d\n", maxSpreadDim);
326 #endif
327   }
328   /// Partition the three pieces further
329   int numLeft = numPartitions/3;
330   partition(nodes, start,    oneThird, numLeft, children);
331   partition(nodes, oneThird, twoThird, numLeft, children);
332   partition(nodes, twoThird, end,      numLeft, children);
335 template <typename Iterator>
336 int ST_RecursivePartition<Iterator>::maxSpreadDimension(std::vector<PhyNode*> &nodes,
337                                                         int start, int end) const
339   const int nDims = tmgr->getNumDims();
340   if (!tmgr->haveTopologyInfo() || (nDims <= 1)) return 0;
342   std::vector<std::bitset<DIM_SET_SIZE> > used(nDims);
343   for (int i=start; i < end; i++) {
344     PhyNode *n = nodes[i];
345     for (int j=0; j < nDims; j++) used[j].set(n->coords[j]);
346   }
347   int max_spread = -1;
348   int max_spread_dim = -1;
349   for (int i=0; i < nDims; i++) {
350     int c(used[i].count());
351     if (c > max_spread) {
352       max_spread = c;
353       max_spread_dim = i;
354     }
355   }
356   return max_spread_dim;
359 #if XE6_TOPOLOGY
361 inline static int modulo(int k, int n) { return ((k %= n) < 0) ? k+n : k; }
364  * Translate coordinates of phynodes such that the max spread in each dimension
365  * is equal (or closer) to the number of coordinates actually used in that dimension.
366  * In other words, "moves" all phynodes by some translation offset so that their bounding box
367  * includes minimum number of coordinates, which implies that adjacent nodes won't go through torus edges.
368  * Works for any number of dimensions (N-d torus)
369  */
370 template <typename Iterator>
371 void ST_RecursivePartition<Iterator>::translateCoordinates(std::vector<PhyNode> &nodes) const
373   const int nDims = tmgr->getNumDims();
374   std::vector<std::bitset<DIM_SET_SIZE> > usedCoordinates(nDims);
375   std::vector<int> max_coord(nDims, -1);
376   std::vector<int> min_coord(nDims, INT_MAX);
377   std::vector<int> maxSpread(nDims);
378   std::vector<int> gapCenter(nDims, -1);
379   std::vector<int> dimSizes(nDims);
380   for (int i=0; i < nDims; i++) dimSizes[i] = tmgr->getDimSize(i);
382   for (int i=0; i < nodes.size(); i++) {
383     PhyNode &n = nodes[i];
384     for (int j=0; j < nDims; j++) {
385       int c = n.coords[j];
386       usedCoordinates[j].set(c);
387       if (c > max_coord[j]) max_coord[j] = c;
388       if (c < min_coord[j]) min_coord[j] = c;
389     }
390   }
391   for (int i=0; i < nDims; i++) {
392     maxSpread[i] = max_coord[i] - min_coord[i] + 1; // store max spread of each dimension
393     int sum = 0, nbUnusedCoords = 0;
394     for (int j=0; j < dimSizes[i]; j++) {
395       if (!usedCoordinates[i].test(j)) {  // j coordinate not used by any element
396         sum += j;
397         nbUnusedCoords++;
398       }
399     }
400     if (nbUnusedCoords > 0) gapCenter[i] = sum / nbUnusedCoords;
401   }
403 #if _DEBUG_SPANNING_TREE_
404   if (CkMyNode() == 0) {
405     CkPrintf("Used coordinates in each dimension:\n");
406     for (int i=0; i < nDims; i++) {
407       CkPrintf("%d: ", i);
408       for (int j=0; j < dimSizes[i]; j++) if (usedCoordinates[i].test(j)) CkPrintf("%d ", j);
409       CkPrintf(", %d\n", int(usedCoordinates[i].count()));
410     }
411     CkPrintf("Max,min coord in each dimension:\n");
412     for (int i=0; i < nDims; i++) CkPrintf("%d: %d %d\n", i, max_coord[i], min_coord[i]);
413     CkPrintf("Gap center for each dimension:\n");
414     for (int i=0; i < nDims; i++) CkPrintf("%d: %d\n", i, gapCenter[i]);
415     CkPrintf("Max spread for each dimension:\n");
416     for (int i=0; i < nDims; i++) CkPrintf("%d: %d\n", i, maxSpread[i]);
417   }
418 #endif
420   for (int i=0; i < nDims; i++) { // find best translation offset for each dimension
421     if (maxSpread[i] == int(usedCoordinates[i].count())) continue; // nothing to correct for this dimension
423 #if _DEBUG_SPANNING_TREE_
424     if (CkMyNode() == 0) CkPrintf("Going to attempt to correct coordinates on dimension %d\n", i);
425 #endif
427     // choose direction of unused coordinates to finish faster
428     int direction = 1;  // go "right"
429     if (gapCenter[i] < dimSizes[i]/2) direction = -1;   // go "left"
430 #if _DEBUG_SPANNING_TREE_
431     if (CkMyNode() == 0) CkPrintf("Chose direction %d\n", direction);
432 #endif
434     // we're going to attempt to minimize the max spread in dimension i
435     int bestMaxSpread = maxSpread[i];
436     int bestOffset=0;
437     for (int m=0; ; m++) {
438       // apply offset of 'm' in 'direction' to all nodes
439       int max_coord = -1;
440       int min_coord = INT_MAX;
441       for (int j=0; j < nodes.size(); j++) {
442         int &x = nodes[j].coords[i];
443         x += direction;
444         if (x >= dimSizes[i]) x = 0;
445         else if (x < 0) x = dimSizes[i] - 1;
446         if (x > max_coord) max_coord = x;
447         if (x < min_coord) min_coord = x;
448       }
449       // evaluate max spread with new offset
450       int maxSpread_new = max_coord - min_coord + 1;
451 #if _DEBUG_SPANNING_TREE_
452       if (CkMyNode() == 0) CkPrintf("%d %d\n", m, maxSpread_new);
453 #endif
454       if (maxSpread_new == int(usedCoordinates[i].count())) {
455 #if _DEBUG_SPANNING_TREE_
456         if (CkMyNode() == 0) CkPrintf("FIXED after %d movements\n", m+1);
457 #endif
458         break;
459       } else if (maxSpread_new < bestMaxSpread) {
460         bestMaxSpread = maxSpread_new;
461         bestOffset = m;
462       }
463       if (m == dimSizes[i] - 2) {
464         // did max number of possible movements (another movement would return us to original
465         // coordinates/offset), exit loop
466         if (maxSpread_new > bestMaxSpread) {
467           for (int j=0; j < nodes.size(); j++) {
468             // roll back to bestOffset
469             int &x = nodes[j].coords[i];
470             x += ((m - bestOffset)*-direction);
471             x = modulo(x, dimSizes[i]);
472           }
473         }
474 #if _DEBUG_SPANNING_TREE_
475         if ((CkMyNode() == 0) && (bestMaxSpread < maxSpread[i])) CkPrintf("Improved to %d max spread\n", bestMaxSpread);
476 #endif
477         break;   // we're done correcting in this dimension
478       }
479     }
480   }
482 #endif
484 template class ST_RecursivePartition<int*>;
485 template class ST_RecursivePartition<std::vector<int>::iterator>;
487 // ------------------------------------------------------------------
488 typedef std::vector<int>::iterator TreeIterator;
490 void getNeighborsTopoTree_R(TreeIterator start, TreeIterator end, int myElem, int prevLvlParent,
491                             bool nodeTree, unsigned int bfactor, CmiSpanningTreeInfo &t)
493   ST_RecursivePartition<TreeIterator> tb(nodeTree, prevLvlParent != -1);
494   int numSubtrees = tb.buildSpanningTree(start, end, std::min(bfactor, (unsigned int)std::distance(start,end)-1));
495   if (myElem == *start) {
496     // I am the root of this subtree so we're done (collect my children and return)
497     t.parent = prevLvlParent;
498     if (numSubtrees > 0) t.children = (int*)malloc(sizeof(int)*numSubtrees);
499     t.child_count = numSubtrees;
500     for (int i=0; i < numSubtrees; i++) t.children[i] = *tb.begin(i);
501     return;
502   }
503   // find in which subtree myElem is in and recursively continue on only that subtree
504   for (int i=0; i < numSubtrees; i++) {
505     TreeIterator subtreeStart = tb.begin(i), subtreeEnd = tb.end(i);
506     TreeIterator f = std::find(subtreeStart, subtreeEnd, myElem);
507     if (f != subtreeEnd) {
508       getNeighborsTopoTree_R(subtreeStart, subtreeEnd, myElem, *start, nodeTree, bfactor, t);
509       break;
510     }
511   }
515  * Obtain parent and children of 'myNode' in tree rooted at 'rootNode', using
516  * ST_RecursivePartition algorithm. Spanning tree assumed to cover all nodes.
517  */
518 void getNodeNeighborsTopoTree(int rootNode, int myNode, CmiSpanningTreeInfo &t, unsigned int bfactor)
520   std::vector<int> nodes;
521   nodes.reserve(CkNumNodes());
522   nodes.push_back(rootNode);
523   for (int i=0; i < CkNumNodes(); i++) {
524     if (i == rootNode) continue;
525     nodes.push_back(i);
526   }
527   getNeighborsTopoTree_R(nodes.begin(), nodes.end(), myNode, -1, true, bfactor, t);
530 /// same as above but for processors
531 void getProcNeighborsTopoTree(int rootPE, int myPE, CmiSpanningTreeInfo &t, unsigned int bfactor)
533   std::vector<int> pes;
534   pes.reserve(CkNumPes());
535   pes.push_back(rootPE);
536   for (int i=0; i < CkNumPes(); i++) {
537     if (i == rootPE) continue;
538     pes.push_back(i);
539   }
540   getNeighborsTopoTree_R(pes.begin(), pes.end(), myPE, -1, false, bfactor, t);
543 #if CMK_USING_XLC
544   typedef std::tr1::unordered_map<int,CmiSpanningTreeInfo*> TreeInfoMap;
545 #else
546   typedef std::unordered_map<int,CmiSpanningTreeInfo*> TreeInfoMap;
547 #endif
549 static TreeInfoMap trees;
550 CmiNodeLock _treeLock;
552 CmiSpanningTreeInfo *ST_RecursivePartition_getTreeInfo(int root) {
553   if (trees.size() == 0) {
554     _treeLock = CmiCreateLock();
555 #if CMK_ERROR_CHECKING
556     if (CkMyRank() != 0) CkAbort("First call to getTreeInfo has to be by rank 0");
557 #endif
558   }
559   CmiLock(_treeLock);
560   TreeInfoMap::iterator it = trees.find(root);
561   if (it != trees.end()) {
562     CmiSpanningTreeInfo *t = it->second;
563     CmiUnlock(_treeLock);
564     return t;
565   } else {
566     CmiSpanningTreeInfo *t = new CmiSpanningTreeInfo;
567     t->children = NULL;
568     trees[root] = t;
569     getNodeNeighborsTopoTree(root, CkMyNode(), *t, 4);
570     CmiUnlock(_treeLock);
571     return t;
572   }
575 void get_topo_tree_nbs(int root, int *parent, int *child_count, int **children) {
576   CmiSpanningTreeInfo *t = ST_RecursivePartition_getTreeInfo(root);
577   *parent = t->parent;
578   *child_count = t->child_count;
579   *children = t->children;