2 @addtogroup ComlibCharmStrategy
6 MulticastStrategy and its
7 derivatives, multicast messages to a section of array elements
8 created on the fly. The section is invoked by calling a
9 section proxy. These strategies can also multicast to a subset
10 of processors for groups.
12 These strategies are non-bracketed. When the first request is
13 made a route is dynamically built on the section. The route
14 information is stored in
17 - Heavily revised by Filippo Gioachin 2/2006
22 #include "MulticastStrategy.h"
24 CkpvExtern(CkGroupID, cmgrID);
27 MulticastStrategy::MulticastStrategy()
28 : Strategy(), CharmStrategy() {
30 ComlibPrintf("MulticastStrategy constructor\n");
31 //ainfo.setDestinationArray(aid);
32 setType(ARRAY_STRATEGY);
35 //Destroy all old built routes
36 MulticastStrategy::~MulticastStrategy() {
38 ComlibPrintf("MulticastStrategy destructor\n");
40 if(getLearner() != NULL)
43 CkHashtableIterator *ht_iterator = sec_ht.iterator();
44 ht_iterator->seekStart();
45 while(ht_iterator->hasNext()){
47 data = (void **)ht_iterator->next();
48 ComlibSectionHashObject *obj = (ComlibSectionHashObject *) (* data);
55 void rewritePEs(CharmMessageHolder *cmsg){
56 ComlibPrintf("[%d] rewritePEs insertMessage \n",CkMyPe());
58 CkAssert(cmsg->dest_proc == IS_SECTION_MULTICAST);
60 void *m = cmsg->getCharmMessage();
61 envelope *env = UsrToEnv(m);
63 ComlibMulticastMsg *msg = (ComlibMulticastMsg *)m;
68 void MulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
70 ComlibPrintf("[%d] Comlib Section Multicast: insertMessage \n", CkMyPe());
71 // ComlibPrintf("[%d] Comlib Section Multicast: insertMessage \n", CkMyPe());
74 ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
77 if(cmsg->dest_proc == IS_SECTION_MULTICAST && cmsg->sec_id != NULL) {
78 ComlibPrintf("[%d] Comlib Section Multicast: looking up cur_sec_id\n",CkMyPe());
80 CkSectionID *sid = cmsg->sec_id;
82 // This is a sanity check if we only use a tiny chare array
83 // if(sid->_nElems > 4 || sid->_nElems<0){
84 // CkPrintf("[%d] Warning!!!!!!!!!!! Section ID in message seems to be screwed up. cmg=%p sid=%p sid->_nElems=%d\n", CkMyPe(), cmsg, sid, (int)sid->_nElems);
87 int cur_sec_id = sid->getSectionID();
90 sinfo.processOldSectionMessage(cmsg);
91 ComlibPrintf("Array section id was %d, but now is %d\n", cur_sec_id, sid->getSectionID());
92 CkAssert(cur_sec_id == sid->getSectionID());
94 ComlibPrintf("[%d] Comlib Section Multicast: insertMessage: cookiePE=%d\n",CkMyPe(),sid->_cookie.get_pe());
95 ComlibSectionHashKey key(CkMyPe(), cur_sec_id);
96 ComlibSectionHashObject *obj = sec_ht.get(key);
99 //CkAbort("Cannot Find Section\n");
100 /* The object can be NULL for various reasons:
101 * 1) the user reassociated the section proxy with a different
102 * multicast strategy, in which case the new one has no idea about
103 * the previous usage of the proxy, but the proxy has the cur_sec_id
104 * set by the previous strategy
105 * 2) the proxy migrated to another processor, in which case the
106 * cur_sec_id is non null, but the CkMyPe changed, so the hashed
107 * object could not be found (Filippo: I'm not sure if the id will
108 * be reset upon migration, so if this case if possible)
112 /* In the following if, the check (CkMyPe == sid->_cookie.pe) helps identifying situations
113 * where the proxy has migrated from one processor to another. In this situation, the
114 * destination processor might find an "obj", created by somebody else. This "obj"
115 * is accepted only if the current processor is equal to the processor in which the
116 * cookie ID was defined. */
117 if (obj != NULL && CkMyPe() == sid->_cookie.get_pe() && !obj->isOld) {
118 envelope *env = UsrToEnv(cmsg->getCharmMessage());
119 localMulticast(env, obj, (CkMcastBaseMsg*)cmsg->getCharmMessage());
120 remoteMulticast(env, obj);
128 // reaching here means the message was not sent as old, either because
129 // it is the first for this section or the existing section is old.
130 ComlibPrintf("[%d] MulticastStrategy, creating a new multicast path\n", CkMyPe());
132 //New sec id, so send it along with the message
133 ComlibMulticastMsg *newmsg = sinfo.getNewMulticastMessage(cmsg, needSorting(), getInstance());
136 ComlibSectionHashObject *obj = NULL;
138 // CkAssert(newmsg!=NULL); // Previously the following code was just not called in this case
141 // Add the section to the hashtable, so we can use it in the future
142 ComlibPrintf("[%d] calling insertSectionID\n", CkMyPe());
143 ComlibSectionHashObject *obj_inserted = insertSectionID(sid, newmsg->nPes, newmsg->indicesCount);
145 envelope *newenv = UsrToEnv(newmsg);
146 CkPackMessage(&newenv);
148 ComlibSectionHashKey key(CkMyPe(), sid->_cookie.info.sInfo.cInfo.id);
150 obj = sec_ht.get(key);
151 ComlibPrintf("[%d] looking up key sid->_cookie.sInfo.cInfo.id=%d. Found obj=%p\n", CkMyPe(), (int)sid->_cookie.info.sInfo.cInfo.id, obj);
152 CkAssert(obj_inserted == obj);
157 CkPrintf("[%d] WARNING: Cannot Find ComlibRectSectionHashObject object in hash table sec_ht!\n", CkMyPe());
158 CkAbort("Cannot Find object. sec_ht.get(key)==NULL");
159 // If the number of array elements is fewer than the number of PEs, this happens frequently
162 char *msg = cmsg->getCharmMessage();
163 localMulticast(UsrToEnv(msg), obj, (CkMcastBaseMsg*)msg);
166 if (newmsg != NULL) {
167 remoteMulticast(UsrToEnv(newmsg), obj);
175 CkAbort("Section multicast cannot be used without a section proxy");
180 ComlibSectionHashObject * MulticastStrategy::insertSectionID(CkSectionID *sid, int npes, ComlibMulticastIndexCount* pelist) {
182 ComlibPrintf("[%d] MulticastStrategy:insertSectionID\n",CkMyPe());
183 ComlibPrintf("[%d] MulticastStrategy:insertSectionID sid->_cookie.sInfo.cInfo.id=%d \n",CkMyPe(), (int)sid->_cookie.info.sInfo.cInfo.id);
185 // double StartTime = CmiWallTimer();
187 ComlibSectionHashKey key(CkMyPe(), sid->_cookie.info.sInfo.cInfo.id);
189 ComlibSectionHashObject *obj = NULL;
190 obj = sec_ht.get(key);
193 ComlibPrintf("MulticastStrategy:insertSectionID: Deleting old object on proc %d for id %d\n",
194 CkMyPe(), sid->_cookie.info.sInfo.cInfo.id);
198 ComlibPrintf("[%d] Creating new ComlibSectionHashObject in insertSectionID\n", CkMyPe());
199 obj = new ComlibSectionHashObject();
200 CkArrayID aid(sid->_cookie.get_aid());
201 sinfo.getLocalIndices(sid->_nElems, sid->_elems, aid, obj->indices);
203 createObjectOnSrcPe(obj, npes, pelist);
204 sec_ht.put(key) = obj;
205 ComlibPrintf("[%d] Inserting object %p into sec_ht\n", CkMyPe(), obj);
206 ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
210 // traceUserBracketEvent( 2204, StartTime, CmiWallTimer());
214 extern void CmiReference(void *);
216 //Send the multicast message the local array elements. The message is
217 //copied and sent if elements exist.
218 void MulticastStrategy::localMulticast(envelope *env,
219 ComlibSectionHashObject *obj,
220 CkMcastBaseMsg *base) {
222 // double StartTime = CmiWallTimer();
224 int nIndices = obj->indices.size();
226 if(obj->msg != NULL) {
231 ComlibPrintf("[%d] localMulticast nIndices=%d\n", CkMyPe(), nIndices);
234 void *msg = EnvToUsr(env);
237 msg1 = CkCopyMsg(&msg);
239 CmiReference(UsrToEnv(msg1));
240 obj->msg = (void *)UsrToEnv(msg1);
242 int reply = ComlibArrayInfo::localMulticast(&(obj->indices), UsrToEnv(msg1));
244 // some of the objects were not local, get the update!
245 CkMcastBaseMsg *errorMsg = sinfo.getNewDeliveryErrorMsg(base);
246 envelope *errorEnv = UsrToEnv(errorMsg);
247 CmiSetHandler(errorEnv, CkpvAccess(comlib_handler));
248 ((CmiMsgHeaderExt *) errorEnv)->stratid = getInstance();
249 CmiSyncSendAndFree(env->getSrcPe(), errorEnv->getTotalsize(), (char*)errorEnv);
253 // traceUserBracketEvent( 2200, StartTime, CmiWallTimer());
258 //Calls default multicast scheme to send the messages. It could
259 //also call a converse lower level strategy to do the muiticast.
260 //For example pipelined multicast
261 void MulticastStrategy::remoteMulticast(envelope *env,
262 ComlibSectionHashObject *obj) {
264 // double StartTime = CmiWallTimer();
266 int npes = obj->npes;
267 int *pelist = obj->pelist;
274 //CmiSetHandler(env, handlerId);
275 CmiSetHandler(env, CkpvAccess(comlib_handler));
277 ((CmiMsgHeaderExt *) env)->stratid = getInstance();
279 //Collect Multicast Statistics
280 RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
283 //Sending a remote multicast
285 ComlibPrintf("[%d] remoteMulticast Sending to %d PEs: \n", CkMyPe(), npes);
286 for(int i=0;i<npes;i++){
287 ComlibPrintf("[%d] %d\n", CkMyPe(), pelist[i]);
290 CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
291 //CmiSyncBroadcastAndFree(env->getTotalsize(), (char*)env);
293 // traceUserBracketEvent( 2201, StartTime, CmiWallTimer());
297 void MulticastStrategy::pup(PUP::er &p){
299 CharmStrategy::pup(p);
303 void MulticastStrategy::handleMessage(void *msg){
306 // double StartTime = CmiWallTimer();
308 envelope *env = (envelope *)msg;
309 RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
311 //Section multicast base message
312 CkMcastBaseMsg *cbmsg = (CkMcastBaseMsg *)EnvToUsr(env);
313 if (cbmsg->magic != _SECTION_MAGIC) CkAbort("MulticastStrategy received bad message! Did you forget to inherit from CkMcastBaseMsg?\n");
315 int status = cbmsg->_cookie.info.sInfo.cInfo.status;
316 ComlibPrintf("[%d] In handleMulticastMessage %d\n", CkMyPe(), status);
318 if(status == COMLIB_MULTICAST_NEW_SECTION)
319 handleNewMulticastMessage(env);
320 else if (status == COMLIB_MULTICAST_SECTION_ERROR) {
321 // some objects were not on the correct processor, mark the section as
322 // old. next time we try to use it, a new one will be generated with the
323 // updated inforamtion in the location manager (since the wrong delivery
324 // updated it indirectly.
325 ComlibSectionHashKey key(cbmsg->_cookie.get_pe(),
326 cbmsg->_cookie.info.sInfo.cInfo.id);
328 ComlibSectionHashObject *obj;
329 obj = sec_ht.get(key);
332 CkAbort("Destination indices is NULL\n");
334 // mark the section as old
336 } else if (status == COMLIB_MULTICAST_OLD_SECTION) {
337 //status == COMLIB_MULTICAST_OLD_SECTION, use the cached section id
338 ComlibSectionHashKey key(cbmsg->_cookie.get_pe(),
339 cbmsg->_cookie.info.sInfo.cInfo.id);
341 ComlibSectionHashObject *obj;
342 obj = sec_ht.get(key);
345 CkAbort("Destination indices is NULL\n");
347 localMulticast(env, obj, cbmsg);
348 remoteMulticast(env, obj);
350 CkAbort("Multicast message status is zero\n");
353 // traceUserBracketEvent( 2202, StartTime, CmiWallTimer());
358 void MulticastStrategy::handleNewMulticastMessage(envelope *env) {
360 // double StartTime = CmiWallTimer();
362 ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
363 ComlibPrintf("%d : In handleNewMulticastMessage\n", CkMyPe());
365 CkUnpackMessage(&env);
369 CkArrayIndex *local_idx_list;
371 // Extract the list of elements to be delivered locally
372 sinfo.unpack(env, localElems, local_idx_list, newenv);
374 ComlibMulticastMsg *cbmsg = (ComlibMulticastMsg *)EnvToUsr(env);
375 ComlibSectionHashKey key(cbmsg->_cookie.get_pe(),
376 cbmsg->_cookie.info.sInfo.cInfo.id);
378 ComlibSectionHashObject *old_obj = NULL;
380 old_obj = sec_ht.get(key);
381 if(old_obj != NULL) {
386 CkArrayIndex *idx_list_array = new CkArrayIndex[idx_list.size()];
387 for(int count = 0; count < idx_list.size(); count++)
388 idx_list_array[count] = idx_list[count];
391 ComlibPrintf("[%d] Creating new ComlibSectionHashObject in handleNewMulticastMessage\n", CkMyPe());
392 ComlibSectionHashObject *new_obj = new ComlibSectionHashObject();
393 new_obj->indices.resize(0);
394 for (int i=0; i<localElems; ++i) new_obj->indices.insertAtEnd(local_idx_list[i]);
396 createObjectOnIntermediatePe(new_obj, cbmsg->nPes, cbmsg->indicesCount, cbmsg->_cookie.get_pe());
398 ComlibPrintf("[%d] Inserting object into sec_ht\n", CkMyPe());
399 ComlibPrintf("[%d] sec_ht.numObjects() =%d\n", CkMyPe(), sec_ht.numObjects());
401 sec_ht.put(key) = new_obj;
404 /* local multicast must come before remote multicast because the second can delete
405 * the passed env parameter, and cbmsg is part of env!
407 // traceUserBracketEvent( 2203, StartTime, CmiWallTimer());
409 localMulticast(newenv, new_obj, cbmsg); //local multicast always copies
410 remoteMulticast(env, new_obj);