1 /**************************************************************************
2 ** Greg Koenig (koenig@uiuc.edu)
5 ** This is GridCommLB.C
7 ** GridCommLB is a load balancer for the Charm++ load balancing framework.
8 ** It is designed to work in a Grid computing environment consisting of
9 ** two or more clusters separated by wide-area communication links.
10 ** Communication between objects within a cluster is assumed to be light
11 ** weight (measured in microseconds) while communication between objects
12 ** on different clusters is assumed to be heavy weight (measured in
15 ** The load balancer examines all communications in a computation and
16 ** attempts to spread the objects that communicate to objects on remote
17 ** clusters evenly across the PEs in the local cluster. No objects are
18 ** ever migrated across cluster boundaries, they are simply distributed
19 ** as evenly as possible across the PEs in the cluster in which they were
20 ** originally placed. The idea is that by spreading objects that
21 ** communicate over the wide-area evenly, a relatively small number of
22 ** WAN objects will be mixed with a relatively large number of LAN
23 ** objects, allowing the message-driven characteristics of Charm++ the
24 ** greatest possibility of overlapping the high-cost WAN communication
25 ** with locally-driven work.
27 ** The load balancer secondarily balances on scaled processor load
28 ** (i.e., a processor that is 2x the speed of another processor in
29 ** the local cluster will get 2x the work) as well as the number of
32 ** This load balancer can severely disrupt the object-to-PE mapping by
33 ** causing large numbers of objects to migrate with each load balancing
34 ** invocation. This may be undesirable in some cases. (For example, if
35 ** the vmi-linux "eager protocol" is used, eager channels may be pinned
36 ** between two PEs, and migrating objects that communicate heavily with
37 ** each other onto other PEs could actually slow the computationif they
38 ** no longer communicate with each other over an eager channel.)
41 #include "GridCommLB.decl.h"
43 #include "GridCommLB.h"
46 CreateLBFunc_Def (GridCommLB, "Grid communication load balancer (evenly distribute objects across each cluster)")
50 /**************************************************************************
53 GridCommLB::GridCommLB (const CkLBOptions &opt) : CentralLB (opt)
58 lbname = (char *) "GridCommLB";
61 CkPrintf ("[%d] GridCommLB created.\n", CkMyPe());
64 if (value = getenv ("CK_LDB_GRIDCOMMLB_MODE")) {
65 CK_LDB_GridCommLB_Mode = atoi (value);
67 CK_LDB_GridCommLB_Mode = CK_LDB_GRIDCOMMLB_MODE;
70 if (value = getenv ("CK_LDB_GRIDCOMMLB_BACKGROUND_LOAD")) {
71 CK_LDB_GridCommLB_Background_Load = atoi (value);
73 CK_LDB_GridCommLB_Background_Load = CK_LDB_GRIDCOMMLB_BACKGROUND_LOAD;
76 if (value = getenv ("CK_LDB_GRIDCOMMLB_LOAD_TOLERANCE")) {
77 CK_LDB_GridCommLB_Load_Tolerance = atof (value);
79 CK_LDB_GridCommLB_Load_Tolerance = CK_LDB_GRIDCOMMLB_LOAD_TOLERANCE;
87 /**************************************************************************
90 GridCommLB::GridCommLB (CkMigrateMessage *msg) : CentralLB (msg)
95 lbname = (char *) "GridCommLB";
97 if (value = getenv ("CK_LDB_GRIDCOMMLB_MODE")) {
98 CK_LDB_GridCommLB_Mode = atoi (value);
100 CK_LDB_GridCommLB_Mode = CK_LDB_GRIDCOMMLB_MODE;
103 if (value = getenv ("CK_LDB_GRIDCOMMLB_BACKGROUND_LOAD")) {
104 CK_LDB_GridCommLB_Background_Load = atoi (value);
106 CK_LDB_GridCommLB_Background_Load = CK_LDB_GRIDCOMMLB_BACKGROUND_LOAD;
109 if (value = getenv ("CK_LDB_GRIDCOMMLB_LOAD_TOLERANCE")) {
110 CK_LDB_GridCommLB_Load_Tolerance = atof (value);
112 CK_LDB_GridCommLB_Load_Tolerance = CK_LDB_GRIDCOMMLB_LOAD_TOLERANCE;
120 /**************************************************************************
121 ** The Charm++ load balancing framework invokes this method to determine
122 ** whether load balancing can be performed at a specified time.
124 bool GridCommLB::QueryBalanceNow (int step)
126 if (_lb_args.debug() > 2) {
127 CkPrintf ("[%d] GridCommLB is balancing on step %d.\n", CkMyPe(), step);
135 /**************************************************************************
136 ** The vmi-linux machine layer incorporates the idea that PEs are located
137 ** within identifiable clusters. This information can be supplied by the
138 ** user or can be probed automatically by the machine layer. The exposed
139 ** API call CmiGetCluster() returns the integer cluster number for a
140 ** specified PE or -1 if the information is unknown.
142 ** For machine layers other than vmi-linux, simply return the constant 0.
143 ** GridCommLB will assume a single-cluster computation and will balance
144 ** on the scaled processor load and number of LAN messages.
146 int GridCommLB::Get_Cluster (int pe)
153 /**************************************************************************
154 ** Instantiate and initialize the PE_Data[] data structure.
156 ** While doing this...
157 ** - ensure that there is at least one available PE
158 ** - ensure that all PEs are mapped to a cluster
159 ** - determine the maximum cluster number (gives the number of clusters)
160 ** - determine the minimum speed PE (used to compute relative PE speeds)
162 void GridCommLB::Initialize_PE_Data (CentralLB::LDStats *stats)
168 PE_Data = new PE_Data_T[Num_PEs];
171 for (i = 0; i < Num_PEs; i++) {
172 (&PE_Data[i])->available = stats->procs[i].available;
173 (&PE_Data[i])->cluster = Get_Cluster (i);
174 (&PE_Data[i])->num_objs = 0;
175 (&PE_Data[i])->num_lan_objs = 0;
176 (&PE_Data[i])->num_lan_msgs = 0;
177 (&PE_Data[i])->num_wan_objs = 0;
178 (&PE_Data[i])->num_wan_msgs = 0;
179 (&PE_Data[i])->relative_speed = 0.0;
180 (&PE_Data[i])->scaled_load = 0.0;
182 if (stats->procs[i].pe_speed < min_speed) {
183 min_speed = stats->procs[i].pe_speed;
187 // Compute the relative PE speeds.
188 // Also add background CPU time to each PE's scaled load.
189 for (i = 0; i < Num_PEs; i++) {
190 (&PE_Data[i])->relative_speed = (double) (stats->procs[i].pe_speed / min_speed);
191 if (CK_LDB_GridCommLB_Background_Load) {
192 (&PE_Data[i])->scaled_load += stats->procs[i].bg_walltime;
199 /**************************************************************************
202 int GridCommLB::Available_PE_Count ()
204 int available_pe_count;
208 available_pe_count = 0;
209 for (i = 0; i < Num_PEs; i++) {
210 if ((&PE_Data[i])->available) {
211 available_pe_count += 1;
214 return (available_pe_count);
219 /**************************************************************************
222 int GridCommLB::Compute_Number_Of_Clusters ()
229 for (i = 0; i < Num_PEs; i++) {
230 if ((&PE_Data[i])->cluster < 0) {
234 if ((&PE_Data[i])->cluster > max_cluster) {
235 max_cluster = (&PE_Data[i])->cluster;
238 return (max_cluster + 1);
243 /**************************************************************************
246 void GridCommLB::Initialize_Object_Data (CentralLB::LDStats *stats)
251 Object_Data = new Object_Data_T[Num_Objects];
253 for (i = 0; i < Num_Objects; i++) {
254 (&Object_Data[i])->migratable = (&stats->objData[i])->migratable;
255 (&Object_Data[i])->cluster = Get_Cluster (stats->from_proc[i]);
256 (&Object_Data[i])->from_pe = stats->from_proc[i];
257 (&Object_Data[i])->to_pe = -1;
258 (&Object_Data[i])->num_lan_msgs = 0;
259 (&Object_Data[i])->num_wan_msgs = 0;
260 (&Object_Data[i])->load = (&stats->objData[i])->wallTime;
266 /**************************************************************************
269 void GridCommLB::Examine_InterObject_Messages (CentralLB::LDStats *stats)
273 LDCommData *com_data;
280 LDObjKey *recv_objects;
284 for (i = 0; i < stats->n_comm; i++) {
285 com_data = &(stats->commData[i]);
286 if ((!com_data->from_proc()) && (com_data->recv_type() == LD_OBJ_MSG)) {
287 send_object = stats->getHash (com_data->sender);
288 recv_object = stats->getHash (com_data->receiver.get_destObj());
290 if ((send_object < 0) || (send_object > Num_Objects) || (recv_object < 0) || (recv_object > Num_Objects)) {
294 send_pe = (&Object_Data[send_object])->from_pe;
295 recv_pe = (&Object_Data[recv_object])->from_pe;
297 send_cluster = Get_Cluster (send_pe);
298 recv_cluster = Get_Cluster (recv_pe);
300 if (send_cluster == recv_cluster) {
301 (&Object_Data[send_object])->num_lan_msgs += com_data->messages;
303 (&Object_Data[send_object])->num_wan_msgs += com_data->messages;
305 } else if (com_data->receiver.get_type() == LD_OBJLIST_MSG) {
306 send_object = stats->getHash (com_data->sender);
308 if ((send_object < 0) || (send_object > Num_Objects)) {
312 send_pe = (&Object_Data[send_object])->from_pe;
313 send_cluster = Get_Cluster (send_pe);
315 recv_objects = com_data->receiver.get_destObjs (num_objects); // (num_objects is passed by reference)
317 for (j = 0; j < num_objects; j++) {
318 recv_object = stats->getHash (recv_objects[j]);
320 if ((recv_object < 0) || (recv_object > Num_Objects)) {
324 recv_pe = (&Object_Data[recv_object])->from_pe;
325 recv_cluster = Get_Cluster (recv_pe);
327 if (send_cluster == recv_cluster) {
328 (&Object_Data[send_object])->num_lan_msgs += com_data->messages;
330 (&Object_Data[send_object])->num_wan_msgs += com_data->messages;
339 /**************************************************************************
342 void GridCommLB::Map_NonMigratable_Objects_To_PEs ()
347 for (i = 0; i < Num_Objects; i++) {
348 if (!((&Object_Data[i])->migratable)) {
349 if (_lb_args.debug() > 1) {
350 CkPrintf ("[%d] GridCommLB identifies object %d as non-migratable.\n", CkMyPe(), i);
353 Assign_Object_To_PE (i, (&Object_Data[i])->from_pe);
360 /**************************************************************************
363 void GridCommLB::Map_Migratable_Objects_To_PEs (int cluster)
370 target_object = Find_Maximum_Object (cluster);
371 target_pe = Find_Minimum_PE (cluster);
373 if ((target_object == -1) || (target_pe == -1)) {
377 Assign_Object_To_PE (target_object, target_pe);
383 /**************************************************************************
384 ** This method locates the maximum WAN object in terms of number of
385 ** messages that traverse a wide-area connection. The search is
386 ** constrained to objects within the specified cluster that have not yet
387 ** been mapped (balanced) to a PE.
389 ** The method returns -1 if no matching object is found.
391 int GridCommLB::Find_Maximum_Object (int cluster)
396 int max_wan_msgs_index;
398 double load_tolerance;
407 max_wan_msgs_index = -1;
410 for (i = 0; i < Num_Objects; i++) {
411 if (((&Object_Data[i])->cluster == cluster) && ((&Object_Data[i])->to_pe == -1)) {
412 if ((&Object_Data[i])->load > max_load) {
414 max_load = (&Object_Data[i])->load;
416 if ((&Object_Data[i])->num_wan_msgs > max_wan_msgs) {
417 max_wan_msgs_index = i;
418 max_wan_msgs = (&Object_Data[i])->num_wan_msgs;
423 if (max_load_index < 0) {
424 return (max_load_index);
427 if ((&Object_Data[max_load_index])->num_wan_msgs >= (&Object_Data[max_wan_msgs_index])->num_wan_msgs) {
428 return (max_load_index);
431 load_tolerance = (&Object_Data[max_load_index])->load * CK_LDB_GridCommLB_Load_Tolerance;
433 max_index = max_load_index;
435 for (i = 0; i < Num_Objects; i++) {
436 if (((&Object_Data[i])->cluster == cluster) && ((&Object_Data[i])->to_pe == -1)) {
437 if (i != max_load_index) {
438 if (fabs ((&Object_Data[max_load_index])->load - (&Object_Data[i])->load) <= load_tolerance) {
439 if ((&Object_Data[i])->num_wan_msgs > (&Object_Data[max_index])->num_wan_msgs) {
452 /**************************************************************************
453 ** This method locates the minimum WAN PE in terms of number of objects
454 ** that communicate with objects across a wide-area connection. The search
455 ** is constrained to PEs within the specified cluster.
457 ** In the event of a "tie" (i.e., the number of WAN objects on a candidate
458 ** PE is equal to the minimum number of WAN objects discovered so far) the
459 ** tie is broken by considering the scaled CPU loads on the PEs. The PE
460 ** with the smaller scaled load is the better candidate. In the event of
461 ** a secondary tie, the secondary tie is broken by considering the number
462 ** of LAN objects on the two PEs.
464 ** The method returns -1 if no matching PE is found.
466 int GridCommLB::Find_Minimum_PE (int cluster)
468 if (CK_LDB_GridCommLB_Mode == 0) {
477 for (i = 0; i < Num_PEs; i++) {
478 if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
479 if ((&PE_Data[i])->num_objs < min_objs) {
481 min_objs = (&PE_Data[i])->num_objs;
482 } else if (((&PE_Data[i])->num_objs == min_objs) &&
483 ((&PE_Data[i])->num_wan_objs < (&PE_Data[min_index])->num_wan_objs)) {
485 } else if (((&PE_Data[i])->num_objs == min_objs) &&
486 ((&PE_Data[i])->num_wan_objs == (&PE_Data[min_index])->num_wan_objs) &&
487 ((&PE_Data[i])->num_wan_msgs < (&PE_Data[min_index])->num_wan_msgs)) {
489 } else if (((&PE_Data[i])->num_objs == min_objs) &&
490 ((&PE_Data[i])->num_wan_objs == (&PE_Data[min_index])->num_wan_objs) &&
491 ((&PE_Data[i])->num_wan_msgs == (&PE_Data[min_index])->num_wan_msgs) &&
492 ((&PE_Data[i])->scaled_load < (&PE_Data[min_index])->scaled_load)) {
499 } else if (CK_LDB_GridCommLB_Mode == 1) {
502 double min_scaled_load;
503 int min_wan_msgs_index;
505 double load_tolerance;
512 min_scaled_load = MAXDOUBLE;
514 min_wan_msgs_index = -1;
515 min_wan_msgs = MAXINT;
517 for (i = 0; i < Num_PEs; i++) {
518 if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
519 if ((&PE_Data[i])->scaled_load < min_scaled_load) {
521 min_scaled_load = (&PE_Data[i])->scaled_load;
523 if ((&PE_Data[i])->num_wan_msgs < min_wan_msgs) {
524 min_wan_msgs_index = i;
525 min_wan_msgs = (&PE_Data[i])->num_wan_msgs;
530 // If no PE at all was found, return a -1.
531 if (min_load_index < 0) {
532 return (min_load_index);
535 // If the number of WAN messages on the lightest loaded PE happens to match the minimum number
536 // of WAN messages overall, we win because this target PE is overall the minimum PE in terms
537 // of both load *and* WAN messages.
538 if ((&PE_Data[min_load_index])->num_wan_msgs <= (&PE_Data[min_wan_msgs_index])->num_wan_msgs) {
539 return (min_load_index);
542 // Otherwise, we now search for PEs that have loads +/- our tolerance. If any PE has a load
543 // within our tolerance, check its number of WAN messages. The one of these that has the
544 // fewest WAN messages is probably the best candidate for placing the next object onto.
546 load_tolerance = (&PE_Data[min_load_index])->scaled_load * CK_LDB_GridCommLB_Load_Tolerance;
548 min_index = min_load_index;
550 for (i = 0; i < Num_PEs; i++) {
551 if (((&PE_Data[i])->available) && ((&PE_Data[i])->cluster == cluster)) {
552 if (i != min_load_index) {
553 if (fabs ((&PE_Data[i])->scaled_load - (&PE_Data[min_load_index])->scaled_load) <= load_tolerance) {
554 if ((&PE_Data[i])->num_wan_msgs < (&PE_Data[min_index])->num_wan_msgs) {
564 if (_lb_args.debug() > 0) {
565 CkPrintf ("[%d] GridCommLB was told to use bad mode (%d).\n", CkMyPe(), CK_LDB_GridCommLB_Mode);
573 /**************************************************************************
574 ** This method assigns target_object to target_pe. The data structure
575 ** entry for target_pe is updated appropriately with measurements from
576 ** target_object. This updated information is considered when placing
577 ** successive objects onto PEs.
579 void GridCommLB::Assign_Object_To_PE (int target_object, int target_pe)
581 (&Object_Data[target_object])->to_pe = target_pe;
583 (&PE_Data[target_pe])->num_objs += 1;
585 if ((&Object_Data[target_object])->num_lan_msgs > 0) {
586 (&PE_Data[target_pe])->num_lan_objs += 1;
587 (&PE_Data[target_pe])->num_lan_msgs += (&Object_Data[target_object])->num_lan_msgs;
590 if ((&Object_Data[target_object])->num_wan_msgs > 0) {
591 (&PE_Data[target_pe])->num_wan_objs += 1;
592 (&PE_Data[target_pe])->num_wan_msgs += (&Object_Data[target_object])->num_wan_msgs;
595 (&PE_Data[target_pe])->scaled_load += (&Object_Data[target_object])->load / (&PE_Data[target_pe])->relative_speed;
600 /**************************************************************************
601 ** The Charm++ load balancing framework invokes this method to cause the
602 ** load balancer to migrate objects to "better" PEs.
604 void GridCommLB::work (LDStats *stats)
609 if (_lb_args.debug() > 0) {
610 CkPrintf ("[%d] GridCommLB is working (mode=%d, background load=%d, load tolerance=%f).\n", CkMyPe(), CK_LDB_GridCommLB_Mode, CK_LDB_GridCommLB_Background_Load, CK_LDB_GridCommLB_Load_Tolerance);
613 // Since this load balancer looks at communications data, it must initialize the CommHash.
614 stats->makeCommHash ();
616 // Initialize object variables for the number of PEs and number of objects.
617 Num_PEs = stats->nprocs();
618 Num_Objects = stats->n_objs;
620 if (_lb_args.debug() > 0) {
621 CkPrintf ("[%d] GridCommLB is examining %d PEs and %d objects.\n", CkMyPe(), Num_PEs, Num_Objects);
624 // Initialize the PE_Data[] data structure.
625 Initialize_PE_Data (stats);
627 // If at least one available PE does not exist, return from load balancing.
628 if (Available_PE_Count() < 1) {
629 if (_lb_args.debug() > 0) {
630 CkPrintf ("[%d] GridCommLB finds no available PEs -- no balancing done.\n", CkMyPe());
638 // Determine the number of clusters.
639 // If any PE is not mapped to a cluster, return from load balancing.
640 Num_Clusters = Compute_Number_Of_Clusters ();
641 if (Num_Clusters < 1) {
642 if (_lb_args.debug() > 0) {
643 CkPrintf ("[%d] GridCommLB finds incomplete PE cluster map -- no balancing done.\n", CkMyPe());
651 if (_lb_args.debug() > 0) {
652 CkPrintf ("[%d] GridCommLB finds %d clusters.\n", CkMyPe(), Num_Clusters);
655 // Initialize the Object_Data[] data structure.
656 Initialize_Object_Data (stats);
658 // Examine all object-to-object messages for intra-cluster and inter-cluster communications.
659 Examine_InterObject_Messages (stats);
661 // Map non-migratable objects to PEs.
662 Map_NonMigratable_Objects_To_PEs ();
664 // Map migratable objects to PEs in each cluster.
665 for (i = 0; i < Num_Clusters; i++) {
666 Map_Migratable_Objects_To_PEs (i);
669 // Make the assignment of objects to PEs in the load balancer framework.
670 for (i = 0; i < Num_Objects; i++) {
671 stats->to_proc[i] = (&Object_Data[i])->to_pe;
673 if (_lb_args.debug() > 2) {
674 CkPrintf ("[%d] GridCommLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
675 } else if (_lb_args.debug() > 1) {
676 if (stats->to_proc[i] != stats->from_proc[i]) {
677 CkPrintf ("[%d] GridCommLB migrates object %d from PE %d to PE %d.\n", CkMyPe(), i, stats->from_proc[i], stats->to_proc[i]);
683 delete [] Object_Data;
687 #include "GridCommLB.def.h"