From 7472e69b220f3bc07c7069599788d5bc736649dd Mon Sep 17 00:00:00 2001 From: raghavendrak Date: Fri, 26 Apr 2019 13:31:54 -0500 Subject: [PATCH] Feature #1987: Take advantage of streamable reductions inside CkMulticast Change-Id: Ie60ea5ebd28d67177071c1cda222ad62701d6d69 --- src/ck-core/XArraySectionReducer.h | 30 ++++++++++++++++++++++-------- src/ck-core/ckmulticast.C | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 8 deletions(-) diff --git a/src/ck-core/XArraySectionReducer.h b/src/ck-core/XArraySectionReducer.h index 9961e09f1b..2e358ec94d 100644 --- a/src/ck-core/XArraySectionReducer.h +++ b/src/ck-core/XArraySectionReducer.h @@ -34,10 +34,22 @@ class XArraySectionReducer /// Each subsection reduction message needs to be passed in here void acceptSectionContribution(CkReductionMsg *msg) { - msgMap[msg->redNo].push_back(msg); - numReceivedMap[msg->redNo]++; - if (numReceivedMap[msg->redNo] >= numSubSections) - finalReducer(msg->redNo); + int redNo = msg->redNo; + msgMap[redNo].push_back(msg); + // Check if partialReduction is possible (streamable) across subsections + if (msgMap[redNo].size() > 1 && CkReduction::reducerTable()[msg->reducer].streamable) { + CkReduction::reducerFn f = CkReduction::reducerTable()[msg->reducer].fn; + CkReductionMsg *intermediateReducedMsg = (*f)(msgMap[redNo].size(), msgMap[redNo].data()); + msgMap[redNo].pop_back(); + // Only the partially reduced message should be remaining in the msgs vector after partialReduction + CkAssert(intermediateReducedMsg == msgMap[redNo][0]); + // Copy the reducer in the newly created message which will be used in the finalReducer() + intermediateReducedMsg->reducer = msg->reducer; + delete msg; + } + numReceivedMap[redNo]++; + if (numReceivedMap[redNo] >= numSubSections) + finalReducer(redNo); } private: @@ -47,17 +59,19 @@ class XArraySectionReducer // Get a handle on the reduction function for this message CkReduction::reducerFn f = CkReduction::reducerTable()[ msgMap[redNo][0]->reducer ].fn; // Perform an extra reduction step on all the subsection reduction msgs - CkReductionMsg *finalMsg = (*f)(numSubSections, msgMap[redNo].data()); + CkReductionMsg *finalMsg = (*f)(msgMap[redNo].size(), msgMap[redNo].data()); // Send the final reduced msg to the client if (finalCB == nullptr) msgMap[redNo][0]->callback.send(finalMsg); else finalCB->send(finalMsg); // Delete the subsection redn msgs, accounting for any msg reuse - for (int i=0; i < numSubSections; i++) - if (msgMap[redNo][i] != finalMsg) delete msgMap[redNo][i]; + auto& msgs = msgMap[redNo]; + for (auto *msg:msgs) { + if (msg != finalMsg) delete msg; + } // Reset the msg list and counters for the corresponding redNo - memset( msgMap[redNo].data(), 0, numSubSections*sizeof(CkReductionMsg*) ); + msgMap.erase(redNo); numReceivedMap[redNo] = 0; } diff --git a/src/ck-core/ckmulticast.C b/src/ck-core/ckmulticast.C index 4a54f37424..ba54175688 100644 --- a/src/ck-core/ckmulticast.C +++ b/src/ck-core/ckmulticast.C @@ -1628,6 +1628,44 @@ void CkMulticastMgr::recvRedMsg(CkReductionMsg *msg) mixTreeUp = 0; } + // If reduceFragment is not being called now, check if partialReduction is possible (streamable) + if (!currentTreeUp && !mixTreeUp && redInfo.msgs[index].size() > 1 && CkReduction::reducerTable()[msg->reducer].streamable) { + reductionMsgs& rmsgs = redInfo.msgs[index]; + CkReduction::reducerType reducer = rmsgs[0]->reducer; + CkReduction::reducerFn f= CkReduction::reducerTable()[msg->reducer].fn; + CkAssert(f != NULL); + + int oldRedNo = redInfo.redNo; + int nFrags = rmsgs[0]->nFrags; + int fragNo = rmsgs[0]->fragNo; + int userFlag = rmsgs[0]->userFlag; + CkSectionInfo oldId = rmsgs[0]->sid; + CkCallback msg_cb; + int8_t rebuilt = 0; + if (msg->rebuilt) rebuilt = 1; + if (!msg->callback.isInvalid()) msg_cb = msg->callback; + // Perform the actual reduction (streaming) + CkReductionMsg *newmsg = (*f)(rmsgs.size(), rmsgs.data()); +#if CMK_MESSAGE_LOGGING + envelope *env = UsrToEnv(newmsg); + env->flags = env->flags | CK_REDUCTION_MSG_MLOG; +#endif + newmsg->redNo = oldRedNo; + newmsg->nFrags = nFrags; + newmsg->fragNo = fragNo; + newmsg->userFlag = userFlag; + newmsg->reducer = reducer; + if (rebuilt) newmsg->rebuilt = 1; + if (!msg_cb.isInvalid()) newmsg->callback = msg_cb; + newmsg->gcount = redInfo.gcount[index]; + newmsg->sid = oldId; + // Remove the current message that was pushed + rmsgs.pop_back(); + delete msg; + // Only the partially reduced message should be remaining in the msgs vector after partialReduction + CkAssert(rmsgs.size() == 1); + } + //------------------------------------------------------------------------- /// If this fragment can be reduced, or if I am the root and have received all fragments from all elements if (currentTreeUp || mixTreeUp) -- 2.11.4.GIT