LRTS Comm Thread Tracing in message recieve
[charm.git] / src / ck-core / ckarrayreductionmgr.C
blobb0cf26f815a812cb5917da74e095c22b90e6c9c1
1 #include "charm++.h"
2 #include "ck.h"
3 #include "CkArrayReductionMgr.decl.h"
4 #define ARRREDDEBUG 0
6 void noopitar(const char*, ...)
7 {}
9 #if ARRREDDEBUG
10 #define ARPRINT CkPrintf
11 #else
12 #define ARPRINT noopitar
13 #endif
15 void CkArrayReductionMgr::init()
17         //ARPRINT("Array ReductionMgr Constructor called %d\n",thisgroup);
18         redNo=0;
19         size = CkMyNodeSize();
20         count = 0;
21         lockCount = CmiCreateLock();
22         ctorDoneFlag = 1;
23         alreadyStarted = -1;
26 CkArrayReductionMgr::CkArrayReductionMgr(){
27         init();
28         attachedGroup.setZero();
31 CkArrayReductionMgr::CkArrayReductionMgr(int dummy, CkGroupID gid){
32         init();
33         attachedGroup = gid;
36 void CkArrayReductionMgr::flushStates(){
37   if(CkMyRank()== 0){
38     // CmiPrintf("[%d] CkArrayReductionMgr::flushState\n", CkMyPe());
39     redNo=0;
40     count = 0;
41     while (!my_msgs.isEmpty())  delete my_msgs.deq();
42     while (!my_futureMsgs.isEmpty()) delete my_futureMsgs.deq();
43     reductionInfo.redNo = 0;
44     CkNodeReductionMgr::flushStates();
45   }
48 void CkArrayReductionMgr::collectAllMessages(){
49         if(count == size){
50                 ARPRINT("[%d] CollectAll messages  for %d with %d on %p\n",CkMyNode(),redNo,count,this);
51                 CkReductionMsg *result = reduceMessages();
52                 result->redNo = redNo;
53                 /**keep a count of elements that contributed to the original reduction***/
54                 contributeWithCounter(result,result->gcount);
55                 count=0;
56                 redNo++;
57                 int n=my_futureMsgs.length();
58                 for(int i=0;i<n;i++){
59                         CkReductionMsg *elementMesg = my_futureMsgs.deq();
60                         if(elementMesg->getRedNo() == redNo){
61                                 my_msgs.enq(elementMesg);
62                                 count++;
63                                 collectAllMessages();
64                         }else{
65                                 my_futureMsgs.enq(elementMesg);
66                         }
67                 }
68         }
71 void CkArrayReductionMgr::contributeArrayReduction(CkReductionMsg *m){
72         ARPRINT("[%d]Contribute Array Reduction called for RedNo %d group %d \n",CkMyNode(),m->getRedNo(),thisgroup.idx);
73         /** store the contribution untill all procs have contributed. At that point reduce and
74         carry out a reduction among nodegroups*/
75 #if CMK_BIGSIM_CHARM
76          _TRACE_BG_TLINE_END(&(m->log));
77 #endif
78         CmiLock(lockCount);
79         if(m->getRedNo() == redNo){
80                 my_msgs.enq(m);
81                 count++;
82                 collectAllMessages();
83         }else{
84                 //ARPRINT("[%d][%d]Out of sequence messages for %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->getRedNo(),redNo);
85                 my_futureMsgs.enq(m);
86         }
87         CmiUnlock(lockCount);
90 CkReductionMsg *CkArrayReductionMgr::reduceMessages(void){
91 #if CMK_BIGSIM_CHARM
92         _TRACE_BG_END_EXECUTE(1);
93         void* _bgParentLog = NULL;
94         _TRACE_BG_BEGIN_EXECUTE_NOMSG("ArrayReduce", &_bgParentLog, 0);
95 #endif
96         CkReductionMsg *ret=NULL;
98         //Look through the vector for a valid reducer, swapping out placeholder messages
99         CkReduction::reducerType r=CkReduction::invalid;
100         int msgs_gcount=0;//Reduced gcount
101         int msgs_nSources=0;//Reduced nSources
102         int msgs_userFlag=-1;
103         CkCallback msgs_callback;
104         CkCallback msgs_secondaryCallback;
105         int i;
106         int nMsgs=0;
107         CkReductionMsg *m;
108         CkReductionMsg **msgArr=new CkReductionMsg*[my_msgs.length()];
109         bool isMigratableContributor;
111         while(NULL!=(m=my_msgs.deq()))
112         {
114                 msgs_gcount+=m->gcount;
115                 if (m->sourceFlag!=0)
116                 { //This is a real message from an element, not just a placeholder
117                         msgArr[nMsgs++]=m;
118                         msgs_nSources+=m->nSources();
119                         r=m->reducer;
120                         if (!m->callback.isInvalid())
121                                 msgs_callback=m->callback;
122                         if(!m->secondaryCallback.isInvalid())
123                                 msgs_secondaryCallback = m->secondaryCallback;
124                         if (m->userFlag!=-1)
125                                 msgs_userFlag=m->userFlag;
127                         isMigratableContributor=m->isMigratableContributor();
128 #if CMK_BIGSIM_CHARM
129                         _TRACE_BG_ADD_BACKWARD_DEP(m->log);
130 #endif
131                                 
132                 }
133                 else
134                 { //This is just a placeholder message-- replace it
135                         delete m;
136                 }
137         }
139         if (nMsgs==0||r==CkReduction::invalid)
140                 //No valid reducer in the whole vector
141                 ret=CkReductionMsg::buildNew(0,NULL);
142         else
143         {//Use the reducer to reduce the messages
144                 if(nMsgs == 1){
145                         ret = msgArr[0];
146                 }else{
147                         CkReduction::reducerFn f=CkReduction::reducerTable[r];
148         ret=(*f)(nMsgs,msgArr);
149                 }
150                 ret->reducer=r;
151         }
153         //Go back through the vector, deleting old messages
154         for (i=0;i<nMsgs;i++) {
155           if (msgArr[i] != ret) delete msgArr[i];
156     }
157         delete [] msgArr;
159         //Set the message counts
160         ret->redNo=redNo;
161         ret->gcount=msgs_gcount;
162         ret->userFlag=msgs_userFlag;
163         ret->callback=msgs_callback;
164                 ret->secondaryCallback = msgs_secondaryCallback;
165         ret->sourceFlag=msgs_nSources;
166                 ret->setMigratableContributor(isMigratableContributor); 
167         return ret;
170 void CkArrayReductionMgr::pup(PUP::er &p){
171         NodeGroup::pup(p);
172         p(redNo);p(count);
173         p|my_msgs;
174         p|my_futureMsgs;
175         p|attachedGroup;
176         if(p.isUnpacking()) {
177           size = CkMyNodeSize();
178           lockCount = CmiCreateLock();
179         }
182 void CkArrayReductionMgr::setAttachedGroup(CkGroupID groupID){
183         attachedGroup = groupID;
184         ARPRINT("[%d] setAttachedGroup called with attachedGroup %d \n",CkMyNode(),attachedGroup);
185         if (alreadyStarted != -1) {
186                 ((CkNodeReductionMgr *)this)->restartLocalGroupReductions(alreadyStarted);
187                 alreadyStarted = -1;
188         }
192 void CkArrayReductionMgr::startNodeGroupReduction(int number,CkGroupID groupID){
193 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
194     Chare *oldObj =CpvAccess(_currentObj);
195     CpvAccess(_currentObj) = this;
196 #endif
197         ARPRINT("[%d] startNodeGroupReductions for red No %d my group %d attached group %d on %p \n",CkMyNode(),number,thisgroup.idx, attachedGroup.idx,this);
198         if(attachedGroup.isZero()){
199                 setAttachedGroup(groupID);
200         }
201         startLocalGroupReductions(number);
202         CkReductionNumberMsg *msg = new CkReductionNumberMsg(number);
203         envelope::setSrcPe((char *)UsrToEnv(msg),CkMyNode());
204         ((CkNodeReductionMgr *)this)->ReductionStarting(msg);
205 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
206     CpvAccess(_currentObj) = oldObj;
207 #endif
210 int CkArrayReductionMgr::startLocalGroupReductions(int number){ 
211         ARPRINT("[%d] startLocalGroupReductions for red No %d my group %d attached group %d number of rednMgrs %d on %p \n",CkMyNode(),number,thisgroup.idx, attachedGroup.idx,size,this);
212         if(attachedGroup.isZero()){
213                 alreadyStarted = number;
214                 return 0;
215         }
216         int firstPE = CkNodeFirst(CkMyNode());
217         for(int i=0;i<size;i++){
218                 CProxy_CkReductionMgr reductionMgrProxy(attachedGroup);
219                 reductionMgrProxy[firstPE+i].ReductionStarting(new CkReductionNumberMsg(number));
220         }
221         return 1;
224 int CkArrayReductionMgr::getTotalGCount(){
225         int firstPE = CkNodeFirst(CkMyNode());
226         int totalGCount=0;
227         for(int i=0;i<size;i++){
228                 CProxy_CkReductionMgr reductionMgrProxy(attachedGroup);
229                 CkReductionMgr *mgrPtr = reductionMgrProxy[firstPE+i].ckLocalBranch();
230                 CkAssert(mgrPtr != NULL);
231                 totalGCount += mgrPtr->getGCount();
232         }
233         return totalGCount;
236 CkArrayReductionMgr::~CkArrayReductionMgr() {
237   CmiDestroyLock(lockCount);
240 #include "CkArrayReductionMgr.def.h"