3 #include "CkArrayReductionMgr.decl.h"
6 void noopitar(const char*, ...)
10 #define ARPRINT CkPrintf
12 #define ARPRINT noopitar
15 void CkArrayReductionMgr::init()
17 //ARPRINT("Array ReductionMgr Constructor called %d\n",thisgroup);
19 size = CkMyNodeSize();
21 lockCount = CmiCreateLock();
26 CkArrayReductionMgr::CkArrayReductionMgr(){
28 attachedGroup.setZero();
31 CkArrayReductionMgr::CkArrayReductionMgr(int dummy, CkGroupID gid){
36 void CkArrayReductionMgr::flushStates(){
38 // CmiPrintf("[%d] CkArrayReductionMgr::flushState\n", CkMyPe());
41 while (!my_msgs.isEmpty()) delete my_msgs.deq();
42 while (!my_futureMsgs.isEmpty()) delete my_futureMsgs.deq();
43 reductionInfo.redNo = 0;
44 CkNodeReductionMgr::flushStates();
48 void CkArrayReductionMgr::collectAllMessages(){
50 ARPRINT("[%d] CollectAll messages for %d with %d on %p\n",CkMyNode(),redNo,count,this);
51 CkReductionMsg *result = reduceMessages();
52 result->redNo = redNo;
53 /**keep a count of elements that contributed to the original reduction***/
54 contributeWithCounter(result,result->gcount);
57 int n=my_futureMsgs.length();
59 CkReductionMsg *elementMesg = my_futureMsgs.deq();
60 if(elementMesg->getRedNo() == redNo){
61 my_msgs.enq(elementMesg);
65 my_futureMsgs.enq(elementMesg);
71 void CkArrayReductionMgr::contributeArrayReduction(CkReductionMsg *m){
72 ARPRINT("[%d]Contribute Array Reduction called for RedNo %d group %d \n",CkMyNode(),m->getRedNo(),thisgroup.idx);
73 /** store the contribution untill all procs have contributed. At that point reduce and
74 carry out a reduction among nodegroups*/
76 _TRACE_BG_TLINE_END(&(m->log));
79 if(m->getRedNo() == redNo){
84 //ARPRINT("[%d][%d]Out of sequence messages for %d Present redNo %d \n",CkMyNode(),CkMyPe(),m->getRedNo(),redNo);
90 CkReductionMsg *CkArrayReductionMgr::reduceMessages(void){
92 _TRACE_BG_END_EXECUTE(1);
93 void* _bgParentLog = NULL;
94 _TRACE_BG_BEGIN_EXECUTE_NOMSG("ArrayReduce", &_bgParentLog, 0);
96 CkReductionMsg *ret=NULL;
98 //Look through the vector for a valid reducer, swapping out placeholder messages
99 CkReduction::reducerType r=CkReduction::invalid;
100 int msgs_gcount=0;//Reduced gcount
101 int msgs_nSources=0;//Reduced nSources
102 int msgs_userFlag=-1;
103 CkCallback msgs_callback;
104 CkCallback msgs_secondaryCallback;
108 CkReductionMsg **msgArr=new CkReductionMsg*[my_msgs.length()];
109 bool isMigratableContributor;
111 while(NULL!=(m=my_msgs.deq()))
114 msgs_gcount+=m->gcount;
115 if (m->sourceFlag!=0)
116 { //This is a real message from an element, not just a placeholder
118 msgs_nSources+=m->nSources();
120 if (!m->callback.isInvalid())
121 msgs_callback=m->callback;
122 if(!m->secondaryCallback.isInvalid())
123 msgs_secondaryCallback = m->secondaryCallback;
125 msgs_userFlag=m->userFlag;
127 isMigratableContributor=m->isMigratableContributor();
129 _TRACE_BG_ADD_BACKWARD_DEP(m->log);
134 { //This is just a placeholder message-- replace it
139 if (nMsgs==0||r==CkReduction::invalid)
140 //No valid reducer in the whole vector
141 ret=CkReductionMsg::buildNew(0,NULL);
143 {//Use the reducer to reduce the messages
147 CkReduction::reducerFn f=CkReduction::reducerTable[r];
148 ret=(*f)(nMsgs,msgArr);
153 //Go back through the vector, deleting old messages
154 for (i=0;i<nMsgs;i++) {
155 if (msgArr[i] != ret) delete msgArr[i];
159 //Set the message counts
161 ret->gcount=msgs_gcount;
162 ret->userFlag=msgs_userFlag;
163 ret->callback=msgs_callback;
164 ret->secondaryCallback = msgs_secondaryCallback;
165 ret->sourceFlag=msgs_nSources;
166 ret->setMigratableContributor(isMigratableContributor);
170 void CkArrayReductionMgr::pup(PUP::er &p){
176 if(p.isUnpacking()) {
177 size = CkMyNodeSize();
178 lockCount = CmiCreateLock();
182 void CkArrayReductionMgr::setAttachedGroup(CkGroupID groupID){
183 attachedGroup = groupID;
184 ARPRINT("[%d] setAttachedGroup called with attachedGroup %d \n",CkMyNode(),attachedGroup);
185 if (alreadyStarted != -1) {
186 ((CkNodeReductionMgr *)this)->restartLocalGroupReductions(alreadyStarted);
192 void CkArrayReductionMgr::startNodeGroupReduction(int number,CkGroupID groupID){
193 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
194 Chare *oldObj =CpvAccess(_currentObj);
195 CpvAccess(_currentObj) = this;
197 ARPRINT("[%d] startNodeGroupReductions for red No %d my group %d attached group %d on %p \n",CkMyNode(),number,thisgroup.idx, attachedGroup.idx,this);
198 if(attachedGroup.isZero()){
199 setAttachedGroup(groupID);
201 startLocalGroupReductions(number);
202 CkReductionNumberMsg *msg = new CkReductionNumberMsg(number);
203 envelope::setSrcPe((char *)UsrToEnv(msg),CkMyNode());
204 ((CkNodeReductionMgr *)this)->ReductionStarting(msg);
205 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
206 CpvAccess(_currentObj) = oldObj;
210 int CkArrayReductionMgr::startLocalGroupReductions(int number){
211 ARPRINT("[%d] startLocalGroupReductions for red No %d my group %d attached group %d number of rednMgrs %d on %p \n",CkMyNode(),number,thisgroup.idx, attachedGroup.idx,size,this);
212 if(attachedGroup.isZero()){
213 alreadyStarted = number;
216 int firstPE = CkNodeFirst(CkMyNode());
217 for(int i=0;i<size;i++){
218 CProxy_CkReductionMgr reductionMgrProxy(attachedGroup);
219 reductionMgrProxy[firstPE+i].ReductionStarting(new CkReductionNumberMsg(number));
224 int CkArrayReductionMgr::getTotalGCount(){
225 int firstPE = CkNodeFirst(CkMyNode());
227 for(int i=0;i<size;i++){
228 CProxy_CkReductionMgr reductionMgrProxy(attachedGroup);
229 CkReductionMgr *mgrPtr = reductionMgrProxy[firstPE+i].ckLocalBranch();
230 CkAssert(mgrPtr != NULL);
231 totalGCount += mgrPtr->getGCount();
236 CkArrayReductionMgr::~CkArrayReductionMgr() {
237 CmiDestroyLock(lockCount);
240 #include "CkArrayReductionMgr.def.h"