2 Parallel Programming Lab, University of Illinois at Urbana-Champaign
3 Orion Sky Lawlor, 3/29/2000, olawlor@acm.org
5 A reduction takes some sort of inputs (contributions)
6 from some set of objects (contributors) scattered across
7 all PE's and combines (reduces) all the contributions
8 onto one PE. This library provides several different
9 kinds of combination routines (reducers), and all the
10 support framework for calling them.
12 Included here are the classes:
13 -CkReduction, which gives the user-visible names as
14 an enumeration for the reducer functions, and maintains the
15 reducer table. (don't instantiate these)
16 -CkReductionMgr, a Chare Group which actually runs a
17 reduction over a dynamic set (allowing insertions, deletions, and
18 migrations) of contributors scattered across all PE's. It can
19 handle several overlapping reductions, but they will complete
20 serially. It carries out the reduction among all elements on
22 -CkReductionMsg, the message carrying reduction data
23 used by the reduction manager.
24 -CkNodeReductionMgr, a Chare Node Group runs reductions
25 on node groups. It is used by the CkReductionMgr to carry out
26 the reduction among different nodes.
28 In the reduction manager, there are several counters used:
29 -reductionMgr::redNo is a sequential reduction count, starting
30 at zero for the first reduction. When a reduction completes, it increments
32 -contributorInfo::redNo is the direct analog for contributors--
33 it starts at zero and is incremented at each contribution. Hence
34 contributorInfo::redNo leads the local reductionMgr::redNo.
35 -lcount is the number of contributors on this PE. When
36 an element migrates away, lcount decreases. lcount is also the number
37 of contributions to wait for before reducing and sending up.
38 -gcount is the net birth-death contributor count on this PE.
39 When a contributor migrates away, gcount stays the same. Unlike lcount,
40 gcount can go negative (if, e.g., a contributor migrates in and then dies).
42 We need a separate gcount because for a short time, a migrant
43 is local to no PE. To make sure we get its contribution, node zero
44 compares its number of received contributions to gcount summed over all PE's
45 (this count is piggybacked with the reduction data in CkReductionMsg).
46 If we haven't gotten a contribution from all living contributors, node zero
47 waits for the migrant contributions to straggle in.
53 #include "pathHistory.h"
55 #if CMK_DEBUG_REDUCTIONS
57 // Reduction mananger internal information:
58 #define DEBR(x) CkPrintf x
59 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
60 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
62 #define DEBN(x) CkPrintf x
63 #define AAN "Red Node%d "
64 #define ABN ,CkMyNode()
66 // For status and data messages from the builtin reducer functions.
67 #define RED_DEB(x) //CkPrintf x
68 #define DEBREVAC(x) CkPrintf x
69 #define DEB_TUPLE(x) CkPrintf x
71 //No debugging info-- empty defines
72 #define DEBR(x) // CkPrintf x
73 #define DEBRMLOG(x) CkPrintf x
76 #define DEBN(x) //CkPrintf x
77 #define RED_DEB(x) //CkPrintf x
78 #define DEBREVAC(x) //CkPrintf x
79 #define DEB_TUPLE(x) //CkPrintf x
83 #define INT_MAX 2147483647
86 extern bool _inrestart;
88 Group::Group():thisIndex(CkMyPe())
90 if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
92 creatingContributors();
93 contributorStamped(&reductionInfo);
94 contributorCreated(&reductionInfo);
95 doneCreatingContributors();
98 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg),thisIndex(CkMyPe())
100 creatingContributors();
101 contributorStamped(&reductionInfo);
102 contributorCreated(&reductionInfo);
103 doneCreatingContributors();
106 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
107 ((CkReductionMgr *)this),
109 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
111 CK_BARRIER_CONTRIBUTE_METHODS_DEF(Group,
112 ((CkReductionMgr *)this),
117 CkGroupInitCallback::CkGroupInitCallback(void) {}
119 The callback is just used to tell the caller that this group
120 has been constructed. (Now they can safely call CkLocalBranch)
122 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
129 The callback is just used to tell the caller that this group
130 is constructed and ready to process other calls.
132 CkGroupReadyCallback::CkGroupReadyCallback(void)
137 CkGroupReadyCallback::callBuffered(void)
139 int n = _msgs.length();
142 CkGroupCallbackMsg *msg = _msgs.deq();
148 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
158 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
159 :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
160 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
162 CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
163 CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
164 b->fn(b->param,m->getSize(),m->getData());
168 ///////////////// Reduction Manager //////////////////
170 One CkReductionMgr runs a non-overlapping set of reductions.
171 It collects messages from all local contributors, then sends
172 the reduced message up the reduction tree to node zero, where
173 they're passed to the user's client function.
176 CkReductionMgr::CkReductionMgr()
178 thisProxy(thisgroup),
190 startRequested=false;
195 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
196 numImmigrantRecObjs = 0;
197 numEmigrantRecObjs = 0;
199 disableNotifyChildrenStart = false;
203 barrier_nContrib=barrier_nRemote=0;
205 DEBR((AA "In reductionMgr constructor at %d \n" AB,this));
208 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
209 , isDestroying(false)
216 startRequested=false;
221 DEBR((AA "In reductionMgr migratable constructor at %d \n" AB,this));
225 barrier_nContrib=barrier_nRemote=0;
227 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
228 numImmigrantRecObjs = 0;
229 numEmigrantRecObjs = 0;
234 CkReductionMgr::~CkReductionMgr()
238 void CkReductionMgr::flushStates()
240 // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
245 startRequested=false;
249 while (!msgs.isEmpty()) { delete msgs.deq(); }
250 while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
251 while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
252 while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
258 //////////// Reduction Manager Client API /////////////
260 //Add the given client function. Overwrites any previous client.
261 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
263 DEBR((AA "Setting reductionClient in ReductionMgr groupid %d\n" AB,thisgroup.idx));
266 CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
270 ///////////////////////////// Contributor ////////////////////////
271 //Contributors keep a copy of this structure:
273 /*Contributor migration support:
275 void contributorInfo::pup(PUP::er &p)
280 ////////////////////// Contributor list maintainance: /////////////////
281 //These just set and clear the "creating" flag to prevent
282 // reductions from finishing early because not all elements
283 // have been created.
284 void CkReductionMgr::creatingContributors(void)
286 DEBR((AA "Creating contributors...\n" AB));
289 void CkReductionMgr::doneCreatingContributors(void)
291 DEBR((AA "Done creating contributors...\n" AB));
294 if (startRequested) startReduction(redNo,CkMyPe());
298 //A new contributor will be created
299 void CkReductionMgr::contributorStamped(contributorInfo *ci)
301 DEBR((AA "Contributor %p stamped\n" AB,ci));
302 //There is another contributor
306 ci->redNo=redNo+1;//Created *during* reduction => contribute to *next* reduction
307 adj(redNo).gcount--;//He'll wrongly be counted in the global count at end
309 ci->redNo=redNo;//Created *before* reduction => contribute to *that* reduction
312 //A new contributor was actually created
313 void CkReductionMgr::contributorCreated(contributorInfo *ci)
315 DEBR((AA "Contributor %p created in grp %d\n" AB,ci,thisgroup.idx));
316 //We've got another contributor
318 //He may not need to contribute to some of our reductions:
319 for (int r=redNo;r<ci->redNo;r++)
320 adj(r).lcount--;//He won't be contributing to r here
324 /*Don't expect any more contributions from this one.
325 This is rather horrifying because we now have to make
326 sure the global element count accurately reflects all the
327 contributions the element made before it died-- these may stretch
328 far into the future. The adj() vector is what saves us here.
330 void CkReductionMgr::contributorDied(contributorInfo *ci)
332 #if CMK_MEM_CHECKPOINT
333 // ignore from listener if it is during restart from crash
334 if (CkInRestarting()) return;
337 if (isDestroying) return;
339 DEBR((AA "Contributor %p(%d) died\n" AB,ci,ci->redNo));
340 //We lost a contributor
344 {//Must have been migrating during reductions-- root is waiting for his
345 // contribution, which will never come.
346 DEBR((AA "Dying guy %p must have been migrating-- he's at #%d!\n" AB,ci,ci->redNo));
347 for (int r=ci->redNo;r<redNo;r++)
348 thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
351 //Add to the global count for all his future messages (wherever they are)
353 for (r=redNo;r<ci->redNo;r++)
354 {//He already contributed to this reduction, but won't show up in global count.
355 DEBR((AA "Dead guy %p left contribution for #%d\n" AB,ci,r));
360 //He's already contributed to several reductions here
361 for (r=redNo;r<ci->redNo;r++)
362 adj(r).lcount++;//He'll be contributing to r here
364 // Check whether the death of this contributor made this pe go barren at this
366 if (ci->redNo <= redNo) {
372 //Migrating away (note that global count doesn't change)
373 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
375 DEBR((AA "Contributor %p(%d) migrating away\n" AB,ci,ci->redNo));
376 lcount--;//We lost a local
377 //He's already contributed to several reductions here
378 for (int r=redNo;r<ci->redNo;r++)
379 adj(r).lcount++;//He'll be contributing to r here
381 // Check whether this made this pe go barren at redNo
382 if (ci->redNo <= redNo) {
388 //Migrating in (note that global count doesn't change)
389 void CkReductionMgr::contributorArriving(contributorInfo *ci)
391 DEBR((AA "Contributor %p(%d) migrating in\n" AB,ci,ci->redNo));
392 lcount++;//We gained a local
393 #if CMK_MEM_CHECKPOINT
394 // ignore from listener if it is during restart from crash
395 // because the ci may be old.
396 if (CkInRestarting()) return;
398 //He has already contributed (elsewhere) to several reductions:
399 for (int r=redNo;r<ci->redNo;r++)
400 adj(r).lcount--;//He won't be contributing to r here
402 // Check if the arrival of a new contributor makes this PE become active again
403 if (ci->redNo == redNo) {
408 //Contribute-- the given msg can contain any data. The reducerType
409 // field of the message must be valid.
410 // Each contributor must contribute exactly once to the each reduction.
411 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
414 _TRACE_BG_TLINE_END(&(m->log));
416 DEBR((AA "Contributor %p contributed for %d in grp %d ismigratable %d \n" AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
418 m->redNo=ci->redNo++;
419 m->sourceFlag=-1;//A single contribution
422 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
424 // if object is an immigrant recovery object, we send the contribution to the source PE
425 if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
427 // turning on the message-logging bypass flag
428 envelope *env = UsrToEnv(m);
429 env->flags = env->flags | CK_BYPASS_DET_MLOG;
430 thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
434 Chare *oldObj = CpvAccess(_currentObj);
435 CpvAccess(_currentObj) = this;
437 // adding contribution
440 CpvAccess(_currentObj) = oldObj;
446 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
447 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
448 //if(CkMyPe() == 2) CkPrintf("[%d] ---> Contributing Via Message\n",CkMyPe());
450 // turning off bypassing flag
451 envelope *env = UsrToEnv(m);
452 env->flags = env->flags & ~CK_BYPASS_DET_MLOG;
454 // adding contribution
458 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
461 void CkReductionMgr::checkIsActive() {
462 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
466 // Check the number of kids in the inactivelist before or at this redNo
467 std::map<int, int>::iterator it;
469 for (it = inactiveList.begin(); it != inactiveList.end(); it++) {
470 if (it->second <= redNo) {
471 DEBR((AA "Kid %d is inactive from redNo %d\n" AB, it->first, it->second));
475 DEBR((AA "CheckIsActive redNo %d, kids %d(inactive %d), lcount %d\n" AB, redNo,
476 numKids, c_inactive, lcount));
478 if(numKids == c_inactive && lcount == 0) {
480 informParentInactive();
483 } else if(is_inactive) {
489 * Add to the child to the inactiveList
491 void CkReductionMgr::checkAndAddToInactiveList(int id, int red_no) {
492 // If there is already a reduction in progress corresponding to red_no, then
493 // the time to call ReductionStarting is past so explicitly invoke
494 // ReductionStarting on the kid
495 if (inProgress && redNo == red_no) {
496 thisProxy[id].ReductionStarting(new CkReductionNumberMsg(red_no));
499 std::map<int, int>::iterator it;
500 it = inactiveList.find(id);
501 if (it == inactiveList.end()) {
502 inactiveList.insert(std::pair<int, int>(id, red_no));
506 // If the red_no is redNo, then check whether this makes this PE inactive
507 if (redNo == red_no) {
513 * This is invoked when a real contribution is received from the kid for a
516 void CkReductionMgr::checkAndRemoveFromInactiveList(int id, int red_no) {
517 std::map<int, int>::iterator it;
518 it = inactiveList.find(id);
519 if (it == inactiveList.end()) {
522 if (it->second <= red_no) {
523 inactiveList.erase(it);
524 DEBR((AA "Parent removing kid %d from inactivelist red_no %d\n" AB,
529 // Inform parent that I am inactive
530 void CkReductionMgr::informParentInactive() {
532 DEBR((AA "Inform parent to add to inactivelist red_no %d\n" AB, redNo));
533 thisProxy[treeParent()].AddToInactiveList(
534 new CkReductionInactiveMsg(CkMyPe(), redNo));
539 * Send ReductionStarting message to all the inactive kids which are inactive
540 * for the specified red_no
542 void CkReductionMgr::sendReductionStartingToKids(int red_no) {
543 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
544 for (int k=0;k<treeKids();k++)
546 DEBR((AA "Asking child PE %d to start #%d\n" AB,kids[k],redNo));
547 thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
550 std::map<int, int>::iterator it;
551 for (it = inactiveList.begin(); it != inactiveList.end(); it++) {
552 if (it->second <= red_no) {
553 DEBR((AA "Parent sending reductionstarting to inactive kid %d\n" AB,
555 thisProxy[it->first].ReductionStarting(new CkReductionNumberMsg(red_no));
562 //////////// Reduction Manager Remote Entry Points /////////////
563 //Sent down the reduction tree (used by barren PEs)
564 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
567 //CkPrintf("!!!!!!!!!!!!!!!!!!!!!!1Who called me ???? %d \n",m->num);
571 DEBR((AA " Group ReductionStarting called for redNo %d\n" AB,m->num));
572 int srcPE = (UsrToEnv(m))->getSrcPe();
573 if (isPresent(m->num) && !inProgress)
575 DEBR((AA "Starting reduction #%d at parent's request\n" AB,m->num));
576 startReduction(m->num,srcPE);
578 } else if (isFuture(m->num)){
579 // CkPrintf("[%d] arrays Mesg No %d redNo %d \n",CkMyPe(),m->num,redNo);
580 DEBR((AA "Asked to startfuture Reduction %d \n" AB,m->num));
581 if(maxStartRequest < m->num){
582 maxStartRequest = m->num;
584 // CkAbort("My reduction tree parent somehow got ahead of me! in arrays\n");
588 DEBR((AA "Ignoring parent's late request to start #%d\n" AB,m->num));
592 //Sent to root of reduction tree with reduction contribution
593 // of migrants that missed the main reduction.
594 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
597 _TRACE_BG_TLINE_END(&(m->log));
602 //A late migrating contributor will never contribute to this reduction
603 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
605 if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
606 DEBR((AA "Migrant died before contributing to #%d\n" AB,m->num));
607 // CkPrintf("[%d,%d]Migrant Died called\n",CkMyNode(),CkMyPe());
608 adj(m->num).gcount--;//He won't be contributing to this one.
613 //////////// Reduction Manager State /////////////
614 void CkReductionMgr::startReduction(int number,int srcPE)
616 if (isFuture(number)){ /*CkAbort("Can't start reductions out of order!\n");*/ return;}
617 if (isPast(number)) {/*CkAbort("Can't restart reduction that's already finished!\n");*/return;}
619 DEBR((AA "This reduction is already in progress\n" AB));
620 return;//This reduction already started
622 if (creating) //Don't start yet-- we're creating elements
624 DEBR((AA "Postponing start request #%d until we're done creating\n" AB,redNo));
629 //If none of these cases, we need to start the reduction--
630 DEBR((AA "Starting reduction #%d %d %d \n" AB,redNo,completedRedNo,number));
637 if(!CmiNodeAlive(CkMyPe())){
641 // TODO: This is currently gotten from the command-line...but it could
642 // probably just be determined at runtime. I don't know if the CL option is
643 // even documented anywhere.
644 if(disableNotifyChildrenStart) return;
646 //Sent start requests to our kids (in case they don't already know)
647 sendReductionStartingToKids(redNo);
650 /*Handle a message from one element for the reduction*/
651 void CkReductionMgr::addContribution(CkReductionMsg *m)
653 if (isPast(m->redNo))
655 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
656 CmiAbort("this version should not have late migrations");
658 //We've moved on-- forward late contribution straight to root
659 DEBR((AA "Migrant gives late contribution for #%d!\n" AB,m->redNo));
660 // if (!hasParent()) //Root moved on too soon-- should never happen
661 // CkAbort("Late reduction contribution received at root!\n");
662 thisProxy[0].LateMigrantMsg(m);
665 else if (isFuture(m->redNo)) {//An early contribution-- add to future Q
666 DEBR((AA "Contributor gives early contribution-- for #%d\n" AB,m->redNo));
668 } else {// An ordinary contribution
669 DEBR((AA "Recv'd local contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
670 // CkPrintf("[%d] Local Contribution for %d in Mesg %d at %.6f\n",CkMyPe(),redNo,m->redNo,CmiWallTimer());
671 startReduction(m->redNo,CkMyPe());
678 /**function checks if it has got all contributions that it is supposed to
679 get at this processor. If it is done it sends the reduced result to the local
681 void CkReductionMgr::finishReduction(void)
683 /*CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d (!inProgress) | creating) %d at %.6f\n",CkMyPe(),redNo, nContrib,(!inProgress) | creating,CmiWallTimer());*/
684 DEBR((AA "in finishReduction (inProgress=%d) in grp %d\n" AB,inProgress,thisgroup.idx));
685 if ((!inProgress) || creating){
686 DEBR((AA "Either not in Progress or creating\n" AB));
690 bool partialReduction = false;
692 //CkPrintf("[%d]finishReduction called for redNo %d with nContrib %d at %.6f\n",CkMyPe(),redNo, nContrib,CmiWallTimer());
693 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
694 if (nContrib<(lcount+adj(redNo).lcount) - numImmigrantRecObjs + numEmigrantRecObjs){
695 if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
696 partialReduction = true;
699 DEBR((AA "Need more local messages %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
700 return; //Need more local messages
704 if (nContrib<(lcount+adj(redNo).lcount)){
705 if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
706 partialReduction = true;
709 DEBR((AA "Need more local messages %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
710 return; //Need more local messages
715 if (nRemote<treeKids()) {
716 if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
717 partialReduction = true;
720 DEBR((AA "Need more remote messages %d %d\n" AB,nRemote,treeKids()));
721 return; //Need more remote messages
726 DEBR((AA "Reducing data... %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
727 CkReductionMsg *result=reduceMessages();
730 if (partialReduction) {
736 {//Pass data up tree to parent
737 DEBR((AA "Passing reduced data up to parent node %d.\n" AB,treeParent()));
738 DEBR((AA "Message gcount is %d+%d+%d.\n" AB,result->gcount,gcount,adj(redNo).gcount));
739 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
740 result->gcount+=gcount+adj(redNo).gcount;
742 result->gcount+=gcount+adj(redNo).gcount;
744 thisProxy[treeParent()].RecvMsg(result);
747 {//We are root-- pass data to client
748 DEBR((AA "Final gcount is %d+%d+%d.\n" AB,result->gcount,gcount,adj(redNo).gcount));
749 int totalElements=result->gcount+gcount+adj(redNo).gcount;
750 if (totalElements>result->nSources())
752 DEBR((AA "Only got %d of %d contributions (c'mon, migrators!)\n" AB,result->nSources(),totalElements));
754 return; // Wait for migrants to contribute
755 } else if (totalElements<result->nSources()) {
756 DEBR((AA "Got %d of %d contributions\n" AB,result->nSources(),totalElements));
757 #if !defined(_FAULT_CAUSAL_)
758 CkAbort("ERROR! Too many contributions at root!\n");
761 DEBR((AA "Passing result to client function\n" AB));
762 CkSetRefNum(result, result->getUserFlag());
763 if (!result->callback.isInvalid())
764 result->callback.send(result);
765 else if (!storedCallback.isInvalid())
766 storedCallback.send(result);
768 CkAbort("No reduction client!\n"
769 "You must register a client with either SetReductionClient or during contribute.\n");
773 //House Keeping Operations will have to check later what needs to be changed
775 // Check after every reduction contribution whether this makes the PE inactive
776 // starting this redNo
778 //Shift the count adjustment vector down one slot (to match new redNo)
781 for (i=1;i<(int)(adjVec.length());i++){
782 adjVec[i-1]=adjVec[i];
787 startRequested=false;
790 //Look through the future queue for messages we can now handle
791 int n=futureMsgs.length();
794 CkReductionMsg *m=futureMsgs.deq();
795 if (m!=NULL) //One of these addContributions may have finished us.
796 addContribution(m);//<- if *still* early, puts it back in the queue
798 n=futureRemoteMsgs.length();
801 CkReductionMsg *m=futureRemoteMsgs.deq();
803 RecvMsg(m);//<- if *still* early, puts it back in the queue
807 if(maxStartRequest >= redNo){
808 startReduction(redNo,CkMyPe());
815 //Sent up the reduction tree with reduced data
816 void CkReductionMgr::RecvMsg(CkReductionMsg *m)
819 _TRACE_BG_TLINE_END(&m->log);
821 if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
822 DEBR((AA "Recv'd remote contribution %d for #%d\n" AB,nRemote,m->redNo));
823 // If the remote contribution is real, then check whether we can remove the
824 // child from the inactiveList if it is in the list
825 if (m->nSources() > 0) {
826 checkAndRemoveFromInactiveList(m->fromPE, m->redNo);
828 startReduction(m->redNo, CkMyPe());
833 else if (isFuture(m->redNo)) {
834 DEBR((AA "Recv'd early remote contribution %d for #%d\n" AB,nRemote,m->redNo));
835 futureRemoteMsgs.enq(m);
837 else CkAbort("Recv'd late remote contribution!\n");
840 void CkReductionMgr::AddToInactiveList(CkReductionInactiveMsg *m) {
842 int last_redno = m->redno;
845 DEBR((AA "Parent add kid %d to inactive list from redno %d\n" AB,
847 checkAndAddToInactiveList(id, last_redno);
850 if (last_redno <= redNo) {
855 //////////// Reduction Manager Utilities /////////////
857 //Return the countAdjustment struct for the given redNo:
858 countAdjustment &CkReductionMgr::adj(int number)
860 number-=completedRedNo;
862 if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
863 //Pad the adjustment vector with zeros until it's at least number long
864 while ((int)(adjVec.length())<=number)
865 adjVec.push_back(countAdjustment());
866 return adjVec[number];
869 //Combine (& free) the current message vector msgs.
870 CkReductionMsg *CkReductionMgr::reduceMessages(void)
873 _TRACE_BG_END_EXECUTE(1);
874 void* _bgParentLog = NULL;
875 _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
877 CkReductionMsg *ret=NULL;
879 //Look through the vector for a valid reducer, swapping out placeholder messages
880 CkReduction::reducerType r=CkReduction::invalid;
881 int msgs_gcount=0;//Reduced gcount
882 int msgs_nSources=0;//Reduced nSources
883 CMK_REFNUM_TYPE msgs_userFlag=(CMK_REFNUM_TYPE)-1;
884 CkCallback msgs_callback;
887 CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
889 bool isMigratableContributor;
891 // Copy message queue into msgArr, skipping placeholders:
892 while (NULL!=(m=msgs.deq()))
894 msgs_gcount+=m->gcount;
895 if (m->sourceFlag!=0)
896 { //This is a real message from an element, not just a placeholder
897 msgs_nSources+=m->nSources();
899 _TRACE_BG_ADD_BACKWARD_DEP(m->log);
902 // for "nop" reducer type, only need to accept one message
903 if (nMsgs == 0 || m->reducer != CkReduction::nop) {
906 if (!m->callback.isInvalid()){
907 #if CMK_ERROR_CHECKING
908 if(nMsgs > 1 && !(msgs_callback == m->callback))
909 CkAbort("mis-matched client callbacks in reduction messages\n");
911 msgs_callback=m->callback;
913 if (m->userFlag!=(CMK_REFNUM_TYPE)-1)
914 msgs_userFlag=m->userFlag;
915 isMigratableContributor=m->isMigratableContributor();
918 #if CMK_ERROR_CHECKING
919 if(!(msgs_callback == m->callback))
920 CkAbort("mis-matched client callbacks in reduction messages\n");
926 { //This is just a placeholder message-- forget it
931 if (nMsgs==0||r==CkReduction::invalid)
932 //No valid reducer in the whole vector
933 ret=CkReductionMsg::buildNew(0,NULL);
935 {//Use the reducer to reduce the messages
936 //if there is only one msg to be reduced just return that message
938 msgArr[0]->reducer != CkReduction::set &&
939 msgArr[0]->reducer != CkReduction::tuple) {
942 if (msgArr[0]->reducer == CkReduction::nop) {
943 // nMsgs > 1 indicates that reduction type is not nop
944 // this means any data with reducer type nop was submitted
945 // only so that counts would agree, and can be removed
947 msgArr[0] = msgArr[nMsgs - 1];
950 CkReduction::reducerFn f=CkReduction::reducerTable()[r].fn;
951 ret=(*f)(nMsgs,msgArr);
958 #if USE_CRITICAL_PATH_HEADER_ARRAY
960 #if CRITICAL_PATH_DEBUG > 3
961 CkPrintf("combining critical path information from messages in CkReductionMgr::reduceMessages\n");
964 MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
965 path.updateMax(UsrToEnv(ret));
966 // Combine the critical paths from all the reduction messages
967 for (i=0;i<nMsgs;i++){
969 // CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
970 path.updateMax(UsrToEnv(msgArr[i]));
975 #if CRITICAL_PATH_DEBUG > 3
976 CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
979 PathHistoryTableEntry tableEntry(path);
980 tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
984 //Go back through the vector, deleting old messages
985 for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
988 //Set the message counts
990 ret->gcount=msgs_gcount;
991 ret->userFlag=msgs_userFlag;
992 ret->callback=msgs_callback;
993 ret->sourceFlag=msgs_nSources;
994 ret->setMigratableContributor(isMigratableContributor);
995 ret->fromPE = CkMyPe();
996 DEBR((AA "Reduced gcount=%d; sourceFlag=%d\n" AB,ret->gcount,ret->sourceFlag));
1002 //Checkpointing utilities
1003 //pack-unpack method for CkReductionMsg
1004 //if packing pack the message and then unpack and return it
1005 //if unpacking allocate memory for it read it off disk and then unapck
1007 void CkReductionMgr::pup(PUP::er &p)
1009 //We do not store the client function pointer or the client function parameter,
1010 //it is the responsibility of the programmer to correctly restore these
1011 CkGroupInitCallback::pup(p);
1014 p(inProgress); p(creating); p(startRequested);
1015 p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
1022 // handle CkReductionClientBundle
1023 if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn)
1025 CkReductionClientBundle *bd;
1026 if (p.isUnpacking())
1027 bd = new CkReductionClientBundle;
1029 bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
1031 if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
1034 // subtle --- Gengbin
1035 // Group : CkReductionMgr
1036 // CkArray: CkReductionMgr
1037 // lcount/gcount in Group is set in Group constructor
1038 // lcount/gcount in CkArray is not, it is set when array elements are created
1039 // we can not pup because inserting array elems will add the counters again
1044 // // printf("[%d] nodeProxy nodeGroup %d pupped in group %d \n",CkMyPe(),(nodeProxy.ckGetGroupID()).idx,thisgroup.idx);
1045 if(p.isUnpacking()){
1046 thisProxy = thisgroup;
1048 #ifdef BINOMIAL_TREE
1049 init_BinomialTree();
1053 is_inactive = false;
1057 DEBR(("[%d,%d] pupping _____________ gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
1060 void CkReductionMgr::init_BinaryTree(){
1061 if (CkNodeSize(CkMyNode()) > 1 && CkNodeFirst(CkMyNode()) != CkMyPe()) {
1062 parent = CkNodeFirst(CkMyNode());
1065 int parentNode = (CkMyNode()-1)/TREE_WID;
1066 parent = CkMyNode() > 0 ? CkNodeFirst(parentNode) : -1;
1067 // Add nodes that are my children
1068 int firstKid = CkMyNode()*TREE_WID+1;
1069 numKids=CkNumNodes()-firstKid;
1070 if (numKids > TREE_WID) numKids = TREE_WID;
1071 if (numKids < 0) numKids = 0;
1072 for (int i = 0; i < numKids; i++) {
1073 kids.push_back(CkNodeFirst(firstKid+i));
1074 newKids.push_back(CkNodeFirst(firstKid+i));
1077 // Add PEs on my node, which are also my children
1078 numKids += CkNodeSize(CkMyNode())-1;
1079 for (int i = 1; i < CkNodeSize(CkMyNode()); i++) {
1080 kids.push_back(CkMyPe()+i);
1081 newKids.push_back(CkMyPe()+i);
1086 void CkReductionMgr::init_TopoTree() {
1087 if (CkNodeSize(CkMyNode()) > 1 && CkNodeFirst(CkMyNode()) != CkMyPe()) {
1088 parent = CkNodeFirst(CkMyNode());
1091 if (_topoTree == NULL) CkAbort("CkReductionMgr:: topo tree has not been calculated\n");
1093 CmiSpanningTreeInfo &t = *_topoTree;
1094 if (t.parent != -1) parent = CkNodeFirst(t.parent);
1096 numKids = t.child_count;
1097 for (int i=0; i < numKids; i++) {
1098 int child = CkNodeFirst(t.children[i]);
1099 kids.push_back(child);
1100 newKids.push_back(child);
1103 // Add PEs on my node, which are also my children
1104 numKids += CkNodeSize(CkMyNode())-1;
1105 for (int i = 1; i < CkNodeSize(CkMyNode()); i++) {
1106 kids.push_back(CkMyPe()+i);
1107 newKids.push_back(CkMyPe()+i);
1112 void CkReductionMgr::init_BinomialTree(){
1113 if (CkNodeSize(CkMyNode()) > 1 && CkNodeFirst(CkMyNode()) != CkMyPe()) {
1114 parent = CkNodeFirst(CkMyNode());
1117 int depth = (int)ceil((log((double)CkNumNodes())/log((double)2)));
1118 upperSize = (unsigned) 1 << depth;
1119 label = upperSize-CkNodeFirst(CkMyNode())-1;
1130 parent = label + (1<<count);
1131 parent = upperSize - 1 - parent;
1135 for(int i=0;i<count;i++){
1136 temp = label - (1<<i);
1137 temp = upperSize - 1 - temp;
1138 if(temp <= CkNumPes()-1){
1139 kids.push_back(temp);
1150 int CkReductionMgr::treeRoot(void)
1154 bool CkReductionMgr::hasParent(void) //Root Node
1156 return (bool)(CkMyPe()!=treeRoot());
1158 int CkReductionMgr::treeParent(void) //My parent Node
1162 int CkReductionMgr::treeKids(void)//Number of children in tree
1168 // simple "stateless" barrier
1169 // no state checkpointed, for FT purpose
1170 // require no overlapping barriers
1171 void CkReductionMgr::barrier(CkReductionMsg *m)
1175 if(!m->callback.isInvalid())
1176 barrier_storedCallback=m->callback;
1181 void CkReductionMgr::finishBarrier(void)
1183 if(barrier_nContrib<lcount){//need more local message
1184 DEBR(("[%d] current contrib:%d,lcount:%d\n",CkMyPe(),barrier_nContrib,lcount));
1187 if(barrier_nRemote<treeKids()){//need more remote messages
1188 DEBR(("[%d] current remote:%d,kids:%d\n",CkMyPe(),barrier_nRemote,treeKids()));
1191 CkReductionMsg * result = CkReductionMsg::buildNew(0,NULL);
1192 result->callback=barrier_storedCallback;
1193 result->sourceFlag=barrier_nSource;
1194 result->gcount=barrier_gCount;
1197 DEBR(("[%d]send to parent:%d\n",CkMyPe(),treeParent()));
1198 result->gcount+=gcount;
1199 thisProxy[treeParent()].Barrier_RecvMsg(result);
1202 int totalElements=result->gcount+gcount;
1203 DEBR(("[%d]root,totalElements:%d,source:%d\n",CkMyPe(),totalElements,result->nSources()));
1204 if(totalElements<result->nSources()){
1205 CkAbort("ERROR! Too many contributions at barrier root\n");
1207 CkSetRefNum(result,result->getUserFlag());
1208 if(!result->callback.isInvalid())
1209 result->callback.send(result);
1210 else if(!barrier_storedCallback.isInvalid())
1211 barrier_storedCallback.send(result);
1213 CkAbort("No reduction client!\n");
1215 barrier_nRemote=barrier_nContrib=0;
1220 void CkReductionMgr::Barrier_RecvMsg(CkReductionMsg *m)
1223 barrier_gCount+=m->gcount;
1224 barrier_nSource+=m->nSources();
1225 if(!m->callback.isInvalid())
1226 barrier_storedCallback=m->callback;
1232 /////////////////////////////////////////////////////////////////////////
1234 ////////////////////////////////
1235 //CkReductionMsg support
1237 //ReductionMessage default private constructor-- does nothing
1238 CkReductionMsg::CkReductionMsg(){}
1239 CkReductionMsg::~CkReductionMsg(){}
1241 //This define gives the distance from the start of the CkReductionMsg
1242 // object to the start of the user data area (just below last object field)
1243 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
1245 //"Constructor"-- builds and returns a new CkReductionMsg.
1246 // the "data" array you specify will be copied into this object.
1247 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
1248 CkReduction::reducerType reducer, CkReductionMsg *buf)
1252 CkReductionMsg *ret = buf ? buf : new(len,0) CkReductionMsg();
1254 ret->dataSize=NdataSize;
1255 if (srcData!=NULL && !buf)
1256 memcpy(ret->data,srcData,NdataSize);
1257 ret->userFlag=(CMK_REFNUM_TYPE)-1;
1258 ret->reducer=reducer;
1260 ret->sourceFlag=-1000;
1262 ret->migratableContributor = true;
1263 #if CMK_BIGSIM_CHARM
1269 // Charm kernel message runtime support:
1271 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits)
1273 int totalsize=ARM_DATASTART+(*sz);
1274 DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
1275 CkReductionMsg *ret = (CkReductionMsg *)
1276 CkAllocMsg(msgnum,totalsize,priobits);
1277 ret->data=(void *)(&ret->dataStorage);
1278 return (void *) ret;
1282 CkReductionMsg::pack(CkReductionMsg* in)
1284 DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
1285 //CkPrintf("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize);
1290 CkReductionMsg* CkReductionMsg::unpack(void *in)
1292 CkReductionMsg *ret = (CkReductionMsg *)in;
1293 DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
1294 //CkPrintf("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize);
1295 ret->data=(void *)(&ret->dataStorage);
1300 /////////////////////////////////////////////////////////////////////////////////////
1301 ///////////////// Builtin Reducer Functions //////////////
1302 /* A simple reducer, like sum_int, looks like this:
1303 CkReductionMsg *sum_int(int nMsg,CkReductionMessage **msg)
1306 for (i=0;i<nMsg;i++)
1307 ret+=*(int *)(msg[i]->getData());
1308 return CkReductionMsg::buildNew(sizeof(int),(void *)&ret);
1311 To keep the code small and easy to change, the implementations below
1312 are built with preprocessor macros.
1315 //////////////// simple reducers ///////////////////
1316 /*A define used to quickly and tersely construct simple reductions.
1317 The basic idea is to use the first message's data array as
1318 (pre-initialized!) scratch space for folding in the other messages.
1321 static CkReductionMsg *invalid_reducer(int nMsg,CkReductionMsg **msg)
1323 CkAbort("Called the invalid reducer type 0. This probably\n"
1324 "means you forgot to initialize your custom reducer index.\n");
1328 static CkReductionMsg *nop(int nMsg,CkReductionMsg **msg)
1330 return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
1333 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
1334 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
1336 RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
1338 int nElem=msg[0]->getLength()/sizeof(dataType);\
1339 dataType *ret=(dataType *)(msg[0]->getData());\
1340 for (m=1;m<nMsg;m++)\
1342 dataType *value=(dataType *)(msg[m]->getData());\
1343 for (i=0;i<nElem;i++)\
1345 RED_DEB(("|\tmsg%d (from %d) [%d]=" typeStr "\n",m,msg[m]->sourceFlag,i,value[i]));\
1349 RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
1350 return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
1353 //Use this macro for reductions that have the same type for all inputs
1354 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
1355 SIMPLE_REDUCTION(nameBase##_char,char,"%c",loop) \
1356 SIMPLE_REDUCTION(nameBase##_short,short,"%h",loop) \
1357 SIMPLE_REDUCTION(nameBase##_int,int,"%d",loop) \
1358 SIMPLE_REDUCTION(nameBase##_long,long,"%ld",loop) \
1359 SIMPLE_REDUCTION(nameBase##_long_long,long long,"%lld",loop) \
1360 SIMPLE_REDUCTION(nameBase##_uchar,unsigned char,"%c",loop) \
1361 SIMPLE_REDUCTION(nameBase##_ushort,unsigned short,"%hu",loop) \
1362 SIMPLE_REDUCTION(nameBase##_uint,unsigned int,"%u",loop) \
1363 SIMPLE_REDUCTION(nameBase##_ulong,unsigned long,"%lu",loop) \
1364 SIMPLE_REDUCTION(nameBase##_ulong_long,unsigned long long,"%llu",loop) \
1365 SIMPLE_REDUCTION(nameBase##_float,float,"%f",loop) \
1366 SIMPLE_REDUCTION(nameBase##_double,double,"%f",loop)
1368 //Compute the sum the numbers passed by each element.
1369 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
1371 //Compute the product of the numbers passed by each element.
1372 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
1374 //Compute the largest number passed by any element.
1375 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
1377 //Compute the smallest integer passed by any element.
1378 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
1381 //Compute the logical AND of the integers passed by each element.
1382 // The resulting integer will be zero if any source integer is zero; else 1.
1383 SIMPLE_REDUCTION(logical_and,int,"%d",
1386 ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1389 //Compute the logical AND of the integers passed by each element.
1390 // The resulting integer will be zero if any source integer is zero; else 1.
1391 SIMPLE_REDUCTION(logical_and_int,int,"%d",
1394 ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1397 //Compute the logical AND of the bools passed by each element.
1398 // The resulting bool will be false if any source bool is false; else true.
1399 SIMPLE_REDUCTION(logical_and_bool,bool,"%d",
1400 if (!value[i]) ret[i]=false;
1403 //Compute the logical OR of the integers passed by each element.
1404 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1405 SIMPLE_REDUCTION(logical_or,int,"%d",
1408 ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1411 //Compute the logical OR of the integers passed by each element.
1412 // The resulting integer will be 1 if any source integer is nonzero; else 0.
1413 SIMPLE_REDUCTION(logical_or_int,int,"%d",
1416 ret[i]=!!ret[i];//Make sure ret[i] is 0 or 1
1419 //Compute the logical OR of the bools passed by each element.
1420 // The resulting bool will be true if any source bool is true; else false.
1421 SIMPLE_REDUCTION(logical_or_bool,bool,"%d",
1422 if (value[i]) ret[i]=true;
1425 //Compute the logical XOR of the integers passed by each element.
1426 // The resulting integer will be 1 if an odd number of source integers is nonzero; else 0.
1427 SIMPLE_REDUCTION(logical_xor_int,int,"%d",
1428 ret[i] = (!ret[i] != !value[i]);
1431 //Compute the logical XOR of the bools passed by each element.
1432 // The resulting bool will be true if an odd number of source bools is true; else false.
1433 SIMPLE_REDUCTION(logical_xor_bool,bool,"%d",
1434 ret[i] = (ret[i] != value[i]);
1437 SIMPLE_REDUCTION(bitvec_and,int,"%d",ret[i]&=value[i];)
1438 SIMPLE_REDUCTION(bitvec_and_int,int,"%d",ret[i]&=value[i];)
1439 SIMPLE_REDUCTION(bitvec_and_bool,bool,"%d",ret[i]&=value[i];)
1441 SIMPLE_REDUCTION(bitvec_or,int,"%d",ret[i]|=value[i];)
1442 SIMPLE_REDUCTION(bitvec_or_int,int,"%d",ret[i]|=value[i];)
1443 SIMPLE_REDUCTION(bitvec_or_bool,bool,"%d",ret[i]|=value[i];)
1445 SIMPLE_REDUCTION(bitvec_xor,int,"%d",ret[i]^=value[i];)
1446 SIMPLE_REDUCTION(bitvec_xor_int,int,"%d",ret[i]^=value[i];)
1447 SIMPLE_REDUCTION(bitvec_xor_bool,bool,"%d",ret[i]^=value[i];)
1449 //Select one random message to pass on
1450 static CkReductionMsg *random(int nMsg,CkReductionMsg **msg) {
1451 int idx = (int)(CrnDrand()*(nMsg-1) + 0.5);
1452 return CkReductionMsg::buildNew(msg[idx]->getLength(),
1453 (void *)msg[idx]->getData(),
1454 CkReduction::random, msg[idx]);
1457 /////////////// concat ////////////////
1459 This reducer simply appends the data it recieves from each element,
1460 without any housekeeping data to separate them.
1462 static CkReductionMsg *concat(int nMsg,CkReductionMsg **msg)
1464 RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
1465 //Figure out how big a message we'll need
1467 for (i=0;i<nMsg;i++)
1468 retSize+=msg[i]->getSize();
1470 RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
1472 //Allocate a new message
1473 CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1475 //Copy the source message data into the return message
1476 char *cur=(char *)(ret->getData());
1477 for (i=0;i<nMsg;i++) {
1478 int messageBytes=msg[i]->getSize();
1479 memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1482 RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
1486 /////////////// set ////////////////
1488 This reducer appends the data it recieves from each element
1489 along with some housekeeping data indicating contribution boundaries.
1490 The message data is thus a list of reduction_set_element structures
1491 terminated by a dummy reduction_set_element with a sourceElement of -1.
1494 //This rounds an integer up to the nearest multiple of sizeof(double)
1495 static const int alignSize=sizeof(double);
1496 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
1498 //This gives the size (in bytes) of a reduction_set_element
1499 static int SET_SIZE(int dataSize)
1500 {return SET_ALIGN(sizeof(int)+dataSize);}
1502 //This returns a pointer to the next reduction_set_element in the list
1503 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
1505 char *next=((char *)cur)+SET_SIZE(cur->dataSize);
1506 return (CkReduction::setElement *)next;
1509 //Combine the data passed by each element into an list of reduction_set_elements.
1510 // Each element may contribute arbitrary data (with arbitrary length).
1511 static CkReductionMsg *set(int nMsg,CkReductionMsg **msg)
1513 RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
1514 //Figure out how big a message we'll need
1516 for (i=0;i<nMsg;i++) {
1517 if (!msg[i]->isFromUser())
1518 //This message is composite-- it will just be copied over (less terminating -1)
1519 retSize+=(msg[i]->getSize()-sizeof(int));
1520 else //This is a message from an element-- it will be wrapped in a reduction_set_element
1521 retSize+=SET_SIZE(msg[i]->getSize());
1523 retSize+=sizeof(int);//Leave room for terminating -1.
1525 RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
1527 //Allocate a new message
1528 CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
1530 //Copy the source message data into the return message
1531 CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
1532 for (i=0;i<nMsg;i++)
1533 if (!msg[i]->isFromUser())
1534 {//This message is composite-- just copy it over (less terminating -1)
1535 int messageBytes=msg[i]->getSize()-sizeof(int);
1536 RED_DEB(("|\tc msg[%d] is %d bytes\n",i,msg[i]->getSize()));
1537 memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
1538 cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
1540 else //This is a message from an element-- wrap it in a reduction_set_element
1542 RED_DEB(("|\tu msg[%d] is %d bytes\n",i,msg[i]->getSize()));
1543 cur->dataSize=msg[i]->getSize();
1544 memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
1547 cur->dataSize=-1;//Add a terminating -1.
1548 RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
1552 //Utility routine: get the next reduction_set_element in the list
1553 // if there is one, or return NULL if there are none.
1554 //To get all the elements, just keep feeding this procedure's output back to
1555 // its input until it returns NULL.
1556 CkReduction::setElement *CkReduction::setElement::next(void)
1558 CkReduction::setElement *n=SET_NEXT(this);
1559 if (n->dataSize==-1)
1560 return NULL;//This is the end of the list
1562 return n;//This is just another element
1566 ///////// statisticsElement
1568 CkReduction::statisticsElement::statisticsElement(double initialValue)
1570 , mean(initialValue)
1574 // statistics reducer
1575 // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
1576 // Chan, Tony F.; Golub, Gene H.; LeVeque, Randall J. (1979),
1577 // "Updating Formulae and a Pairwise Algorithm for Computing Sample Variances." (PDF),
1578 // Technical Report STAN-CS-79-773, Department of Computer Science, Stanford University.
1579 static CkReductionMsg* statistics(int nMsgs, CkReductionMsg** msg)
1581 int nElem = msg[0]->getLength() / sizeof(CkReduction::statisticsElement);
1582 CkReduction::statisticsElement* ret = (CkReduction::statisticsElement*)(msg[0]->getData());
1583 for (int m = 1; m < nMsgs; m++)
1585 CkReduction::statisticsElement* value = (CkReduction::statisticsElement*)(msg[m]->getData());
1586 for (int i = 0; i < nElem; i++)
1588 double a_count = ret[i].count;
1589 ret[i].count += value[i].count;
1590 double delta = value[i].mean - ret[i].mean;
1591 ret[i].mean += delta * value[i].count / ret[i].count;
1592 ret[i].m2 += value[i].m2 + delta * delta * value[i].count * a_count / ret[i].count;
1595 return CkReductionMsg::buildNew(
1596 nElem*sizeof(CkReduction::statisticsElement),
1598 CkReduction::invalid,
1602 ///////// tupleElement
1604 CkReduction::tupleElement::tupleElement()
1607 , reducer(CkReduction::invalid)
1610 CkReduction::tupleElement::tupleElement(size_t dataSize_, void* data_, CkReduction::reducerType reducer_)
1611 : dataSize(dataSize_)
1612 , data((char*)data_)
1617 CkReduction::tupleElement::tupleElement(CkReduction::tupleElement&& rhs_move)
1618 : dataSize(rhs_move.dataSize)
1619 , data(rhs_move.data)
1620 , reducer(rhs_move.reducer)
1621 , owns_data(rhs_move.owns_data)
1623 rhs_move.dataSize = 0;
1625 rhs_move.reducer = CkReduction::invalid;
1626 rhs_move.owns_data = false;
1628 CkReduction::tupleElement& CkReduction::tupleElement::operator=(CkReduction::tupleElement&& rhs_move)
1632 dataSize = rhs_move.dataSize;
1633 data = rhs_move.data;
1634 reducer = rhs_move.reducer;
1635 owns_data = rhs_move.owns_data;
1636 rhs_move.dataSize = 0;
1638 rhs_move.reducer = CkReduction::invalid;
1639 rhs_move.owns_data = false;
1642 CkReduction::tupleElement::~tupleElement()
1648 void CkReduction::tupleElement::pup(PUP::er &p) {
1650 // TODO - it might be better to pack these raw, then we don't have to
1651 // transform & copy them out on unpacking, we could just use the message's
1653 if (p.isUnpacking()) {
1654 data = new char[dataSize];
1657 PUParray(p, data, dataSize);
1658 if (p.isUnpacking()){
1661 reducer=(CkReduction::reducerType)temp;
1663 int temp=(int)reducer;
1668 CkReductionMsg* CkReductionMsg::buildFromTuple(CkReduction::tupleElement* reductions, int num_reductions)
1672 PUParray(ps, reductions, num_reductions);
1674 CkReductionMsg* msg = CkReductionMsg::buildNew(ps.size(), NULL, CkReduction::tuple);
1675 PUP::toMem p(msg->data);
1677 PUParray(p, reductions, num_reductions);
1678 if (p.size() != ps.size()) CmiAbort("Size mismatch packing CkReduction::tupleElement::tupleToBuffer\n");
1682 void CkReductionMsg::toTuple(CkReduction::tupleElement** out_reductions, int* num_reductions)
1684 PUP::fromMem p(this->getData());
1685 p|(*num_reductions);
1686 *out_reductions = new CkReduction::tupleElement[*num_reductions];
1687 PUParray(p, *out_reductions, *num_reductions);
1691 CkReductionMsg* CkReduction::tupleReduction(int num_messages, CkReductionMsg** messages)
1693 CkReduction::tupleElement** tuple_data = new CkReduction::tupleElement*[num_messages];
1694 int num_reductions = 0;
1695 for (int message_idx = 0; message_idx < num_messages; ++message_idx)
1697 int itr_num_reductions = 0;
1698 messages[message_idx]->toTuple(&tuple_data[message_idx], &itr_num_reductions);
1700 // each message must submit the same reductions
1701 if (num_reductions == 0)
1702 num_reductions = itr_num_reductions;
1703 else if (num_reductions != itr_num_reductions)
1704 CmiAbort("num_reductions mismatch in CkReduction::tupleReduction");
1707 DEB_TUPLE(("tupleReduction {\n num_messages=%d,\n num_reductions=%d,\n length=%d\n",
1708 num_messages, num_reductions, messages[0]->getLength()));
1710 CkReduction::tupleElement* return_data = new CkReduction::tupleElement[num_reductions];
1711 // using a raw buffer to avoid CkReductionMsg constructor/destructor, we want to manage
1712 // the inner memory of these temps ourselves to avoid unneeded copies
1713 char* simulated_messages_buffer = new char[sizeof(CkReductionMsg) * num_reductions * num_messages];
1714 CkReductionMsg** simulated_messages = new CkReductionMsg*[num_messages];
1716 // imagine the given data in a 2D layout where the messages are rows and reductions are columns
1717 // here we grab each column and run that reduction
1719 std::vector<CkReductionMsg *> msgs_to_delete;
1720 for (int reduction_idx = 0; reduction_idx < num_reductions; ++reduction_idx)
1722 DEB_TUPLE((" reduction_idx=%d {\n", reduction_idx));
1723 CkReduction::reducerType reducerType = CkReduction::invalid;
1724 for (int message_idx = 0; message_idx < num_messages; ++message_idx)
1726 CkReduction::tupleElement* reductions = (CkReduction::tupleElement*)(tuple_data[message_idx]);
1727 CkReduction::tupleElement& element = reductions[reduction_idx];
1728 DEB_TUPLE((" msg %d, sf=%d, length=%d : { dataSize=%d, data=%p, reducer=%d },\n",
1729 message_idx, messages[message_idx]->sourceFlag, messages[message_idx]->getLength(), element.dataSize, element.data, element.reducer));
1731 reducerType = element.reducer;
1733 size_t sim_idx = (reduction_idx * num_messages + message_idx) * sizeof(CkReductionMsg);
1734 CkReductionMsg& simulated_message = *(CkReductionMsg*)&simulated_messages_buffer[sim_idx];
1735 simulated_message.dataSize = element.dataSize;
1736 simulated_message.data = element.data;
1737 simulated_message.reducer = element.reducer;
1738 simulated_message.sourceFlag = messages[message_idx]->sourceFlag;
1739 simulated_message.userFlag = messages[message_idx]->userFlag;
1740 simulated_message.gcount = messages[message_idx]->gcount;
1741 simulated_message.migratableContributor = messages[message_idx]->migratableContributor;
1742 #if CMK_BIGSIM_CHARM
1743 simulated_message.log = NULL;
1745 simulated_messages[message_idx] = &simulated_message;
1748 // run the reduction and copy the result back to our data structure
1749 const auto& reducerFp = CkReduction::reducerTable()[reducerType].fn;
1750 CkReductionMsg* result = reducerFp(num_messages, simulated_messages);
1751 DEB_TUPLE((" result_len=%d\n },\n", result->getLength()));
1752 return_data[reduction_idx] = CkReduction::tupleElement(result->getLength(), result->getData(), reducerType);
1753 // reducers are allowed to reuse the zeroth message's memory, so it is not safe to delete this
1754 // all the time, and, even if it is, deletion must be deferred until after processing is complete.
1755 if (result != simulated_messages[0]) {
1756 msgs_to_delete.push_back(result);
1760 CkReductionMsg* retval = CkReductionMsg::buildFromTuple(return_data, num_reductions);
1761 DEB_TUPLE(("} tupleReduction msg_size=%d\n", retval->getSize()));
1763 for (int message_idx = 0; message_idx < num_messages; ++message_idx)
1764 delete[] tuple_data[message_idx];
1765 delete[] tuple_data;
1766 delete[] return_data;
1767 delete[] simulated_messages_buffer;
1768 // note that although this is a 2d array, we don't need to delete the inner objects,
1769 // their memory is tracked in simulated_messages_buffer
1770 delete[] simulated_messages;
1771 for (int i = 0; i < msgs_to_delete.size(); ++i)
1772 delete msgs_to_delete[i];
1778 /////////////////// Reduction Function Table /////////////////////
1779 CkReduction::CkReduction() {} //Dummy private constructor
1781 //Add the given reducer to the list. Returns the new reducer's
1782 // reducerType. Must be called in the same order on every node.
1783 CkReduction::reducerType CkReduction::addReducer(reducerFn fn, bool streamable)
1785 CkAssert(CmiMyRank() == 0);
1786 reducerType index = (reducerType)reducerTable().size();
1787 reducerTable().push_back(reducerStruct(fn, streamable));
1792 /*Reducer table: maps reducerTypes to reducerStructs.
1793 It's indexed by reducerType, so the order in this table
1794 must *exactly* match the reducerType enum declaration.
1795 The names don't have to match, but it helps.
1797 std::vector<CkReduction::reducerStruct> CkReduction::initReducerTable()
1799 std::vector<CkReduction::reducerStruct> vec;
1801 vec.push_back(CkReduction::reducerStruct(::invalid_reducer, true));
1802 vec.push_back(CkReduction::reducerStruct(::nop, true));
1803 //Compute the sum the numbers passed by each element.
1804 vec.push_back(CkReduction::reducerStruct(::sum_char, true));
1805 vec.push_back(CkReduction::reducerStruct(::sum_short, true));
1806 vec.push_back(CkReduction::reducerStruct(::sum_int, true));
1807 vec.push_back(CkReduction::reducerStruct(::sum_long, true));
1808 vec.push_back(CkReduction::reducerStruct(::sum_long_long, true));
1809 vec.push_back(CkReduction::reducerStruct(::sum_uchar, true));
1810 vec.push_back(CkReduction::reducerStruct(::sum_ushort, true));
1811 vec.push_back(CkReduction::reducerStruct(::sum_uint, true));
1812 vec.push_back(CkReduction::reducerStruct(::sum_ulong, true));
1813 vec.push_back(CkReduction::reducerStruct(::sum_ulong_long, true));
1814 vec.push_back(CkReduction::reducerStruct(::sum_float, true));
1815 vec.push_back(CkReduction::reducerStruct(::sum_double, true));
1817 //Compute the product the numbers passed by each element.
1818 vec.push_back(CkReduction::reducerStruct(::product_char, true));
1819 vec.push_back(CkReduction::reducerStruct(::product_short, true));
1820 vec.push_back(CkReduction::reducerStruct(::product_int, true));
1821 vec.push_back(CkReduction::reducerStruct(::product_long, true));
1822 vec.push_back(CkReduction::reducerStruct(::product_long_long, true));
1823 vec.push_back(CkReduction::reducerStruct(::product_uchar, true));
1824 vec.push_back(CkReduction::reducerStruct(::product_ushort, true));
1825 vec.push_back(CkReduction::reducerStruct(::product_uint, true));
1826 vec.push_back(CkReduction::reducerStruct(::product_ulong, true));
1827 vec.push_back(CkReduction::reducerStruct(::product_ulong_long, true));
1828 vec.push_back(CkReduction::reducerStruct(::product_float, true));
1829 vec.push_back(CkReduction::reducerStruct(::product_double, true));
1831 //Compute the largest number passed by any element.
1832 vec.push_back(CkReduction::reducerStruct(::max_char, true));
1833 vec.push_back(CkReduction::reducerStruct(::max_short, true));
1834 vec.push_back(CkReduction::reducerStruct(::max_int, true));
1835 vec.push_back(CkReduction::reducerStruct(::max_long, true));
1836 vec.push_back(CkReduction::reducerStruct(::max_long_long, true));
1837 vec.push_back(CkReduction::reducerStruct(::max_uchar, true));
1838 vec.push_back(CkReduction::reducerStruct(::max_ushort, true));
1839 vec.push_back(CkReduction::reducerStruct(::max_uint, true));
1840 vec.push_back(CkReduction::reducerStruct(::max_ulong, true));
1841 vec.push_back(CkReduction::reducerStruct(::max_ulong_long, true));
1842 vec.push_back(CkReduction::reducerStruct(::max_float, true));
1843 vec.push_back(CkReduction::reducerStruct(::max_double, true));
1845 //Compute the smallest number passed by any element.
1846 vec.push_back(CkReduction::reducerStruct(::min_char, true));
1847 vec.push_back(CkReduction::reducerStruct(::min_short, true));
1848 vec.push_back(CkReduction::reducerStruct(::min_int, true));
1849 vec.push_back(CkReduction::reducerStruct(::min_long, true));
1850 vec.push_back(CkReduction::reducerStruct(::min_long_long, true));
1851 vec.push_back(CkReduction::reducerStruct(::min_uchar, true));
1852 vec.push_back(CkReduction::reducerStruct(::min_ushort, true));
1853 vec.push_back(CkReduction::reducerStruct(::min_uint, true));
1854 vec.push_back(CkReduction::reducerStruct(::min_ulong, true));
1855 vec.push_back(CkReduction::reducerStruct(::min_ulong_long, true));
1856 vec.push_back(CkReduction::reducerStruct(::min_float, true));
1857 vec.push_back(CkReduction::reducerStruct(::min_double, true));
1859 //Compute the logical AND of the values passed by each element.
1860 // The resulting value will be zero if any source value is zero.
1861 // logical_and deprecated in favor of logical_and_int
1862 vec.push_back(CkReduction::reducerStruct(::logical_and, true));
1863 vec.push_back(CkReduction::reducerStruct(::logical_and_int, true));
1864 vec.push_back(CkReduction::reducerStruct(::logical_and_bool, true));
1866 //Compute the logical OR of the values passed by each element.
1867 // The resulting value will be 1 if any source value is nonzero.
1868 // logical_or deprecated in favor of logical_or_int
1869 vec.push_back(CkReduction::reducerStruct(::logical_or, true));
1870 vec.push_back(CkReduction::reducerStruct(::logical_or_int, true));
1871 vec.push_back(CkReduction::reducerStruct(::logical_or_bool, true));
1873 //Compute the logical XOR of the values passed by each element.
1874 // The resulting value will be 1 if an odd number of source values is nonzero.
1875 vec.push_back(CkReduction::reducerStruct(::logical_xor_int, true));
1876 vec.push_back(CkReduction::reducerStruct(::logical_xor_bool, true));
1878 // Compute the logical bitvector AND of the values passed by each element.
1879 // bitvec_and deprecated in favor of bitvec_and_int
1880 vec.push_back(CkReduction::reducerStruct(::bitvec_and, true));
1881 vec.push_back(CkReduction::reducerStruct(::bitvec_and_int, true));
1882 vec.push_back(CkReduction::reducerStruct(::bitvec_and_bool, true));
1884 // Compute the logical bitvector OR of the values passed by each element.
1885 // bitvec_or deprecated in favor of bitvec_or_int
1886 vec.push_back(CkReduction::reducerStruct(::bitvec_or, true));
1887 vec.push_back(CkReduction::reducerStruct(::bitvec_or_int, true));
1888 vec.push_back(CkReduction::reducerStruct(::bitvec_or_bool, true));
1890 // Compute the logical bitvector XOR of the values passed by each element.
1891 vec.push_back(CkReduction::reducerStruct(::bitvec_xor, true));
1892 vec.push_back(CkReduction::reducerStruct(::bitvec_xor_int, true));
1893 vec.push_back(CkReduction::reducerStruct(::bitvec_xor_bool, true));
1895 // Select one of the messages at random to pass on
1896 vec.push_back(CkReduction::reducerStruct(::random, true));
1898 //Concatenate the (arbitrary) data passed by each element
1899 // This reduction is marked as unstreamable because of the n^2
1900 // work required to stream it
1901 vec.push_back(CkReduction::reducerStruct(::concat, false));
1903 //Combine the data passed by each element into an list of setElements.
1904 // Each element may contribute arbitrary data (with arbitrary length).
1905 // This reduction is marked as unstreamable because of the n^2
1906 // work required to stream it
1907 vec.push_back(CkReduction::reducerStruct(::set, false));
1909 // Computes a count, mean, and variance for the contributed values
1910 vec.push_back(CkReduction::reducerStruct(::statistics, true));
1912 // Allows multiple reductions to be done in the same message
1913 vec.push_back(CkReduction::reducerStruct(CkReduction::tupleReduction, false));
1918 /* Wraps accesses to the reducerTable to prevent the static initialization
1920 std::vector<CkReduction::reducerStruct>& CkReduction::reducerTable()
1922 static std::vector<CkReduction::reducerStruct> table = initReducerTable();
1933 /********** Code added by Sayantan *********************/
1934 /** Locking is a big problem in the nodegroup code for smp.
1935 So a few assumptions have had to be made. There is one lock
1936 called lockEverything. It protects all the data structures
1937 of the nodegroup reduction mgr. I tried grabbing it separately
1938 for each datastructure, modifying it and then releasing it and
1939 then grabbing it again, for the next change.
1940 That doesn't really help because the interleaved execution of
1941 different threads makes the state of the reduction manager
1944 1. Grab lockEverything before calling finishreduction or startReduction
1946 2. lockEverything is grabbed only in entry methods reductionStarting
1947 or RecvMesg or addcontribution.
1950 /**nodegroup reduction manager . Most of it is similar to the guy above***/
1951 NodeGroup::NodeGroup(void):thisIndex(CkMyNode()) {
1952 __nodelock=CmiCreateLock();
1953 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1954 mlogData->objID.type = TypeNodeGroup;
1955 mlogData->objID.data.group.onPE = CkMyNode();
1959 NodeGroup::~NodeGroup() {
1960 CmiDestroyLock(__nodelock);
1961 CkpvAccess(_destroyingNodeGroup) = true;
1963 void NodeGroup::pup(PUP::er &p)
1965 CkNodeReductionMgr::pup(p);
1969 //CK_REDUCTION_CLIENT_DEF(CProxy_NodeGroup,(CkNodeReductionMgr *)CkLocalBranch(_ck_gid));
1971 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
1972 DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
1973 ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
1974 //ckLocalNodeBranch()->ckSetReductionClient(cb);
1977 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
1978 ((CkNodeReductionMgr *)this),
1979 reductionInfo,false)
1981 /* this contribute also adds up the count across all messages it receives.
1982 Useful for summing up number of array elements who have contributed ****/
1983 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
1984 {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
1988 //#define BINOMIAL_TREE
1990 CkNodeReductionMgr::CkNodeReductionMgr()//Constructor
1991 : thisProxy(thisgroup)
1993 #ifdef BINOMIAL_TREE
1994 init_BinomialTree();
1998 storedCallback=NULL;
2002 startRequested=false;
2003 gcount=CkNumNodes();
2006 lockEverything = CmiCreateLock();
2011 DEBR((AA "In NodereductionMgr constructor at %d \n" AB,this));
2016 maxModificationRedNo = INT_MAX;
2018 additionalGCount = newAdditionalGCount = 0;
2021 CkNodeReductionMgr::~CkNodeReductionMgr()
2023 CmiDestroyLock(lockEverything);
2026 void CkNodeReductionMgr::flushStates()
2028 if(CkMyRank() == 0){
2029 // CmiPrintf("[%d] CkNodeReductionMgr::flushState\n", CkMyPe());
2033 startRequested=false;
2034 gcount=CkNumNodes();
2040 while (!msgs.isEmpty()) { delete msgs.deq(); }
2041 while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
2042 while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
2043 while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
2047 //////////// Reduction Manager Client API /////////////
2049 //Add the given client function. Overwrites any previous client.
2050 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
2052 DEBR((AA "Setting reductionClient in NodeReductionMgr %d at %d\n" AB,cb,this));
2053 if(cb->isInvalid()){
2054 DEBR((AA "Invalid Callback passed to setReductionClient in nodeReductionMgr\n" AB));
2056 DEBR((AA "Valid Callback passed to setReductionClient in nodeReductionMgr\n" AB));
2060 CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
2061 delete storedCallback;
2067 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
2069 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2070 Chare *oldObj =CpvAccess(_currentObj);
2071 CpvAccess(_currentObj) = this;
2075 m->redNo=ci->redNo++;
2076 m->sourceFlag=-1;//A single contribution
2078 DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
2081 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2082 CpvAccess(_currentObj) = oldObj;
2087 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
2089 #if CMK_BIGSIM_CHARM
2090 _TRACE_BG_TLINE_END(&m->log);
2092 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2093 Chare *oldObj =CpvAccess(_currentObj);
2094 CpvAccess(_currentObj) = this;
2097 m->redNo=ci->redNo++;
2099 DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
2101 DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
2103 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2104 CpvAccess(_currentObj) = oldObj;
2109 //////////// Reduction Manager Remote Entry Points /////////////
2111 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
2112 DEBR(("[%d,%d] doRecvMsg called for %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2117 DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
2118 bufferedRemoteMsgs.enq(m);
2122 if (isPresent(m->redNo)) { //Is a regular, in-order reduction message
2123 //DEBR((AA "Recv'd remote contribution %d for #%d at %d\n" AB,nRemote,m->redNo,this));
2124 startReduction(m->redNo,CkMyNode());
2130 if (isFuture(m->redNo)) {
2131 // DEBR((AA "Recv'd early remote contribution %d for #%d\n" AB,nRemote,m->redNo));
2132 futureRemoteMsgs.enq(m);
2134 CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);
2135 CkAbort("Recv'd late remote contribution!\n");
2138 DEBR(("[%d,%d]]]]] doRecvMsg called for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2141 //Sent up the reduction tree with reduced data
2142 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
2144 #if CMK_BIGSIM_CHARM
2145 _TRACE_BG_TLINE_END(&m->log);
2147 #ifndef CMK_CPV_IS_SMP
2148 #if CMK_IMMEDIATE_MSG
2149 if(interrupt == true){
2150 //CkPrintf("$$$$$$$$$How did i wake up in the middle of someone else's entry method ?\n");
2151 CpvAccess(_qd)->process(-1);
2152 CmiDelayImmediate();
2158 CmiLock(lockEverything);
2159 DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2161 CmiUnlock(lockEverything);
2163 DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
2166 void CkNodeReductionMgr::startReduction(int number,int srcNode)
2168 if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
2169 if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
2171 DEBR((AA "This Node reduction is already in progress\n" AB));
2172 return;//This reduction already started
2174 if (creating) //Don't start yet-- we're creating elements
2176 DEBR((AA " Node Postponing start request #%d until we're done creating\n" AB,redNo));
2177 startRequested=true;
2181 //If none of these cases, we need to start the reduction--
2182 DEBR((AA "Starting Node reduction #%d on %p srcNode %d\n" AB,redNo,this,srcNode));
2186 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
2191 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
2192 bufferedMsgs.enq(m);
2196 if (isFuture(m->redNo)) {//An early contribution-- add to future Q
2197 DEBR((AA "Contributor gives early node contribution-- for #%d\n" AB,m->redNo));
2199 } else {// An ordinary contribution
2200 DEBR((AA "Recv'd local node contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
2201 // CmiPrintf("[%d,%d] Redcv'd Local Contribution for redNo %d number %d at %0.6f \n",CkMyNode(),CkMyPe(),m->redNo,nContrib+1,CkWallTimer());
2202 startReduction(m->redNo,CkMyNode());
2209 //Handle a message from one element for the reduction
2210 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
2213 CmiLock(lockEverything);
2214 doAddContribution(m);
2215 CmiUnlock(lockEverything);
2219 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
2220 CmiLock(lockEverything);
2225 DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
2226 bufferedMsgs.enq(m);
2227 CmiUnlock(lockEverything);
2231 if (isFuture(m->redNo)) {//An early contribution-- add to future Q
2232 DEBR((AA "Latemigrant gives early node contribution-- for #%d\n" AB,m->redNo));
2233 // CkPrintf("[%d,%d] NodeGroup %d> Latemigrant gives early node contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
2234 futureLateMigrantMsgs.enq(m);
2235 } else {// An ordinary contribution
2236 DEBR((AA "Recv'd late migrant contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
2237 // CkPrintf("[%d,%d] NodeGroup %d> Latemigrant contribution %d in redNo %d\n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo);
2241 CmiUnlock(lockEverything);
2248 /** check if the nodegroup reduction is finished at this node. In that case send it
2249 up the reduction tree **/
2251 void CkNodeReductionMgr::finishReduction(void)
2253 DEBR((AA "in Nodegrp finishReduction %d treeKids %d \n" AB,inProgress,treeKids()));
2254 /***Check if reduction is finished in the next few ifs***/
2255 if ((!inProgress) || creating){
2256 DEBR((AA "Either not in Progress or creating\n" AB));
2260 bool partialReduction = false;
2262 if (nContrib<(lcount)){
2263 if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
2264 partialReduction = true;
2267 DEBR((AA "Nodegrp Need more local messages %d %d\n" AB,nContrib,(lcount)));
2268 return;//Need more local messages
2271 if (nRemote<treeKids()){
2272 if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
2273 partialReduction = true;
2276 DEBR((AA "Nodegrp Need more Remote messages %d %d\n" AB,nRemote,treeKids()));
2277 return;//Need more remote messages
2280 if (nRemote>treeKids()){
2283 CkAbort("Nodegrp Excess remote reduction message received!\n");
2286 DEBR((AA "Reducing node data...\n" AB));
2288 /**reduce all messages received at this node **/
2289 CkReductionMsg *result=reduceMessages();
2291 if (partialReduction) {
2297 {//Pass data up tree to parent
2298 if(CmiNodeAlive(CkMyNode()) || killed == false){
2299 DEBR((AA "Passing reduced data up to parent node %d. \n" AB,treeParent()));
2300 DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
2304 result->gcount += additionalGCount;//if u have replaced some node add its gcount to ur count
2305 thisProxy[treeParent()].RecvMsg(result);
2311 if(result->isMigratableContributor() && result->gcount+additionalGCount != result->sourceFlag){
2312 DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
2316 result->gcount += additionalGCount;
2317 /** if the reduction is finished and I am the root of the reduction tree
2318 then call the reductionhandler and other stuff ***/
2321 DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
2322 CkSetRefNum(result, result->getUserFlag());
2323 if (!result->callback.isInvalid()){
2324 DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
2325 result->callback.send(result);
2327 else if (storedCallback!=NULL){
2328 DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
2329 storedCallback->send(result);
2332 DEBR((AA "Invalid Callback \n" AB));
2333 CkAbort("No reduction client!\n"
2334 "You must register a client with either SetReductionClient or during contribute.\n");
2338 // DEBR((AA "Reduction %d finished in group!\n" AB,redNo));
2339 //CkPrintf("[%d,%d]Reduction %d finished with %d\n",CkMyNode(),CkMyPe(),redNo,nContrib);
2344 startRequested=false;
2347 //Look through the future queue for messages we can now handle
2348 int n=futureMsgs.length();
2354 CkReductionMsg *m=futureMsgs.deq();
2357 if (m!=NULL){ //One of these addContributions may have finished us.
2358 DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
2359 doAddContribution(m);//<- if *still* early, puts it back in the queue
2365 n=futureRemoteMsgs.length();
2372 CkReductionMsg *m=futureRemoteMsgs.deq();
2376 doRecvMsg(m);//<- if *still* early, puts it back in the queue
2379 n = futureLateMigrantMsgs.length();
2381 CkReductionMsg *m = futureLateMigrantMsgs.deq();
2383 if(m->redNo == redNo){
2386 futureLateMigrantMsgs.enq(m);
2392 //////////// Reduction Manager Utilities /////////////
2394 void CkNodeReductionMgr::init_BinaryTree(){
2395 parent = (CkMyNode()-1)/TREE_WID;
2396 int firstkid = CkMyNode()*TREE_WID+1;
2397 numKids=CkNumNodes()-firstkid;
2398 if (numKids>TREE_WID) numKids=TREE_WID;
2399 if (numKids<0) numKids=0;
2401 for(int i=0;i<numKids;i++){
2402 kids.push_back(firstkid+i);
2403 newKids.push_back(firstkid+i);
2407 void CkNodeReductionMgr::init_TopoTree() {
2408 if (_topoTree == NULL) CkAbort("CkNodeReductionMgr:: topo tree has not been calculated\n");
2409 CmiSpanningTreeInfo &t = *_topoTree;
2411 numKids = t.child_count;
2412 for (int i=0; i < numKids; i++) {
2413 kids.push_back(t.children[i]);
2414 newKids.push_back(t.children[i]);
2418 void CkNodeReductionMgr::init_BinomialTree(){
2419 int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
2420 /*upperSize = (unsigned )pow((double)2,depth);*/
2421 upperSize = (unsigned) 1 << depth;
2422 label = upperSize-CkMyNode()-1;
2433 /*parent = label + rint(pow((double)2,count));*/
2434 parent = label + (1<<count);
2435 parent = upperSize -1 -parent;
2439 for(int i=0;i<count;i++){
2440 /*temp = label - rint(pow((double)2,i));*/
2441 temp = label - (1<<i);
2442 temp = upperSize-1-temp;
2443 if(temp <= CkNumNodes()-1){
2444 // kids[numKids] = temp;
2445 kids.push_back(temp);
2456 int CkNodeReductionMgr::treeRoot(void)
2460 bool CkNodeReductionMgr::hasParent(void) //Root Node
2462 return (bool)(CkMyNode()!=treeRoot());
2464 int CkNodeReductionMgr::treeParent(void) //My parent Node
2469 int CkNodeReductionMgr::firstKid(void) //My first child Node
2471 return CkMyNode()*TREE_WID+1;
2473 int CkNodeReductionMgr::treeKids(void)//Number of children in tree
2475 #ifdef BINOMIAL_TREE
2478 /* int nKids=CkNumNodes()-firstKid();
2479 if (nKids>TREE_WID) nKids=TREE_WID;
2480 if (nKids<0) nKids=0;
2486 //Combine (& free) the current message vector msgs.
2487 CkReductionMsg *CkNodeReductionMgr::reduceMessages(void)
2489 #if CMK_BIGSIM_CHARM
2490 _TRACE_BG_END_EXECUTE(1);
2491 void* _bgParentLog = NULL;
2492 _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
2494 CkReductionMsg *ret=NULL;
2496 //Look through the vector for a valid reducer, swapping out placeholder messages
2497 CkReduction::reducerType r=CkReduction::invalid;
2498 int msgs_gcount=0;//Reduced gcount
2499 int msgs_nSources=0;//Reduced nSources
2500 CMK_REFNUM_TYPE msgs_userFlag=(CMK_REFNUM_TYPE)-1;
2501 CkCallback msgs_callback;
2505 CkReductionMsg **msgArr=new CkReductionMsg*[msgs.length()];
2506 bool isMigratableContributor;
2509 while(NULL!=(m=msgs.deq()))
2511 DEBR((AA "***** gcount=%d; sourceFlag=%d ismigratable %d \n" AB,m->gcount,m->nSources(),m->isMigratableContributor()));
2512 msgs_gcount+=m->gcount;
2513 if (m->sourceFlag!=0)
2514 { //This is a real message from an element, not just a placeholder
2515 msgs_nSources+=m->nSources();
2516 #if CMK_BIGSIM_CHARM
2517 _TRACE_BG_ADD_BACKWARD_DEP(m->log);
2520 if (nMsgs == 0 || m->reducer != CkReduction::nop) {
2523 if (!m->callback.isInvalid()){
2524 #if CMK_ERROR_CHECKING
2525 if(nMsgs > 1 && !(msgs_callback == m->callback))
2526 CkAbort("mis-matched client callbacks in reduction messages\n");
2528 msgs_callback=m->callback;
2530 if (m->userFlag!=(CMK_REFNUM_TYPE)-1)
2531 msgs_userFlag=m->userFlag;
2532 isMigratableContributor= m->isMigratableContributor();
2535 #if CMK_ERROR_CHECKING
2536 if(!(msgs_callback == m->callback))
2537 CkAbort("mis-matched client callbacks in reduction messages\n");
2543 { //This is just a placeholder message-- replace it
2548 if (nMsgs==0||r==CkReduction::invalid)
2549 //No valid reducer in the whole vector
2550 ret=CkReductionMsg::buildNew(0,NULL);
2552 {//Use the reducer to reduce the messages
2554 msgArr[0]->reducer != CkReduction::set &&
2555 msgArr[0]->reducer != CkReduction::tuple) {
2558 if (msgArr[0]->reducer == CkReduction::nop) {
2559 // nMsgs > 1 indicates that reduction type is not nop
2560 // this means any data with reducer type nop was submitted
2561 // only so that counts would agree, and can be removed
2563 msgArr[0] = msgArr[nMsgs - 1];
2566 CkReduction::reducerFn f=CkReduction::reducerTable()[r].fn;
2567 ret=(*f)(nMsgs,msgArr);
2573 #if USE_CRITICAL_PATH_HEADER_ARRAY
2574 #if CRITICAL_PATH_DEBUG > 3
2575 CkPrintf("[%d] combining critical path information from messages in CkNodeReductionMgr::reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
2577 MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
2578 path.updateMax(UsrToEnv(ret));
2579 // Combine the critical paths from all the reduction messages into the header for the new result
2580 for (i=0;i<nMsgs;i++){
2581 if (msgArr[i]!=ret){
2582 // CkPrintf("[%d] other path = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2583 path.updateMax(UsrToEnv(msgArr[i]));
2585 // CkPrintf("[%d] other path is ret = %lf\n", CkMyPe(), UsrToEnv(msgArr[i])->pathHistory.getTime() );
2588 #if CRITICAL_PATH_DEBUG > 3
2589 CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
2595 //Go back through the vector, deleting old messages
2596 for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
2598 //Set the message counts
2600 ret->gcount=msgs_gcount;
2601 ret->userFlag=msgs_userFlag;
2602 ret->callback=msgs_callback;
2603 ret->sourceFlag=msgs_nSources;
2604 ret->setMigratableContributor(isMigratableContributor);
2605 DEBR((AA "Node Reduced gcount=%d; sourceFlag=%d\n" AB,ret->gcount,ret->sourceFlag));
2606 #if CMK_BIGSIM_CHARM
2607 _TRACE_BG_TLINE_END(&ret->log);
2613 void CkNodeReductionMgr::pup(PUP::er &p)
2615 //We do not store the client function pointer or the client function parameter,
2616 //it is the responsibility of the programmer to correctly restore these
2619 p(inProgress); p(creating); p(startRequested);
2621 p(nContrib); p(nRemote);
2626 p|futureLateMigrantMsgs;
2629 p|newAdditionalGCount;
2630 if(p.isUnpacking()) {
2631 gcount=CkNumNodes();
2632 thisProxy = thisgroup;
2633 lockEverything = CmiCreateLock();
2634 #ifdef BINOMIAL_TREE
2635 init_BinomialTree();
2641 p | maxModificationRedNo;
2643 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2644 bool isnull = (storedCallback == NULL);
2647 if (p.isUnpacking()) {
2648 storedCallback = new CkCallback;
2658 Evacuate - is called when this processor realizes it might crash. In that case, it tries to change
2659 the reduction tree. It also needs to decide a reduction number after which it shall use the new
2662 void CkNodeReductionMgr::evacuate(){
2663 DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
2664 if(treeKids() == 0){
2666 if the node going down is a leaf
2669 DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
2671 Need to ask parent for the reduction number that it has seen.
2672 Since it is a leaf, the tree does not need to be rewired.
2673 We reuse the oldparent type of tree modification message to get
2674 the parent to block and tell us about the highest reduction number it has seen.
2679 data[1]=getTotalGCount()+additionalGCount;
2680 thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
2681 newParent = treeParent();
2683 DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
2686 It is not a leaf. It needs to rewire the tree around itself.
2687 It also needs to decide on a reduction No after which the new tree will be used
2688 Till it decides on the new tree and the redNo at which it becomes valid,
2689 all received messages will be buffered
2691 newParent = kids[0];
2692 for(int i=numKids-1;i>=0;i--){
2696 Ask everybody for the highest reduction number they have seen and
2697 also tell them about the new tree
2700 Tell parent about its new child;
2702 int oldParentData[2];
2703 oldParentData[0] = CkMyNode();
2704 oldParentData[1] = newParent;
2705 thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
2708 Tell the other children about their new parent
2710 int childrenData=newParent;
2711 for(int i=1;i<numKids;i++){
2712 thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
2716 Tell newParent (1st child) about its new children,
2717 the current node and its children except the newParent
2719 int *newParentData = new int[numKids+2];
2720 for(int i=1;i<numKids;i++){
2721 newParentData[i] = kids[i];
2723 newParentData[0] = CkMyNode();
2724 newParentData[numKids] = parent;
2725 newParentData[numKids+1] = getTotalGCount()+additionalGCount;
2726 thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData);
2728 readyDeletion = false;
2730 numModificationReplies = 0;
2731 tempModificationRedNo = findMaxRedNo();
2735 Depending on the code, use the data to change the tree
2736 1. OLDPARENT : replace the old child with a new one
2737 2. OLDCHILDREN: replace the parent
2738 3. NEWPARENT: add the children and change the parent
2739 4. LEAFPARENT: delete the old child
2742 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
2743 DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
2746 readyDeletion = false;
2747 newAdditionalGCount = additionalGCount;
2750 for(int i=0;i<numKids;i++){
2751 if(newKids[i] == data[0]){
2752 newKids[i] = data[1];
2760 newParent = data[0];
2764 for(int i=0;i<size-2;i++){
2765 newKids.push_back(data[i]);
2767 newParent = data[size-2];
2768 newAdditionalGCount += data[size-1];
2772 for(int i=0;i<numKids;i++){
2773 if(newKids[i] == data[0]){
2780 newAdditionalGCount += data[1];
2784 int maxRedNo = findMaxRedNo();
2786 thisProxy[sender].collectMaxRedNo(maxRedNo);
2789 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
2791 Find out the maximum redNo that has been seen by
2794 numModificationReplies++;
2795 if(maxRedNo > tempModificationRedNo){
2796 tempModificationRedNo = maxRedNo;
2798 if(numModificationReplies == numKids+1){
2799 maxModificationRedNo = tempModificationRedNo;
2801 when all the affected nodes have replied, tell them the maximum.
2802 Unblock yourself. deal with the buffered messages local and remote
2804 if(maxModificationRedNo == -1){
2805 printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
2807 DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
2809 thisProxy[parent].unblockNode(maxModificationRedNo);
2810 for(int i=0;i<numKids;i++){
2811 thisProxy[kids[i]].unblockNode(maxModificationRedNo);
2819 void CkNodeReductionMgr::unblockNode(int maxRedNo){
2820 maxModificationRedNo = maxRedNo;
2827 void CkNodeReductionMgr::clearBlockedMsgs(){
2828 int len = bufferedMsgs.length();
2829 for(int i=0;i<len;i++){
2830 CkReductionMsg *m = bufferedMsgs.deq();
2831 doAddContribution(m);
2833 len = bufferedRemoteMsgs.length();
2834 for(int i=0;i<len;i++){
2835 CkReductionMsg *m = bufferedRemoteMsgs.deq();
2841 if the reduction number exceeds the maxModificationRedNo, change the tree
2842 to become the new one
2845 void CkNodeReductionMgr::updateTree(){
2846 if(redNo > maxModificationRedNo){
2849 maxModificationRedNo = INT_MAX;
2850 numKids = kids.size();
2851 readyDeletion = true;
2852 additionalGCount = newAdditionalGCount;
2853 DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
2854 for(int i=0;i<(int)(newKids.size());i++){
2855 DEBREVAC(("%d ",newKids[i]));
2858 // startReduction(redNo,CkMyNode());
2860 if(maxModificationRedNo != INT_MAX){
2861 DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
2862 startReduction(redNo,CkMyNode());
2869 void CkNodeReductionMgr::doneEvacuate(){
2870 DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
2873 It used to be a leaf
2874 Then as soon as future messages have been emptied you can
2875 send the parent a message telling them that they are not going
2876 to receive anymore messages from this child
2878 DEBR(("[%d] At the end of evacuation emptying future messages %d \n",CkMyNode(),futureMsgs.length()));
2879 while(futureMsgs.length() != 0){
2880 int n = futureMsgs.length();
2881 for(int i=0;i<n;i++){
2882 CkReductionMsg *m = futureMsgs.deq();
2883 if(isPresent(m->redNo)){
2889 CkReductionMsg *result = reduceMessages();
2890 thisProxy[treeParent()].RecvMsg(result);
2893 DEBR(("[%d] Asking parent %d to remove myself from list \n",CkMyNode(),treeParent()));
2894 thisProxy[treeParent()].DeleteChild(CkMyNode());
2897 thisProxy[treeParent()].DeleteChild(CkMyNode());
2899 thisProxy[newParent].DeleteNewChild(CkMyNode());
2904 void CkNodeReductionMgr::DeleteChild(int deletedChild){
2905 DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
2906 for(int i=0;i<numKids;i++){
2907 if(kids[i] == deletedChild){
2912 numKids = kids.length();
2916 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
2917 for(int i=0;i<(int)(newKids.length());i++){
2918 if(newKids[i] == deletedChild){
2923 DEBREVAC(("[%d]%d> Deleting new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
2924 for(int i=0;i<(int)(newKids.size());i++){
2925 DEBREVAC(("%d ",newKids[i]));
2931 int CkNodeReductionMgr::findMaxRedNo(){
2933 for(int i=0;i<futureRemoteMsgs.length();i++){
2934 if(futureRemoteMsgs[i]->redNo > max){
2935 max = futureRemoteMsgs[i]->redNo;
2939 if redNo is max (that is no future message) and the current reduction has not started
2940 then tree can be changed before the reduction redNo can be started
2942 if(redNo == max && msgs.length() == 0){
2943 DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
2949 #include "CkReduction.def.h"