3 // #include "NodeMulticast.h"
4 // #include "converse.h"
6 // #define MAX_BUF_SIZE 165000
7 // #define MAX_SENDS_PER_BATCH 16
8 // #define MULTICAST_DELAY 5
10 // static NodeMulticast *nm_mgr;
12 // static void call_doneInserting(void *ptr,double curWallTime){
13 // NodeMulticast *mgr = (NodeMulticast *)ptr;
14 // mgr->doneInserting();
17 // static void* NodeMulticastHandler(void *msg){
18 // ComlibPrintf("In Node MulticastHandler\n");
19 // nm_mgr->recvHandler(msg);
23 // static void* NodeMulticastCallbackHandler(void *msg){
24 // ComlibPrintf("[%d]:In Node MulticastCallbackHandler\n", CkMyPe());
25 // register envelope *env = (envelope *)msg;
26 // CkUnpackMessage(&env);
27 // //nm_mgr->getCallback().send(EnvToUsr(env));
29 // //nm_mgr->getHandler()(env);
33 // //Handles multicast by sending only one message to a nodes and making
34 // //them multicast locally
35 // void NodeMulticast::setDestinationArray(CkArrayID a, int nelem,
36 // CkArrayIndex **idx, int ep){
41 // if(getenv("RMS_NODES") != NULL)
42 // pes_per_node = CkNumPes()/atoi(getenv("RMS_NODES"));
48 // numNodes = CkNumPes()/pes_per_node;
49 // numCurDestPes = CkNumPes();
51 // nodeMap = new int[numNodes];
53 // ComlibPrintf("In SetDestinationArray %d, %d, %d, %d\n", numNodes,
54 // pes_per_node, nelements, ep);
56 // indexVec = new CkVec<CkArrayIndex> [CkNumPes()];
58 // for(int count = 0; count < nelements; count++) {
59 // ComlibPrintf("Before lastKnown %d\n", count);
60 // int dest_proc = CkArrayID::CkLocalBranch(a)->lastKnown(*idx[count]);
61 // ComlibPrintf("After lastKnown %d\n", dest_proc);
62 // nodeMap[dest_proc/pes_per_node] = 1;
64 // indexVec[dest_proc].insertAtEnd(*idx[count]);
67 // ComlibPrintf("After SetDestinationArray\n");
70 // void NodeMulticast::setPeList(int npes, int *pelist, ComlibMulticastHandler handler){
71 // mode = PROCESSOR_MODE;
74 // //if(getenv("RMS_NODES") != NULL)
75 // //pes_per_node = CkNumPes()/atoi(getenv("RMS_NODES"));
78 // this->handler = (long)handler;
80 // numNodes = CkNumPes()/pes_per_node;
81 // numCurDestPes = npes;
84 // nodeMap = new int[numNodes];
87 // this->pelist = new int[npes];
88 // memcpy(this->pelist, pelist, npes * sizeof(int));
90 // ComlibPrintf("In setPeList %d, %d, %d\n", numNodes,
91 // pes_per_node, npes);
93 // for(int count = 0; count < npes; count++)
94 // nodeMap[pelist[count]/pes_per_node] = 1;
96 // ComlibPrintf("After setPeList\n");
100 // void NodeMulticast::recvHandler(void *msg) {
101 // register envelope* env = (envelope *)msg;
102 // void *charm_msg = (void *)EnvToUsr(env);
105 // ComlibPrintf("In receive Handler\n");
106 // if(mode == ARRAY_MODE) {
107 // env->setArrayMgr(mAid);
108 // env->getsetArrayEp()=entryPoint;
109 // env->getsetArrayHops()=0;
110 // CkUnpackMessage(&env);
112 // for(int count = 0; count < pes_per_node; count ++){
113 // int dest_pe = (CkMyPe()/pes_per_node) * pes_per_node + count;
114 // int size = indexVec[dest_pe].size();
116 // ComlibPrintf("[%d], %d elements to send to %d of size %d\n", CkMyPe(), size, dest_pe, env->getTotalsize());
118 // CkArrayIndex * idx_arr = indexVec[dest_pe].getVec();
119 // for(int itr = 0; itr < size; itr ++) {
120 // void *newcharmmsg = CkCopyMsg(&charm_msg);
121 // envelope* newenv = UsrToEnv(newcharmmsg);
122 // CProxyElement_ArrayBase ap(mAid, idx_arr[itr]);
123 // newenv->getsetArrayIndex()=idx_arr[itr];
124 // ap.ckSend((CkArrayMessage *)newcharmmsg, entryPoint);
129 // CkUnpackMessage(&env);
130 // for(int count = 0; count < pes_per_node; count++)
131 // if(validRank[count]){
132 // void *newcharmmsg;
135 // if(count < pes_per_node - 1) {
136 // newcharmmsg = CkCopyMsg(&charm_msg);
137 // newenv = UsrToEnv(newcharmmsg);
140 // newcharmmsg = charm_msg;
141 // newenv = UsrToEnv(newcharmmsg);
144 // CmiSetHandler(newenv, NodeMulticastCallbackHandlerId);
145 // ComlibPrintf("[%d] In receive Handler (proc mode), sending message to %d at handler %d\n",
146 // CkMyPe(), (CkMyPe()/pes_per_node) * pes_per_node
147 // + count, NodeMulticastCallbackHandlerId);
149 // CkPackMessage(&newenv);
150 // CmiSyncSendAndFree((CkMyPe()/pes_per_node) *pes_per_node + count,
151 // newenv->getTotalsize(), (char *)newenv);
154 // ComlibPrintf("[%d] CmiFree (Code) (%x)\n", CkMyPe(),
155 // (long) msg - 2*sizeof(int));
159 // void NodeMulticast::insertMessage(CharmMessageHolder *cmsg){
161 // ComlibPrintf("In insertMessage \n");
162 // envelope *env = UsrToEnv(cmsg->getCharmMessage());
164 // CmiSetHandler(env, NodeMulticastHandlerId);
165 // messageBuf->enq(cmsg);
168 // void NodeMulticast::doneInserting(){
169 // CharmMessageHolder *cmsg;
171 // register envelope *env;
173 // ComlibPrintf("NodeMulticast :: doneInserting\n");
175 // if(messageBuf->length() > 1) {
176 // //CkPrintf("NodeMulticast :: doneInserting length > 1\n");
179 // int *sizes, msg_count;
181 // msgComps = new char*[messageBuf->length()];
182 // sizes = new int[messageBuf->length()];
184 // while (!messageBuf->isEmpty()) {
185 // cmsg = messageBuf->deq();
186 // msg = cmsg->getCharmMessage();
187 // env = UsrToEnv(msg);
188 // sizes[msg_count] = env->getTotalsize();
189 // msgComps[msg_count] = (char *)env;
195 // for(int count = 0; count < numNodes; count++)
196 // if(nodeMap[count])
197 // CmiMultipleSend(count * pes_per_node + myRank, msg_count,
200 // delete [] msgComps;
204 // else if (messageBuf->length() == 1){
205 // static int prevCount = 0;
207 // ComlibPrintf("Sending Node Multicast\n");
208 // cmsg = messageBuf->deq();
209 // msg = cmsg->getCharmMessage();
210 // env = UsrToEnv(msg);
212 // if(mode == ARRAY_MODE)
213 // env->getsetArraySrcPe()=CkMyPe();
214 // CkPackMessage(&env);
216 // CmiSetHandler(env, NodeMulticastHandlerId);
217 // ComlibPrintf("After set handler\n");
219 // //CmiPrintf("cursedtpes = %d, %d\n", cmsg->npes, numCurDestPes);
221 // if((mode != ARRAY_MODE) && cmsg->npes < numCurDestPes) {
222 // numCurDestPes = cmsg->npes;
223 // for(count = 0; count < numNodes; count++)
224 // nodeMap[count] = 0;
226 // for(count = 0; count < cmsg->npes; count++)
227 // nodeMap[(cmsg->pelist[count])/pes_per_node] = 1;
230 // for(count = prevCount; count < numNodes; count++) {
231 // //int dest_node = count;
232 // int dest_node = (count + (CkMyPe()/pes_per_node))%numNodes;
233 // if(nodeMap[dest_node]) {
234 // void *newcharmmsg;
237 // if(count < numNodes - 1) {
238 // newcharmmsg = CkCopyMsg((void **)&msg);
239 // newenv = UsrToEnv(newcharmmsg);
242 // newcharmmsg = msg;
243 // newenv = UsrToEnv(newcharmmsg);
246 // ComlibPrintf("[%d]In cmisyncsend to %d\n", CkMyPe(),
247 // dest_node * pes_per_node + myRank);
248 // #if CMK_PERSISTENT_COMM
249 // if(env->getTotalsize() < MAX_BUF_SIZE)
250 // CmiUsePersistentHandle(&persistentHandlerArray[dest_node],1);
252 // CkPackMessage(&newenv);
253 // CmiSyncSendAndFree(dest_node * pes_per_node + myRank,
254 // newenv->getTotalsize(), (char *)newenv);
255 // #if CMK_PERSISTENT_COMM
256 // if(env->getTotalsize() < MAX_BUF_SIZE)
257 // CmiUsePersistentHandle(NULL, 0);
261 // if((prevCount % MAX_SENDS_PER_BATCH == 0) &&
262 // (prevCount != numNodes)) {
263 // CcdCallFnAfterOnPE((CcdVoidFn)call_doneInserting, (void *)this,
264 // MULTICAST_DELAY, CkMyPe());
270 // ComlibPrintf("[%d] CmiFree (Code) (%x)\n", CkMyPe(), (char *)env - 2*sizeof(int));
276 // void NodeMulticast::pup(PUP::er &p){
278 // CharmStrategy::pup(p);
286 // p | numCurDestPes;
289 // if(p.isUnpacking()) {
290 // nodeMap = new int[numNodes];
292 // if(mode == ARRAY_MODE) {
293 // typedef CkVec<CkArrayIndex> CkVecArrayIndex;
294 // CkVecArrayIndex *vec = new CkVecArrayIndex[CkNumPes()];
298 // if(mode == PROCESSOR_MODE)
299 // pelist = new int[npes];
304 // p(nodeMap, numNodes);
306 // if(mode == PROCESSOR_MODE)
309 // if(mode == ARRAY_MODE)
310 // for(int count = 0; count < CkNumPes(); count++)
311 // p | indexVec[count];
313 // if(p.isUnpacking()) {
314 // messageBuf = new CkQ <CharmMessageHolder *>;
315 // myRank = CkMyPe() % pes_per_node;
317 // NodeMulticastHandlerId = CkRegisterHandler((CmiHandler)NodeMulticastHandler);
318 // NodeMulticastCallbackHandlerId = CkRegisterHandler
319 // ((CmiHandler)NodeMulticastCallbackHandler);
323 // //validRank[0] = validRank[1] = validRank[2] = validRank[3] = 0;
324 // memset(validRank, 0, MAX_PES_PER_NODE * sizeof(int));
325 // for(int count = 0; count < npes; count ++){
326 // if(CkMyPe()/pes_per_node == pelist[count] / pes_per_node)
327 // validRank[pelist[count] % pes_per_node] = 1;
330 // #if CMK_PERSISTENT_COMM
331 // persistentHandlerArray = new PersistentHandle[numNodes];
332 // for(int count = 0; count < numNodes; count ++)
333 // //if(nodeMap[count])
334 // persistentHandlerArray[count] = CmiCreatePersistent
335 // (count * pes_per_node + myRank, MAX_BUF_SIZE);
340 // //PUPable_def(NodeMulticast);