6 #include "ckevacuation.h"
11 /***********************************************************************************************/
20 int numValidProcessors;
24 int remainingElements;
25 int allowMessagesOnly; /*this processor has started evacuating but not yet complete
26 So allow messages to it but not immigration
30 Called on other processors that msg->pe processor has been evacuated
32 void _ckEvacBcast(struct evacMsg *msg){
33 if(msg->remainingElements == -1){
34 firstRecv = CmiWallTimer();
37 printf("[%d]<%.6f> Processor %d is being evacuated \n",CkMyPe(),CmiWallTimer(),msg->pe);
38 fprintf(stderr,"[%d] <%.6f> Processor %d is being evacuated \n",CkMyPe(),CmiWallTimer(),msg->pe);
39 CpvAccess(_validProcessors)[msg->pe] = 0;
40 set_avail_vector(CpvAccess(_validProcessors));
41 if(msg->pe == CpvAccess(serializer)){
42 CpvAccess(serializer) = getNextSerializer();
45 Inform all processors about the current position of
46 the elements that have a home on them.
47 Useful for the case where an element on the crashing
48 processor has migrated away previously
50 int numGroups = CkpvAccess(_groupIDTable)->size();
52 CkElementInformHome inform;
53 CKLOCMGR_LOOP(((CkLocMgr*)(obj))->iterate(inform););
55 if(msg->remainingElements == 0){
58 // printf("[%d] Last broadcast received at %.6lf in %.6lf \n",CkMyPe(),CmiWallTimer(),CmiWallTimer()-firstRecv);
59 CmiSetHandler(&reply,_ckAckEvacIdx);
60 CmiSyncSend(msg->pe,sizeof(struct evacMsg),(char *)&reply);
61 allowMessagesOnly = -1;
63 allowMessagesOnly = msg->pe;
71 Acks that all the valid processors have received the
74 void _ckAckEvac(struct evacMsg *msg){
76 if(numValidProcessors == 0){
77 set_avail_vector(CpvAccess(_validProcessors));
78 printf("[%d] <%.6f> Reply from all processors took %.6lf s \n",CkMyPe(),CmiWallTimer(),CmiWallTimer()-evacTime);
79 // CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)CkStopScheduler,0);
85 void CkAnnounceEvac(int remain){
86 // Tell all the processors
89 msg.remainingElements = remain;
90 CmiSetHandler(&msg,_ckEvacBcastIdx);
91 CmiSyncBroadcast(sizeof(struct evacMsg),(char *)&msg);
95 void CkStopScheduler(){
96 if(remainingElements > 0){
100 Tell the reduction managers that this processor is going down now
101 and that they should tell their parents to cut them off
103 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
104 for(int i=0;i<numNodeGroups;i++){
105 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();
108 int thisPE = CkMyPe();
109 printf("[%d] Stopping Scheduler \n", thisPE);
110 /*stops putting messages into the scheduler queue*/
111 CpvAccess(_validProcessors)[thisPE]=0;
114 void CkEmmigrateElement(void *arg){
115 CkLocRec_local *rec = (CkLocRec_local *)arg;
116 CkLocMgr *mgr = rec->getLocMgr();
117 const CkArrayIndex &idx = rec->getIndex();
118 int targetPE=getNextPE(idx);
119 //set this flag so that load balancer is not informed when
120 //this element migrates
121 rec->AsyncMigrate(true);
122 mgr->emigrate(rec,targetPE);
123 CkEvacuatedElement();
127 void CkEvacuatedElement(){
128 if(!CpvAccess(_validProcessors)[CkMyPe()]){
131 if(!CkpvAccess(startedEvac)){
135 // Go through all the groups and find the location managers.
136 // For each location manager migrate away all the elements
137 // Recalculate the number of remaining elements
138 int numGroups = CkpvAccess(_groupIDTable)->size();
140 CkElementEvacuate evac;
141 CKLOCMGR_LOOP(((CkLocMgr*)(obj))->iterate(evac););
143 CmiAssert(remainingElements >= 0);
144 DEBUGC(printf("[%d] remaining elements %d \n",CkMyPe(),remainingElements));
145 if(remainingElements == 0){
146 printf("[%d] Processor empty in %.6lfs \n",CkMyPe(),CmiWallTimer()-evacTime);
147 CpvAccess(_validProcessors)[CkMyPe()] = 0;
149 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
150 for(int i=0;i<numNodeGroups;i++){
151 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();
158 extern "C" void CkClearAllArrayElements();
160 void CkDecideEvacPe(){
165 evacTime = CmiWallTimer();
166 CkClearAllArrayElements();
174 Code for moving off all the array elements on a processor
177 void CkClearAllArrayElements(){
184 // evacTime = CmiWallTimer();
185 printf("[%d] <%.6lf> Start Evacuation \n",CkMyPe(),evacTime);
186 CkpvAccess(startedEvac)=1;
187 // Make sure the broadcase serializer changes
188 if(CkMyPe() == CpvAccess(serializer)){
189 CpvAccess(serializer) = getNextSerializer();
191 // CkAnnounceEvac(-1);
193 // Go through all the groups and find the location managers.
194 // For each location manager migrate away all the elements
195 int numGroups = CkpvAccess(_groupIDTable)->size();
197 CkElementEvacuate evac;
198 CKLOCMGR_LOOP(((CkLocMgr*)(obj))->iterate(evac););
201 Tell the nodegroup reduction managers that they need to
202 start changing their reduction trees
204 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
205 for(i=0;i<numNodeGroups;i++){
206 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();
210 DEBUGC(printf("[%d] remaining elements %d number Evacuated %d \n",CkMyPe(),remainingElements,numEvacuated));
211 numValidProcessors = CkNumValidPes()-1;
212 CkAnnounceEvac(remainingElements);
213 if(remainingElements == 0){
215 Tell the nodegroup reduction managers when all the elements have been
218 printf("[%d] Processor empty in %.6lfs \n",CkMyPe(),CmiWallTimer()-evacTime);
219 CpvAccess(_validProcessors)[CkMyPe()] = 0;
220 int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
221 for(int i=0;i<numNodeGroups;i++){
222 IrrGroup *obj = CksvAccess(_nodeGroupTable)->find((CksvAccess(_nodeGroupIDTable))[i]).getObj();
228 void CkClearAllArrayElementsCPP(){
229 CkClearAllArrayElements();
232 void CkElementEvacuate::addLocation(CkLocation &loc){
233 CkLocMgr *locMgr = loc.getManager();
234 CkLocRec_local *rec = loc.getLocalRecord();
235 const CkArrayIndex &i = loc.getIndex();
236 int targetPE=getNextPE(i);
237 if(rec->isAsyncEvacuate()){
239 printf("[%d]<%.6lf> START to emigrate array element \n",CkMyPe(),CmiWallTimer());
240 rec->AsyncMigrate(true);
241 locMgr->emigrate(rec,targetPE);
242 printf("[%d]<%.6lf> emigrated array element \n",CkMyPe(),CmiWallTimer());
245 This is in all probability a location containing an ampi, ampiParent and their
246 associated TCharm thread.
248 CkVec<CkMigratable *>list;
249 locMgr->migratableList(rec,list);
250 DEBUGC(printf("[%d] ArrayElement not ready to Evacuate number of migratable %d \n",CkMyPe(),list.size()));
251 for(int i=0;i<list.size();i++){
252 if(list[i]->isAsyncEvacuate()){
253 DEBUGC(printf("[%d] possible TCharm element decides to migrate \n",CkMyPe()));
254 // list[i]->ckMigrate(targetPE);
255 rec->AsyncMigrate(true);
256 locMgr->emigrate(rec,targetPE);
260 // remainingElements++;
261 //inform new home that this element is here
262 // locMgr->informHome(i,CkMyPe());
266 void CkElementInformHome::addLocation(CkLocation &loc){
267 const CkArrayIndex &i = loc.getIndex();
268 CkLocMgr *locMgr = loc.getManager();
269 locMgr->informHome(i,CkMyPe());
274 Find the homePE of an array element, given an index. Used only on a
275 processor that is being evacuated
278 int getNextPE(const CkArrayIndex &i){
280 //Map 1D integer indices in simple round-robin fashion
281 int ans= (i.data()[0])%CkNumPes();
282 while(!CpvAccess(_validProcessors)[ans] || ans == CkMyPe()){
283 ans = (ans +1 )%CkNumPes();
287 //Map other indices based on their hash code, mod a big prime.
288 unsigned int hash=(i.hash()+739)%1280107;
289 int ans = (hash % CkNumPes());
290 while(!CpvAccess(_validProcessors)[ans] || ans == CkMyPe()){
291 ans = (ans +1 )%CkNumPes();
300 If it is found that the serializer processor has crashed, decide on a
301 new serializer, should return the same answer on all processors
303 int getNextSerializer(){
304 int currentSerializer = CpvAccess(serializer);
305 int nextSerializer = (currentSerializer+1)%CkNumPes();
307 while(!(CpvAccess(_validProcessors)[nextSerializer])){
308 nextSerializer = (nextSerializer + 1)%CkNumPes();
309 if(nextSerializer == currentSerializer){
310 CkAbort("All processors are invalid ");
313 return nextSerializer;
321 for(int i=0;i<CkNumPes();i++){
322 if(CpvAccess(_validProcessors)[i]){
331 void processRaiseEvacFile(char *raiseEvacFile){
332 FILE *fp = fopen(raiseEvacFile,"r");
334 printf("Could not open raiseevac file %s. Ignoring raiseevac \n",raiseEvacFile);
338 while(fgets(line,99,fp)!=0){
340 sscanf(line,"%d %d",&pe,&faultTime);
342 printf("[%d] Processor to be evacuated after %ds\n",CkMyPe(),faultTime);
343 CcdCallFnAfter((CcdVoidFn)CkDecideEvacPe, 0, faultTime*1000);