LRTS Comm Thread Tracing in message recieve
[charm.git] / src / ck-core / ckevacuation.C
blob6f5f478bac8a521e15f296f331af42f323d408fd
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include "charm++.h"
5 #include "ck.h"
6 #include "ckevacuation.h"
8 //#define DEBUGC(x) x
9 #define DEBUGC(x) 
11 /***********************************************************************************************/
13         FAULT_EVAC
18 int _ckEvacBcastIdx;
19 int _ckAckEvacIdx;
20 int numValidProcessors;
22 double evacTime;
24 int remainingElements;
25 int allowMessagesOnly; /*this processor has started evacuating but not yet complete 
26                                                                                                 So allow messages to it but not immigration
27                                                                                                 */
28 double firstRecv;
30         Called on other processors that msg->pe processor has been evacuated
32 void _ckEvacBcast(struct evacMsg *msg){
33         if(msg->remainingElements == -1){
34                         firstRecv = CmiWallTimer();
35                         return;
36         }
37         printf("[%d]<%.6f> Processor %d is being evacuated \n",CkMyPe(),CmiWallTimer(),msg->pe);
38         fprintf(stderr,"[%d] <%.6f> Processor %d is being evacuated \n",CkMyPe(),CmiWallTimer(),msg->pe);
39         CpvAccess(_validProcessors)[msg->pe] = 0;
40         set_avail_vector(CpvAccess(_validProcessors));
41         if(msg->pe == CpvAccess(serializer)){
42                 CpvAccess(serializer) = getNextSerializer();
43         }
44         /*
45                 Inform all processors about the current position of 
46                 the elements that have a home on them.
47                 Useful for the case where an element on the crashing
48                 processor has migrated away previously
49         */
50         int numGroups = CkpvAccess(_groupIDTable)->size();
51         int i;
52         CkElementInformHome inform;
53         CKLOCMGR_LOOP(((CkLocMgr*)(obj))->iterate(inform););
54         
55         if(msg->remainingElements == 0){
56                 struct evacMsg reply;
57                 reply.pe = CkMyPe();
58         //      printf("[%d] Last broadcast received at %.6lf in %.6lf \n",CkMyPe(),CmiWallTimer(),CmiWallTimer()-firstRecv);
59         CmiSetHandler(&reply,_ckAckEvacIdx);
60         CmiSyncSend(msg->pe,sizeof(struct evacMsg),(char *)&reply);
61                 allowMessagesOnly = -1;
62         }else{
63                 allowMessagesOnly = msg->pe;
64         }
71         Acks that all the valid processors have received the 
72         evacuate broadcast
74 void _ckAckEvac(struct evacMsg *msg){
75         numValidProcessors--;
76         if(numValidProcessors == 0){
77                 set_avail_vector(CpvAccess(_validProcessors));
78                 printf("[%d] <%.6f> Reply from all processors took %.6lf s \n",CkMyPe(),CmiWallTimer(),CmiWallTimer()-evacTime);
79 //              CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)CkStopScheduler,0);
80 //              CkStopScheduler();
81         }
85 void CkAnnounceEvac(int remain){
86         //      Tell all the processors
87         struct evacMsg msg;
88         msg.pe = CkMyPe();
89         msg.remainingElements = remain;
90         CmiSetHandler(&msg,_ckEvacBcastIdx);
91         CmiSyncBroadcast(sizeof(struct evacMsg),(char *)&msg);
95 void CkStopScheduler(){
96         if(remainingElements > 0){
97                 return;
98         }
99                 /*
100                 Tell the reduction managers that this processor is going down now
101                 and that they should tell their parents to cut them off
102         */      
103         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
104         for(int i=0;i<numNodeGroups;i++){
105     IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();     
106                 obj->doneEvacuate();
107         }
108         int thisPE = CkMyPe();
109         printf("[%d] Stopping Scheduler \n", thisPE);
110         /*stops putting messages into the scheduler queue*/
111         CpvAccess(_validProcessors)[thisPE]=0;
114 void CkEmmigrateElement(void *arg){
115         CkLocRec_local *rec = (CkLocRec_local *)arg;
116         CkLocMgr *mgr = rec->getLocMgr();
117         const CkArrayIndex &idx = rec->getIndex();
118         int targetPE=getNextPE(idx);
119         //set this flag so that load balancer is not informed when
120         //this element migrates
121         rec->AsyncMigrate(true);
122         mgr->emigrate(rec,targetPE);
123         CkEvacuatedElement();
124         
127 void CkEvacuatedElement(){
128         if(!CpvAccess(_validProcessors)[CkMyPe()]){
129                 return;
130         }
131         if(!CkpvAccess(startedEvac)){
132                 return;
133         }
134         remainingElements=0;
135         //      Go through all the groups and find the location managers.
136         //      For each location manager migrate away all the elements
137         // Recalculate the number of remaining elements
138         int numGroups = CkpvAccess(_groupIDTable)->size();
139         int i;
140   CkElementEvacuate evac;
141         CKLOCMGR_LOOP(((CkLocMgr*)(obj))->iterate(evac););
142         
143         CmiAssert(remainingElements >= 0);
144         DEBUGC(printf("[%d] remaining elements %d \n",CkMyPe(),remainingElements));
145         if(remainingElements == 0){
146                 printf("[%d] Processor empty in %.6lfs \n",CkMyPe(),CmiWallTimer()-evacTime);
147                 CpvAccess(_validProcessors)[CkMyPe()] = 0;
148                 CkAnnounceEvac(0);
149                 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
150                 for(int i=0;i<numNodeGroups;i++){
151           IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();       
152                         obj->doneEvacuate();
153                 }
154         }       
157 int evacuate;
158 extern "C" void CkClearAllArrayElements();
160 void CkDecideEvacPe(){
161         if(evacuate > 0){
162                 return;
163         }
164         evacuate = 1;
165         evacTime = CmiWallTimer();
166         CkClearAllArrayElements();
171 int numEvacuated;
174         Code for moving off all the array elements on a processor
176 extern "C"
177 void CkClearAllArrayElements(){
178         if(evacuate != 1){
179                         return;
180         }
181         evacuate=2;
182         remainingElements=0;
183         numEvacuated=0;
184 //      evacTime = CmiWallTimer();
185         printf("[%d] <%.6lf> Start Evacuation \n",CkMyPe(),evacTime);
186         CkpvAccess(startedEvac)=1;
187         //      Make sure the broadcase serializer changes
188         if(CkMyPe() == CpvAccess(serializer)){
189                 CpvAccess(serializer) = getNextSerializer();
190         }
191 //      CkAnnounceEvac(-1);
192         
193         //      Go through all the groups and find the location managers.
194         //      For each location manager migrate away all the elements
195         int numGroups = CkpvAccess(_groupIDTable)->size();
196         int i;
197   CkElementEvacuate evac;
198         CKLOCMGR_LOOP(((CkLocMgr*)(obj))->iterate(evac););
200         /*
201                 Tell the nodegroup reduction managers that they need to 
202                 start changing their reduction trees
203         */
204         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
205         for(i=0;i<numNodeGroups;i++){
206     IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();     
207                 obj->evacuate();
208         }
209         
210         DEBUGC(printf("[%d] remaining elements %d number Evacuated %d \n",CkMyPe(),remainingElements,numEvacuated));
211         numValidProcessors = CkNumValidPes()-1;
212         CkAnnounceEvac(remainingElements);
213         if(remainingElements == 0){
214                 /*
215                         Tell the nodegroup reduction managers when all the elements have been
216                         removed
217                 */
218                 printf("[%d] Processor empty in %.6lfs \n",CkMyPe(),CmiWallTimer()-evacTime);
219                 CpvAccess(_validProcessors)[CkMyPe()] = 0;
220                 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
221                 for(int i=0;i<numNodeGroups;i++){
222           IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();       
223                         obj->doneEvacuate();
224                 }
225         }       
228 void CkClearAllArrayElementsCPP(){
229         CkClearAllArrayElements();
232 void CkElementEvacuate::addLocation(CkLocation &loc){
233         CkLocMgr *locMgr = loc.getManager();
234         CkLocRec_local *rec = loc.getLocalRecord();
235         const CkArrayIndex &i = loc.getIndex();
236         int targetPE=getNextPE(i);
237         if(rec->isAsyncEvacuate()){
238                 numEvacuated++;
239                 printf("[%d]<%.6lf> START to emigrate array element \n",CkMyPe(),CmiWallTimer());
240                 rec->AsyncMigrate(true);
241                 locMgr->emigrate(rec,targetPE);
242                 printf("[%d]<%.6lf> emigrated array element \n",CkMyPe(),CmiWallTimer());
243         }else{
244                 /*
245                         This is in all probability a location containing an ampi, ampiParent and their
246                         associated TCharm thread.
247                 */
248                 CkVec<CkMigratable *>list;
249                 locMgr->migratableList(rec,list);
250                 DEBUGC(printf("[%d] ArrayElement not ready to Evacuate number of migratable %d \n",CkMyPe(),list.size()));
251                 for(int i=0;i<list.size();i++){
252                         if(list[i]->isAsyncEvacuate()){
253                                 DEBUGC(printf("[%d] possible TCharm element decides to migrate \n",CkMyPe()));
254 //                              list[i]->ckMigrate(targetPE);
255                                 rec->AsyncMigrate(true);
256                                 locMgr->emigrate(rec,targetPE);
257                                 numEvacuated++;
258                         }
259                 }
260         //      remainingElements++;
261                 //inform new home that this element is here
262         //      locMgr->informHome(i,CkMyPe());
263         }
266 void CkElementInformHome::addLocation(CkLocation &loc){
267         const CkArrayIndex &i = loc.getIndex();
268         CkLocMgr *locMgr = loc.getManager();
269         locMgr->informHome(i,CkMyPe()); 
274         Find the homePE of an array element,  given an index. Used only on a
275         processor that is being evacuated
278 int getNextPE(const CkArrayIndex &i){
279         if (i.nInts==1) {
280       //Map 1D integer indices in simple round-robin fashion
281       int ans= (i.data()[0])%CkNumPes();
282                         while(!CpvAccess(_validProcessors)[ans] || ans == CkMyPe()){
283                                 ans = (ans +1 )%CkNumPes();
284                         }
285                         return ans;
286   }else{
287                 //Map other indices based on their hash code, mod a big prime.
288                         unsigned int hash=(i.hash()+739)%1280107;
289                         int ans = (hash % CkNumPes());
290                         while(!CpvAccess(_validProcessors)[ans] || ans == CkMyPe()){
291                                 ans = (ans +1 )%CkNumPes();
292                         }
293                         return ans;
295         }
300         If it is found that the serializer processor has crashed, decide on a 
301         new serializer, should return the same answer on all processors
303 int getNextSerializer(){
304         int currentSerializer = CpvAccess(serializer);
305         int nextSerializer = (currentSerializer+1)%CkNumPes();
307         while(!(CpvAccess(_validProcessors)[nextSerializer])){
308                 nextSerializer = (nextSerializer + 1)%CkNumPes();
309                 if(nextSerializer == currentSerializer){
310                         CkAbort("All processors are invalid ");
311                 }
312         }
313         return nextSerializer;
316 int CkNumValidPes(){
317 #if CMK_BIGSIM_CHARM
318         return CkNumPes();
319 #else
320         int count=0;
321         for(int i=0;i<CkNumPes();i++){
322                 if(CpvAccess(_validProcessors)[i]){
323                         count++;
324                 }
325         }
326         return count;
327 #endif
331 void processRaiseEvacFile(char *raiseEvacFile){
332         FILE *fp = fopen(raiseEvacFile,"r");
333         if(fp == NULL){
334                 printf("Could not open raiseevac file %s. Ignoring raiseevac \n",raiseEvacFile);
335                 return;
336         }
337         char line[100];
338         while(fgets(line,99,fp)!=0){
339                 int pe,faultTime;
340                 sscanf(line,"%d %d",&pe,&faultTime);
341                 if(pe == CkMyPe()){
342                         printf("[%d] Processor to be evacuated after %ds\n",CkMyPe(),faultTime);
343                         CcdCallFnAfter((CcdVoidFn)CkDecideEvacPe, 0, faultTime*1000);
344                 }
345         }
346         fclose(fp);