Enable topology (or physical node) aware spanning tree for array reductions
[charm.git] / src / ck-core / ckreduction.C
blobd4ef31f7b2c637c94dbe521c5ee0a748b9bb88dc
1 /*
2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE.  This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
12 Included here are the classes:
13   -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16   -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's.  It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
21 a processor.
22   -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24    -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.   
28 In the reduction manager, there are several counters used:
29   -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction.  When a reduction completes, it increments
31 redNo.
32   -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution.  Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35   -lcount is the number of contributors on this PE.  When
36 an element migrates away, lcount decreases.  lcount is also the number
37 of contributions to wait for before reducing and sending up.
38   -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same.  Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE.  To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
50 #include "charm++.h"
51 #include "ck.h"
53 #include "pathHistory.h"
55 #if CMK_DEBUG_REDUCTIONS
56 //Debugging messages:
57 // Reduction mananger internal information:
58 #define DEBR(x) CkPrintf x
59 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
60 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
62 #define DEBN(x) CkPrintf x
63 #define AAN "Red Node%d "
64 #define ABN ,CkMyNode()
66 // For status and data messages from the builtin reducer functions.
67 #define RED_DEB(x) //CkPrintf x
68 #define DEBREVAC(x) CkPrintf x
69 #define DEB_TUPLE(x) CkPrintf x
70 #else
71 //No debugging info-- empty defines
72 #define DEBR(x) // CkPrintf x
73 #define DEBRMLOG(x) CkPrintf x
74 #define AA
75 #define AB
76 #define DEBN(x) //CkPrintf x
77 #define RED_DEB(x) //CkPrintf x
78 #define DEBREVAC(x) //CkPrintf x
79 #define DEB_TUPLE(x) //CkPrintf x
80 #endif
82 #ifndef INT_MAX
83 #define INT_MAX 2147483647
84 #endif
86 extern bool _inrestart;
88 Group::Group():thisIndex(CkMyPe())
90         if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
92         creatingContributors();
93         contributorStamped(&reductionInfo);
94         contributorCreated(&reductionInfo);
95         doneCreatingContributors();
98 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg),thisIndex(CkMyPe())
100         creatingContributors();
101         contributorStamped(&reductionInfo);
102         contributorCreated(&reductionInfo);
103         doneCreatingContributors();
106 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
107                                     ((CkReductionMgr *)this),
108                                     reductionInfo,false)
109 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
111 CK_BARRIER_CONTRIBUTE_METHODS_DEF(Group,
112                                    ((CkReductionMgr *)this),
113                                    reductionInfo,false)
117 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 The callback is just used to tell the caller that this group
120 has been constructed.  (Now they can safely call CkLocalBranch)
122 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
124   m->call();
125   delete m;
129 The callback is just used to tell the caller that this group
130 is constructed and ready to process other calls.
132 CkGroupReadyCallback::CkGroupReadyCallback(void)
134   _isReady = false;
136 void
137 CkGroupReadyCallback::callBuffered(void)
139   int n = _msgs.length();
140   for(int i=0;i<n;i++)
141   {
142     CkGroupCallbackMsg *msg = _msgs.deq();
143     msg->call();
144     delete msg;
145   }
147 void
148 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
150   if(_isReady) {
151     msg->call();
152     delete msg;
153   } else {
154     _msgs.enq(msg);
155   }
158 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
159         :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
160 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162         CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
163         CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
164         b->fn(b->param,m->getSize(),m->getData());
165         delete m;
168 ///////////////// Reduction Manager //////////////////
170 One CkReductionMgr runs a non-overlapping set of reductions.
171 It collects messages from all local contributors, then sends
172 the reduced message up the reduction tree to node zero, where
173 they're passed to the user's client function.
176 CkReductionMgr::CkReductionMgr()
177   :
178   thisProxy(thisgroup),
179   isDestroying(false)
181 #ifdef BINOMIAL_TREE
182   init_BinomialTree();
183 #else
184   init_TopoTree();
185 #endif
186   redNo=0;
187   completedRedNo = -1;
188   inProgress=false;
189   creating=false;
190   startRequested=false;
191   gcount=lcount=0;
192   nContrib=nRemote=0;
193   is_inactive = false;
194   maxStartRequest=0;
195 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
196         numImmigrantRecObjs = 0;
197         numEmigrantRecObjs = 0;
198 #endif
199   disableNotifyChildrenStart = false;
201   barrier_gCount=0;
202   barrier_nSource=0;
203   barrier_nContrib=barrier_nRemote=0;
205   DEBR((AA "In reductionMgr constructor at %d \n" AB,this));
208 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
209                                                     , isDestroying(false)
211   numKids = -1;
212   redNo=0;
213   completedRedNo = -1;
214   inProgress=false;
215   creating=false;
216   startRequested=false;
217   gcount=lcount=0;
218   nContrib=nRemote=0;
219   is_inactive = false;
220   maxStartRequest=0;
221   DEBR((AA "In reductionMgr migratable constructor at %d \n" AB,this));
223   barrier_gCount=0;
224   barrier_nSource=0;
225   barrier_nContrib=barrier_nRemote=0;
227 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
228   numImmigrantRecObjs = 0;
229   numEmigrantRecObjs = 0;
230 #endif
234 CkReductionMgr::~CkReductionMgr()
238 void CkReductionMgr::flushStates()
240   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
241   redNo=0;
242   completedRedNo = -1;
243   inProgress=false;
244   creating=false;
245   startRequested=false;
246   nContrib=nRemote=0;
247   maxStartRequest=0;
249   while (!msgs.isEmpty()) { delete msgs.deq(); }
250   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
251   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
252   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
254   adjVec.length()=0;
258 //////////// Reduction Manager Client API /////////////
260 //Add the given client function.  Overwrites any previous client.
261 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
263   DEBR((AA "Setting reductionClient in ReductionMgr groupid %d\n" AB,thisgroup.idx));
265   if (CkMyPe()!=0)
266           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
267   storedCallback=*cb;
270 ///////////////////////////// Contributor ////////////////////////
271 //Contributors keep a copy of this structure:
273 /*Contributor migration support:
275 void contributorInfo::pup(PUP::er &p)
277   p(redNo);
280 ////////////////////// Contributor list maintainance: /////////////////
281 //These just set and clear the "creating" flag to prevent
282 // reductions from finishing early because not all elements
283 // have been created.
284 void CkReductionMgr::creatingContributors(void)
286   DEBR((AA "Creating contributors...\n" AB));
287   creating=true;
289 void CkReductionMgr::doneCreatingContributors(void)
291   DEBR((AA "Done creating contributors...\n" AB));
292   creating=false;
293   checkIsActive();
294   if (startRequested) startReduction(redNo,CkMyPe());
295   finishReduction();
298 //A new contributor will be created
299 void CkReductionMgr::contributorStamped(contributorInfo *ci)
301   DEBR((AA "Contributor %p stamped\n" AB,ci));
302   //There is another contributor
303   gcount++;
304   if (inProgress)
305   {
306     ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
307     adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
308   } else
309     ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
312 //A new contributor was actually created
313 void CkReductionMgr::contributorCreated(contributorInfo *ci)
315   DEBR((AA "Contributor %p created in grp %d\n" AB,ci,thisgroup.idx));
316   //We've got another contributor
317   lcount++;
318   //He may not need to contribute to some of our reductions:
319   for (int r=redNo;r<ci->redNo;r++)
320     adj(r).lcount--;//He won't be contributing to r here
321   checkIsActive();
324 /*Don't expect any more contributions from this one.
325 This is rather horrifying because we now have to make
326 sure the global element count accurately reflects all the
327 contributions the element made before it died-- these may stretch
328 far into the future.  The adj() vector is what saves us here.
330 void CkReductionMgr::contributorDied(contributorInfo *ci)
332 #if CMK_MEM_CHECKPOINT
333   // ignore from listener if it is during restart from crash
334   if (CkInRestarting()) return;
335 #endif
337   if (isDestroying) return;
339   DEBR((AA "Contributor %p(%d) died\n" AB,ci,ci->redNo));
340   //We lost a contributor
341   gcount--;
343   if (ci->redNo<redNo)
344   {//Must have been migrating during reductions-- root is waiting for his
345   // contribution, which will never come.
346     DEBR((AA "Dying guy %p must have been migrating-- he's at #%d!\n" AB,ci,ci->redNo));
347     for (int r=ci->redNo;r<redNo;r++)
348       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
349   }
351   //Add to the global count for all his future messages (wherever they are)
352   int r;
353   for (r=redNo;r<ci->redNo;r++)
354   {//He already contributed to this reduction, but won't show up in global count.
355     DEBR((AA "Dead guy %p left contribution for #%d\n" AB,ci,r));
356     adj(r).gcount++;
357   }
359   lcount--;
360   //He's already contributed to several reductions here
361   for (r=redNo;r<ci->redNo;r++)
362     adj(r).lcount++;//He'll be contributing to r here
364   // Check whether the death of this contributor made this pe go barren at this
365   // redNo
366   if (ci->redNo <= redNo) {
367     checkIsActive();
368   }
369   finishReduction();
372 //Migrating away (note that global count doesn't change)
373 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
375   DEBR((AA "Contributor %p(%d) migrating away\n" AB,ci,ci->redNo));
376   lcount--;//We lost a local
377   //He's already contributed to several reductions here
378   for (int r=redNo;r<ci->redNo;r++)
379     adj(r).lcount++;//He'll be contributing to r here
381   // Check whether this made this pe go barren at redNo
382   if (ci->redNo <= redNo) {
383     checkIsActive();
384   }
385   finishReduction();
388 //Migrating in (note that global count doesn't change)
389 void CkReductionMgr::contributorArriving(contributorInfo *ci)
391   DEBR((AA "Contributor %p(%d) migrating in\n" AB,ci,ci->redNo));
392   lcount++;//We gained a local
393 #if CMK_MEM_CHECKPOINT
394   // ignore from listener if it is during restart from crash
395   // because the ci may be old.
396   if (CkInRestarting()) return;
397 #endif
398   //He has already contributed (elsewhere) to several reductions:
399   for (int r=redNo;r<ci->redNo;r++)
400     adj(r).lcount--;//He won't be contributing to r here
402   // Check if the arrival of a new contributor makes this PE become active again
403   if (ci->redNo == redNo) {
404     checkIsActive();
405   }
408 //Contribute-- the given msg can contain any data.  The reducerType
409 // field of the message must be valid.
410 // Each contributor must contribute exactly once to the each reduction.
411 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
413 #if CMK_BIGSIM_CHARM
414   _TRACE_BG_TLINE_END(&(m->log));
415 #endif
416   DEBR((AA "Contributor %p contributed for %d in grp %d ismigratable %d \n" AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
417   //m->ci=ci;
418   m->redNo=ci->redNo++;
419   m->sourceFlag=-1;//A single contribution
420   m->gcount=0;
422 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
424         // if object is an immigrant recovery object, we send the contribution to the source PE
425         if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
426                 
427                 // turning on the message-logging bypass flag
428                 envelope *env = UsrToEnv(m);
429                 env->flags = env->flags | CK_BYPASS_DET_MLOG;
430         thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
431                 return;
432         }
434     Chare *oldObj = CpvAccess(_currentObj);
435     CpvAccess(_currentObj) = this;
437         // adding contribution
438         addContribution(m);
440     CpvAccess(_currentObj) = oldObj;
441 #else
442   addContribution(m);
443 #endif
446 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
447 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
448         //if(CkMyPe() == 2) CkPrintf("[%d] ---> Contributing Via Message\n",CkMyPe());
449         
450         // turning off bypassing flag
451         envelope *env = UsrToEnv(m);
452         env->flags = env->flags & ~CK_BYPASS_DET_MLOG;
454         // adding contribution
455     addContribution(m);
457 #else
458 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
459 #endif
461 void CkReductionMgr::checkIsActive() {
462 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
463   return;
464 #endif
466   // Check the number of kids in the inactivelist before or at this redNo
467   std::map<int, int>::iterator it;
468   int c_inactive = 0;
469   for (it = inactiveList.begin(); it != inactiveList.end(); it++) {
470     if (it->second <= redNo) {
471       DEBR((AA "Kid %d is inactive from redNo %d\n" AB, it->first, it->second));
472       c_inactive++;
473     }
474   }
475   DEBR((AA "CheckIsActive redNo %d, kids %d(inactive %d), lcount %d\n" AB, redNo,
476     numKids, c_inactive, lcount));
478   if(numKids == c_inactive && lcount == 0) {
479     if(!is_inactive) {
480       informParentInactive();
481     }
482     is_inactive = true;
483   } else if(is_inactive) {
484     is_inactive = false;
485   }
489 * Add to the child to the inactiveList
491 void CkReductionMgr::checkAndAddToInactiveList(int id, int red_no) {
492   // If there is already a reduction in progress corresponding to red_no, then
493   // the time to call ReductionStarting is past so explicitly invoke
494   // ReductionStarting on the kid
495   if (inProgress && redNo == red_no) {
496     thisProxy[id].ReductionStarting(new CkReductionNumberMsg(red_no));
497   }
499   std::map<int, int>::iterator it;
500   it = inactiveList.find(id);
501   if (it == inactiveList.end()) {
502     inactiveList.insert(std::pair<int, int>(id, red_no));
503   } else {
504     it->second = red_no;
505   }
506   // If the red_no is redNo, then check whether this makes this PE inactive
507   if (redNo == red_no) {
508     checkIsActive();
509   }
513 * This is invoked when a real contribution is received from the kid for a
514 * particular red_no
516 void CkReductionMgr::checkAndRemoveFromInactiveList(int id, int red_no) {
517   std::map<int, int>::iterator it;
518   it = inactiveList.find(id);
519   if (it == inactiveList.end()) {
520     return;
521   }
522   if (it->second <= red_no) {
523     inactiveList.erase(it);
524     DEBR((AA "Parent removing kid %d from inactivelist red_no %d\n" AB,
525       id, red_no));
526   }
529 // Inform parent that I am inactive
530 void CkReductionMgr::informParentInactive() {
531   if (hasParent()) {
532     DEBR((AA "Inform parent to add to inactivelist red_no %d\n" AB, redNo));
533     thisProxy[treeParent()].AddToInactiveList(
534       new CkReductionInactiveMsg(CkMyPe(), redNo));
535   }
539 *  Send ReductionStarting message to all the inactive kids which are inactive
540 *  for the specified red_no
542 void CkReductionMgr::sendReductionStartingToKids(int red_no) {
543 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
544   for (int k=0;k<treeKids();k++)
545   {
546     DEBR((AA "Asking child PE %d to start #%d\n" AB,kids[k],redNo));
547     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
548   }
549 #else
550   std::map<int, int>::iterator it;
551   for (it = inactiveList.begin(); it != inactiveList.end(); it++) {
552     if (it->second <= red_no) {
553       DEBR((AA "Parent sending reductionstarting to inactive kid %d\n" AB,
554         it->first));
555       thisProxy[it->first].ReductionStarting(new CkReductionNumberMsg(red_no));
556     }
557   }
558 #endif
562 //////////// Reduction Manager Remote Entry Points /////////////
563 //Sent down the reduction tree (used by barren PEs)
564 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
566  if(CkMyPe()==0){
567         //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
568         //delete m;
569         //return;
571  DEBR((AA " Group ReductionStarting called for redNo %d\n" AB,m->num));
572  int srcPE = (UsrToEnv(m))->getSrcPe();
573   if (isPresent(m->num) && !inProgress)
574   {
575     DEBR((AA "Starting reduction #%d at parent's request\n" AB,m->num));
576     startReduction(m->num,srcPE);
577     finishReduction();
578   } else if (isFuture(m->num)){
579 //   CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
580           DEBR((AA "Asked to startfuture Reduction %d \n" AB,m->num));
581           if(maxStartRequest < m->num){
582                   maxStartRequest = m->num;
583           }
584  //   CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
585           
586     }
587   else //is Past
588     DEBR((AA "Ignoring parent's late request to start #%d\n" AB,m->num));
589   delete m;
592 //Sent to root of reduction tree with reduction contribution
593 // of migrants that missed the main reduction.
594 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
596 #if CMK_BIGSIM_CHARM
597   _TRACE_BG_TLINE_END(&(m->log));
598 #endif
599   addContribution(m);
602 //A late migrating contributor will never contribute to this reduction
603 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
605   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
606   DEBR((AA "Migrant died before contributing to #%d\n" AB,m->num));
607  // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());                         
608   adj(m->num).gcount--;//He won't be contributing to this one.
609   finishReduction();
610   delete m;
613 //////////// Reduction Manager State /////////////
614 void CkReductionMgr::startReduction(int number,int srcPE)
616   if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
617   if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
618   if (inProgress){
619         DEBR((AA "This reduction is already in progress\n" AB));
620         return;//This reduction already started
621   }
622   if (creating) //Don't start yet-- we're creating elements
623   {
624     DEBR((AA "Postponing start request #%d until we're done creating\n" AB,redNo));
625     startRequested=true;
626     return;
627   }
629 //If none of these cases, we need to start the reduction--
630   DEBR((AA "Starting reduction #%d  %d %d \n" AB,redNo,completedRedNo,number));
631   inProgress=true;
634         /*
635                 FAULT_EVAC
636         */
637   if(!CmiNodeAlive(CkMyPe())){
638         return;
639   }
641   // TODO: This is currently gotten from the command-line...but it could
642   // probably just be determined at runtime. I don't know if the CL option is
643   // even documented anywhere.
644   if(disableNotifyChildrenStart) return;
646   //Sent start requests to our kids (in case they don't already know)
647   sendReductionStartingToKids(redNo);
648 }       
650 /*Handle a message from one element for the reduction*/
651 void CkReductionMgr::addContribution(CkReductionMsg *m)
653   if (isPast(m->redNo))
654   {
655 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
656         CmiAbort("this version should not have late migrations");
657 #else
658         //We've moved on-- forward late contribution straight to root
659     DEBR((AA "Migrant gives late contribution for #%d!\n" AB,m->redNo));
660         // if (!hasParent()) //Root moved on too soon-- should never happen
661         //   CkAbort("Late reduction contribution received at root!\n");
662     thisProxy[0].LateMigrantMsg(m);
663 #endif
664   }
665   else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
666     DEBR((AA "Contributor gives early contribution-- for #%d\n" AB,m->redNo));
667     futureMsgs.enq(m);
668   } else {// An ordinary contribution
669     DEBR((AA "Recv'd local contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
670    // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
671     startReduction(m->redNo,CkMyPe());
672     msgs.enq(m);
673     nContrib++;
674     finishReduction();
675   }
678 /**function checks if it has got all contributions that it is supposed to
679 get at this processor. If it is done it sends the reduced result to the local
680 nodegroup */
681 void CkReductionMgr::finishReduction(void)
683   /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
684   DEBR((AA "in finishReduction (inProgress=%d) in grp %d\n" AB,inProgress,thisgroup.idx));
685   if ((!inProgress) || creating){
686         DEBR((AA "Either not in Progress or creating\n" AB));
687         return;
688   }
690   bool partialReduction = false;
692   //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
693 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
694         if (nContrib<(lcount+adj(redNo).lcount) - numImmigrantRecObjs + numEmigrantRecObjs){
695           if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
696             partialReduction = true;
697           }
698           else {
699             DEBR((AA "Need more local messages %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
700             return; //Need more local messages
701           }
702         }
703 #else
704   if (nContrib<(lcount+adj(redNo).lcount)){
705          if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
706            partialReduction = true;
707          }
708          else {
709            DEBR((AA "Need more local messages %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
710            return; //Need more local messages
711          }
712   }
713 #endif
715   if (nRemote<treeKids()) {
716     if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
717       partialReduction = true;
718     }
719     else {
720       DEBR((AA "Need more remote messages %d %d\n" AB,nRemote,treeKids()));
721       return; //Need more remote messages
722     }
723   }
724         
726   DEBR((AA "Reducing data... %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
727   CkReductionMsg *result=reduceMessages();
728   result->redNo=redNo;
730   if (partialReduction) {
731     msgs.enq(result);
732     return;
733   }
735   if (hasParent())
736   {//Pass data up tree to parent
737     DEBR((AA "Passing reduced data up to parent node %d.\n" AB,treeParent()));
738     DEBR((AA "Message gcount is %d+%d+%d.\n" AB,result->gcount,gcount,adj(redNo).gcount));
739 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
740     result->gcount+=gcount+adj(redNo).gcount;
741 #else
742     result->gcount+=gcount+adj(redNo).gcount;
743 #endif
744     thisProxy[treeParent()].RecvMsg(result);
745   }
746   else 
747   {//We are root-- pass data to client
748     DEBR((AA "Final gcount is %d+%d+%d.\n" AB,result->gcount,gcount,adj(redNo).gcount));
749     int totalElements=result->gcount+gcount+adj(redNo).gcount;
750     if (totalElements>result->nSources()) 
751     {
752       DEBR((AA "Only got %d of %d contributions (c'mon, migrators!)\n" AB,result->nSources(),totalElements));
753       msgs.enq(result);
754       return; // Wait for migrants to contribute
755     } else if (totalElements<result->nSources()) {
756       DEBR((AA "Got %d of %d contributions\n" AB,result->nSources(),totalElements));
757 #if !defined(_FAULT_CAUSAL_)
758       CkAbort("ERROR! Too many contributions at root!\n");
759 #endif
760     }
761     DEBR((AA "Passing result to client function\n" AB));
762     CkSetRefNum(result, result->getUserFlag());
763     if (!result->callback.isInvalid())
764             result->callback.send(result);
765     else if (!storedCallback.isInvalid())
766             storedCallback.send(result);
767     else
768             CkAbort("No reduction client!\n"
769                     "You must register a client with either SetReductionClient or during contribute.\n");
770   }
773   //House Keeping Operations will have to check later what needs to be changed
774   redNo++;
775   // Check after every reduction contribution whether this makes the PE inactive
776   // starting this redNo
777   checkIsActive();
778   //Shift the count adjustment vector down one slot (to match new redNo)
779   int i;
780   completedRedNo++;
781   for (i=1;i<(int)(adjVec.length());i++){
782     adjVec[i-1]=adjVec[i];
783   }
784   adjVec.length()--;
786   inProgress=false;
787   startRequested=false;
788   nRemote=nContrib=0;
790   //Look through the future queue for messages we can now handle
791   int n=futureMsgs.length();
792   for (i=0;i<n;i++)
793   {
794     CkReductionMsg *m=futureMsgs.deq();
795     if (m!=NULL) //One of these addContributions may have finished us.
796       addContribution(m);//<- if *still* early, puts it back in the queue
797   }
798   n=futureRemoteMsgs.length();
799   for (i=0;i<n;i++)
800   {
801     CkReductionMsg *m=futureRemoteMsgs.deq();
802     if (m!=NULL) {
803       RecvMsg(m);//<- if *still* early, puts it back in the queue
804     }
805   }
807   if(maxStartRequest >= redNo){
808           startReduction(redNo,CkMyPe());
809           finishReduction();
810   }
815 //Sent up the reduction tree with reduced data
816   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
818 #if CMK_BIGSIM_CHARM
819   _TRACE_BG_TLINE_END(&m->log);
820 #endif
821   if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
822     DEBR((AA "Recv'd remote contribution %d for #%d\n" AB,nRemote,m->redNo));
823     // If the remote contribution is real, then check whether we can remove the
824     // child from the inactiveList if it is in the list
825     if (m->nSources() > 0) {
826       checkAndRemoveFromInactiveList(m->fromPE, m->redNo);
827     }
828     startReduction(m->redNo, CkMyPe());
829     msgs.enq(m);
830     nRemote++;
831     finishReduction();
832   }
833   else if (isFuture(m->redNo)) {
834     DEBR((AA "Recv'd early remote contribution %d for #%d\n" AB,nRemote,m->redNo));
835     futureRemoteMsgs.enq(m);
836   } 
837   else CkAbort("Recv'd late remote contribution!\n");
840 void CkReductionMgr::AddToInactiveList(CkReductionInactiveMsg *m) {
841   int id = m->id;
842   int last_redno = m->redno;
843   delete m;
845   DEBR((AA "Parent add kid %d to inactive list from redno %d\n" AB,
846     id, last_redno));
847   checkAndAddToInactiveList(id, last_redno);
849   finishReduction();
850   if (last_redno <= redNo) {
851     checkIsActive();
852   }
855 //////////// Reduction Manager Utilities /////////////
857 //Return the countAdjustment struct for the given redNo:
858 countAdjustment &CkReductionMgr::adj(int number)
860   number-=completedRedNo;
861   number--;
862   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
863   //Pad the adjustment vector with zeros until it's at least number long
864   while ((int)(adjVec.length())<=number)
865     adjVec.push_back(countAdjustment());
866   return adjVec[number];
869 //Combine (& free) the current message vector msgs.
870 CkReductionMsg *CkReductionMgr::reduceMessages(void)
872 #if CMK_BIGSIM_CHARM
873   _TRACE_BG_END_EXECUTE(1);
874   void* _bgParentLog = NULL;
875   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
876 #endif
877   CkReductionMsg *ret=NULL;
879   //Look through the vector for a valid reducer, swapping out placeholder messages
880   CkReduction::reducerType r=CkReduction::invalid;
881   int msgs_gcount=0;//Reduced gcount
882   int msgs_nSources=0;//Reduced nSources
883   CMK_REFNUM_TYPE msgs_userFlag=(CMK_REFNUM_TYPE)-1;
884   CkCallback msgs_callback;
885   int i;
886   int nMsgs=0;
887   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
888   CkReductionMsg *m;
889   bool isMigratableContributor;
891   // Copy message queue into msgArr, skipping placeholders:
892   while (NULL!=(m=msgs.deq()))
893   {
894     msgs_gcount+=m->gcount;
895     if (m->sourceFlag!=0)
896     { //This is a real message from an element, not just a placeholder
897       msgs_nSources+=m->nSources();
898 #if CMK_BIGSIM_CHARM
899       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
900 #endif
902       // for "nop" reducer type, only need to accept one message
903       if (nMsgs == 0 || m->reducer != CkReduction::nop) {
904         msgArr[nMsgs++]=m;
905         r=m->reducer;
906         if (!m->callback.isInvalid()){
907 #if CMK_ERROR_CHECKING
908           if(nMsgs > 1 && !(msgs_callback == m->callback))
909             CkAbort("mis-matched client callbacks in reduction messages\n");
910 #endif
911           msgs_callback=m->callback;
912         }
913         if (m->userFlag!=(CMK_REFNUM_TYPE)-1)
914           msgs_userFlag=m->userFlag;
915         isMigratableContributor=m->isMigratableContributor();
916       }
917       else {
918 #if CMK_ERROR_CHECKING
919         if(!(msgs_callback == m->callback))
920           CkAbort("mis-matched client callbacks in reduction messages\n");
921 #endif  
922         delete m;
923       }
924     }
925     else
926     { //This is just a placeholder message-- forget it
927       delete m;
928     }
929   }
931   if (nMsgs==0||r==CkReduction::invalid)
932   //No valid reducer in the whole vector
933     ret=CkReductionMsg::buildNew(0,NULL);
934   else
935   {//Use the reducer to reduce the messages
936                 //if there is only one msg to be reduced just return that message
937     if(nMsgs == 1 &&
938        msgArr[0]->reducer != CkReduction::set &&
939        msgArr[0]->reducer != CkReduction::tuple) {
940       ret = msgArr[0];
941     }else{
942       if (msgArr[0]->reducer == CkReduction::nop) {
943         // nMsgs > 1 indicates that reduction type is not nop
944         // this means any data with reducer type nop was submitted
945         // only so that counts would agree, and can be removed
946         delete msgArr[0];
947         msgArr[0] = msgArr[nMsgs - 1];
948         nMsgs--;
949       }
950       CkReduction::reducerFn f=CkReduction::reducerTable()[r].fn;
951       ret=(*f)(nMsgs,msgArr);
952     }
953     ret->reducer=r;
954   }
958 #if USE_CRITICAL_PATH_HEADER_ARRAY
960 #if CRITICAL_PATH_DEBUG > 3
961   CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
962 #endif
964   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
965   path.updateMax(UsrToEnv(ret));
966   // Combine the critical paths from all the reduction messages
967   for (i=0;i<nMsgs;i++){
968     if (msgArr[i]!=ret){
969       //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
970       path.updateMax(UsrToEnv(msgArr[i]));
971     }
972   }
973   
975 #if CRITICAL_PATH_DEBUG > 3
976   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
977 #endif
978   
979   PathHistoryTableEntry tableEntry(path);
980   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
981   
982 #endif
984         //Go back through the vector, deleting old messages
985   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
986   delete [] msgArr;
988   //Set the message counts
989   ret->redNo=redNo;
990   ret->gcount=msgs_gcount;
991   ret->userFlag=msgs_userFlag;
992   ret->callback=msgs_callback;
993   ret->sourceFlag=msgs_nSources;
994         ret->setMigratableContributor(isMigratableContributor);
995   ret->fromPE = CkMyPe();
996   DEBR((AA "Reduced gcount=%d; sourceFlag=%d\n" AB,ret->gcount,ret->sourceFlag));
998   return ret;
1002 //Checkpointing utilities
1003 //pack-unpack method for CkReductionMsg
1004 //if packing pack the message and then unpack and return it
1005 //if unpacking allocate memory for it read it off disk and then unapck
1006 //and return it
1007 void CkReductionMgr::pup(PUP::er &p)
1009 //We do not store the client function pointer or the client function parameter,
1010 //it is the responsibility of the programmer to correctly restore these
1011   CkGroupInitCallback::pup(p);
1012   p(redNo);
1013   p(completedRedNo);
1014   p(inProgress); p(creating); p(startRequested);
1015   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
1016   p|msgs;
1017   p|futureMsgs;
1018   p|futureRemoteMsgs;
1019   p|finalMsgs;
1020   p|adjVec;
1021   p|storedCallback;
1022     // handle CkReductionClientBundle
1023   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
1024   {
1025     CkReductionClientBundle *bd;
1026     if (p.isUnpacking()) 
1027       bd = new CkReductionClientBundle;
1028     else
1029       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
1030     p|*bd;
1031     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
1032   }
1034   // subtle --- Gengbin
1035   // Group : CkReductionMgr
1036   // CkArray: CkReductionMgr
1037   // lcount/gcount in Group is set in Group constructor
1038   // lcount/gcount in CkArray is not, it is set when array elements are created
1039   // we can not pup because inserting array elems will add the counters again
1040 //  p|lcount;
1041 //  p|gcount;
1042 //  p|lcount;
1043 //  //  p|gcount;
1044 //  //  printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
1045   if(p.isUnpacking()){
1046     thisProxy = thisgroup;
1047     maxStartRequest=0;
1048 #ifdef BINOMIAL_TREE
1049     init_BinomialTree();
1050 #else
1051     init_TopoTree();
1052 #endif
1053     is_inactive = false;
1054     checkIsActive();
1055   }
1057   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1060 void CkReductionMgr::init_BinaryTree(){
1061   if (CkNodeSize(CkMyNode()) > 1 && CkNodeFirst(CkMyNode()) != CkMyPe()) {
1062     parent = CkNodeFirst(CkMyNode());
1063     numKids = 0;
1064   } else {
1065     int parentNode = (CkMyNode()-1)/TREE_WID;
1066     parent = CkMyNode() > 0 ? CkNodeFirst(parentNode) : -1;
1067     // Add nodes that are my children
1068     int firstKid = CkMyNode()*TREE_WID+1;
1069     numKids=CkNumNodes()-firstKid;
1070     if (numKids > TREE_WID) numKids = TREE_WID;
1071     if (numKids < 0) numKids = 0;
1072     for (int i = 0; i < numKids; i++) {
1073       kids.push_back(CkNodeFirst(firstKid+i));
1074       newKids.push_back(CkNodeFirst(firstKid+i));
1075     }
1077     // Add PEs on my node, which are also my children
1078     numKids += CkNodeSize(CkMyNode())-1;
1079     for (int i = 1; i < CkNodeSize(CkMyNode()); i++) {
1080       kids.push_back(CkMyPe()+i);
1081       newKids.push_back(CkMyPe()+i);
1082     }
1083   }
1086 void CkReductionMgr::init_TopoTree() {
1087   if (CkNodeSize(CkMyNode()) > 1 && CkNodeFirst(CkMyNode()) != CkMyPe()) {
1088     parent = CkNodeFirst(CkMyNode());
1089     numKids = 0;
1090   } else {
1091     if (_topoTree == NULL) CkAbort("CkReductionMgr:: topo tree has not been calculated\n");
1093     CmiSpanningTreeInfo &t = *_topoTree;
1094     if (t.parent != -1) parent = CkNodeFirst(t.parent);
1095     else parent = -1;
1096     numKids = t.child_count;
1097     for (int i=0; i < numKids; i++) {
1098       int child = CkNodeFirst(t.children[i]);
1099       kids.push_back(child);
1100       newKids.push_back(child);
1101     }
1103     // Add PEs on my node, which are also my children
1104     numKids += CkNodeSize(CkMyNode())-1;
1105     for (int i = 1; i < CkNodeSize(CkMyNode()); i++) {
1106       kids.push_back(CkMyPe()+i);
1107       newKids.push_back(CkMyPe()+i);
1108     }
1109   }
1112 void CkReductionMgr::init_BinomialTree(){
1113   if (CkNodeSize(CkMyNode()) > 1 && CkNodeFirst(CkMyNode()) != CkMyPe()) {
1114     parent = CkNodeFirst(CkMyNode());
1115     numKids = 0;
1116   } else {
1117     int depth = (int)ceil((log((double)CkNumNodes())/log((double)2)));
1118     upperSize = (unsigned) 1 << depth;
1119     label = upperSize-CkNodeFirst(CkMyNode())-1;
1120     int p=label;
1121     int count=0;
1122     while( p > 0){
1123       if(p % 2 == 0)
1124         break;
1125       else{
1126         p = p/2;
1127         count++;
1128       }
1129     }
1130     parent = label + (1<<count);
1131     parent = upperSize - 1 - parent;
1132     int temp;
1133     if(count != 0){
1134       numKids = 0;
1135       for(int i=0;i<count;i++){
1136         temp = label - (1<<i);
1137         temp = upperSize - 1 - temp;
1138         if(temp <= CkNumPes()-1){
1139           kids.push_back(temp);
1140           numKids++;
1141         }
1142       }
1143     }else{
1144       numKids = 0;
1145     }
1146   }
1150 int CkReductionMgr::treeRoot(void)
1152   return 0;
1154 bool CkReductionMgr::hasParent(void) //Root Node
1156   return (bool)(CkMyPe()!=treeRoot());
1158 int CkReductionMgr::treeParent(void) //My parent Node
1160   return parent;
1162 int CkReductionMgr::treeKids(void)//Number of children in tree
1164   return numKids;
1168 //                simple "stateless" barrier
1169 //                no state checkpointed, for FT purpose
1170 //                require no overlapping barriers
1171 void CkReductionMgr::barrier(CkReductionMsg *m)
1173   barrier_nContrib++;
1174   barrier_nSource++;
1175   if(!m->callback.isInvalid())
1176       barrier_storedCallback=m->callback;
1177   finishBarrier();
1178   delete m;
1181 void CkReductionMgr::finishBarrier(void)
1183        if(barrier_nContrib<lcount){//need more local message
1184                DEBR(("[%d] current contrib:%d,lcount:%d\n",CkMyPe(),barrier_nContrib,lcount));
1185                return;
1186        }
1187        if(barrier_nRemote<treeKids()){//need more remote messages
1188                DEBR(("[%d] current remote:%d,kids:%d\n",CkMyPe(),barrier_nRemote,treeKids()));
1189                return;
1190        }
1191        CkReductionMsg * result = CkReductionMsg::buildNew(0,NULL);
1192        result->callback=barrier_storedCallback;
1193        result->sourceFlag=barrier_nSource;
1194        result->gcount=barrier_gCount;
1195        if(hasParent())
1196        {
1197                DEBR(("[%d]send to parent:%d\n",CkMyPe(),treeParent()));
1198                result->gcount+=gcount;
1199                thisProxy[treeParent()].Barrier_RecvMsg(result);
1200        }
1201        else{
1202                int totalElements=result->gcount+gcount;
1203                DEBR(("[%d]root,totalElements:%d,source:%d\n",CkMyPe(),totalElements,result->nSources()));
1204                if(totalElements<result->nSources()){
1205                        CkAbort("ERROR! Too many contributions at barrier root\n");
1206                }
1207                CkSetRefNum(result,result->getUserFlag());
1208                if(!result->callback.isInvalid())
1209                        result->callback.send(result);
1210                else if(!barrier_storedCallback.isInvalid())
1211                                barrier_storedCallback.send(result);
1212                else 
1213                        CkAbort("No reduction client!\n");
1214        }
1215        barrier_nRemote=barrier_nContrib=0;
1216        barrier_gCount=0;
1217        barrier_nSource=0;
1220 void CkReductionMgr::Barrier_RecvMsg(CkReductionMsg *m)
1222        barrier_nRemote++;
1223        barrier_gCount+=m->gcount;
1224        barrier_nSource+=m->nSources();
1225        if(!m->callback.isInvalid())
1226                barrier_storedCallback=m->callback;
1227        finishBarrier();
1232 /////////////////////////////////////////////////////////////////////////
1234 ////////////////////////////////
1235 //CkReductionMsg support
1237 //ReductionMessage default private constructor-- does nothing
1238 CkReductionMsg::CkReductionMsg(){}
1239 CkReductionMsg::~CkReductionMsg(){}
1241 //This define gives the distance from the start of the CkReductionMsg
1242 // object to the start of the user data area (just below last object field)
1243 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1245 //"Constructor"-- builds and returns a new CkReductionMsg.
1246 //  the "data" array you specify will be copied into this object.
1247 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1248     CkReduction::reducerType reducer, CkReductionMsg *buf)
1250   int len[1];
1251   len[0]=NdataSize;
1252   CkReductionMsg *ret = buf ? buf : new(len,0) CkReductionMsg();
1254   ret->dataSize=NdataSize;
1255   if (srcData!=NULL && !buf)
1256     memcpy(ret->data,srcData,NdataSize);
1257   ret->userFlag=(CMK_REFNUM_TYPE)-1;
1258   ret->reducer=reducer;
1259   //ret->ci=NULL;
1260   ret->sourceFlag=-1000;
1261   ret->gcount=0;
1262   ret->migratableContributor = true;
1263 #if CMK_BIGSIM_CHARM
1264   ret->log = NULL;
1265 #endif
1266   return ret;
1269 // Charm kernel message runtime support:
1270 void *
1271 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1273   int totalsize=ARM_DATASTART+(*sz);
1274   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1275   CkReductionMsg *ret = (CkReductionMsg *)
1276     CkAllocMsg(msgnum,totalsize,priobits);
1277   ret->data=(void *)(&ret->dataStorage);
1278   return (void *) ret;
1281 void *
1282 CkReductionMsg::pack(CkReductionMsg* in)
1284   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1285   //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1286   in->data = NULL;
1287   return (void*) in;
1290 CkReductionMsg* CkReductionMsg::unpack(void *in)
1292   CkReductionMsg *ret = (CkReductionMsg *)in;
1293   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1294   //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1295   ret->data=(void *)(&ret->dataStorage);
1296   return ret;
1300 /////////////////////////////////////////////////////////////////////////////////////
1301 ///////////////// Builtin Reducer Functions //////////////
1302 /* A simple reducer, like sum_int, looks like this:
1303 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1305   int i,ret=0;
1306   for (i=0;i<nMsg;i++)
1307     ret+=*(int *)(msg[i]->getData());
1308   return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1311 To keep the code small and easy to change, the implementations below
1312 are built with preprocessor macros.
1315 //////////////// simple reducers ///////////////////
1316 /*A define used to quickly and tersely construct simple reductions.
1317 The basic idea is to use the first message's data array as
1318 (pre-initialized!) scratch space for folding in the other messages.
1319  */
1321 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1323         CkAbort("Called the invalid reducer type 0.  This probably\n"
1324                 "means you forgot to initialize your custom reducer index.\n");
1325         return NULL;
1328 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1330   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1333 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1334 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1336   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1337   int m,i;\
1338   int nElem=msg[0]->getLength()/sizeof(dataType);\
1339   dataType *ret=(dataType *)(msg[0]->getData());\
1340   for (m=1;m<nMsg;m++)\
1341   {\
1342     dataType *value=(dataType *)(msg[m]->getData());\
1343     for (i=0;i<nElem;i++)\
1344     {\
1345       RED_DEB(("|\tmsg%d (from %d) [%d]=" typeStr "\n",m,msg[m]->sourceFlag,i,value[i]));\
1346       loop\
1347     }\
1348   }\
1349   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1350   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1353 //Use this macro for reductions that have the same type for all inputs
1354 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1355   SIMPLE_REDUCTION(nameBase##_char,char,"%c",loop) \
1356   SIMPLE_REDUCTION(nameBase##_short,short,"%h",loop) \
1357   SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1358   SIMPLE_REDUCTION(nameBase##_long,long,"%ld",loop) \
1359   SIMPLE_REDUCTION(nameBase##_long_long,long long,"%lld",loop) \
1360   SIMPLE_REDUCTION(nameBase##_uchar,unsigned char,"%c",loop) \
1361   SIMPLE_REDUCTION(nameBase##_ushort,unsigned short,"%hu",loop) \
1362   SIMPLE_REDUCTION(nameBase##_uint,unsigned int,"%u",loop) \
1363   SIMPLE_REDUCTION(nameBase##_ulong,unsigned long,"%lu",loop) \
1364   SIMPLE_REDUCTION(nameBase##_ulong_long,unsigned long long,"%llu",loop) \
1365   SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1366   SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1368 //Compute the sum the numbers passed by each element.
1369 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1371 //Compute the product of the numbers passed by each element.
1372 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1374 //Compute the largest number passed by any element.
1375 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1377 //Compute the smallest integer passed by any element.
1378 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1381 //Compute the logical AND of the integers passed by each element.
1382 // The resulting integer will be zero if any source integer is zero; else 1.
1383 SIMPLE_REDUCTION(logical_and,int,"%d",
1384         if (value[i]==0)
1385      ret[i]=0;
1386   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1389 //Compute the logical AND of the integers passed by each element.
1390 // The resulting integer will be zero if any source integer is zero; else 1.
1391 SIMPLE_REDUCTION(logical_and_int,int,"%d",
1392         if (value[i]==0)
1393      ret[i]=0;
1394   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1397 //Compute the logical AND of the bools passed by each element.
1398 // The resulting bool will be false if any source bool is false; else true.
1399 SIMPLE_REDUCTION(logical_and_bool,bool,"%d",
1400   if (!value[i]) ret[i]=false;
1403 //Compute the logical OR of the integers passed by each element.
1404 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1405 SIMPLE_REDUCTION(logical_or,int,"%d",
1406   if (value[i]!=0)
1407            ret[i]=1;
1408   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1411 //Compute the logical OR of the integers passed by each element.
1412 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1413 SIMPLE_REDUCTION(logical_or_int,int,"%d",
1414   if (value[i]!=0)
1415            ret[i]=1;
1416   ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1419 //Compute the logical OR of the bools passed by each element.
1420 // The resulting bool will be true if any source bool is true; else false.
1421 SIMPLE_REDUCTION(logical_or_bool,bool,"%d",
1422   if (value[i]) ret[i]=true;
1425 //Compute the logical XOR of the integers passed by each element.
1426 // The resulting integer will be 1 if an odd number of source integers is nonzero; else 0.
1427 SIMPLE_REDUCTION(logical_xor_int,int,"%d",
1428   ret[i] = (!ret[i] != !value[i]);
1431 //Compute the logical XOR of the bools passed by each element.
1432 // The resulting bool will be true if an odd number of source bools is true; else false.
1433 SIMPLE_REDUCTION(logical_xor_bool,bool,"%d",
1434   ret[i] = (ret[i] != value[i]);
1437 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1438 SIMPLE_REDUCTION(bitvec_and_int,int,"%d",ret[i]&=value[i];)
1439 SIMPLE_REDUCTION(bitvec_and_bool,bool,"%d",ret[i]&=value[i];)
1441 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1442 SIMPLE_REDUCTION(bitvec_or_int,int,"%d",ret[i]|=value[i];)
1443 SIMPLE_REDUCTION(bitvec_or_bool,bool,"%d",ret[i]|=value[i];)
1445 SIMPLE_REDUCTION(bitvec_xor,int,"%d",ret[i]^=value[i];)
1446 SIMPLE_REDUCTION(bitvec_xor_int,int,"%d",ret[i]^=value[i];)
1447 SIMPLE_REDUCTION(bitvec_xor_bool,bool,"%d",ret[i]^=value[i];)
1449 //Select one random message to pass on
1450 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1451   int idx = (int)(CrnDrand()*(nMsg-1) + 0.5);
1452   return CkReductionMsg::buildNew(msg[idx]->getLength(),
1453                                   (void *)msg[idx]->getData(),
1454                                   CkReduction::random, msg[idx]);
1457 /////////////// concat ////////////////
1459 This reducer simply appends the data it recieves from each element,
1460 without any housekeeping data to separate them.
1462 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1464   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1465   //Figure out how big a message we'll need
1466   int i,retSize=0;
1467   for (i=0;i<nMsg;i++)
1468       retSize+=msg[i]->getSize();
1470   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1472   //Allocate a new message
1473   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1475   //Copy the source message data into the return message
1476   char *cur=(char *)(ret->getData());
1477   for (i=0;i<nMsg;i++) {
1478     int messageBytes=msg[i]->getSize();
1479     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1480     cur+=messageBytes;
1481   }
1482   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1483   return ret;
1486 /////////////// set ////////////////
1488 This reducer appends the data it recieves from each element
1489 along with some housekeeping data indicating contribution boundaries.
1490 The message data is thus a list of reduction_set_element structures
1491 terminated by a dummy reduction_set_element with a sourceElement of -1.
1494 //This rounds an integer up to the nearest multiple of sizeof(double)
1495 static const int alignSize=sizeof(double);
1496 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1498 //This gives the size (in bytes) of a reduction_set_element
1499 static int SET_SIZE(int dataSize)
1500 {return SET_ALIGN(sizeof(int)+dataSize);}
1502 //This returns a pointer to the next reduction_set_element in the list
1503 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1505   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1506   return (CkReduction::setElement *)next;
1509 //Combine the data passed by each element into an list of reduction_set_elements.
1510 // Each element may contribute arbitrary data (with arbitrary length).
1511 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1513   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1514   //Figure out how big a message we'll need
1515   int i,retSize=0;
1516   for (i=0;i<nMsg;i++) {
1517     if (!msg[i]->isFromUser())
1518     //This message is composite-- it will just be copied over (less terminating -1)
1519       retSize+=(msg[i]->getSize()-sizeof(int));
1520     else //This is a message from an element-- it will be wrapped in a reduction_set_element
1521       retSize+=SET_SIZE(msg[i]->getSize());
1522   }
1523   retSize+=sizeof(int);//Leave room for terminating -1.
1525   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1527   //Allocate a new message
1528   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1530   //Copy the source message data into the return message
1531   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1532   for (i=0;i<nMsg;i++)
1533     if (!msg[i]->isFromUser())
1534     {//This message is composite-- just copy it over (less terminating -1)
1535                         int messageBytes=msg[i]->getSize()-sizeof(int);
1536                         RED_DEB(("|\tc msg[%d] is %d bytes\n",i,msg[i]->getSize()));
1537                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1538                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1539     }
1540     else //This is a message from an element-- wrap it in a reduction_set_element
1541     {
1542       RED_DEB(("|\tu msg[%d] is %d bytes\n",i,msg[i]->getSize()));
1543       cur->dataSize=msg[i]->getSize();
1544       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1545       cur=SET_NEXT(cur);
1546     }
1547   cur->dataSize=-1;//Add a terminating -1.
1548   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1549   return ret;
1552 //Utility routine: get the next reduction_set_element in the list
1553 // if there is one, or return NULL if there are none.
1554 //To get all the elements, just keep feeding this procedure's output back to
1555 // its input until it returns NULL.
1556 CkReduction::setElement *CkReduction::setElement::next(void)
1558   CkReduction::setElement *n=SET_NEXT(this);
1559   if (n->dataSize==-1)
1560     return NULL;//This is the end of the list
1561   else
1562     return n;//This is just another element
1566 ///////// statisticsElement
1568 CkReduction::statisticsElement::statisticsElement(double initialValue)
1569   : count(1)
1570   , mean(initialValue)
1571   , m2(0.0)
1574 // statistics reducer
1575 // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
1576 // Chan, Tony F.; Golub, Gene H.; LeVeque, Randall J. (1979),
1577 // "Updating Formulae and a Pairwise Algorithm for Computing Sample Variances." (PDF),
1578 // Technical Report STAN-CS-79-773, Department of Computer Science, Stanford University.
1579 static CkReductionMsg* statistics(int nMsgs, CkReductionMsg** msg)
1581   int nElem = msg[0]->getLength() / sizeof(CkReduction::statisticsElement);
1582   CkReduction::statisticsElement* ret = (CkReduction::statisticsElement*)(msg[0]->getData());
1583   for (int m = 1; m < nMsgs; m++)
1584   {
1585     CkReduction::statisticsElement* value = (CkReduction::statisticsElement*)(msg[m]->getData());
1586     for (int i = 0; i < nElem; i++)
1587     {
1588       double a_count = ret[i].count;
1589       ret[i].count += value[i].count;
1590       double delta = value[i].mean - ret[i].mean;
1591       ret[i].mean += delta * value[i].count / ret[i].count;
1592       ret[i].m2 += value[i].m2 + delta * delta * value[i].count * a_count / ret[i].count;
1593     }
1594   }
1595   return CkReductionMsg::buildNew(
1596     nElem*sizeof(CkReduction::statisticsElement),
1597     (void *)ret,
1598     CkReduction::invalid,
1599     msg[0]);
1602 ///////// tupleElement
1604 CkReduction::tupleElement::tupleElement()
1605   : dataSize(0)
1606   , data(NULL)
1607   , reducer(CkReduction::invalid)
1608   , owns_data(false)
1610 CkReduction::tupleElement::tupleElement(size_t dataSize_, void* data_, CkReduction::reducerType reducer_)
1611   : dataSize(dataSize_)
1612   , data((char*)data_)
1613   , reducer(reducer_)
1614   , owns_data(false)
1617 CkReduction::tupleElement::tupleElement(CkReduction::tupleElement&& rhs_move)
1618   : dataSize(rhs_move.dataSize)
1619   , data(rhs_move.data)
1620   , reducer(rhs_move.reducer)
1621   , owns_data(rhs_move.owns_data)
1623   rhs_move.dataSize = 0;
1624   rhs_move.data = 0;
1625   rhs_move.reducer = CkReduction::invalid;
1626   rhs_move.owns_data = false;
1628 CkReduction::tupleElement& CkReduction::tupleElement::operator=(CkReduction::tupleElement&& rhs_move)
1630   if (owns_data)
1631     delete[] data;
1632   dataSize = rhs_move.dataSize;
1633   data = rhs_move.data;
1634   reducer = rhs_move.reducer;
1635   owns_data = rhs_move.owns_data;
1636   rhs_move.dataSize = 0;
1637   rhs_move.data = 0;
1638   rhs_move.reducer = CkReduction::invalid;
1639   rhs_move.owns_data = false;
1640   return *this;
1642 CkReduction::tupleElement::~tupleElement()
1644   if (owns_data)
1645     delete[] data;
1648 void CkReduction::tupleElement::pup(PUP::er &p) {
1649   p|dataSize;
1650   // TODO - it might be better to pack these raw, then we don't have to
1651   //  transform & copy them out on unpacking, we could just use the message's
1652   //  memory directly
1653   if (p.isUnpacking()) {
1654     data = new char[dataSize];
1655     owns_data = true;
1656   }
1657   PUParray(p, data, dataSize);
1658   if (p.isUnpacking()){
1659     int temp;
1660     p|temp;
1661     reducer=(CkReduction::reducerType)temp;
1662   } else {
1663     int temp=(int)reducer;
1664     p|temp;
1665   }
1668 CkReductionMsg* CkReductionMsg::buildFromTuple(CkReduction::tupleElement* reductions, int num_reductions)
1670   PUP::sizer ps;
1671   ps|num_reductions;
1672   PUParray(ps, reductions, num_reductions);
1674   CkReductionMsg* msg = CkReductionMsg::buildNew(ps.size(), NULL, CkReduction::tuple);
1675   PUP::toMem p(msg->data);
1676   p|num_reductions;
1677   PUParray(p, reductions, num_reductions);
1678   if (p.size() != ps.size()) CmiAbort("Size mismatch packing CkReduction::tupleElement::tupleToBuffer\n");
1679   return msg;
1682 void CkReductionMsg::toTuple(CkReduction::tupleElement** out_reductions, int* num_reductions)
1684   PUP::fromMem p(this->getData());
1685   p|(*num_reductions);
1686   *out_reductions = new CkReduction::tupleElement[*num_reductions];
1687   PUParray(p, *out_reductions, *num_reductions);
1690 // tuple reducer
1691 CkReductionMsg* CkReduction::tupleReduction(int num_messages, CkReductionMsg** messages)
1693   CkReduction::tupleElement** tuple_data = new CkReduction::tupleElement*[num_messages];
1694   int num_reductions = 0;
1695   for (int message_idx = 0; message_idx < num_messages; ++message_idx)
1696   {
1697     int itr_num_reductions = 0;
1698     messages[message_idx]->toTuple(&tuple_data[message_idx], &itr_num_reductions);
1700     // each message must submit the same reductions
1701     if (num_reductions == 0)
1702       num_reductions = itr_num_reductions;
1703     else if (num_reductions != itr_num_reductions)
1704       CmiAbort("num_reductions mismatch in CkReduction::tupleReduction");
1705   }
1707   DEB_TUPLE(("tupleReduction {\n  num_messages=%d,\n  num_reductions=%d,\n  length=%d\n",
1708            num_messages, num_reductions, messages[0]->getLength()));
1710   CkReduction::tupleElement* return_data = new CkReduction::tupleElement[num_reductions];
1711   // using a raw buffer to avoid CkReductionMsg constructor/destructor, we want to manage
1712   //  the inner memory of these temps ourselves to avoid unneeded copies
1713   char* simulated_messages_buffer = new char[sizeof(CkReductionMsg) * num_reductions * num_messages];
1714   CkReductionMsg** simulated_messages = new CkReductionMsg*[num_messages];
1716   // imagine the given data in a 2D layout where the messages are rows and reductions are columns
1717   // here we grab each column and run that reduction
1719   std::vector<CkReductionMsg *> msgs_to_delete;
1720   for (int reduction_idx = 0; reduction_idx < num_reductions; ++reduction_idx)
1721   {
1722     DEB_TUPLE(("  reduction_idx=%d {\n", reduction_idx));
1723     CkReduction::reducerType reducerType = CkReduction::invalid;
1724     for (int message_idx = 0; message_idx < num_messages; ++message_idx)
1725     {
1726       CkReduction::tupleElement* reductions = (CkReduction::tupleElement*)(tuple_data[message_idx]);
1727       CkReduction::tupleElement& element = reductions[reduction_idx];
1728       DEB_TUPLE(("    msg %d, sf=%d, length=%d : { dataSize=%d, data=%p, reducer=%d },\n",
1729                  message_idx, messages[message_idx]->sourceFlag, messages[message_idx]->getLength(), element.dataSize, element.data, element.reducer));
1731       reducerType = element.reducer;
1733       size_t sim_idx = (reduction_idx * num_messages + message_idx) * sizeof(CkReductionMsg);
1734       CkReductionMsg& simulated_message = *(CkReductionMsg*)&simulated_messages_buffer[sim_idx];
1735       simulated_message.dataSize = element.dataSize;
1736       simulated_message.data = element.data;
1737       simulated_message.reducer = element.reducer;
1738       simulated_message.sourceFlag = messages[message_idx]->sourceFlag;
1739       simulated_message.userFlag = messages[message_idx]->userFlag;
1740       simulated_message.gcount = messages[message_idx]->gcount;
1741       simulated_message.migratableContributor = messages[message_idx]->migratableContributor;
1742 #if CMK_BIGSIM_CHARM
1743       simulated_message.log = NULL;
1744 #endif
1745       simulated_messages[message_idx] = &simulated_message;
1746     }
1748     // run the reduction and copy the result back to our data structure
1749     const auto& reducerFp = CkReduction::reducerTable()[reducerType].fn;
1750     CkReductionMsg* result = reducerFp(num_messages, simulated_messages);
1751     DEB_TUPLE(("    result_len=%d\n  },\n", result->getLength()));
1752     return_data[reduction_idx] = CkReduction::tupleElement(result->getLength(), result->getData(), reducerType);
1753     // reducers are allowed to reuse the zeroth message's memory, so it is not safe to delete this
1754     // all the time, and, even if it is, deletion must be deferred until after processing is complete.
1755     if (result != simulated_messages[0]) {
1756       msgs_to_delete.push_back(result);
1757     }
1758   }
1760   CkReductionMsg* retval = CkReductionMsg::buildFromTuple(return_data, num_reductions);
1761   DEB_TUPLE(("} tupleReduction msg_size=%d\n", retval->getSize()));
1763   for (int message_idx = 0; message_idx < num_messages; ++message_idx)
1764     delete[] tuple_data[message_idx];
1765   delete[] tuple_data;
1766   delete[] return_data;
1767   delete[] simulated_messages_buffer;
1768   // note that although this is a 2d array, we don't need to delete the inner objects,
1769   //  their memory is tracked in simulated_messages_buffer
1770   delete[] simulated_messages;
1771   for (int i = 0; i < msgs_to_delete.size(); ++i)
1772     delete msgs_to_delete[i];
1773   return retval;
1778 /////////////////// Reduction Function Table /////////////////////
1779 CkReduction::CkReduction() {} //Dummy private constructor
1781 //Add the given reducer to the list.  Returns the new reducer's
1782 // reducerType.  Must be called in the same order on every node.
1783 CkReduction::reducerType CkReduction::addReducer(reducerFn fn, bool streamable)
1785   CkAssert(CmiMyRank() == 0);
1786   reducerType index = (reducerType)reducerTable().size();
1787   reducerTable().push_back(reducerStruct(fn, streamable));
1788   return index;
1792 /*Reducer table: maps reducerTypes to reducerStructs.
1793 It's indexed by reducerType, so the order in this table
1794 must *exactly* match the reducerType enum declaration.
1795 The names don't have to match, but it helps.
1797 std::vector<CkReduction::reducerStruct> CkReduction::initReducerTable()
1799   std::vector<CkReduction::reducerStruct> vec;
1801   vec.push_back(CkReduction::reducerStruct(::invalid_reducer, true));
1802   vec.push_back(CkReduction::reducerStruct(::nop, true));
1803   //Compute the sum the numbers passed by each element.
1804   vec.push_back(CkReduction::reducerStruct(::sum_char, true));
1805   vec.push_back(CkReduction::reducerStruct(::sum_short, true));
1806   vec.push_back(CkReduction::reducerStruct(::sum_int, true));
1807   vec.push_back(CkReduction::reducerStruct(::sum_long, true));
1808   vec.push_back(CkReduction::reducerStruct(::sum_long_long, true));
1809   vec.push_back(CkReduction::reducerStruct(::sum_uchar, true));
1810   vec.push_back(CkReduction::reducerStruct(::sum_ushort, true));
1811   vec.push_back(CkReduction::reducerStruct(::sum_uint, true));
1812   vec.push_back(CkReduction::reducerStruct(::sum_ulong, true));
1813   vec.push_back(CkReduction::reducerStruct(::sum_ulong_long, true));
1814   vec.push_back(CkReduction::reducerStruct(::sum_float, true));
1815   vec.push_back(CkReduction::reducerStruct(::sum_double, true));
1817   //Compute the product the numbers passed by each element.
1818   vec.push_back(CkReduction::reducerStruct(::product_char, true));
1819   vec.push_back(CkReduction::reducerStruct(::product_short, true));
1820   vec.push_back(CkReduction::reducerStruct(::product_int, true));
1821   vec.push_back(CkReduction::reducerStruct(::product_long, true));
1822   vec.push_back(CkReduction::reducerStruct(::product_long_long, true));
1823   vec.push_back(CkReduction::reducerStruct(::product_uchar, true));
1824   vec.push_back(CkReduction::reducerStruct(::product_ushort, true));
1825   vec.push_back(CkReduction::reducerStruct(::product_uint, true));
1826   vec.push_back(CkReduction::reducerStruct(::product_ulong, true));
1827   vec.push_back(CkReduction::reducerStruct(::product_ulong_long, true));
1828   vec.push_back(CkReduction::reducerStruct(::product_float, true));
1829   vec.push_back(CkReduction::reducerStruct(::product_double, true));
1831   //Compute the largest number passed by any element.
1832   vec.push_back(CkReduction::reducerStruct(::max_char, true));
1833   vec.push_back(CkReduction::reducerStruct(::max_short, true));
1834   vec.push_back(CkReduction::reducerStruct(::max_int, true));
1835   vec.push_back(CkReduction::reducerStruct(::max_long, true));
1836   vec.push_back(CkReduction::reducerStruct(::max_long_long, true));
1837   vec.push_back(CkReduction::reducerStruct(::max_uchar, true));
1838   vec.push_back(CkReduction::reducerStruct(::max_ushort, true));
1839   vec.push_back(CkReduction::reducerStruct(::max_uint, true));
1840   vec.push_back(CkReduction::reducerStruct(::max_ulong, true));
1841   vec.push_back(CkReduction::reducerStruct(::max_ulong_long, true));
1842   vec.push_back(CkReduction::reducerStruct(::max_float, true));
1843   vec.push_back(CkReduction::reducerStruct(::max_double, true));
1845   //Compute the smallest number passed by any element.
1846   vec.push_back(CkReduction::reducerStruct(::min_char, true));
1847   vec.push_back(CkReduction::reducerStruct(::min_short, true));
1848   vec.push_back(CkReduction::reducerStruct(::min_int, true));
1849   vec.push_back(CkReduction::reducerStruct(::min_long, true));
1850   vec.push_back(CkReduction::reducerStruct(::min_long_long, true));
1851   vec.push_back(CkReduction::reducerStruct(::min_uchar, true));
1852   vec.push_back(CkReduction::reducerStruct(::min_ushort, true));
1853   vec.push_back(CkReduction::reducerStruct(::min_uint, true));
1854   vec.push_back(CkReduction::reducerStruct(::min_ulong, true));
1855   vec.push_back(CkReduction::reducerStruct(::min_ulong_long, true));
1856   vec.push_back(CkReduction::reducerStruct(::min_float, true));
1857   vec.push_back(CkReduction::reducerStruct(::min_double, true));
1859   //Compute the logical AND of the values passed by each element.
1860   // The resulting value will be zero if any source value is zero.
1861     // logical_and deprecated in favor of logical_and_int
1862   vec.push_back(CkReduction::reducerStruct(::logical_and, true));
1863   vec.push_back(CkReduction::reducerStruct(::logical_and_int, true));
1864   vec.push_back(CkReduction::reducerStruct(::logical_and_bool, true));
1866   //Compute the logical OR of the values passed by each element.
1867   // The resulting value will be 1 if any source value is nonzero.
1868     // logical_or deprecated in favor of logical_or_int
1869   vec.push_back(CkReduction::reducerStruct(::logical_or, true));
1870   vec.push_back(CkReduction::reducerStruct(::logical_or_int, true));
1871   vec.push_back(CkReduction::reducerStruct(::logical_or_bool, true));
1873   //Compute the logical XOR of the values passed by each element.
1874   // The resulting value will be 1 if an odd number of source values is nonzero.
1875   vec.push_back(CkReduction::reducerStruct(::logical_xor_int, true));
1876   vec.push_back(CkReduction::reducerStruct(::logical_xor_bool, true));
1878   // Compute the logical bitvector AND of the values passed by each element.
1879     // bitvec_and deprecated in favor of bitvec_and_int
1880   vec.push_back(CkReduction::reducerStruct(::bitvec_and, true));
1881   vec.push_back(CkReduction::reducerStruct(::bitvec_and_int, true));
1882   vec.push_back(CkReduction::reducerStruct(::bitvec_and_bool, true));
1884   // Compute the logical bitvector OR of the values passed by each element.
1885     // bitvec_or deprecated in favor of bitvec_or_int
1886   vec.push_back(CkReduction::reducerStruct(::bitvec_or, true));
1887   vec.push_back(CkReduction::reducerStruct(::bitvec_or_int, true));
1888   vec.push_back(CkReduction::reducerStruct(::bitvec_or_bool, true));
1890   // Compute the logical bitvector XOR of the values passed by each element.
1891   vec.push_back(CkReduction::reducerStruct(::bitvec_xor, true));
1892   vec.push_back(CkReduction::reducerStruct(::bitvec_xor_int, true));
1893   vec.push_back(CkReduction::reducerStruct(::bitvec_xor_bool, true));
1895   // Select one of the messages at random to pass on
1896   vec.push_back(CkReduction::reducerStruct(::random, true));
1898   //Concatenate the (arbitrary) data passed by each element
1899   // This reduction is marked as unstreamable because of the n^2
1900   // work required to stream it
1901   vec.push_back(CkReduction::reducerStruct(::concat, false));
1903   //Combine the data passed by each element into an list of setElements.
1904   // Each element may contribute arbitrary data (with arbitrary length).
1905   // This reduction is marked as unstreamable because of the n^2
1906   // work required to stream it
1907   vec.push_back(CkReduction::reducerStruct(::set, false));
1909   // Computes a count, mean, and variance for the contributed values
1910   vec.push_back(CkReduction::reducerStruct(::statistics, true));
1912   // Allows multiple reductions to be done in the same message
1913   vec.push_back(CkReduction::reducerStruct(CkReduction::tupleReduction, false));
1915   return vec;
1918 /* Wraps accesses to the reducerTable to prevent the static initialization
1919 order fiasco */
1920 std::vector<CkReduction::reducerStruct>& CkReduction::reducerTable()
1922   static std::vector<CkReduction::reducerStruct> table = initReducerTable();
1923   return table;
1933 /********** Code added by Sayantan *********************/
1934 /** Locking is a big problem in the nodegroup code for smp.
1935  So a few assumptions have had to be made. There is one lock
1936  called lockEverything. It protects all the data structures 
1937  of the nodegroup reduction mgr. I tried grabbing it separately 
1938  for each datastructure, modifying it and then releasing it and
1939  then grabbing it again, for the next change.
1940  That doesn't really help because the interleaved execution of 
1941  different threads makes the state of the reduction manager 
1942  inconsistent. 
1944  1. Grab lockEverything before calling finishreduction or startReduction
1945     or doRecvMsg
1946  2. lockEverything is grabbed only in entry methods reductionStarting
1947     or RecvMesg or  addcontribution.
1948  ****/
1950 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1951 NodeGroup::NodeGroup(void):thisIndex(CkMyNode()) {
1952   __nodelock=CmiCreateLock();
1953 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1954     mlogData->objID.type = TypeNodeGroup;
1955     mlogData->objID.data.group.onPE = CkMyNode();
1956 #endif
1959 NodeGroup::~NodeGroup() {
1960   CmiDestroyLock(__nodelock);
1961   CkpvAccess(_destroyingNodeGroup) = true;
1963 void NodeGroup::pup(PUP::er &p)
1965   CkNodeReductionMgr::pup(p);
1966   p|reductionInfo;
1969 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1971 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1972   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1973  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1974   //ckLocalNodeBranch()->ckSetReductionClient(cb);
1977 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1978                                     ((CkNodeReductionMgr *)this),
1979                                     reductionInfo,false)
1981 /* this contribute also adds up the count across all messages it receives.
1982   Useful for summing up number of array elements who have contributed ****/ 
1983 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1984         {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1988 //#define BINOMIAL_TREE
1990 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1991   : thisProxy(thisgroup)
1993 #ifdef BINOMIAL_TREE
1994   init_BinomialTree();
1995 #else
1996   init_TopoTree();
1997 #endif
1998   storedCallback=NULL;
1999   redNo=0;
2000   inProgress=false;
2001   
2002   startRequested=false;
2003   gcount=CkNumNodes();
2004   lcount=1;
2005   nContrib=nRemote=0;
2006   lockEverything = CmiCreateLock();
2009   creating=false;
2010   interrupt = false;
2011   DEBR((AA "In NodereductionMgr constructor at %d \n" AB,this));
2012         /*
2013                 FAULT_EVAC
2014         */
2015         blocked = false;
2016         maxModificationRedNo = INT_MAX;
2017         killed=false;
2018         additionalGCount = newAdditionalGCount = 0;
2021 CkNodeReductionMgr::~CkNodeReductionMgr()
2023   CmiDestroyLock(lockEverything);
2026 void CkNodeReductionMgr::flushStates()
2028  if(CkMyRank() == 0){
2029   // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
2030   redNo=0;
2031   inProgress=false;
2033   startRequested=false;
2034   gcount=CkNumNodes();
2035   lcount=1;
2036   nContrib=nRemote=0;
2038   creating=false;
2039   interrupt = false;
2040   while (!msgs.isEmpty()) { delete msgs.deq(); }
2041   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
2042   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
2043   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
2044   }
2047 //////////// Reduction Manager Client API /////////////
2049 //Add the given client function.  Overwrites any previous client.
2050 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
2052   DEBR((AA "Setting reductionClient in NodeReductionMgr %d at %d\n" AB,cb,this));
2053   if(cb->isInvalid()){
2054         DEBR((AA "Invalid Callback passed to setReductionClient in nodeReductionMgr\n" AB));
2055   }else{
2056         DEBR((AA "Valid Callback passed to setReductionClient in nodeReductionMgr\n" AB));
2057   }
2059   if (CkMyNode()!=0)
2060           CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
2061   delete storedCallback;
2062   storedCallback=cb;
2067 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
2069 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2070     Chare *oldObj =CpvAccess(_currentObj);
2071     CpvAccess(_currentObj) = this;
2072 #endif
2074   //m->ci=ci;
2075   m->redNo=ci->redNo++;
2076   m->sourceFlag=-1;//A single contribution
2077   m->gcount=0;
2078   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
2079   addContribution(m);
2081 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2082     CpvAccess(_currentObj) = oldObj;
2083 #endif
2087 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
2089 #if CMK_BIGSIM_CHARM
2090   _TRACE_BG_TLINE_END(&m->log);
2091 #endif
2092 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2093     Chare *oldObj =CpvAccess(_currentObj);
2094     CpvAccess(_currentObj) = this;
2095 #endif
2096   //m->ci=ci;
2097   m->redNo=ci->redNo++;
2098   m->gcount=count;
2099   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
2100   addContribution(m);
2101   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
2103 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2104     CpvAccess(_currentObj) = oldObj;
2105 #endif
2109 //////////// Reduction Manager Remote Entry Points /////////////
2111 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
2112         DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2113         /*
2114                 FAULT_EVAC
2115         */
2116         if(blocked){
2117                 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
2118                 bufferedRemoteMsgs.enq(m);
2119                 return;
2120         }
2121         
2122         if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
2123             //DEBR((AA "Recv'd remote contribution %d for #%d at %d\n" AB,nRemote,m->redNo,this));
2124             startReduction(m->redNo,CkMyNode());
2125             msgs.enq(m);
2126             nRemote++;
2127             finishReduction();
2128         }
2129         else {
2130             if (isFuture(m->redNo)) {
2131                    // DEBR((AA "Recv'd early remote contribution %d for #%d\n" AB,nRemote,m->redNo));
2132                     futureRemoteMsgs.enq(m);
2133             }else{
2134                    CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
2135                    CkAbort("Recv'd late remote contribution!\n");
2136             }
2137         }
2138         DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2141 //Sent up the reduction tree with reduced data
2142 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
2144 #if CMK_BIGSIM_CHARM
2145   _TRACE_BG_TLINE_END(&m->log);
2146 #endif
2147 #ifndef CMK_CPV_IS_SMP
2148 #if CMK_IMMEDIATE_MSG
2149         if(interrupt == true){
2150                 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
2151                 CpvAccess(_qd)->process(-1);
2152                 CmiDelayImmediate();
2153                 return;
2154         }
2155 #endif  
2156 #endif
2157    interrupt = true;
2158    CmiLock(lockEverything);   
2159    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2160    doRecvMsg(m);
2161    CmiUnlock(lockEverything);    
2162    interrupt = false;
2163    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2166 void CkNodeReductionMgr::startReduction(int number,int srcNode)
2168         if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
2169         if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
2170         if (inProgress){
2171                 DEBR((AA "This Node reduction is already in progress\n" AB));
2172                 return;//This reduction already started
2173         }
2174         if (creating) //Don't start yet-- we're creating elements
2175         {
2176                 DEBR((AA " Node Postponing start request #%d until we're done creating\n" AB,redNo));
2177                 startRequested=true;
2178                 return;
2179         }
2180         
2181         //If none of these cases, we need to start the reduction--
2182         DEBR((AA "Starting Node reduction #%d on %p srcNode %d\n" AB,redNo,this,srcNode));
2183         inProgress=true;
2186 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
2187         /*
2188                 FAULT_EVAC
2189         */
2190         if(blocked){
2191                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
2192                 bufferedMsgs.enq(m);
2193                 return;
2194         }
2195         
2196         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
2197                 DEBR((AA "Contributor gives early node contribution-- for #%d\n" AB,m->redNo));
2198                 futureMsgs.enq(m);
2199         } else {// An ordinary contribution
2200                 DEBR((AA "Recv'd local node contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
2201                 //    CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
2202                 startReduction(m->redNo,CkMyNode());
2203                 msgs.enq(m);
2204                 nContrib++;
2205                 finishReduction();
2206         }
2209 //Handle a message from one element for the reduction
2210 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
2212   interrupt = true;
2213   CmiLock(lockEverything);
2214   doAddContribution(m);
2215   CmiUnlock(lockEverything);
2216   interrupt = false;
2219 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
2220         CmiLock(lockEverything);   
2221         /*
2222                 FAULT_EVAC
2223         */
2224         if(blocked){
2225                 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
2226                 bufferedMsgs.enq(m);
2227                 CmiUnlock(lockEverything);   
2228                 return;
2229         }
2230         
2231         if (isFuture(m->redNo)) {//An early contribution-- add to future Q
2232                 DEBR((AA "Latemigrant gives early node contribution-- for #%d\n" AB,m->redNo));
2233 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
2234                 futureLateMigrantMsgs.enq(m);
2235         } else {// An ordinary contribution
2236                 DEBR((AA "Recv'd late migrant contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
2237 //              CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
2238                 msgs.enq(m);
2239                 finishReduction();
2240         }
2241         CmiUnlock(lockEverything);   
2248 /** check if the nodegroup reduction is finished at this node. In that case send it
2249 up the reduction tree **/
2251 void CkNodeReductionMgr::finishReduction(void)
2253   DEBR((AA "in Nodegrp finishReduction %d treeKids %d \n" AB,inProgress,treeKids()));
2254   /***Check if reduction is finished in the next few ifs***/
2255   if ((!inProgress) || creating){
2256         DEBR((AA "Either not in Progress or creating\n" AB));
2257         return;
2258   }
2260   bool partialReduction = false;
2262   if (nContrib<(lcount)){
2263     if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
2264       partialReduction = true;
2265     }
2266     else {
2267       DEBR((AA "Nodegrp Need more local messages %d %d\n" AB,nContrib,(lcount)));
2268       return;//Need more local messages
2269     }
2270   }
2271   if (nRemote<treeKids()){
2272     if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
2273       partialReduction = true;
2274     }
2275     else {
2276       DEBR((AA "Nodegrp Need more Remote messages %d %d\n" AB,nRemote,treeKids()));
2277       return;//Need more remote messages
2278     }
2279   }
2280   if (nRemote>treeKids()){
2282           interrupt = false;
2283            CkAbort("Nodegrp Excess remote reduction message received!\n");
2284   }
2286   DEBR((AA "Reducing node data...\n" AB));
2288   /**reduce all messages received at this node **/
2289   CkReductionMsg *result=reduceMessages();
2291   if (partialReduction) {
2292     msgs.enq(result);
2293     return;
2294   }
2296   if (hasParent())
2297   {//Pass data up tree to parent
2298         if(CmiNodeAlive(CkMyNode()) || killed == false){
2299         DEBR((AA "Passing reduced data up to parent node %d. \n" AB,treeParent()));
2300         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
2301                 /*
2302                         FAULT_EVAC
2303                 */
2304                         result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
2305             thisProxy[treeParent()].RecvMsg(result);
2306         }
2308   }
2309   else
2310   {
2311                 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
2312                   DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
2313                         msgs.enq(result);
2314                         return;
2315                 }
2316                 result->gcount += additionalGCount;
2317           /** if the reduction is finished and I am the root of the reduction tree
2318           then call the reductionhandler and other stuff ***/
2319                 
2321                 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2322     CkSetRefNum(result, result->getUserFlag());
2323     if (!result->callback.isInvalid()){
2324       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2325             result->callback.send(result);
2326     }
2327     else if (storedCallback!=NULL){
2328       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2329             storedCallback->send(result);
2330     }
2331     else{
2332                 DEBR((AA "Invalid Callback \n" AB));
2333             CkAbort("No reduction client!\n"
2334                     "You must register a client with either SetReductionClient or during contribute.\n");
2335                 }
2336   }
2338   // DEBR((AA "Reduction %d finished in group!\n" AB,redNo));
2339   //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2340   redNo++;
2341         updateTree();
2342   int i;
2343   inProgress=false;
2344   startRequested=false;
2345   nRemote=nContrib=0;
2347   //Look through the future queue for messages we can now handle
2348   int n=futureMsgs.length();
2350   for (i=0;i<n;i++)
2351   {
2352     interrupt = true;
2354     CkReductionMsg *m=futureMsgs.deq();
2356     interrupt = false;
2357     if (m!=NULL){ //One of these addContributions may have finished us.
2358       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2359       doAddContribution(m);//<- if *still* early, puts it back in the queue
2360     }
2361   }
2363   interrupt = true;
2365   n=futureRemoteMsgs.length();
2367   interrupt = false;
2368   for (i=0;i<n;i++)
2369   {
2370     interrupt = true;
2372     CkReductionMsg *m=futureRemoteMsgs.deq();
2374     interrupt = false;
2375     if (m!=NULL)
2376       doRecvMsg(m);//<- if *still* early, puts it back in the queue
2377   }
2378   
2379   n = futureLateMigrantMsgs.length();
2380   for(i=0;i<n;i++){
2381     CkReductionMsg *m = futureLateMigrantMsgs.deq();
2382     if(m != NULL){
2383       if(m->redNo == redNo){
2384         msgs.enq(m);
2385       }else{
2386         futureLateMigrantMsgs.enq(m);
2387       }
2388     }
2389   }
2392 //////////// Reduction Manager Utilities /////////////
2394 void CkNodeReductionMgr::init_BinaryTree(){
2395         parent = (CkMyNode()-1)/TREE_WID;
2396         int firstkid = CkMyNode()*TREE_WID+1;
2397         numKids=CkNumNodes()-firstkid;
2398   if (numKids>TREE_WID) numKids=TREE_WID;
2399   if (numKids<0) numKids=0;
2401         for(int i=0;i<numKids;i++){
2402                 kids.push_back(firstkid+i);
2403                 newKids.push_back(firstkid+i);
2404         }
2407 void CkNodeReductionMgr::init_TopoTree() {
2408   if (_topoTree == NULL) CkAbort("CkNodeReductionMgr:: topo tree has not been calculated\n");
2409   CmiSpanningTreeInfo &t = *_topoTree;
2410   parent = t.parent;
2411   numKids = t.child_count;
2412   for (int i=0; i < numKids; i++) {
2413     kids.push_back(t.children[i]);
2414     newKids.push_back(t.children[i]);
2415   }
2418 void CkNodeReductionMgr::init_BinomialTree(){
2419         int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2420         /*upperSize = (unsigned )pow((double)2,depth);*/
2421         upperSize = (unsigned) 1 << depth;
2422         label = upperSize-CkMyNode()-1;
2423         int p=label;
2424         int count=0;
2425         while( p > 0){
2426                 if(p % 2 == 0)
2427                         break;
2428                 else{
2429                         p = p/2;
2430                         count++;
2431                 }
2432         }
2433         /*parent = label + rint(pow((double)2,count));*/
2434         parent = label + (1<<count);
2435         parent = upperSize -1 -parent;
2436         int temp;
2437         if(count != 0){
2438                 numKids = 0;
2439                 for(int i=0;i<count;i++){
2440                         /*temp = label - rint(pow((double)2,i));*/
2441                         temp = label - (1<<i);
2442                         temp = upperSize-1-temp;
2443                         if(temp <= CkNumNodes()-1){
2444                 //              kids[numKids] = temp;
2445                                 kids.push_back(temp);
2446                                 numKids++;
2447                         }
2448                 }
2449         }else{
2450                 numKids = 0;
2451         //      kids = NULL;
2452         }
2456 int CkNodeReductionMgr::treeRoot(void)
2458   return 0;
2460 bool CkNodeReductionMgr::hasParent(void) //Root Node
2462   return (bool)(CkMyNode()!=treeRoot());
2464 int CkNodeReductionMgr::treeParent(void) //My parent Node
2466   return parent;
2469 int CkNodeReductionMgr::firstKid(void) //My first child Node
2471   return CkMyNode()*TREE_WID+1;
2473 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2475 #ifdef BINOMIAL_TREE
2476         return numKids;
2477 #else
2478 /*  int nKids=CkNumNodes()-firstKid();
2479   if (nKids>TREE_WID) nKids=TREE_WID;
2480   if (nKids<0) nKids=0;
2481   return nKids;*/
2482         return numKids;
2483 #endif
2486 //Combine (& free) the current message vector msgs.
2487 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2489 #if CMK_BIGSIM_CHARM
2490   _TRACE_BG_END_EXECUTE(1);
2491   void* _bgParentLog = NULL;
2492   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2493 #endif
2494   CkReductionMsg *ret=NULL;
2496   //Look through the vector for a valid reducer, swapping out placeholder messages
2497   CkReduction::reducerType r=CkReduction::invalid;
2498   int msgs_gcount=0;//Reduced gcount
2499   int msgs_nSources=0;//Reduced nSources
2500   CMK_REFNUM_TYPE msgs_userFlag=(CMK_REFNUM_TYPE)-1;
2501   CkCallback msgs_callback;
2502   int i;
2503   int nMsgs=0;
2504   CkReductionMsg *m;
2505   CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2506   bool isMigratableContributor;
2507         
2509   while(NULL!=(m=msgs.deq()))
2510   {
2511     DEBR((AA "***** gcount=%d; sourceFlag=%d ismigratable %d \n" AB,m->gcount,m->nSources(),m->isMigratableContributor()));       
2512     msgs_gcount+=m->gcount;
2513     if (m->sourceFlag!=0)
2514     { //This is a real message from an element, not just a placeholder
2515       msgs_nSources+=m->nSources();
2516 #if CMK_BIGSIM_CHARM
2517       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2518 #endif
2520       if (nMsgs == 0 || m->reducer != CkReduction::nop) {
2521         msgArr[nMsgs++]=m;
2522         r=m->reducer;
2523         if (!m->callback.isInvalid()){
2524 #if CMK_ERROR_CHECKING
2525           if(nMsgs > 1 && !(msgs_callback == m->callback))
2526             CkAbort("mis-matched client callbacks in reduction messages\n");
2527 #endif  
2528           msgs_callback=m->callback;
2529         }
2530         if (m->userFlag!=(CMK_REFNUM_TYPE)-1)
2531           msgs_userFlag=m->userFlag;
2532         isMigratableContributor= m->isMigratableContributor();
2533       }
2534       else {
2535 #if CMK_ERROR_CHECKING
2536         if(!(msgs_callback == m->callback))
2537           CkAbort("mis-matched client callbacks in reduction messages\n");
2538 #endif  
2539         delete m;
2540       }
2541     }
2542     else
2543     { //This is just a placeholder message-- replace it
2544       delete m;
2545     }
2546   }
2548   if (nMsgs==0||r==CkReduction::invalid)
2549   //No valid reducer in the whole vector
2550     ret=CkReductionMsg::buildNew(0,NULL);
2551   else
2552   {//Use the reducer to reduce the messages
2553     if(nMsgs == 1 &&
2554       msgArr[0]->reducer != CkReduction::set &&
2555       msgArr[0]->reducer != CkReduction::tuple) {
2556       ret = msgArr[0];
2557     }else{
2558       if (msgArr[0]->reducer == CkReduction::nop) {
2559         // nMsgs > 1 indicates that reduction type is not nop
2560         // this means any data with reducer type nop was submitted
2561         // only so that counts would agree, and can be removed
2562         delete msgArr[0];
2563         msgArr[0] = msgArr[nMsgs - 1];
2564         nMsgs--;
2565       }
2566       CkReduction::reducerFn f=CkReduction::reducerTable()[r].fn;
2567       ret=(*f)(nMsgs,msgArr);
2568     }
2569     ret->reducer=r;
2570   }
2572         
2573 #if USE_CRITICAL_PATH_HEADER_ARRAY
2574 #if CRITICAL_PATH_DEBUG > 3
2575         CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2576 #endif
2577         MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2578         path.updateMax(UsrToEnv(ret));
2579         // Combine the critical paths from all the reduction messages into the header for the new result
2580         for (i=0;i<nMsgs;i++){
2581           if (msgArr[i]!=ret){
2582             //      CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2583             path.updateMax(UsrToEnv(msgArr[i]));
2584           } else {
2585             //      CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2586           }
2587         }
2588 #if CRITICAL_PATH_DEBUG > 3
2589         CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2590 #endif
2592 #endif
2595         //Go back through the vector, deleting old messages
2596   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2597   delete [] msgArr;
2598   //Set the message counts
2599   ret->redNo=redNo;
2600   ret->gcount=msgs_gcount;
2601   ret->userFlag=msgs_userFlag;
2602   ret->callback=msgs_callback;
2603   ret->sourceFlag=msgs_nSources;
2604   ret->setMigratableContributor(isMigratableContributor);
2605   DEBR((AA "Node Reduced gcount=%d; sourceFlag=%d\n" AB,ret->gcount,ret->sourceFlag));
2606 #if CMK_BIGSIM_CHARM
2607   _TRACE_BG_TLINE_END(&ret->log);
2608 #endif
2610   return ret;
2613 void CkNodeReductionMgr::pup(PUP::er &p)
2615 //We do not store the client function pointer or the client function parameter,
2616 //it is the responsibility of the programmer to correctly restore these
2617   IrrGroup::pup(p);
2618   p(redNo);
2619   p(inProgress); p(creating); p(startRequested);
2620   p(lcount);
2621   p(nContrib); p(nRemote);
2622   p(interrupt);
2623   p|msgs;
2624   p|futureMsgs;
2625   p|futureRemoteMsgs;
2626   p|futureLateMigrantMsgs;
2627   p|parent;
2628   p|additionalGCount;
2629   p|newAdditionalGCount;
2630   if(p.isUnpacking()) {
2631     gcount=CkNumNodes();
2632     thisProxy = thisgroup;
2633     lockEverything = CmiCreateLock();
2634 #ifdef BINOMIAL_TREE
2635     init_BinomialTree();
2636 #else
2637     init_TopoTree();
2638 #endif          
2639   }
2640   p | blocked;
2641   p | maxModificationRedNo;
2643 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2644   bool isnull = (storedCallback == NULL);
2645   p | isnull;
2646   if (!isnull) {
2647     if (p.isUnpacking()) {
2648       storedCallback = new CkCallback;
2649     }
2650     p|*storedCallback;
2651   }
2652 #endif
2657         FAULT_EVAC
2658         Evacuate - is called when this processor realizes it might crash. In that case, it tries to change 
2659         the reduction tree. It also needs to decide a reduction number after which it shall use the new 
2660         reduction tree. 
2662 void CkNodeReductionMgr::evacuate(){
2663         DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2664         if(treeKids() == 0){
2665         /*
2666                 if the node going down is a leaf
2667         */
2668                 oldleaf=true;
2669                 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2670                 /*
2671                         Need to ask parent for the reduction number that it has seen. 
2672                         Since it is a leaf, the tree does not need to be rewired. 
2673                         We reuse the oldparent type of tree modification message to get 
2674                         the parent to block and tell us about the highest reduction number it has seen.
2675                         
2676                 */
2677                 int data[2];
2678                 data[0]=CkMyNode();
2679                 data[1]=getTotalGCount()+additionalGCount;
2680                 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2681                 newParent = treeParent();
2682         }else{
2683                 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2684                 oldleaf= false;
2685         /*
2686                 It is not a leaf. It needs to rewire the tree around itself.
2687                 It also needs to decide on a reduction No after which the new tree will be used
2688                 Till it decides on the new tree and the redNo at which it becomes valid,
2689                 all received messages will be buffered
2690         */
2691                 newParent = kids[0];
2692                 for(int i=numKids-1;i>=0;i--){
2693                         newKids.remove(i);
2694                 }
2695                 /*
2696                         Ask everybody for the highest reduction number they have seen and
2697                         also tell them about the new tree
2698                 */
2699                 /*
2700                         Tell parent about its new child;
2701                 */
2702                 int oldParentData[2];
2703                 oldParentData[0] = CkMyNode();
2704                 oldParentData[1] = newParent;
2705                 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2707                 /*
2708                         Tell the other children about their new parent
2709                 */
2710                 int childrenData=newParent;
2711                 for(int i=1;i<numKids;i++){
2712                         thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2713                 }
2714                 
2715                 /*
2716                         Tell newParent (1st child) about its new children,
2717                         the current node and its children except the newParent
2718                 */
2719                 int *newParentData = new int[numKids+2];
2720                 for(int i=1;i<numKids;i++){
2721                         newParentData[i] = kids[i];
2722                 }
2723                 newParentData[0] = CkMyNode();
2724                 newParentData[numKids] = parent;
2725                 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2726                 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2727         }
2728         readyDeletion = false;
2729         blocked = true;
2730         numModificationReplies = 0;
2731         tempModificationRedNo = findMaxRedNo();
2735         Depending on the code, use the data to change the tree
2736         1. OLDPARENT : replace the old child with a new one
2737         2. OLDCHILDREN: replace the parent
2738         3. NEWPARENT:  add the children and change the parent
2739         4. LEAFPARENT: delete the old child
2742 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2743         DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2744         int sender;
2745         newKids = kids;
2746         readyDeletion = false;
2747         newAdditionalGCount = additionalGCount;
2748         switch(code){
2749                 case OLDPARENT: 
2750                         for(int i=0;i<numKids;i++){
2751                                 if(newKids[i] == data[0]){
2752                                         newKids[i] = data[1];
2753                                         break;
2754                                 }
2755                         }
2756                         sender = data[0];
2757                         newParent = parent;
2758                         break;
2759                 case OLDCHILDREN:
2760                         newParent = data[0];
2761                         sender = parent;
2762                         break;
2763                 case NEWPARENT:
2764                         for(int i=0;i<size-2;i++){
2765                                 newKids.push_back(data[i]);
2766                         }
2767                         newParent = data[size-2];
2768                         newAdditionalGCount += data[size-1];
2769                         sender = parent;
2770                         break;
2771                 case LEAFPARENT:
2772                         for(int i=0;i<numKids;i++){
2773                                 if(newKids[i] == data[0]){
2774                                         newKids.remove(i);
2775                                         break;
2776                                 }
2777                         }
2778                         sender = data[0];
2779                         newParent = parent;
2780                         newAdditionalGCount += data[1];
2781                         break;
2782         };
2783         blocked = true;
2784         int maxRedNo = findMaxRedNo();
2785         
2786         thisProxy[sender].collectMaxRedNo(maxRedNo);
2789 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2790         /*
2791                 Find out the maximum redNo that has been seen by 
2792                 the affected nodes
2793         */
2794         numModificationReplies++;
2795         if(maxRedNo > tempModificationRedNo){
2796                 tempModificationRedNo = maxRedNo;
2797         }
2798         if(numModificationReplies == numKids+1){
2799                 maxModificationRedNo = tempModificationRedNo;
2800                 /*
2801                         when all the affected nodes have replied, tell them the maximum.
2802                         Unblock yourself. deal with the buffered messages local and remote
2803                 */
2804                 if(maxModificationRedNo == -1){
2805                         printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2806                 }else{
2807                         DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2808                 }
2809                 thisProxy[parent].unblockNode(maxModificationRedNo);
2810                 for(int i=0;i<numKids;i++){
2811                         thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2812                 }
2813                 blocked = false;
2814                 updateTree();
2815                 clearBlockedMsgs();
2816         }
2819 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2820         maxModificationRedNo = maxRedNo;
2821         updateTree();
2822         blocked = false;
2823         clearBlockedMsgs();
2827 void CkNodeReductionMgr::clearBlockedMsgs(){
2828         int len = bufferedMsgs.length();
2829         for(int i=0;i<len;i++){
2830                 CkReductionMsg *m = bufferedMsgs.deq();
2831                 doAddContribution(m);
2832         }
2833         len = bufferedRemoteMsgs.length();
2834         for(int i=0;i<len;i++){
2835                 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2836                 doRecvMsg(m);
2837         }
2841         if the reduction number exceeds the maxModificationRedNo, change the tree
2842         to become the new one
2845 void CkNodeReductionMgr::updateTree(){
2846         if(redNo > maxModificationRedNo){
2847                 parent = newParent;
2848                 kids = newKids;
2849                 maxModificationRedNo = INT_MAX;
2850                 numKids = kids.size();
2851                 readyDeletion = true;
2852                 additionalGCount = newAdditionalGCount;
2853                 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2854                 for(int i=0;i<(int)(newKids.size());i++){
2855                         DEBREVAC(("%d ",newKids[i]));
2856                 }
2857                 DEBREVAC(("\n"));
2858         //      startReduction(redNo,CkMyNode());
2859         }else{
2860                 if(maxModificationRedNo != INT_MAX){
2861                         DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2862                         startReduction(redNo,CkMyNode());
2863                         finishReduction();
2864                 }       
2865         }
2869 void CkNodeReductionMgr::doneEvacuate(){
2870         DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2871 /*      if(oldleaf){
2872                 
2873                         It used to be a leaf
2874                         Then as soon as future messages have been emptied you can 
2875                         send the parent a message telling them that they are not going
2876                         to receive anymore messages from this child
2877                 
2878                 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2879                 while(futureMsgs.length() != 0){
2880                         int n = futureMsgs.length();
2881                         for(int i=0;i<n;i++){
2882                                 CkReductionMsg *m = futureMsgs.deq();
2883                                 if(isPresent(m->redNo)){
2884                                         msgs.enq(m);
2885                                 }else{
2886                                         futureMsgs.enq(m);
2887                                 }
2888                         }
2889                         CkReductionMsg *result = reduceMessages();
2890                         thisProxy[treeParent()].RecvMsg(result);
2891                         redNo++;
2892                 }
2893                 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2894                 thisProxy[treeParent()].DeleteChild(CkMyNode());
2895         }else{*/
2896                 if(readyDeletion){
2897                         thisProxy[treeParent()].DeleteChild(CkMyNode());
2898                 }else{
2899                         thisProxy[newParent].DeleteNewChild(CkMyNode());
2900                 }
2901 //      }
2904 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2905         DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2906         for(int i=0;i<numKids;i++){
2907                 if(kids[i] == deletedChild){
2908                         kids.remove(i);
2909                         break;
2910                 }
2911         }
2912         numKids = kids.length();
2913         finishReduction();
2916 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2917         for(int i=0;i<(int)(newKids.length());i++){
2918                 if(newKids[i] == deletedChild){
2919                         newKids.remove(i);
2920                         break;
2921                 }
2922         }
2923         DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2924         for(int i=0;i<(int)(newKids.size());i++){
2925                 DEBREVAC(("%d ",newKids[i]));
2926         }
2927         DEBREVAC(("\n"));
2928         finishReduction();
2931 int CkNodeReductionMgr::findMaxRedNo(){
2932         int max = redNo;
2933         for(int i=0;i<futureRemoteMsgs.length();i++){
2934                 if(futureRemoteMsgs[i]->redNo  > max){
2935                         max = futureRemoteMsgs[i]->redNo;
2936                 }
2937         }
2938         /*
2939                 if redNo is max (that is no future message) and the current reduction has not started
2940                 then tree can be changed before the reduction redNo can be started
2941         */ 
2942         if(redNo == max && msgs.length() == 0){
2943                 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2944                 max--;
2945         }
2946         return max;
2949 #include "CkReduction.def.h"