Bug #1421:Running leanmd with error checking enabled in Charm++ triggers assertion...
[charm.git] / src / ck-ldb / CentralLB.C
blob7d67da8d1d33336556ab2aedadf4d1bb132b50cd
2 /**
3  * \addtogroup CkLdb
4 */
5 /*@{*/
7 #include <charm++.h>
8 #include "ck.h"
9 #include "envelope.h"
10 #include "CentralLB.h"
11 #include "LBDBManager.h"
12 #include "LBSimulation.h"
14 #define  DEBUGF(x)       // CmiPrintf x;
15 #define  DEBUG(x)        // x;
17 #if CMK_MEM_CHECKPOINT
18    /* can not handle reduction in inmem FT */
19 #define USE_REDUCTION         0
20 #define USE_LDB_SPANNING_TREE 0
21 #else
22 #define USE_REDUCTION         1
23 #define USE_LDB_SPANNING_TREE 1
24 #endif
26 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
27 extern bool _restartFlag;
28 extern void getGlobalStep(CkGroupID );
29 extern void initMlogLBStep(CkGroupID );
30 extern int globalResumeCount;
31 extern void sendDummyMigrationCounts(int *);
32 #endif
34 #if CMK_GRID_QUEUE_AVAILABLE
35 CpvExtern(void *, CkGridObject);
36 #endif
38 #if CMK_GLOBAL_LOCATION_UPDATE      
39 extern void UpdateLocation(MigrateInfo& migData); 
40 #endif
42 #if CMK_SHRINK_EXPAND
43 extern "C" void charmrun_realloc(char *s);
44 extern char willContinue;
45 extern realloc_state pending_realloc_state;
46 extern char * se_avail_vector;
47 extern "C" int mynewpe;
48 extern char *_shrinkexpand_basedir;
49 int numProcessAfterRestart;
50 int mynewpe=0;
51 #endif
52 CkGroupID loadbalancer;
53 int * lb_ptr;
54 bool load_balancer_created;
56 CreateLBFunc_Def(CentralLB, "CentralLB base class")
58 static int broadcastThreshold = 32;
60 static void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
61                              LBMigrateMsg *, LBInfo &info, int considerComm);
64 void CreateCentralLB()
66   CProxy_CentralLB::ckNew(0);
70 void CentralLB::staticStartLB(void* data)
72   CentralLB *me = (CentralLB*)(data);
73   me->StartLB();
76 void CentralLB::staticMigrated(void* data, LDObjHandle h, int waitBarrier)
78   CentralLB *me = (CentralLB*)(data);
79   me->Migrated(h, waitBarrier);
82 void CentralLB::staticAtSync(void* data)
84   CentralLB *me = (CentralLB*)(data);
85   me->AtSync();
88 void CentralLB::initLB(const CkLBOptions &opt)
90 #if CMK_LBDB_ON
91   lbname = "CentralLB";
92   thisProxy = CProxy_CentralLB(thisgroup);
93   //  CkPrintf("Construct in %d\n",CkMyPe());
94   loadbalancer = thisgroup;
95   // create and turn on by default
96   receiver = theLbdb->
97     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
98   notifier = theLbdb->getLBDB()->
99     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
100   startLbFnHdl = theLbdb->getLBDB()->
101     AddStartLBFn((LDStartLBFn)(staticStartLB),(void*)(this));
103   // CkPrintf("[%d] CentralLB initLB \n",CkMyPe());
104   if (opt.getSeqNo() > 0) turnOff();
106   stats_msg_count = 0;
107   statsMsgsList = NULL;
108   statsData = NULL;
110   storedMigrateMsg = NULL;
111   reduction_started = false;
113   // for future predictor
114   if (_lb_predict) predicted_model = new FutureModel(_lb_predict_window);
115   else predicted_model=0;
116   // register user interface callbacks
117   theLbdb->getLBDB()->SetupPredictor((LDPredictModelFn)(staticPredictorOn),(LDPredictWindowFn)(staticPredictorOnWin),(LDPredictFn)(staticPredictorOff),(LDPredictModelFn)(staticChangePredictor),(void*)(this));
119   myspeed = theLbdb->ProcessorSpeed();
121   migrates_completed = 0;
122   future_migrates_completed = 0;
123   migrates_expected = -1;
124   future_migrates_expected = -1;
125   cur_ld_balancer = _lb_args.central_pe();      // 0 default
126   lbdone = 0;
127   count_msgs=0;
128   statsMsg = NULL;
129   use_thread = false;
131   if (_lb_args.statsOn()) theLbdb->CollectStatsOn();
133   load_balancer_created = true;
134 #endif
135 #ifdef TEMP_LDB
136         logicalCoresPerNode=physicalCoresPerNode=4;
137         logicalCoresPerChip=4;
138         numSockets=1;
139 #endif
143 CentralLB::~CentralLB()
145 #if CMK_LBDB_ON
146   delete [] statsMsgsList;
147   delete statsData;
148   theLbdb = CProxy_LBDatabase(_lbdb).ckLocalBranch();
149   if (theLbdb) {
150     theLbdb->getLBDB()->
151       RemoveNotifyMigrated(notifier);
152     theLbdb->
153       RemoveStartLBFn((LDStartLBFn)(staticStartLB));
154   }
155 #endif
158 void CentralLB::turnOn() 
160 #if CMK_LBDB_ON
161   theLbdb->getLBDB()->
162     TurnOnBarrierReceiver(receiver);
163   theLbdb->getLBDB()->
164     TurnOnNotifyMigrated(notifier);
165   theLbdb->getLBDB()->
166     TurnOnStartLBFn(startLbFnHdl);
167 #endif
170 void CentralLB::turnOff() 
172 #if CMK_LBDB_ON
173   theLbdb->getLBDB()->
174     TurnOffBarrierReceiver(receiver);
175   theLbdb->getLBDB()->
176     TurnOffNotifyMigrated(notifier);
177   theLbdb->getLBDB()->
178     TurnOffStartLBFn(startLbFnHdl);
179 #endif
182 void CentralLB::SetPESpeed(int speed) 
184   myspeed = speed;
187 int CentralLB::GetPESpeed() 
189   return myspeed;
192 void CentralLB::AtSync()
194 #if CMK_LBDB_ON
195   DEBUGF(("[%d] CentralLB AtSync step %d!!!!!\n",CkMyPe(),step()));
196 #if CMK_MEM_CHECKPOINT  
197   CkSetInLdb();
198 #endif
199 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
200         CpvAccess(_currentObj)=this;
201 #endif
203   // if num of processor is only 1, nothing should happen
204   if (!QueryBalanceNow(step()) || CkNumPes() == 1) {
205     MigrationDone(0);
206     return;
207   }
208   if(CmiNodeAlive(CkMyPe())){
209     thisProxy [CkMyPe()].ProcessAtSync();
210   }
211 #endif
214 void CentralLB::ProcessAtSync()
216 #if CMK_LBDB_ON
217   if (reduction_started) return;              // reducton in progress
219   CmiAssert(CmiNodeAlive(CkMyPe()));
220   if (CkMyPe() == cur_ld_balancer) {
221     start_lb_time = CkWallTimer();
222   }
224 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
225         initMlogLBStep(thisgroup);
226 #endif
228   // build message
229   BuildStatsMsg();
231 #if USE_REDUCTION
232     // reduction to get total number of objects and comm
233     // so that processor 0 can pre-allocate load balancing database
234   int counts[2];
235   counts[0] = theLbdb->GetObjDataSz();
236   counts[1] = theLbdb->GetCommDataSz();
238   CkCallback cb;
239   if (concurrent)
240     cb = CkCallback(CkReductionTarget(CentralLB, ReceiveCounts), thisProxy); // every PE receives counts
241   else
242     cb = CkCallback(CkReductionTarget(CentralLB, ReceiveCounts), thisProxy[0]);
243   contribute(2*sizeof(int), counts, CkReduction::sum_int, cb);
244   reduction_started = true;
245 #else
246   SendStats();
247 #endif
248 #endif
251 #if defined(TEMP_LDB)
252 static int  cpufreq_sysfs_write (
253                      const char *setting,int proc
254                      )
256 char path[100];
257 sprintf(path,"/sys/devices/system/cpu/cpu%d/cpufreq/scaling_setspeed",proc);
258                 FILE *fd = fopen (path, "w");
260                 if (!fd) {
261                         printf("PROC#%d ooooooo666 FILE OPEN ERROR file=%s\n",CkMyPe(),path);
262                         return -1;
263                 }
264 //                else CkPrintf("PROC#%d opened freq file=%s\n",proc,path);
266         fseek ( fd , 0 , SEEK_SET );
267         int numw=fprintf (fd, setting);
268         if (numw <= 0) {
270                 fclose (fd);
271                 printf("FILE WRITING ERROR\n");
272                 return 0;
273         }
274 //        else CkPrintf("Freq for Proc#%d set to %s numw=%d\n",proc,setting,numw);
275         fclose(fd);
276         return 1;
280 static int cpufreq_sysfs_read (int proc)
282         FILE *fd;
283         char path[100];
284         int i=proc;
285         sprintf(path,"/sys/devices/system/cpu/cpu%d/cpufreq/scaling_setspeed",i);
287         fd = fopen (path, "r");
289         if (!fd) {
290                 printf("33 FILE OPEN ERROR file=%s\n",path);
291                 return 0;
292         }
293         char val[10];
294         fgets(val,10,fd);
295         int ff=atoi(val);
296         fclose (fd);
298         return ff;
301 float CentralLB::getTemp(int cpu)
303         char val[10];
304         FILE *f;
305                 char path[100];
306                 sprintf(path,"/sys/devices/platform/coretemp.%d/temp1_input",cpu);
307                 f=fopen(path,"r");
308                 if (!f) {
309                         printf("777 FILE OPEN ERROR file=%s\n",path);
310                         exit(0);
311                 }
313         if(f==NULL) {printf("ddddddddddddddddddddddddddd\n");exit(0);}
314         fgets(val,10,f);
315         fclose(f);
316         return atof(val)/1000;
318 #endif
321 // called only on 0 (or every PE if concurrent=true)
322 void CentralLB::ReceiveCounts(int *counts, int n)
324   if (!concurrent) CmiAssert(CkMyPe() == 0);
325   if (statsData == NULL) statsData = new LDStats;
327     // check that only 2 counts are sent
328   CmiAssert(n == 2);
329   int n_objs = counts[0];
330   int n_comm = counts[1];
332     // resize database
333   statsData->objData.resize(n_objs);
334   statsData->from_proc.resize(n_objs);
335   statsData->to_proc.resize(n_objs);
336   statsData->commData.resize(n_comm);
338   DEBUGF(("[%d] ReceiveCounts: n_objs:%d n_comm:%d\n",CkMyPe(), n_objs, n_comm));
339         
340   if (concurrent) SendStats();
341   else thisProxy.SendStats(); // broadcast call to let everybody start to send stats
344 void CentralLB::BuildStatsMsg()
346 #if CMK_LBDB_ON
347   // build and send stats
348   const int osz = theLbdb->GetObjDataSz();
349   const int csz = theLbdb->GetCommDataSz();
351   int npes = CkNumPes();
352   CLBStatsMsg* msg = new CLBStatsMsg(osz, csz);
353   _MEMCHECK(msg);
354   msg->from_pe = CkMyPe();
355 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
356         msg->step = step();
357 #endif
358   //msg->serial = CrnRand();
361   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
362   theLbdb->IdleTime(&msg->idletime);
363   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
365 #if CMK_LB_CPUTIMER
366   theLbdb->GetTime(&msg->total_walltime,&msg->total_cputime,
367                    &msg->idletime, &msg->bg_walltime,&msg->bg_cputime);
368 #else
369   theLbdb->GetTime(&msg->total_walltime,&msg->total_walltime,
370                    &msg->idletime, &msg->bg_walltime,&msg->bg_walltime);
371 #endif
372 #if defined(TEMP_LDB)
373         float mytemp=getTemp(CkMyPe()%physicalCoresPerNode);
374         int freq=cpufreq_sysfs_read (CkMyPe()%logicalCoresPerNode);
375         msg->pe_temp=mytemp;
376         msg->pe_speed=freq;
377 #else
378   msg->pe_speed = myspeed;
379 #endif
381   DEBUGF(("Processor %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n", CkMyPe(),msg->total_walltime,msg->total_cputime,msg->idletime,msg->bg_walltime,msg->bg_cputime));
383   msg->n_objs = osz;
384   theLbdb->GetObjData(msg->objData);
385   msg->n_comm = csz;
386   theLbdb->GetCommData(msg->commData);
387 //  theLbdb->ClearLoads();
388   DEBUGF(("PE %d BuildStatsMsg %d objs, %d comm\n",CkMyPe(),msg->n_objs,msg->n_comm));
390   if(CkMyPe() == cur_ld_balancer) {
391     msg->avail_vector = new char[CkNumPes()];
392     LBDatabaseObj()->get_avail_vector(msg->avail_vector);
393     msg->next_lb = LBDatabaseObj()->new_lbbalancer();
394   }
396   CmiAssert(statsMsg == NULL);
397   statsMsg = msg;
398 #endif
402 // called on every processor
403 void CentralLB::SendStats()
405 #if CMK_LBDB_ON
406   CmiAssert(statsMsg != NULL);
407   reduction_started = false;
409 #if USE_LDB_SPANNING_TREE
410   if(CkNumPes()>1024)
411   {
412     if (CkMyPe() == cur_ld_balancer)
413       thisProxy[CkMyPe()].ReceiveStats(statsMsg);
414     else
415       thisProxy[CkMyPe()].ReceiveStatsViaTree(statsMsg);
416   }
417   else
418 #endif
419   {
420     DEBUGF(("[%d] calling ReceiveStats on step %d \n",CmiMyPe(),step()));
421     thisProxy[cur_ld_balancer].ReceiveStats(statsMsg);
422   }
424   statsMsg = NULL;
426 #ifdef __BIGSIM__
427   BgEndStreaming();
428 #endif
430   {
431   // enfore the barrier to wait until centralLB says no
432   LDOMHandle h;
433   h.id.id.idx = 0;
434   theLbdb->getLBDB()->RegisteringObjects(h);
435   }
436 #endif
439 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
440 extern int donotCountMigration;
441 #endif
443 void CentralLB::Migrated(LDObjHandle h, int waitBarrier)
445 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
446     if(donotCountMigration){
447         return ;
448     }
449 #endif
451 #if CMK_LBDB_ON
452   if (waitBarrier) {
453             migrates_completed++;
454       DEBUGF(("[%d] An object migrated! %d %d\n",CkMyPe(),migrates_completed,migrates_expected));
455     if (migrates_completed == migrates_expected) {
456       MigrationDone(1);
457     }
458   }
459   else {
460     future_migrates_completed ++;
461     DEBUGF(("[%d] An object migrated with no barrier! %d expected: %d\n",CkMyPe(),future_migrates_completed,future_migrates_expected));
462     if (future_migrates_completed == future_migrates_expected)  {
463         CheckMigrationComplete();
464     }
465   }
466 #endif
469 void CentralLB::MissMigrate(int waitForBarrier)
471   LDObjHandle h;
472   Migrated(h, waitForBarrier);
475 // build a complete data from bufferred messages
476 // not used when USE_REDUCTION = 1
477 void CentralLB::buildStats()
479     statsData->nprocs() = stats_msg_count;
480     // allocate space
481     statsData->objData.resize(statsData->n_objs);
482     statsData->from_proc.resize(statsData->n_objs);
483     statsData->to_proc.resize(statsData->n_objs);
484     statsData->commData.resize(statsData->n_comm);
486     int nobj = 0;
487     int ncom = 0;
488     int nmigobj = 0;
489     // copy all data in individule message to this big structure
490     for (int pe=0; pe<CkNumPes(); pe++) {
491        int i;
492        CLBStatsMsg *msg = statsMsgsList[pe];
493        if(msg == NULL) continue;
494        for (i=0; i<msg->n_objs; i++) {
495          statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
496          statsData->objData[nobj] = msg->objData[i];
497          if (msg->objData[i].migratable) nmigobj++;
498          nobj++;
499        }
500        for (i=0; i<msg->n_comm; i++) {
501          statsData->commData[ncom] = msg->commData[i];
502          ncom++;
503        }
504        // free the memory
505        delete msg;
506        statsMsgsList[pe]=0;
507     }
508     statsData->n_migrateobjs = nmigobj;
511 // deposit one processor data at a time, note database is pre-allocated
512 // to have enough space
513 // used when USE_REDUCTION = 1
514 void CentralLB::depositData(CLBStatsMsg *m)
516   int i;
517   if (m == NULL) return;
519   const int pe = m->from_pe;
520   struct ProcStats &procStat = statsData->procs[pe];
521 #if defined(TEMP_LDB)
522         procStat.pe_temp=m->pe_temp;
523         procStat.pe_speed=m->pe_speed;
524 #endif
526   procStat.pe = pe;
527   procStat.total_walltime = m->total_walltime;
528   procStat.idletime = m->idletime;
529   procStat.bg_walltime = m->bg_walltime;
530 #if CMK_LB_CPUTIMER
531   procStat.total_cputime = m->total_cputime;
532   procStat.bg_cputime = m->bg_cputime;
533 #endif
534   procStat.pe_speed = m->pe_speed;
536   //procStat.utilization = 1.0;
537   procStat.available = true;
538   procStat.n_objs = m->n_objs;
540   int &nobj = statsData->n_objs;
541   int &nmigobj = statsData->n_migrateobjs;
542   for (i=0; i<m->n_objs; i++) {
543       statsData->from_proc[nobj] = statsData->to_proc[nobj] = pe;
544       statsData->objData[nobj] = m->objData[i];
545       if (m->objData[i].migratable) nmigobj++;
546       nobj++;
547       CmiAssert(nobj <= statsData->objData.capacity());
548   }
549   int &n_comm = statsData->n_comm;
550   for (i=0; i<m->n_comm; i++) {
551       statsData->commData[n_comm] = m->commData[i];
552       n_comm++;
553       CmiAssert(n_comm <= statsData->commData.capacity());
554   }
555   delete m;
558 void CentralLB::ReceiveStatsFromRoot(CkMarshalledCLBStatsMessage &msg) {
559 #if CMK_LBDB_ON
560   if (CkMyPe() == cur_ld_balancer) return;
561   else ReceiveStats(msg);
562 #endif
565 void CentralLB::ReceiveStats(CkMarshalledCLBStatsMessage &msg)
567 #if CMK_LBDB_ON
568   if (concurrent && (CkMyPe() == cur_ld_balancer)) {
569     thisProxy.ReceiveStatsFromRoot(msg);  // broadcast stats to all other PEs
570   }
572   if (statsMsgsList == NULL) {
573     statsMsgsList = new CLBStatsMsg*[CkNumPes()];
574     CmiAssert(statsMsgsList != NULL);
575     for(int i=0; i < CkNumPes(); i++)
576       statsMsgsList[i] = 0;
577   }
578   if (statsData == NULL) statsData = new LDStats;
580     //  loop through all CLBStatsMsg in the incoming msg
581   int count = msg.getCount();
582   for (int num = 0; num < count; num++) 
583   {
584     CLBStatsMsg *m = msg.getMessage(num);
585     CmiAssert(m!=NULL);
586     const int pe = m->from_pe;
587     DEBUGF(("Stats msg received, %d %d %d %p step %d\n", pe,stats_msg_count,m->n_objs,m,step()));
588 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))     
589 /*      
590  *  if(m->step < step()){
591  *    //TODO: if a processor is redoing an old load balance step..
592  *    //tell it that the step is done and that it should not perform any migrations
593  *      thisProxy[pe].ReceiveDummyMigration();
594  *  }*/
595 #endif
596         
597     if(!CmiNodeAlive(pe)){
598         DEBUGF(("[%d] ReceiveStats called from invalidProcessor %d\n",CkMyPe(),pe));
599         continue;
600     }
601         
602     if (m->avail_vector!=NULL) {
603       LBDatabaseObj()->set_avail_vector(m->avail_vector,  m->next_lb);
604     }
606     if (statsMsgsList[pe] != 0) {
607       CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
608              pe);
609     } else {
610       statsMsgsList[pe] = m;
611 #if USE_REDUCTION
612       depositData(m);
613 #else
614       // store per processor data right away
615       struct ProcStats &procStat = statsData->procs[pe];
616       procStat.pe = pe;
617       procStat.total_walltime = m->total_walltime;
618       procStat.idletime = m->idletime;
619       procStat.bg_walltime = m->bg_walltime;
620 #if CMK_LB_CPUTIMER
621       procStat.total_cputime = m->total_cputime;
622       procStat.bg_cputime = m->bg_cputime;
623 #endif
624       procStat.pe_speed = m->pe_speed;
625       //procStat.utilization = 1.0;
626       procStat.available = true;
627       procStat.n_objs = m->n_objs;
629       statsData->n_objs += m->n_objs;
630       statsData->n_comm += m->n_comm;
631 #if defined(TEMP_LDB)
632                         procStat.pe_temp=m->pe_temp;
633                         procStat.pe_speed=m->pe_speed;
634 #endif
635 #endif
637       stats_msg_count++;
638     }
639   }    // end of for
641   const int clients = CkNumValidPes();
642   DEBUGF(("THIS POINT count = %d, clients = %d\n",stats_msg_count,clients));
644   if (stats_msg_count == clients) {
645         DEBUGF(("[%d] All stats messages received \n",CmiMyPe()));
646     statsData->nprocs() = stats_msg_count;
647     if (use_thread)
648         thisProxy[CkMyPe()].t_LoadBalance();
649     else
650         thisProxy[CkMyPe()].LoadBalance();
651   }
652 #endif
655 /** added by Abhinav for receiving msgs via spanning tree */
656 void CentralLB::ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &msg)
658 #if CMK_LBDB_ON
659         CmiAssert(CkMyPe() != 0);
660         bufMsg.add(msg);         // buffer messages
661         count_msgs++;
662         //CkPrintf("here %d\n", CkMyPe());
663         if (count_msgs == st.numChildren+1) {
664                 if(st.parent == 0)
665                 {
666                         thisProxy[0].ReceiveStats(bufMsg);
667                         //CkPrintf("from %d\n", CkMyPe());
668                 }
669                 else
670                         thisProxy[st.parent].ReceiveStatsViaTree(bufMsg);
671                 count_msgs = 0;
672                 bufMsg.free();
673         } 
674 #endif
677 #if CMK_REPLAYSYSTEM
678 static LDHandle *loadBalancer_pointers;
679 #endif
681 void CentralLB::LoadBalance()
683 #if CMK_LBDB_ON
684   int proc;
685   const int clients = CkNumPes();
687 #if ! USE_REDUCTION
688   // build data
689   buildStats();
690 #else
691   for (proc = 0; proc < clients; proc++) statsMsgsList[proc] = NULL;
692 #endif
694   theLbdb->ResetAdaptive();
695   if (!_lb_args.samePeSpeed()) statsData->normalize_speed();
697   if (_lb_args.debug() && (CkMyPe() == cur_ld_balancer))
698       CmiPrintf("\nCharmLB> %s: PE [%d] step %d starting at %f Memory: %f MB\n",
699                   lbname, cur_ld_balancer, step(), start_lb_time,
700                   CmiMemoryUsage()/(1024.0*1024.0));
702   // if we are in simulation mode read data
703   if (LBSimulation::doSimulation) simulationRead();
705   char *availVector = LBDatabaseObj()->availVector();
706   for(proc = 0; proc < clients; proc++)
707       statsData->procs[proc].available = (bool)availVector[proc];
710   removeCommDataOfDeletedObjs(statsData);
711   preprocess(statsData);
713 //    CkPrintf("Before Calling Strategy\n");
715   if (_lb_args.printSummary()) {
716       LBInfo info(clients);
717         // not take comm data
718       info.getInfo(statsData, clients, 0);
719       LBRealType mLoad, mCpuLoad, totalLoad;
720       info.getSummary(mLoad, mCpuLoad, totalLoad);
721       int nmsgs, nbytes;
722       statsData->computeNonlocalComm(nmsgs, nbytes);
723       CkPrintf("[%d] Load Summary (before LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024);
724 //      if (_lb_args.debug() > 1) {
725 //        for (int i=0; i<statsData->n_objs; i++)
726 //          CmiPrintf("[%d] %.10f %.10f\n", i, statsData->objData[i].minWall, statsData->objData[i].maxWall);
727 //      }
728   }
730 #if CMK_REPLAYSYSTEM
731   if (_replaySystem && !concurrent) {
732     loadBalancer_pointers = (LDHandle*)malloc(CkNumPes()*sizeof(LDHandle));
733     for (int i=0; i<statsData->n_objs; ++i) loadBalancer_pointers[statsData->from_proc[i]] = statsData->objData[i].handle.omhandle.ldb;
734   }
735 #endif
736   
737   storedMigrateMsg = Strategy(statsData);
739   if (!concurrent) ApplyDecision(); // immediately apply the migration decision
740 #endif
743 void CentralLB::ApplyDecision() {
744 #if CMK_LBDB_ON
745   const int clients = CkNumPes();
747   LBMigrateMsg *migrateMsg;
748   if (concurrent) {
749     migrateMsg = createMigrateMsg(statsData);
750     if (_lb_args.debug()) printStrategyStats(migrateMsg);
751   } else {
752     migrateMsg = storedMigrateMsg;
753     storedMigrateMsg = NULL;
754   }
756 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
757         migrateMsg->step = step();
758 #endif
760 #if CMK_REPLAYSYSTEM
761   CpdHandleLBMessage(&migrateMsg);
762   if (_replaySystem && !concurrent) {
763     for (int i=0; i<migrateMsg->n_moves; ++i) migrateMsg->moves[i].obj.omhandle.ldb = loadBalancer_pointers[migrateMsg->moves[i].from_pe];
764     free(loadBalancer_pointers);
765   }
766 #endif
767   
768   LBDatabaseObj()->get_avail_vector(migrateMsg->avail_vector);
769   migrateMsg->next_lb = LBDatabaseObj()->new_lbbalancer();
771   // if this is the step at which we need to dump the database
772   simulationWrite();
774 //  calculate predicted load
775 //  very time consuming though, so only happen when debugging is on
776   if (_lb_args.printSummary()) {
777       LBInfo info(clients);
778         // not take comm data
779       getPredictedLoadWithMsg(statsData, clients, migrateMsg, info, 0);
780       LBRealType mLoad, mCpuLoad, totalLoad;
781       info.getSummary(mLoad, mCpuLoad, totalLoad);
782       int nmsgs, nbytes;
783       statsData->computeNonlocalComm(nmsgs, nbytes);
784       CkPrintf("[%d] Load Summary (after LB): max (with bg load): %f max (obj only): %f average: %f at step %d nonlocal: %d msgs %.2fKB useMem: %.2fKB.\n", CkMyPe(), mLoad, mCpuLoad, totalLoad/clients, step(), nmsgs, 1.0*nbytes/1024, (1.0*useMem())/1024);
785       for (int i=0; i<clients; i++)
786         migrateMsg->expectedLoad[i] = info.peLoads[i];
787   }
789   DEBUGF(("[%d]calling recv migration\n",CkMyPe()));
790 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
791     lbDecisionCount++;
792     migrateMsg->lbDecisionCount = lbDecisionCount;
793 #endif
795   envelope *env = UsrToEnv(migrateMsg);
796 #if CMK_SCATTER_LB_RESULTS
797   InitiateScatter(migrateMsg);
798 #else
799   if (1) {
800       // broadcast
801     thisProxy.ReceiveMigration(migrateMsg);
802   }
803   else {
804     // split the migration for each processor
805     for (int p=0; p<CkNumPes(); p++) {
806       LBMigrateMsg *m = extractMigrateMsg(migrateMsg, p);
807       thisProxy[p].ReceiveMigration(m);
808     }
809     delete migrateMsg;
810   }
811 #endif
812   // Zero out data structures for next cycle
813   // CkPrintf("zeroing out data\n");
814   statsData->clear();
815   stats_msg_count=0;
816 #endif
819 void CentralLB::t_LoadBalance()
821     LoadBalance();
824 void CentralLB::InitiateScatter(LBMigrateMsg *msg) {
826   if (CkNumPes() <= broadcastThreshold) {
827     thisProxy.ReceiveMigration(msg);
828     return;
829   }
831   int middlePe = CkNumPes() / 2;
833   // allocate maximum possible size to avoid later copies
834   // the messages will be resized before sending
835   LBScatterMsg *leftMsg = new (middlePe, msg->n_moves)
836     LBScatterMsg(0, middlePe - 1);
837   LBScatterMsg *rightMsg = new (CkNumPes() - middlePe, msg->n_moves)
838     LBScatterMsg(middlePe, CkNumPes() - 1);
840   int *migrateTally = new int[CkNumPes()];
841   memset(migrateTally, 0, CkNumPes() * sizeof(int));
843   for (int i = 0; i < msg->n_moves; i++) {
844     MigrateInfo* item = (MigrateInfo*) &msg->moves[i];
845     migrateTally[item->to_pe]++;
846     if (item->from_pe < middlePe) {
847       leftMsg->moves[leftMsg->numMigrates++] = *item;
848     }
849     else {
850       rightMsg->moves[rightMsg->numMigrates++] = *item;
851     }
852   }
854   memcpy(leftMsg->numMigratesPerPe, migrateTally, middlePe * sizeof(int));
855   memcpy(rightMsg->numMigratesPerPe, &migrateTally[middlePe], (CkNumPes() - middlePe) * sizeof(int));
857   delete [] migrateTally;
859   // shrink the size of the messages
860   envelope *env = UsrToEnv(rightMsg);
861   env->shrinkUsersize((msg->n_moves - rightMsg->numMigrates) * sizeof(MigrateDecision));
863   // left message is not getting sent yet, but better resize it now
864   // before we lose track of its original size
865   env = UsrToEnv(leftMsg);
866   env->shrinkUsersize((msg->n_moves - leftMsg->numMigrates) * sizeof(MigrateDecision));
868   // send out results for right half of PEs first
869   // to overlap communication with computation
870   thisProxy[middlePe].ScatterMigrationResults(rightMsg);
872   delete msg;
873   ScatterMigrationResults(leftMsg);
876 void CentralLB::ScatterMigrationResults(LBScatterMsg *msg) {
878   int finished = false;
879   do {
880     CkAssert(msg->firstPeInSpan == CkMyPe());
881     int numPesInSpan = msg->lastPeInSpan - msg->firstPeInSpan + 1 ;
883     if (numPesInSpan <= broadcastThreshold) {
884       for (int i = msg->firstPeInSpan; i < msg->lastPeInSpan; i++) {
885         // TODO: multicast without allocating new message each time
886         LBScatterMsg *msgCopy = new (numPesInSpan, msg->numMigrates)
887           LBScatterMsg(msg->firstPeInSpan, msg->lastPeInSpan);
888         msgCopy->numMigrates = msg->numMigrates;
889         memcpy(msgCopy->numMigratesPerPe, msg->numMigratesPerPe,
890                numPesInSpan * sizeof(int));
891         memcpy(msgCopy->moves, msg->moves,
892                msg->numMigrates * sizeof(MigrateDecision));
893         thisProxy[i].ReceiveMigration(msgCopy);
894       }
895       // use original message for last send
896       thisProxy[msg->lastPeInSpan].ReceiveMigration(msg);
897       finished = true;
898     }
899     else {
900       int middlePe = (msg->firstPeInSpan + msg->lastPeInSpan + 1) / 2;
901       // reuse received message, taking care not to overwrite needed data
902       LBScatterMsg *leftMsg = msg;
903       int numMigrates = leftMsg->numMigrates;
904       int numPesInRightSpan = leftMsg->lastPeInSpan - middlePe + 1;
905       LBScatterMsg *rightMsg =
906         new (numPesInRightSpan, leftMsg->numMigrates)
907         LBScatterMsg(middlePe, leftMsg->lastPeInSpan);
908       leftMsg->numMigrates = 0;
909       leftMsg->lastPeInSpan = middlePe - 1;
910       for (int i = 0; i < numMigrates; i++) {
911         if (leftMsg->moves[i].fromPe < middlePe) {
912           leftMsg->moves[leftMsg->numMigrates++] = leftMsg->moves[i];
913         }
914         else {
915           rightMsg->moves[rightMsg->numMigrates++] = leftMsg->moves[i];
916         }
917       }
919       memcpy(rightMsg->numMigratesPerPe,
920              &leftMsg->numMigratesPerPe[middlePe - leftMsg->firstPeInSpan],
921              (numPesInRightSpan) * sizeof(int));
923       // shrink the size of the messages
924       envelope *env = UsrToEnv(rightMsg);
925       env->shrinkUsersize((numMigrates - rightMsg->numMigrates)
926                           * sizeof(MigrateDecision));
928       // left message is not getting sent yet, but better resize it now
929       // before we lose track of its original size
930       env = UsrToEnv(leftMsg);
931       env->shrinkUsersize((numMigrates - leftMsg->numMigrates)
932                           * sizeof(MigrateDecision));
934       thisProxy[middlePe].ScatterMigrationResults(rightMsg);
935     }
937   } while (!finished);
941 // test if sender and receiver in a commData is nonmigratable.
942 static bool isMigratable(LDObjData **objData, int *len, int count, const LDCommData &commData)
944 #if CMK_LBDB_ON
945   for (int pe=0 ; pe<count; pe++)
946   {
947     for (int i=0; i<len[pe]; i++)
948       if (LDObjIDEqual(objData[pe][i].objID(), commData.sender.objID()) ||
949           LDObjIDEqual(objData[pe][i].objID(), commData.receiver.get_destObj().objID())) 
950       return false;
951   }
952 #endif
953   return true;
956 // rebuild LDStats and remove all non-migratble objects and related things
957 void CentralLB::removeNonMigratable(LDStats* stats, int count)
959   int i;
961   // check if we have non-migratable objects
962   int have = 0;
963   for (i=0; i<stats->n_objs; i++) 
964   {
965     LDObjData &odata = stats->objData[i];
966     if (!odata.migratable) {
967       have = 1; break;
968     }
969   }
970   if (have == 0) return;
972   CkVec<LDObjData> nonmig;
973   CkVec<int> new_from_proc, new_to_proc;
974   nonmig.resize(stats->n_migrateobjs);
975   new_from_proc.resize(stats->n_migrateobjs);
976   new_to_proc.resize(stats->n_migrateobjs);
977   int n_objs = 0;
978   for (i=0; i<stats->n_objs; i++) 
979   {
980     LDObjData &odata = stats->objData[i];
981     if (odata.migratable) {
982       nonmig[n_objs] = odata;
983       new_from_proc[n_objs] = stats->from_proc[i];
984       new_to_proc[n_objs] = stats->to_proc[i];
985       n_objs ++;
986     }
987     else {
988       stats->procs[stats->from_proc[i]].bg_walltime += odata.wallTime;
989 #if CMK_LB_CPUTIMER
990       stats->procs[stats->from_proc[i]].bg_cputime += odata.cpuTime;
991 #endif
992     }
993   }
994   CmiAssert(stats->n_migrateobjs == n_objs);
996   stats->makeCommHash();
997   
998   CkVec<LDCommData> newCommData;
999   newCommData.resize(stats->n_comm);
1000   int n_comm = 0;
1001   for (i=0; i<stats->n_comm; i++) 
1002   {
1003     LDCommData& cdata = stats->commData[i];
1004     if (!cdata.from_proc()) 
1005     {
1006       int idx = stats->getSendHash(cdata);
1007       CmiAssert(idx != -1);
1008       if (!stats->objData[idx].migratable) continue;
1009     }
1010     switch (cdata.receiver.get_type()) {
1011     case LD_PROC_MSG:
1012       break;
1013     case LD_OBJ_MSG:  {
1014       int idx = stats->getRecvHash(cdata);
1015       if (stats->complete_flag)
1016         CmiAssert(idx != -1);
1017       else if (idx == -1) continue;          // receiver not in this group
1018       if (!stats->objData[idx].migratable) continue;
1019       break;
1020       }
1021     case LD_OBJLIST_MSG:    // object message FIXME add multicast
1022       break;
1023     }
1024     newCommData[n_comm] = cdata;
1025     n_comm ++;
1026   }
1028   if (n_objs != stats->n_objs) CmiPrintf("Removed %d nonmigratable %d comms - n_objs:%d migratable:%d\n", stats->n_objs-n_objs, stats->n_objs, stats->n_migrateobjs, stats->n_comm-n_comm);
1030   // swap to new data
1031   stats->objData = nonmig;
1032   stats->from_proc = new_from_proc;
1033   stats->to_proc = new_to_proc;
1034   stats->n_objs = n_objs;
1036   stats->commData = newCommData;
1037   stats->n_comm = n_comm;
1039   stats->deleteCommHash();
1040   stats->makeCommHash();
1045 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1046 extern int restarted;
1047 #endif
1049 void CentralLB::ReceiveMigration(LBScatterMsg *m) {
1050   if (concurrent) {
1051     if (CkMyPe() == 0) theLbdb->SetStrategyCost(CkWallTimer() - strat_start_time);
1052     // Zero out data structures for next cycle
1053     statsData->clear();
1054     stats_msg_count=0;
1055   }
1056   storedMigrateMsg = NULL;
1057   storedScatterMsg = m;
1058 #if CMK_MEM_CHECKPOINT
1059   CkResetInLdb();
1060 #endif
1061 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1062         restoreParallelRecovery(&resumeAfterRestoreParallelRecovery,(void *)this);
1063 #else
1064   contribute(CkCallback(CkReductionTarget(CentralLB, ProcessMigrationDecision),
1065               thisProxy));
1066 #endif
1070 void CentralLB::ReceiveMigration(LBMigrateMsg *m)
1072   if (concurrent) {
1073     if (CkMyPe() == 0) theLbdb->SetStrategyCost(CkWallTimer() - strat_start_time);
1074     // Zero out data structures for next cycle
1075     statsData->clear();
1076     stats_msg_count=0;
1077   }
1078   storedMigrateMsg = m;
1079 #if CMK_MEM_CHECKPOINT
1080   CkResetInLdb();
1081 #endif
1082 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1083         restoreParallelRecovery(&resumeAfterRestoreParallelRecovery,(void *)this);
1084 #else
1085   contribute(CkCallback(CkReductionTarget(CentralLB, ProcessReceiveMigration),
1086               thisProxy));
1087 #endif
1090 void CentralLB::ProcessMigrationDecision() {
1091 #if CMK_LBDB_ON
1092   LBScatterMsg *m = storedScatterMsg;
1093   CkAssert(m != NULL);
1095   migrates_expected = m->numMigratesPerPe[CkMyPe() - m->firstPeInSpan];
1096   future_migrates_expected = 0;
1098   for(int i = 0; i < m->numMigrates; i++) {
1099     MigrateDecision& move = m->moves[i];
1100     const int me = CkMyPe();
1101     if (move.fromPe == me) {
1102       DEBUGF(("[%d] migrating object to %d\n", move.fromPe, move.toPe));
1103       // migrate object, in case it is already gone, inform toPe
1104       LDObjHandle objInfo = theLbdb->GetObjHandle(move.dbIndex);
1106       if (theLbdb->Migrate(objInfo,move.toPe) == 0) {
1107         CkAbort("Error: Async arrival not supported in scattering mode\n");
1108       }
1109     }
1110   }
1112   if (migrates_expected == 0 || migrates_completed == migrates_expected) {
1113     MigrationDone(1);
1114   }
1116   delete m;
1117 #endif
1120 void CentralLB::ProcessReceiveMigration()
1122 #if CMK_LBDB_ON
1123         int i;
1124         LBMigrateMsg *m = storedMigrateMsg;
1125         CmiAssert(m!=NULL);
1127 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1128         int *dummyCounts;
1130         DEBUGF(("[%d] Starting ReceiveMigration WITH step %d m->step %d\n",CkMyPe(),step(),m->step));
1131         // CmiPrintf("[%d] Starting ReceiveMigration step %d m->step %d\n",CkMyPe(),step(),m->step);
1132         if(step() > m->step){
1133                 char str[100];
1134                 envelope *env = UsrToEnv(m);
1135                 return;
1136         }
1137         lbDecisionCount = m->lbDecisionCount;
1138 #endif
1140   if (_lb_args.debug() > 1) 
1141     if (CkMyPe()%1024==0) CmiPrintf("[%d] Starting ReceiveMigration step %d at %f\n",CkMyPe(),step(), CmiWallTimer());
1143   for (i=0; i<CkNumPes(); i++) theLbdb->lastLBInfo.expectedLoad[i] = m->expectedLoad[i];
1144   CmiAssert(migrates_expected <= 0 || migrates_completed == migrates_expected);
1145 /*FAULT_EVAC*/
1146   if(!CmiNodeAlive(CkMyPe())){
1147         delete m;
1148         return;
1149   }
1150   migrates_expected = 0;
1151   future_migrates_expected = 0;
1152 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1153         int sending=0;
1154     int dummy=0;
1155         LBDB *_myLBDB = theLbdb->getLBDB();
1156         if(_restartFlag){
1157         dummyCounts = new int[CmiNumPes()];
1158         memset(dummyCounts,0,sizeof(int)*CmiNumPes());
1159     }
1160 #endif
1161   for(i=0; i < m->n_moves; i++) {
1162     MigrateInfo& move = m->moves[i];
1163     const int me = CkMyPe();
1164     if (move.from_pe == me && move.to_pe != me) {
1165       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
1166       // migrate object, in case it is already gone, inform toPe
1167 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1168       if (theLbdb->Migrate(move.obj,move.to_pe) == 0) 
1169          thisProxy[move.to_pe].MissMigrate(!move.async_arrival);
1170 #else
1171             if(_restartFlag){
1172                 DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1173                 theLbdb->Migrate(move.obj,move.to_pe);
1174                 sending++;
1175             }else{
1176                 if(_myLBDB->validObjHandle(move.obj)){
1177                     DEBUG(CmiPrintf("[%d] need to move object from %d to %d \n",CkMyPe(),move.from_pe,move.to_pe));
1178                     theLbdb->Migrate(move.obj,move.to_pe);
1179                     sending++;
1180                 }else{
1181                     DEBUG(CmiPrintf("[%d] dummy move to pe %d detected after restart \n",CmiMyPe(),move.to_pe));
1182                     dummyCounts[move.to_pe]++;
1183                     dummy++;
1184                 }
1185             }
1186 #endif
1187     } else if (move.from_pe != me && move.to_pe == me) {
1188        DEBUGF(("[%d] expecting object from %d\n",move.to_pe,move.from_pe));
1189       if (!move.async_arrival) migrates_expected++;
1190       else future_migrates_expected++;
1191     }
1192     else {
1193 #if CMK_GLOBAL_LOCATION_UPDATE      
1194       UpdateLocation(move); 
1195 #endif
1196     }
1198   }
1199   DEBUGF(("[%d] in ReceiveMigration %d moves expected: %d future expected: %d\n",CkMyPe(),m->n_moves, migrates_expected, future_migrates_expected));
1200   // if (_lb_debug) CkPrintf("[%d] expecting %d objects migrating.\n", CkMyPe(), migrates_expected);
1202 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1203         if(_restartFlag){
1204                 sendDummyMigrationCounts(dummyCounts);
1205                 _restartFlag  =false;
1206         delete []dummyCounts;
1207         }
1208 #endif
1211 #if 0
1212   if (m->n_moves ==0) {
1213     theLbdb->SetLBPeriod(theLbdb->GetLBPeriod()*2);
1214   }
1215 #endif
1216   cur_ld_balancer = m->next_lb;
1217   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
1218       LBDatabaseObj()->set_avail_vector(m->avail_vector, -2);
1219   }
1221   if (migrates_expected == 0 || migrates_completed == migrates_expected)
1222     MigrationDone(1);
1223   delete m;
1225 //      CkEvacuatedElement();
1226 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1227 //  migrates_expected = 0;
1228 //  //  ResumeClients(1);
1229 #endif
1230 #endif
1233 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1234 void CentralLB::ReceiveDummyMigration(int globalDecisionCount){
1235     DEBUGF(("[%d] ReceiveDummyMigration called for step %d with globalDecisionCount %d\n",CkMyPe(),step(),globalDecisionCount));
1236     //TODO: this is gonna be important when a crash happens during checkpoint
1237     //the globalDecisionCount would have to be saved and compared against
1238     //a future recvMigration
1239                 
1240         thisProxy[CkMyPe()].ResumeClients(1);
1242 #endif
1244 // We assume that bit vector would have been aptly set async by either scheduler or charmrun.
1245 void CentralLB::CheckForRealloc(){
1246 #if CMK_SHRINK_EXPAND
1247    if(pending_realloc_state == REALLOC_MSG_RECEIVED) {
1248         pending_realloc_state = REALLOC_IN_PROGRESS; //in progress
1249         CkPrintf("Load balancer invoking charmrun to handle reallocation on pe %d\n", CkMyPe());
1250         double end_lb_time = CkWallTimer();
1251         CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1252             lbname, cur_ld_balancer, step()-1, end_lb_time,     end_lb_time-start_lb_time);
1253         // do checkpoint
1254         CkCallback cb(CkIndex_CentralLB::ResumeFromReallocCheckpoint(), thisProxy[0]);
1255         CkStartCheckpoint(_shrinkexpand_basedir, cb);
1256     }
1257     else{
1258         thisProxy.MigrationDoneImpl(1);
1259     }
1260 #endif
1263 void CentralLB::ResumeFromReallocCheckpoint(){
1264 #if CMK_SHRINK_EXPAND
1265     std::vector<char> avail(se_avail_vector, se_avail_vector + CkNumPes());
1266     free(se_avail_vector);
1267     thisProxy.WillIbekilled(avail, numProcessAfterRestart);
1268 #endif
1273 #if CMK_SHRINK_EXPAND
1274 int GetNewPeNumber(std::vector<char> avail){
1275   int mype = CkMyPe();
1276   int count =0;
1277   for (int i =0; i <mype; i++){
1278     if(avail[i] ==0) count++;
1279   }
1280   return (mype - count);
1282 #endif
1284 void CentralLB::WillIbekilled(std::vector<char> avail, int newnumProcessAfterRestart){
1285 #if CMK_SHRINK_EXPAND
1286  numProcessAfterRestart = newnumProcessAfterRestart;
1287  mynewpe =  GetNewPeNumber(avail);
1288  willContinue = avail[CkMyPe()];
1289  CkCallback cb(CkIndex_CentralLB::StartCleanup(), thisProxy[0]);
1290  contribute(cb);
1291 #endif
1294 void CentralLB::StartCleanup(){
1295 #if CMK_SHRINK_EXPAND
1296                 CkCleanup();
1297 #endif
1299 void CentralLB::MigrationDone(int balancing)
1301 #if CMK_SHRINK_EXPAND
1302    // barrier to check for reallocation
1303     CkCallback cb(CkIndex_CentralLB::CheckForRealloc(), thisProxy[0]);
1304     contribute(cb);
1305         return;
1306 #else
1307     MigrationDoneImpl(balancing);
1308 #endif
1310 void CentralLB::MigrationDoneImpl (int balancing)
1313 #if CMK_LBDB_ON
1314   migrates_completed = 0;
1315   migrates_expected = -1;
1316   // clear load stats
1317   if (balancing) theLbdb->ClearLoads();
1318   // Increment to next step
1319   theLbdb->incStep();
1320         DEBUGF(("[%d] Incrementing Step %d \n",CkMyPe(),step()));
1321   // if sync resume, invoke a barrier
1323 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1324     savedBalancing = balancing;
1325     startLoadBalancingMlog(&resumeCentralLbAfterChkpt,(void *)this);
1326 #endif
1328   LBDatabase::Object()->MigrationDone();    // call registered callbacks
1330   LoadbalanceDone(balancing);        // callback
1331 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1332   // if sync resume invoke a barrier
1333   if (balancing && _lb_args.syncResume()) {
1334     contribute(CkCallback(CkReductionTarget(CentralLB, ResumeClients),
1335                 thisProxy));
1336   }
1337   else{ 
1338     if(CmiNodeAlive(CkMyPe())){
1339         thisProxy [CkMyPe()].ResumeClients(balancing);
1340     }   
1341   }     
1342 #if CMK_GRID_QUEUE_AVAILABLE
1343   CmiGridQueueDeregisterAll ();
1344   CpvAccess(CkGridObject) = NULL;
1345 #endif  // if CMK_GRID_QUEUE_AVAILABLE
1346 #endif  // if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
1347 #endif  // if CMK_LBDB_ON
1350 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1351 void CentralLB::endMigrationDone(int balancing){
1352     DEBUGF(("[%d] CentralLB::endMigrationDone step %d\n",CkMyPe(),step()));
1355   if (balancing && _lb_args.syncResume()) {
1356     contribute(CkCallback(CkReductionTarget(CentralLB, ResumeClients),
1357                 thisProxy));
1358   }
1359   else{
1360     if(CmiNodeAlive(CkMyPe())){
1361     DEBUGF(("[%d] Sending ResumeClients balancing %d \n",CkMyPe(),balancing));
1362     thisProxy [CkMyPe()].ResumeClients(balancing);
1363     }
1364   }
1367 #endif
1369 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1370 void resumeCentralLbAfterChkpt(void *_lb){
1371     CentralLB *lb= (CentralLB *)_lb;
1372     CpvAccess(_currentObj)=lb;
1373     lb->endMigrationDone(lb->savedBalancing);
1375 void resumeAfterRestoreParallelRecovery(void *_lb){
1376     CentralLB *lb= (CentralLB *)_lb;
1377         lb->ProcessReceiveMigration();
1379 #endif
1382 void CentralLB::ResumeClients()
1384   ResumeClients(1);
1387 void CentralLB::ResumeClients(int balancing)
1389 #if CMK_LBDB_ON
1390 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1391     resumeCount++;
1392     globalResumeCount = resumeCount;
1393 #endif
1394   DEBUGF(("[%d] Resuming clients. balancing:%d.\n",CkMyPe(),balancing));
1396   theLbdb->ResumeClients();
1397   if (balancing)  {
1399     CheckMigrationComplete();
1400     if (future_migrates_expected == 0 || 
1401             future_migrates_expected == future_migrates_completed) {
1402       CheckMigrationComplete();
1403     }
1404   }
1405 #endif
1409   migration of objects contains two different kinds:
1410   (1) objects want to make a barrier for migration completion
1411       (waitForBarrier is true)
1412       migrationDone() to finish and resumeClients
1413   (2) objects don't need a barrier
1414   However, next load balancing can only happen when both migrations complete
1415 */ 
1416 void CentralLB::CheckMigrationComplete()
1418 #if CMK_LBDB_ON
1419   lbdone ++;
1420   if (lbdone == 2) {
1421     double end_lb_time = CkWallTimer();
1422     if (_lb_args.debug() && CkMyPe()==0) {
1423       CkPrintf("CharmLB> %s: PE [%d] step %d finished at %f duration %f s\n\n",
1424                 lbname, CkMyPe(), step()-1, end_lb_time,
1425                 end_lb_time-start_lb_time);
1426     }
1428     theLbdb->SetMigrationCost(end_lb_time - start_lb_time);
1430     lbdone = 0;
1431     future_migrates_expected = -1;
1432     future_migrates_completed = 0;
1435     DEBUGF(("[%d] Migration Complete\n", CkMyPe()));
1436     // release local barrier  so that the next load balancer can go
1437     LDOMHandle h;
1438     h.id.id.idx = 0;
1439     theLbdb->getLBDB()->DoneRegisteringObjects(h);
1440     // switch to the next load balancer in the list
1441     // subtle: called from Migrated() may result in Migrated() called in next LB
1442     theLbdb->nextLoadbalancer(seqno);
1443   }
1444 #endif
1447 // Remove edges from commData in LDStats which contains deleted elements
1448 void CentralLB::removeCommDataOfDeletedObjs(LDStats* stats) {
1449   stats->makeCommHash();
1451   CkVec<LDCommData> newCommData;
1452   newCommData.resize(stats->n_comm);
1453   int n_comm = 0;
1454   for (int i=0; i<stats->n_comm; i++) {
1455     LDCommData& cdata = stats->commData[i];
1456     switch (cdata.receiver.get_type()) {
1457       case LD_PROC_MSG:
1458         break;
1459       case LD_OBJ_MSG:  {
1460         if (!cdata.from_proc()) {
1461           int sidx = stats->getSendHash(cdata);
1462           int ridx = stats->getRecvHash(cdata);
1463           if (sidx == -1 || ridx == -1) continue;
1464         }
1465         break;
1466       }
1467       case LD_OBJLIST_MSG:  {
1468         int sidx = stats->getSendHash(cdata);
1469         if (sidx == -1) continue;
1470         int nobjs;
1471         LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
1472         for (int id=0; id<nobjs; id++) {
1473           int idx = stats->getHash(objs[id]);
1474           if (idx == -1)
1475           {
1476             objs[id] = objs[nobjs-1];
1477             id--;
1478             nobjs--;
1479           }
1480         }
1481         if(nobjs == 0) continue;
1482         cdata.receiver.dest.destObjs.len = nobjs;
1483         break;
1484       }
1485     }
1487     stats->commData[n_comm] = cdata;
1488     n_comm++;
1489   }
1491   stats->commData.resize(n_comm);
1492   stats->n_comm = n_comm;
1495 void CentralLB::preprocess(LDStats* stats)
1497   if (_lb_args.ignoreBgLoad())
1498     stats->clearBgLoad();
1500   // Call the predictor for the future
1501   if (_lb_predict) FuturePredictor(statsData);
1504 void CentralLB::printStrategyStats(LBMigrateMsg *msg) {
1505 #if CMK_LBDB_ON
1506   envelope *env = UsrToEnv(msg);
1508   double strat_end_time = CkWallTimer();
1509   double lbdbMemsize = LBDatabase::Object()->useMem()/1000;
1510   CkPrintf("CharmLB> %s: PE [%d] Memory: LBManager: %d KB CentralLB: %d KB\n",
1511         lbname, CkMyPe(), (int)lbdbMemsize, (int)(useMem()/1000));
1512   CkPrintf("CharmLB> %s: PE [%d] #Objects migrating: %d, LBMigrateMsg size: %.2f MB\n", lbname, CkMyPe(), msg->n_moves, env->getTotalsize()/1024.0/1024.0);
1513   CkPrintf("CharmLB> %s: PE [%d] strategy finished at %f duration %f s\n",
1514       lbname, CkMyPe(), strat_end_time, strat_end_time-strat_start_time);
1515 #endif
1518 // default load balancing strategy
1519 LBMigrateMsg* CentralLB::Strategy(LDStats* stats)
1521 #if CMK_LBDB_ON
1522   strat_start_time = CkWallTimer();
1523   if (_lb_args.debug() && (CkMyPe() == cur_ld_balancer))
1524     CkPrintf("CharmLB> %s: PE [%d] strategy starting at %f\n", lbname, cur_ld_balancer, strat_start_time);
1526   work(stats);
1529   if ((_lb_args.debug()>2) && (CkMyPe() == cur_ld_balancer))  {
1530     CkPrintf("CharmLB> Obj Map:\n");
1531     for (int i=0; i<stats->n_objs; i++) CkPrintf("%d ", stats->to_proc[i]);
1532     CkPrintf("\n");
1533   }
1535   if (concurrent) return NULL;  // migrate msg will only be created on PE with best solution
1537   LBMigrateMsg *msg = createMigrateMsg(stats);
1539         /* Extra feature for MetaBalancer
1540   if (_lb_args.metaLbOn()) {
1541     int clients = CkNumPes();
1542     LBInfo info(clients);
1543     getPredictedLoadWithMsg(stats, clients, msg, info, 0);
1544     LBRealType mLoad, mCpuLoad, totalLoad, totalLoadWComm;
1545     info.getSummary(mLoad, mCpuLoad, totalLoad);
1546     theLbdb->UpdateDataAfterLB(mLoad, mCpuLoad, totalLoad/clients);
1547   }
1548         */
1550   double strat_end_time = CkWallTimer();
1551   theLbdb->SetStrategyCost(strat_end_time - strat_start_time);
1553   if (_lb_args.debug() && (CkMyPe() == cur_ld_balancer)) {
1554     printStrategyStats(msg);
1555   }
1556   return msg;
1557 #else
1558   return NULL;
1559 #endif
1562 void CentralLB::changeFreq(int r)
1564         CkAbort("ERROR: changeFreq in CentralLB should never be called!\n");
1567 void CentralLB::changeFreq(int nFreq)
1569 #ifdef TEMP_LDB
1570         //CkPrintf("PROC#%d in changeFreq numProcs=%d\n",CkMyPe(),nFreq);
1571 //  for(int i=0;i<numProcs;i++)
1572   {
1573 //        if(procFreq[i]!=procFreqNew[i])
1574         {
1575               char newfreq[10];
1576               sprintf(newfreq,"%d",nFreq);
1577               cpufreq_sysfs_write(newfreq,CkMyPe()%physicalCoresPerNode);//i%physicalCoresPerNode);
1578 //            CkPrintf("PROC#%d freq changing from %d to %d temp=%f\n",i,procFreq[i],procFreqNew[i],procTemp[i]);
1579         }
1580   }
1581 #else
1582         CmiAbort("You should never call CentralLB::changeFreq without using the flag TEMP_LDB\n");
1583 #endif
1587 void CentralLB::work(LDStats* stats)
1589   // does nothing but print the database
1590   stats->print();
1593 // generate migrate message from stats->from_proc and to_proc
1594 LBMigrateMsg * CentralLB::createMigrateMsg(LDStats* stats)
1596   int i;
1597   CkVec<MigrateInfo*> migrateInfo;
1598   for (i=0; i<stats->n_objs; i++) {
1599     LDObjData &objData = stats->objData[i];
1600     int frompe = stats->from_proc[i];
1601     int tope = stats->to_proc[i];
1602     if (frompe != tope) {
1603       //      CkPrintf("[%d] Obj %d migrating from %d to %d\n",
1604       //         CkMyPe(),obj,pe,dest);
1605       MigrateInfo *migrateMe = new MigrateInfo;
1606       migrateMe->obj = objData.handle;
1607       migrateMe->from_pe = frompe;
1608       migrateMe->to_pe = tope;
1609       migrateMe->async_arrival = objData.asyncArrival;
1610       migrateInfo.insertAtEnd(migrateMe);
1611     }
1612   }
1614   int migrate_count=migrateInfo.length();
1615   LBMigrateMsg* msg = new(migrate_count,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1616   msg->n_moves = migrate_count;
1617   for(i=0; i < migrate_count; i++) {
1618     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
1619     msg->moves[i] = *item;
1620     delete item;
1621     migrateInfo[i] = 0;
1622   }
1623   return msg;
1626 LBMigrateMsg * CentralLB::extractMigrateMsg(LBMigrateMsg *m, int p)
1628   int nmoves = 0;
1629   int nunavail = 0;
1630   int i;
1631   for (i=0; i<m->n_moves; i++) {
1632     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1633     if (item->from_pe == p || item->to_pe == p) nmoves++;
1634   }
1635   for (i=0; i<CkNumPes();i++) {
1636     if (!m->avail_vector[i]) nunavail++;
1637   }
1638   LBMigrateMsg* msg;
1639   if (nunavail) msg = new(nmoves,CkNumPes(),CkNumPes(),0) LBMigrateMsg;
1640   else msg = new(nmoves,0,0,0) LBMigrateMsg;
1641   msg->n_moves = nmoves;
1642   msg->level = m->level;
1643   msg->next_lb = m->next_lb;
1644   for (i=0,nmoves=0; i<m->n_moves; i++) {
1645     MigrateInfo* item = (MigrateInfo*) &m->moves[i];
1646     if (item->from_pe == p || item->to_pe == p) {
1647       msg->moves[nmoves] = *item;
1648       nmoves++;
1649     }
1650   }
1651   // copy processor data
1652   if (nunavail)
1653   for (i=0; i<CkNumPes();i++) {
1654     msg->avail_vector[i] = m->avail_vector[i];
1655     msg->expectedLoad[i] = m->expectedLoad[i];
1656   }
1657   return msg;
1660 void CentralLB::simulationWrite() {
1661   if(step() == LBSimulation::dumpStep)
1662   {
1663     // here we are supposed to dump the database
1664     int dumpFileSize = strlen(LBSimulation::dumpFile) + 4;
1665     char *dumpFileName = (char *)malloc(dumpFileSize);
1666     while (sprintf(dumpFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::dumpStep) >= dumpFileSize) {
1667       free(dumpFileName);
1668       dumpFileSize+=3;
1669       dumpFileName = (char *)malloc(dumpFileSize);
1670     }
1671     writeStatsMsgs(dumpFileName);
1672     free(dumpFileName);
1673     CmiPrintf("LBDump: Dumped the load balancing data at step %d.\n",LBSimulation::dumpStep);
1674     ++LBSimulation::dumpStep;
1675     --LBSimulation::dumpStepSize;
1676     if (LBSimulation::dumpStepSize <= 0) { // prevent stupid step sizes
1677       CmiPrintf("Charm++> Exiting...\n");
1678       CkExit();
1679     }
1680     return;
1681   }
1684 void CentralLB::simulationRead() {
1685   if (concurrent) CkAbort("Error: LB simulation not supported in concurrent mode");
1686   LBSimulation *simResults = NULL, *realResults;
1687   LBMigrateMsg *voidMessage = new (0,0,0,0) LBMigrateMsg();
1688   voidMessage->n_moves=0;
1689   for ( ;LBSimulation::simStepSize > 0; --LBSimulation::simStepSize, ++LBSimulation::simStep) {
1690     // here we are supposed to read the data from the dump database
1691     int simFileSize = strlen(LBSimulation::dumpFile) + 4;
1692     char *simFileName = (char *)malloc(simFileSize);
1693     while (sprintf(simFileName, "%s.%d", LBSimulation::dumpFile, LBSimulation::simStep) >= simFileSize) {
1694       free(simFileName);
1695       simFileSize+=3;
1696       simFileName = (char *)malloc(simFileSize);
1697     }
1698     readStatsMsgs(simFileName);
1700     // allocate simResults (only the first step)
1701     if (simResults == NULL) {
1702       simResults = new LBSimulation(LBSimulation::simProcs);
1703       realResults = new LBSimulation(LBSimulation::simProcs);
1704     }
1705     else {
1706       // should be the same number of procs of the original simulation!
1707       if (!LBSimulation::procsChanged) {
1708         // it means we have a previous step, so in simResults there is data.
1709         // we can now print the real effects of the load balancer during the simulation
1710         // or print the difference between the predicted data and the real one.
1711         realResults->reset();
1712         // reset to_proc of statsData to be equal to from_proc
1713         for (int k=0; k < statsData->n_objs; ++k) statsData->to_proc[k] = statsData->from_proc[k];
1714         findSimResults(statsData, LBSimulation::simProcs, voidMessage, realResults);
1715         simResults->PrintDifferences(realResults,statsData);
1716       }
1717       simResults->reset();
1718     }
1720     // now pass it to the strategy routine
1721     double startT = CkWallTimer();
1722     preprocess(statsData);
1723     CmiPrintf("%s> Strategy starts ... \n", lbname);
1724     LBMigrateMsg* migrateMsg = Strategy(statsData);
1725     CmiPrintf("%s> Strategy took %fs memory usage: CentralLB: %d KB.\n",
1726                lbname, CkWallTimer()-startT, (int)(useMem()/1000));
1728     // now calculate the results of the load balancing simulation
1729     findSimResults(statsData, LBSimulation::simProcs, migrateMsg, simResults);
1731     // now we have the simulation data, so print it and loop
1732     CmiPrintf("Charm++> LBSim: Simulation of load balancing step %d done.\n",LBSimulation::simStep);
1733     // **CWL** Officially recording my disdain here for using ints for bool
1734     if (LBSimulation::showDecisionsOnly) {
1735       simResults->PrintDecisions(migrateMsg, simFileName, 
1736                                  LBSimulation::simProcs);
1737     } else {
1738       simResults->PrintSimulationResults();
1739     }
1741     free(simFileName);
1742     delete migrateMsg;
1743     CmiPrintf("Charm++> LBSim: Passing to the next step\n");
1744   }
1745   // deallocate simResults
1746   delete simResults;
1747   CmiPrintf("Charm++> Exiting...\n");
1748   CkExit();
1751 void CentralLB::readStatsMsgs(const char* filename) 
1753 #if CMK_LBDB_ON
1754   int i;
1755   FILE *f = fopen(filename, "r");
1756   if (f==NULL) {
1757     CmiPrintf("Fatal Error> Cannot open LB Dump file %s!\n", filename);
1758     CmiAbort("");
1759   }
1761   // at this stage, we need to rebuild the statsMsgList and
1762   // statsDataList structures. For that first deallocate the
1763   // old structures
1764   if (statsMsgsList) {
1765     for(i = 0; i < stats_msg_count; i++)
1766       delete statsMsgsList[i];
1767     delete[] statsMsgsList;
1768     statsMsgsList=0;
1769   }
1771   PUP::fromDisk pd(f);
1772   PUP::machineInfo machInfo;
1774   pd((char *)&machInfo, sizeof(machInfo));      // read machine info
1775   PUP::xlater p(machInfo, pd);
1777   if (_lb_args.lbversion() > 1) {
1778     p|_lb_args.lbversion();             // write version number
1779     CkPrintf("LB> File version detected: %d\n", _lb_args.lbversion());
1780     CmiAssert(_lb_args.lbversion() <= LB_FORMAT_VERSION);
1781   }
1782   p|stats_msg_count;
1784   CmiPrintf("readStatsMsgs for %d pes starts ... \n", stats_msg_count);
1785   if (LBSimulation::simProcs == 0) LBSimulation::simProcs = stats_msg_count;
1786   if (LBSimulation::simProcs != stats_msg_count) LBSimulation::procsChanged = true;
1788   // LBSimulation::simProcs must be set
1789   statsData->pup(p);
1791   CmiPrintf("Simulation for %d pes \n", LBSimulation::simProcs);
1792   CmiPrintf("n_obj: %d n_migratble: %d \n", statsData->n_objs, statsData->n_migrateobjs);
1794   // file f is closed in the destructor of PUP::fromDisk
1795   CmiPrintf("ReadStatsMsg from %s completed\n", filename);
1796 #endif
1799 void CentralLB::writeStatsMsgs(const char* filename) 
1801 #if CMK_LBDB_ON
1802   FILE *f = fopen(filename, "w");
1803   if (f==NULL) {
1804     CmiPrintf("Fatal Error> writeStatsMsgs failed to open the output file %s!\n", filename);
1805     CmiAbort("");
1806   }
1808   const PUP::machineInfo &machInfo = PUP::machineInfo::current();
1809   PUP::toDisk p(f);
1810   p((char *)&machInfo, sizeof(machInfo));       // machine info
1812   p|_lb_args.lbversion();               // write version number
1813   p|stats_msg_count;
1814   statsData->pup(p);
1816   fclose(f);
1818   CmiPrintf("WriteStatsMsgs to %s succeed!\n", filename);
1819 #endif
1822 // calculate the predicted wallclock/cpu load for every processors
1823 // considering communication overhead if considerComm is true
1824 void getPredictedLoadWithMsg(BaseLB::LDStats* stats, int count, 
1825                       LBMigrateMsg *msg, LBInfo &info, 
1826                       int considerComm)
1828 #if CMK_LBDB_ON
1829         stats->makeCommHash();
1831         // update to_proc according to migration msgs
1832         for(int i = 0; i < msg->n_moves; i++) {
1833           MigrateInfo &mInfo = msg->moves[i];
1834           int idx = stats->getHash(mInfo.obj.objID(), mInfo.obj.omID());
1835           CmiAssert(idx != -1);
1836           stats->to_proc[idx] = mInfo.to_pe;
1837         }
1839         info.getInfo(stats, count, considerComm);
1840 #endif
1844 void CentralLB::findSimResults(LDStats* stats, int count, LBMigrateMsg* msg, LBSimulation* simResults)
1846     CkAssert(simResults != NULL && count == simResults->numPes);
1847     // estimate the new loads of the processors. As a first approximation, this is the
1848     // sum of the cpu times of the objects on that processor
1849     double startT = CkWallTimer();
1850     getPredictedLoadWithMsg(stats, count, msg, simResults->lbinfo, 1);
1851     CmiPrintf("getPredictedLoad finished in %fs\n", CkWallTimer()-startT);
1854 void CentralLB::pup(PUP::er &p) { 
1855   if (p.isUnpacking())  {
1856     initLB(CkLBOptions(seqno)); 
1857   }
1858   p|reduction_started;
1859   int has_statsMsg=0;
1860   if (p.isPacking()) has_statsMsg = (statsMsg!=NULL);
1861   p|has_statsMsg;
1862   if (has_statsMsg) {
1863     if (p.isUnpacking())
1864       statsMsg = new CLBStatsMsg;
1865     statsMsg->pup(p);
1866   }
1867 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1868   p | lbDecisionCount;
1869   p | resumeCount;
1870 #endif
1871   p | use_thread;
1874 int CentralLB::useMem() { 
1875   return sizeof(CentralLB) + statsData->useMem() + 
1876          CkNumPes() * sizeof(CLBStatsMsg *);
1881   CLBStatsMsg is not a real message now.
1882   CLBStatsMsg is used for all processors to fill in their local load and comm
1883   statistics and send to processor 0
1886 CLBStatsMsg::CLBStatsMsg(int osz, int csz) {
1887   n_objs = osz;
1888   n_comm = csz;
1889   objData = new LDObjData[osz];
1890   commData = new LDCommData[csz];
1891   avail_vector = NULL;
1894 CLBStatsMsg::~CLBStatsMsg() {
1895   delete [] objData;
1896   delete [] commData;
1897   delete [] avail_vector;
1900 void CLBStatsMsg::pup(PUP::er &p) {
1901   int i;
1902   p|from_pe;
1903   p|pe_speed;
1904   p|total_walltime;
1905   p|idletime;
1906 #if defined(TEMP_LDB)
1907         p|pe_temp;
1908 #endif
1910   p|bg_walltime;
1911 #if CMK_LB_CPUTIMER
1912   p|total_cputime;
1913   p|bg_cputime;
1914 #endif
1915 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1916   p | step;
1917 #endif
1918   p|n_objs;
1919   if (p.isUnpacking()) objData = new LDObjData[n_objs];
1920   for (i=0; i<n_objs; i++) p|objData[i];
1921   p|n_comm;
1922   if (p.isUnpacking()) commData = new LDCommData[n_comm];
1923   for (i=0; i<n_comm; i++) p|commData[i];
1925   int has_avail_vector;
1926   if (!p.isUnpacking()) has_avail_vector = (avail_vector != NULL);
1927   p|has_avail_vector;
1928   if (p.isUnpacking()) {
1929     if (has_avail_vector) avail_vector = new char[CkNumPes()];
1930     else avail_vector = NULL;
1931   }
1932   if (has_avail_vector) p(avail_vector, CkNumPes());
1934   p(next_lb);
1937 // CkMarshalledCLBStatsMessage is used in the marshalled parameter in
1938 // the entry function, it is just used to use to pup.
1939 // I don't use CLBStatsMsg directly as marshalled parameter because
1940 // I want the data pointer stored and not to be freed by the Charm++.
1941 void CkMarshalledCLBStatsMessage::free() { 
1942   int count = msgs.size();
1943   for  (int i=0; i<count; i++) {
1944     delete msgs[i];
1945     msgs[i] = NULL;
1946   }
1947   msgs.free();
1950 void CkMarshalledCLBStatsMessage::add(CkMarshalledCLBStatsMessage &m)
1952   int count = m.getCount();
1953   for (int i=0; i<count; i++) add(m.getMessage(i));
1956 void CkMarshalledCLBStatsMessage::pup(PUP::er &p)
1958   int count = msgs.size();
1959   p|count;
1960   for (int i=0; i<count; i++) {
1961     CLBStatsMsg *msg;
1962     if (p.isUnpacking()) msg = new CLBStatsMsg;
1963     else { 
1964       msg = msgs[i]; CmiAssert(msg!=NULL);
1965     }
1966     msg->pup(p);
1967     if (p.isUnpacking()) add(msg);
1968   }
1971 SpanningTree::SpanningTree()
1973         double sq = sqrt(CkNumPes()*4.0-3.0) - 1; // 1 + arity + arity*arity = CkNumPes()
1974         arity = (int)ceil(sq/2);
1975         calcParent(CkMyPe());
1976         calcNumChildren(CkMyPe());
1979 void SpanningTree::calcParent(int n)
1981         parent=-1;
1982         if(n != 0  && arity > 0)
1983                 parent = (n-1)/arity;
1986 void SpanningTree::calcNumChildren(int n)
1988         numChildren = 0;
1989         if (arity == 0) return;
1990         int fullNode=(CkNumPes()-1-arity)/arity;
1991         if(n <= fullNode)
1992                 numChildren = arity;
1993         if(n == fullNode+1)
1994                 numChildren = CkNumPes()-1-(fullNode+1)*arity;
1995         if(n > fullNode+1)
1996                 numChildren = 0;
1999 #include "CentralLB.def.h"
2001 /*@}*/