3 #include "sockRoutines.h"
4 #include "sockRoutines.c"
7 #include "ccs-server.h"
8 #include "ccs-server.c"
23 #include <sys/bproc.h>
30 #include <unordered_map>
39 /*Win32 has screwy names for the standard UNIX calls:*/
40 #define getcwd _getcwd
41 #define strdup _strdup
42 #define unlink _unlink
44 #define fdopen _fdopen
45 #define ftruncate _chsize
49 #include <sys/timeb.h>
52 #define SIGBUS -1 /*These signals don't exist in Win32*/
57 #include <pwd.h> /*getcwd*/
62 #define PRINT(a) (arg_quiet ? 1 : printf a)
64 #if CMK_SSH_NOT_NEEDED /*No SSH-- use daemon to start node-programs*/
67 #else /*Use SSH to start node-programs*/
69 #if CMK_SSH_IS_A_COMMAND
77 /*#define DEBUGF(x) printf x*/
81 #define MAXPATHLEN 1024
84 static const int MAX_NUM_RETRIES = 3;
88 /*Hierarchical-start routines*/
89 static int mynodes_start; /* To keep a global node numbering */
93 static double ftTimer;
95 static double start_timer;
97 static double GetClock(void)
102 return (tv.time * 1.0 + tv.millitm * 1.0E-3);
105 int ok = gettimeofday(&tv, NULL);
107 perror("gettimeofday");
110 return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
114 static int probefile(const char *path)
116 FILE *f = fopen(path, "r");
123 static const char *mylogin(void)
126 static char name[100] = {'d', 'u', 'n', 'n', 'o', 0};
127 unsigned int len = 100;
128 GetUserName(name, (LPDWORD) &len);
131 struct passwd *self = getpwuid(getuid());
137 sprintf(cmd, "id -u -n");
140 if (fscanf(p, "%63s", uname) != 1) {
141 fprintf(stderr, "charmrun> fscanf() failed!\n");
145 return strdup(uname);
152 return self->pw_name;
156 /**************************************************************************
158 * Pathfix : alters a path according to a set of rewrite rules
160 *************************************************************************/
162 typedef struct s_pathfixlist {
165 struct s_pathfixlist *next;
168 static pathfixlist pathfix_append(char *s1, char *s2, pathfixlist l)
170 pathfixlist pf = (pathfixlist) malloc(sizeof(s_pathfixlist));
177 static char *pathfix(const char *path, pathfixlist fixes)
179 char buffer[MAXPATHLEN];
180 char buf2[MAXPATHLEN];
181 strcpy(buffer, path);
185 for (pathfixlist l = fixes; l; l = l->next) {
186 int len = strlen(l->s1);
187 char *offs = strstr(buffer, l->s1);
190 sprintf(buf2, "%s%s%s", buffer, l->s2, offs + len);
191 strcpy(buffer, buf2);
196 return strdup(buffer);
199 static char *pathextfix(const char *path, pathfixlist fixes, char *ext)
201 char *newpath = pathfix(path, fixes);
204 char *ret = (char *) malloc(strlen(newpath) + strlen(ext) + 2);
205 strcpy(ret, newpath);
211 /****************************************************************************
213 * Miscellaneous minor routines.
215 ****************************************************************************/
217 static int is_quote(char c) { return (c == '\'' || c == '"'); }
219 static void zap_newline(char *s)
221 char *p = s + strlen(s) - 1;
224 /* in case of DOS ^m */
230 /* get substring from lo to hi, remove quote chars */
231 static char *substr(const char *lo, const char *hi)
235 if (is_quote(*(hi - 1)))
238 char *res = (char *) malloc(1 + len);
239 memcpy(res, lo, len);
244 static int subeqs(const char *lo, const char *hi, const char *str)
246 int len = strlen(str);
249 if (memcmp(lo, str, len))
254 /* advance pointer over blank characters */
255 static const char *skipblanks(const char *p)
257 while ((*p == ' ') || (*p == '\t'))
262 /* advance pointer over nonblank characters and a quoted string */
263 static const char *skipstuff(const char *p)
266 if (*p && (*p == '\'' || *p == '"')) {
271 while (*p && *p != quote)
274 fprintf(stderr, "ERROR> Unmatched quote in nodelist file.\n");
279 while ((*p) && (*p != ' ') && (*p != '\t'))
284 static char *cstring_join(const std::vector<const char *> & vec, const char *separator)
286 const size_t separator_length = strlen(separator);
288 for (const char *p : vec)
289 length += strlen(p) + separator_length;
291 char * const str = (char *)malloc(length + 1);
295 for (int i = 1; i < vec.size(); ++i)
297 strcat(str, separator);
305 static const char *getenv_ssh()
307 char *e = getenv("CONV_RSH");
308 return e ? e : SSH_CMD;
313 static char *getenv_display()
315 static char result[100], ipBuf[200];
317 char *e = getenv("DISPLAY");
320 char *p = strrchr(e, ':');
323 if ((e[0] == ':') || (strncmp(e, "unix:", 5) == 0)) {
324 sprintf(result, "%s:%s", skt_print_ip(ipBuf, skt_my_ip()), p + 1);
329 static char *getenv_display_no_tamper()
331 static char result[100], ipBuf[200];
333 char *e = getenv("DISPLAY");
336 char *p = strrchr(e, ':');
345 static unsigned int server_port;
346 static char server_addr[1024]; /* IP address or hostname of charmrun*/
347 static SOCKET server_fd;
348 /*****************************************************************************
350 * PPARAM - obtaining "program parameters" from the user. *
352 *****************************************************************************/
354 typedef struct s_ppdef {
360 } where; /*Where to store result*/
361 const char *lname; /*Argument name on command line*/
363 char type; /*One of i, r, s, f.*/
364 bool initFlag; // if 0 means, user input paramater is inserted. 1 means, it holds a default value
365 struct s_ppdef *next;
370 static int pparam_pos;
371 static const char **pparam_argv;
372 static char pparam_optc = '-';
373 static char pparam_error[100];
375 static ppdef pparam_find(const char *lname)
378 for (def = ppdefs; def; def = def->next)
379 if (strcmp(def->lname, lname) == 0)
384 static ppdef pparam_cell(const char *lname)
386 ppdef def = pparam_find(lname);
389 def = (ppdef) malloc(sizeof(s_ppdef));
392 def->doc = "(undocumented)";
394 def->initFlag = true;
399 static void pparam_int(int *where, int defValue, const char *arg, const char *doc)
401 ppdef def = pparam_cell(arg);
403 def->where.i = where;
409 static void pparam_flag(int *where, int defValue, const char *arg, const char *doc)
411 ppdef def = pparam_cell(arg);
413 def->where.f = where;
419 static void pparam_real(double *where, double defValue, const char *arg,
422 ppdef def = pparam_cell(arg);
424 def->where.r = where;
430 static void pparam_str(const char **where, const char *defValue, const char *arg,
433 ppdef def = pparam_cell(arg);
435 def->where.s = where;
441 static int pparam_setdef(ppdef def, const char *value)
445 def->initFlag = false;
447 fprintf(stderr, "Option \'%s\' is used more than once. Please remove duplicate arguments for this option\n", def->lname);
453 *def->where.i = strtol(value, &p, 10);
458 *def->where.r = strtod(value, &p);
463 /* Parse input string and convert a literal "\n" into '\n'. */
464 *def->where.s = (char *) calloc(strlen(value) + 1, sizeof(char));
465 char *parsed_value = (char *) *def->where.s;
466 for (int i = 0, j = 0; i < strlen(value); i++) {
467 if (i + 1 < strlen(value)) {
468 if (value[i] == '\\' && value[i + 1] == 'n') {
469 parsed_value[j++] = '\n';
474 parsed_value[j++] = value[i];
479 *def->where.f = strtol(value, &p, 10);
487 static int pparam_set(char *lname, char *value)
489 ppdef def = pparam_cell(lname);
490 return pparam_setdef(def, value);
493 static const char *pparam_getdef(ppdef def)
495 static char result[100];
498 sprintf(result, "%d", *def->where.i);
501 sprintf(result, "%f", *def->where.r);
504 return *def->where.s ? *def->where.s : "";
506 sprintf(result, "%d", *def->where.f);
512 static void pparam_printdocs()
514 int maxname = 0, maxdoc = 0;
515 for (ppdef def = ppdefs; def; def = def->next) {
517 len = strlen(def->lname);
520 len = strlen(def->doc);
524 fprintf(stderr, "\n");
525 fprintf(stderr, "Charmrun Command-line Parameters:\n");
526 for (ppdef def = ppdefs; def; def = def->next) {
527 fprintf(stderr, " %c%c%-*s ", pparam_optc, pparam_optc, maxname,
529 fprintf(stderr, " %-*s [%s]\n", maxdoc, def->doc, pparam_getdef(def));
531 fprintf(stderr, "\n");
534 static void pparam_delarg(int i)
536 for (int j = i; pparam_argv[j]; j++)
537 pparam_argv[j] = pparam_argv[j + 1];
540 static int pparam_countargs(const char **argv)
543 for (argc = 0; argv[argc]; argc++)
548 static int pparam_parseopt()
550 const char *opt = pparam_argv[pparam_pos];
551 /* handle ++ by skipping to end */
552 if ((opt[1] == '+') && (opt[2] == 0)) {
553 pparam_delarg(pparam_pos);
554 while (pparam_argv[pparam_pos])
558 /* handle + by itself - an error */
560 sprintf(pparam_error, "Illegal option +\n");
563 /* look up option definition */
566 def = pparam_find(opt + 2);
570 if (strlen(opt) <= 2 || !isalpha(opt[2])) {
572 def = pparam_find(name);
577 sprintf(pparam_error, "Option %s not recognized.", opt);
580 /*Unrecognized + option-- skip it.*/
585 /* handle flag-options */
586 if ((def->type == 'f') && (opt[1] != '+') && (opt[2])) {
587 sprintf(pparam_error, "Option %s should not include a value", opt);
590 if (def->type == 'f') {
592 pparam_delarg(pparam_pos);
595 /* handle non-flag options */
596 if ((opt[1] == '+') || (opt[2] == 0)) {
597 pparam_delarg(pparam_pos);
598 opt = pparam_argv[pparam_pos];
601 if ((opt == 0) || (opt[0] == 0)) {
602 sprintf(pparam_error, "%s must be followed by a value.", opt);
605 int ok = pparam_setdef(def, opt);
606 pparam_delarg(pparam_pos);
608 sprintf(pparam_error, "Illegal value for %s", opt);
614 static int pparam_parsecmd(char optchr, const char **argv)
618 pparam_optc = optchr;
621 const char *opt = pparam_argv[pparam_pos];
624 if (opt[0] != optchr)
626 else if (pparam_parseopt() < 0)
633 static char **dupargv(const char **argv)
641 for (argc = 0; argv[argc] != NULL; argc++)
643 char **copy = (char **) malloc((argc + 2) * sizeof(char *));
648 for (argc = 0; argv[argc] != NULL; argc++) {
649 int len = strlen(argv[argc]);
650 copy[argc] = (char *)malloc(sizeof(char) * (len + 1));
651 strcpy(copy[argc], argv[argc]);
659 /****************************************************************************
663 * The following module computes a whole bunch of miscellaneous values, which
664 * are all constant throughout the program. Naturally, this includes the
665 * value of the command-line arguments.
667 *****************************************************************************/
669 #define MAX_LINE_LENGTH 1000
671 static const char **arg_argv;
674 static int arg_requested_pes;
675 static int arg_requested_nodes;
676 static int arg_requested_numhosts;
678 static int arg_timeout;
679 static int arg_timelimit;
680 static int arg_verbose;
681 static const char *arg_nodelist;
682 static const char *arg_nodegroup;
683 static const char *arg_runscript; /* script to run the node-program with */
684 static const char *arg_charmrunip;
686 static int arg_debug;
687 static int arg_debug_no_pause;
688 static int arg_debug_no_xrdb;
689 static int arg_charmdebug;
691 arg_debug_commands; /* commands that are provided by a ++debug-commands
692 flag. These are passed into gdb. */
694 static int arg_quiet; /* omit charmrun standard output */
695 static int arg_local; /* start node programs directly by exec on localhost */
696 static int arg_batch_spawn; /* control starting node programs, several at a time */
697 static int arg_scalable_start;
700 static int arg_hierarchical_start;
701 static int arg_child_charmrun;
703 static int arg_help; /* print help message */
704 static int arg_ppn; /* pes per node */
705 static int arg_usehostname;
707 #if CMK_SHRINK_EXPAND
708 static char **saved_argv;
709 static int saved_argc;
710 static int arg_realloc_pes;
711 static int arg_old_pes;
712 static int arg_shrinkexpand;
713 static int arg_charmrun_port;
714 static const char *arg_shrinkexpand_basedir;
718 static int arg_maxssh;
719 static const char *arg_shell;
720 static int arg_in_xterm;
721 static const char *arg_debugger;
722 static const char *arg_xterm;
723 static const char *arg_display;
724 static int arg_ssh_display;
725 static const char *arg_mylogin;
727 static int arg_mpiexec;
728 static int arg_mpiexec_no_n;
729 static int arg_no_va_rand;
731 static const char *arg_nodeprog_a;
732 static const char *arg_nodeprog_r;
733 static char *arg_currdir_a;
734 static char *arg_currdir_r;
736 static int arg_server;
737 static int arg_server_port = 0;
738 static const char *arg_server_auth = NULL;
739 static int replay_single = 0;
742 static int arg_startpe;
743 static int arg_endpe;
744 static int arg_singlemaster;
745 static int arg_skipmaster;
748 struct TopologyRequest
750 int host, socket, core, pu;
763 return (host > 0) + (socket > 0) + (core > 0) + (pu > 0);
781 TopologyRequest proc_per;
782 TopologyRequest onewth_per;
785 static void arg_init(int argc, const char **argv)
787 static char buf[1024];
790 #if CMK_CHARMRUN_LOCAL
791 local_def = 1; /*++local is the default*/
794 pparam_int(&arg_requested_pes, 0, "p", "Number of PEs to create");
795 pparam_int(&arg_requested_numhosts, 0, "numHosts", "Number of hosts to use from nodelist file");
797 pparam_int(&arg_requested_nodes, 0, "n", "Number of processes to create");
798 pparam_int(&arg_requested_nodes, 0, "np", "Number of processes to create");
800 pparam_int(&arg_timeout, 60, "timeout",
801 "Seconds to wait per host connection");
802 pparam_int(&arg_timelimit, -1, "timelimit",
803 "Seconds to wait for program to complete");
804 pparam_flag(&arg_verbose, 0, "verbose", "Print diagnostic messages");
805 pparam_flag(&arg_quiet, 0, "quiet", "Omit non-error runtime messages");
806 pparam_str(&arg_nodelist, 0, "nodelist", "File containing list of physical nodes");
807 pparam_str(&arg_nodegroup, "main", "nodegroup",
808 "Which group of physical nodes to use");
810 #if CMK_CCS_AVAILABLE
811 pparam_flag(&arg_server, 0, "server", "Enable client-server (CCS) mode");
812 pparam_int(&arg_server_port, 0, "server-port",
813 "Port to listen for CCS requests");
814 pparam_str(&arg_server_auth, 0, "server-auth", "CCS Authentication file");
816 pparam_flag(&arg_local, local_def, "local",
817 "Start node programs locally without daemon");
818 pparam_int(&arg_batch_spawn, 0, "batch", "Launch connections to this many "
819 "node programs at a time, avoiding "
820 "overloading charmrun PE");
822 pparam_flag(&arg_scalable_start, 1, "scalable-start", "Enable scalable start");
825 pparam_flag(&arg_hierarchical_start, 0, "hierarchical-start",
826 "hierarchical start");
827 pparam_flag(&arg_child_charmrun, 0, "child-charmrun", "child charmrun");
829 #if CMK_SHRINK_EXPAND
830 pparam_int(&arg_realloc_pes, 1, "newp", "New number of processes to create");
831 pparam_int(&arg_old_pes, 1, "oldp", "Old number of processes to create");
832 pparam_flag(&arg_shrinkexpand, 0, "shrinkexpand", "Enable shrink/expand support");
833 pparam_int(&arg_charmrun_port, 0, "charmrun_port", "Make charmrun listen on this port");
835 pparam_flag(&arg_usehostname, 0, "usehostname",
836 "Send nodes our symbolic hostname instead of IP address");
837 pparam_str(&arg_charmrunip, 0, "useip",
838 "Use IP address provided for charmrun IP");
839 pparam_flag(&arg_mpiexec, 0, "mpiexec", "Use mpiexec to start jobs");
840 pparam_flag(&arg_mpiexec_no_n, 0, "mpiexec-no-n", "Use mpiexec to start jobs without -n procs");
842 pparam_flag(&arg_debug, 0, "debug",
843 "Run each node under gdb in an xterm window");
844 pparam_flag(&arg_debug_no_pause, 0, "debug-no-pause",
845 "Like debug, except doesn't pause at beginning");
846 pparam_str(&arg_debug_commands, 0, "debug-commands",
847 "Commands to be run inside gdb at startup");
848 pparam_flag(&arg_debug_no_xrdb, 0, "no-xrdb", "Don't check xrdb");
850 /* When the ++charmdebug flag is used, charmrun listens from its stdin for
851 commands, and forwards them to the gdb info program (a child), or to the
852 processor gdbs. The stderr is redirected to the stdout, so the two streams
853 are mixed together. The channel for stderr is reused to forward the replies
854 of gdb back to the java debugger. */
856 pparam_flag(&arg_charmdebug, 0, "charmdebug",
857 "Used only when charmrun is started by charmdebug");
860 pparam_int(&arg_maxssh, 16, "maxssh",
861 "Maximum number of ssh's to run at a time");
862 pparam_str(&arg_shell, 0, "remote-shell",
863 "Which remote shell to use (default $CONV_RSH or " SSH_CMD ")");
864 pparam_str(&arg_debugger, 0, "debugger", "Which debugger to use");
865 pparam_str(&arg_display, 0, "display", "X Display for xterm");
866 pparam_flag(&arg_ssh_display, 0, "ssh-display",
867 "Use own X Display for each ssh session");
868 pparam_flag(&arg_in_xterm, 0, "in-xterm", "Run each node in an xterm window");
869 pparam_str(&arg_xterm, 0, "xterm", "Which xterm to use");
872 /* options for Scyld */
873 pparam_int(&arg_startpe, 0, "startpe", "First PE to start job(SCYLD)");
874 pparam_int(&arg_endpe, 1000000, "endpe", "Last PE to start job(SCYLD)");
875 pparam_flag(&arg_singlemaster, 0, "singlemaster",
876 "Only assign one process to master node(SCYLD)");
877 pparam_flag(&arg_skipmaster, 0, "skipmaster",
878 "Do not assign any process to master node(SCYLD)");
879 if (arg_skipmaster && arg_singlemaster) {
880 PRINT(("Charmrun> 'singlemaster' is ignored due to 'skipmaster'. \n"));
881 arg_singlemaster = 0;
883 pparam_flag(&arg_debug, 0, "debug", "Turn on more verbose debug prints");
885 pparam_str(&arg_runscript, 0, "runscript", "Script to run node-program with");
886 pparam_flag(&arg_help, 0, "help", "Print help messages");
887 pparam_int(&arg_ppn, 0, "ppn", "Number of PEs per Charm++ node (=OS process)");
888 pparam_flag(&arg_no_va_rand, 0, "no-va-randomization",
889 "Disables randomization of the virtual address space");
891 // Process Binding Parameters
892 pparam_int(&proc_per.host, 0,
893 "processPerHost", "assign N processes per host");
894 pparam_int(&proc_per.socket, 0,
895 "processPerSocket", "assign N processes per socket");
896 pparam_int(&proc_per.core, 0,
897 "processPerCore", "assign N processes per core");
898 pparam_int(&proc_per.pu, 0,
899 "processPerPU", "assign N processes per PU");
901 // Worker Thread Binding Parameters
902 pparam_flag(&onewth_per.host, 0,
903 "oneWthPerHost", "assign one worker thread per host");
904 pparam_flag(&onewth_per.socket, 0,
905 "oneWthPerSocket", "assign one worker thread per socket");
906 pparam_flag(&onewth_per.core, 0,
907 "oneWthPerCore", "assign one worker thread per core");
908 pparam_flag(&onewth_per.pu, 0,
909 "oneWthPerPU", "assign one worker thread per PU");
911 pparam_flag(&auto_provision, 0, "auto-provision", "fully utilize available resources");
912 pparam_flag(&auto_provision, 0, "autoProvision", "fully utilize available resources");
915 arg_argv = (const char **)dupargv(argv);
918 #if CMK_SHRINK_EXPAND
919 /* move it to a function */
921 saved_argv = (char **) malloc(sizeof(char *) * (saved_argc));
922 for (int i = 0; i < saved_argc; i++) {
923 // MACHSTATE1(2,"Parameters %s",Cmi_argvcopy[i]);
924 saved_argv[i] = (char *) argv[i];
928 if (pparam_parsecmd('+', argv) < 0) {
929 fprintf(stderr, "ERROR> syntax: %s\n", pparam_error);
939 if ( arg_mpiexec_no_n ) arg_mpiexec = arg_mpiexec_no_n;
941 #if CMK_SHRINK_EXPAND
942 if (arg_shrinkexpand) {
943 arg_requested_pes = arg_realloc_pes;
944 printf("\n \nCharmrun> %d Reallocated pes\n \n", arg_requested_pes);
949 if (!arg_hierarchical_start || arg_child_charmrun)
952 (argv) + 1; /*Skip over charmrun (0) here and program name (1) later*/
953 arg_argc = pparam_countargs(arg_argv);
955 fprintf(stderr, "ERROR> You must specify a node-program.\n");
961 if (!arg_hierarchical_start || arg_child_charmrun) {
962 // Removing nodeprogram from the list
966 // Removing charmrun from parameters
970 arg_argv[arg_argc++] = "++child-charmrun";
971 arg_argv[arg_argc] = NULL;
978 if (arg_server_port || arg_server_auth)
981 if (arg_verbose) arg_quiet = 0;
983 if (arg_debug || arg_debug_no_pause || arg_in_xterm) {
984 fprintf(stderr, "Charmrun> scalable start disabled under ++debug and ++in-xterm:\n"
985 "NOTE: will make an SSH connection per process launched,"
986 " instead of per physical node.\n");
987 arg_scalable_start = 0;
990 if (arg_debug || arg_debug_no_pause)
992 /*Pass ++debug along to program (used by machine.C)*/
993 arg_argv[arg_argc++] = "++debug";
996 /* pass ++quiet to program */
997 if (arg_quiet) arg_argv[arg_argc++] = "++quiet";
999 /* Check for +replay-detail to know we have to load only one single processor
1001 for (int i = 0; argv[i]; i++) {
1002 if (0 == strcmp(argv[i], "+replay-detail")) {
1004 arg_requested_pes = 1;
1011 "Warning> ++local cannot be used in bproc version, ignored!\n");
1017 /* Find the current value of the CONV_RSH variable */
1020 arg_shell = "mpiexec";
1022 arg_shell = getenv_ssh();
1025 #if !defined(_WIN32)
1026 /* Find the current value of the DISPLAY variable */
1028 arg_display = getenv_display_no_tamper();
1031 if ((arg_debug || arg_debug_no_pause || arg_in_xterm) && (arg_display == 0)) {
1032 fprintf(stderr, "ERROR> DISPLAY must be set to use debugging mode\n");
1035 if (arg_debug || arg_debug_no_pause)
1036 arg_timeout = 8 * 60 * 60; /* Wait 8 hours for ++debug */
1038 /* default debugger is gdb */
1041 arg_debugger = "lldb";
1043 arg_debugger = "gdb";
1045 /* default xterm is xterm */
1047 arg_xterm = "xterm";
1049 arg_mylogin = mylogin();
1052 /* find the current directory, absolute version */
1053 if (getcwd(buf, 1023) == NULL) {
1054 fprintf(stderr, "charmrun> getcwd() failed!\n");
1057 arg_currdir_a = strdup(buf);
1059 /* find the node-program, absolute version */
1060 arg_nodeprog_r = argv[1];
1062 if (arg_nodeprog_r[0] == '-' || arg_nodeprog_r[0] == '+') {
1063 /*If it starts with - or +, it ain't a node program.
1064 Chances are, the user screwed up and passed some
1065 unknown flag to charmrun*/
1066 fprintf(stderr, "Charmrun does not recognize the flag '%s'.\n", arg_nodeprog_r);
1067 if (arg_nodeprog_r[0] == '+')
1068 fprintf(stderr, "Charm++'s flags need to be placed *after* the program name.\n");
1074 if (argv[1][1] == ':' ||
1075 argv[1][0] == '\\' && argv[1][1] == '\\') { /*E.g.: "C:\foo\bar.exe*/
1077 if (argv[1][0] == '/') { /*E.g.: "\foo\bar"*/
1079 /*Absolute path to node-program*/
1080 arg_nodeprog_a = argv[1];
1082 sprintf(buf, "%s%s%s", arg_currdir_a, DIRSEP, arg_nodeprog_r);
1083 arg_nodeprog_a = strdup(buf);
1085 if (arg_scalable_start) {
1086 PRINT(("Charmrun> scalable start enabled. \n"));
1090 if (arg_hierarchical_start) {
1091 PRINT(("Charmrun> Hierarchical scalable start enabled. \n"));
1092 if (arg_debug || arg_debug_no_pause) {
1093 fprintf(stderr, "Charmrun> Error: ++hierarchical-start does not support "
1094 "debugging mode. \n");
1098 fprintf(stderr, "Charmrun> Warning: you have enabled verbose output with "
1099 "Hierarchical startup, you may get inconsistent verbose "
1100 "outputs. \n++hierarchial-start does not support verbose "
1104 } else if (arg_child_charmrun) {
1107 "Charmrun> Error: ++child-charmrun is not a user-specified flag. \n");
1112 const int proc_active = proc_per.active();
1113 const int onewth_active = onewth_per.active();
1114 if (proc_active || onewth_active || auto_provision)
1116 if (arg_requested_pes != 0)
1118 fprintf(stderr, "Charmrun> Error: +p cannot be used with ++(process|oneWth)Per* or ++auto-provision.\n");
1122 if (proc_active && arg_requested_nodes > 0)
1124 fprintf(stderr, "Charmrun> Error: +n/++np cannot be used with ++processPer* or ++auto-provision.\n");
1128 if (proc_active && arg_mpiexec)
1130 fprintf(stderr, "Charmrun> Error: ++mpiexec and ++processPer* cannot be used together.\n");
1134 if (proc_active + (auto_provision > 0) > 1)
1136 fprintf(stderr, "Charmrun> Error: Only one of ++processPer(Host|Socket|Core|PU) or ++auto-provision is allowed.\n");
1141 if (onewth_active + (arg_ppn > 0) + (auto_provision > 0) > 1)
1143 fprintf(stderr, "Charmrun> Error: Only one of ++oneWthPer(Host|Socket|Core|PU), ++ppn, or ++auto-provision is allowed.\n");
1147 using Unit = typename TopologyRequest::Unit;
1149 const Unit proc_unit = proc_per.unit();
1150 const Unit onewth_unit = onewth_per.unit();
1152 if ((onewth_unit == Unit::Host && (proc_unit == Unit::Socket || proc_unit == Unit::Core || proc_unit == Unit::PU)) ||
1153 (onewth_unit == Unit::Socket && (proc_unit == Unit::Core || proc_unit == Unit::PU)) ||
1154 (onewth_unit == Unit::Core && proc_unit == Unit::PU))
1156 fprintf(stderr, "Charmrun> Error: Cannot request processes on a smaller unit than that requested for worker threads.\n");
1160 if ((onewth_unit == Unit::Host && proc_unit == Unit::Host && proc_per.host > 1) ||
1161 (onewth_unit == Unit::Socket && proc_unit == Unit::Socket && proc_per.socket > 1) ||
1162 (onewth_unit == Unit::Core && proc_unit == Unit::Core && proc_per.core > 1) ||
1163 (onewth_unit == Unit::PU && proc_unit == Unit::PU && proc_per.pu > 1))
1165 fprintf(stderr, "Charmrun> Error: Cannot request more processes than worker threads per unit.\n");
1173 if (arg_requested_pes > 0 && arg_requested_nodes > 0 && arg_ppn > 0 && arg_ppn * arg_requested_nodes != arg_requested_pes)
1175 fprintf(stderr, "Charmrun> Error: ++np times ++ppn does not equal +p.\n");
1179 if (arg_requested_pes > 0 && arg_ppn > 0 && arg_requested_pes % arg_ppn != 0)
1181 if (arg_ppn > arg_requested_pes)
1183 arg_ppn = arg_requested_pes;
1184 fprintf(stderr, "Charmrun> Warning: forced ++ppn = +p = %d\n", arg_ppn);
1188 fprintf(stderr, "Charmrun> Error: ++ppn (number of PEs per node) does not divide +p (number of PEs).\n");
1193 if (arg_requested_pes > 0 && arg_requested_nodes > 0 && arg_requested_pes != arg_requested_nodes)
1195 fprintf(stderr, "Charmrun> Error: +p and ++np do not agree.\n");
1202 if (arg_ppn > 1 || onewth_active)
1204 fprintf(stderr, "Charmrun> Error: ++oneWthPer(Host|Socket|Core|PU) and ++ppn are only available in SMP mode.\n");
1212 proc_per.socket = 1;
1218 else if (arg_requested_pes <= 0 && arg_requested_nodes <= 0 && arg_ppn <= 0 && !proc_active && !onewth_active)
1220 PRINT(("Charmrun> No provisioning arguments specified. Running with a single PE.\n"
1221 " Use ++auto-provision to fully subscribe resources or +p1 to silence this message.\n"));
1225 /****************************************************************************
1227 * NODETAB: The nodes file and nodes table.
1229 ****************************************************************************/
1231 static int portOk = 1;
1232 static const char *nodetab_tempName = NULL;
1233 static char *nodetab_file_find()
1235 char buffer[MAXPATHLEN];
1237 /* Find a nodes-file as specified by ++nodelist */
1239 const char *path = arg_nodelist;
1240 if (probefile(path))
1241 return strdup(path);
1242 fprintf(stderr, "ERROR> No such nodelist file %s\n", path);
1245 /* Find a nodes-file as specified by getenv("NODELIST") */
1246 if (getenv("NODELIST")) {
1247 char *path = getenv("NODELIST");
1248 if (path && probefile(path))
1249 return strdup(path);
1250 // cppcheck-suppress nullPointer
1251 fprintf(stderr, "ERROR> Cannot find nodelist file %s\n", path);
1254 /* Find a nodes-file by looking under 'nodelist' in the current directory */
1255 if (probefile("./nodelist"))
1256 return strdup("./nodelist");
1259 nodetab_tempName = strdup(buffer);
1261 if (getenv("HOME")) {
1262 sprintf(buffer, "%s/.nodelist", getenv("HOME"));
1265 if (!probefile(buffer)) {
1266 /*Create a simple nodelist in the user's home*/
1267 FILE *f = fopen(buffer, "w");
1269 fprintf(stderr, "ERROR> Cannot create a 'nodelist' file.\n");
1272 fprintf(f, "group main\nhost localhost\n");
1275 return strdup(buffer);
1280 static skt_ip_t resolve(const char *name);
1282 double speed = 1.0; /*Relative speed of each CPU*/
1284 const char *name = "SET_H->NAME"; /*Host DNS name*/
1286 const char *shell = arg_shell; /*Ssh to use*/
1287 const char *debugger = arg_debugger; /*Debugger to use*/
1288 const char *xterm = arg_xterm; /*Xterm to use*/
1289 const char *login = arg_mylogin; /*User login name to use*/
1290 const char *passwd = "*"; /*User login password*/
1291 const char *setup = "*"; /*Commands to execute on login*/
1293 char *ext = nullptr; /* Command suffix */
1294 pathfixlist pathfixes = nullptr;
1296 skt_ip_t ip = _skt_invalid_ip; /*IP address of host*/
1297 int cpus = 1; /* # of physical CPUs*/
1298 int nice = -100; /* process priority */
1299 // int forks = 0; /* number of processes to fork on remote node */
1306 bool crashed = false;
1310 skt_ip_t nodetab_host::resolve(const char *name)
1312 skt_ip_t ip = skt_innode_lookup_ip(name);
1313 if (skt_ip_match(ip, _skt_invalid_ip)) {
1315 /* only the master node is used */
1316 if (!(1 <= arg_requested_pes && atoi(name) == -1))
1319 fprintf(stderr, "ERROR> Cannot obtain IP address of %s\n", name);
1327 struct nodetab_process
1330 ChInfiAddr *qpList = nullptr; /* An array of queue pair identifiers */
1331 ChInfiAddr *qpData = nullptr;
1337 nodetab_host * host;
1338 int rank = 0; /*Rank of this CPU*/
1341 SOCKET req_client = -1; /*TCP request sockets for each node*/
1342 // ^ aka ctrlfd /*Connection to control port*/
1343 int dataport = -1; /*UDP port number*/
1345 SOCKET charmrun_fds = -1;
1360 friend bool operator< (const nodetab_process &, const nodetab_process &);
1363 bool operator< (const nodetab_process & a, const nodetab_process & b)
1365 const int a_hostno = a.host->hostno, b_hostno = b.host->hostno;
1366 return a_hostno < b_hostno || (a_hostno == b_hostno && a.nodeno < b.nodeno);
1369 static std::vector<nodetab_host *> host_table;
1371 static std::vector<nodetab_host *> my_host_table;
1373 # define my_host_table host_table
1375 static std::vector<nodetab_process> my_process_table;
1376 static std::vector<nodetab_process *> pe_to_process_map;
1378 static const char *nodetab_args(const char *args, nodetab_host *h)
1382 const char *b1 = skipblanks(args), *e1 = skipstuff(b1);
1383 const char *b2 = skipblanks(e1), *e2 = skipstuff(b2);
1386 b1++; /*Skip over "++" on parameters*/
1388 if (subeqs(b1, e1, "speed"))
1389 h->speed = atof(b2);
1390 else if (subeqs(b1, e1, "cpus"))
1392 else if (subeqs(b1, e1, "pathfix"))
1394 const char *b3 = skipblanks(e2), *e3 = skipstuff(b3);
1395 args = skipblanks(e3);
1397 pathfix_append(substr(b2, e2), substr(b3, e3), h->pathfixes);
1398 e2 = e3; /* for the skipblanks at the end */
1400 else if (subeqs(b1, e1, "ext"))
1401 h->ext = substr(b2, e2);
1402 else if (subeqs(b1, e1, "nice"))
1405 else if (subeqs(b1, e1, "login"))
1406 h->login = substr(b2, e2);
1407 else if (subeqs(b1, e1, "passwd"))
1408 h->passwd = substr(b2, e2);
1409 else if (subeqs(b1, e1, "setup"))
1410 h->setup = strdup(b2);
1411 else if (subeqs(b1, e1, "shell"))
1412 h->shell = substr(b2, e2);
1413 else if (subeqs(b1, e1, "debugger"))
1414 h->debugger = substr(b2, e2);
1415 else if (subeqs(b1, e1, "xterm"))
1416 h->xterm = substr(b2, e2);
1421 args = skipblanks(e2);
1427 /* setup nodetab as localhost only */
1428 static void nodetab_init_for_local()
1431 static const char hostname[] = "127.0.0.1";
1432 nodetab_host * h = new nodetab_host{};
1433 h->name = hostname; // should strdup if leaks are fixed
1434 h->ip = nodetab_host::resolve(hostname);
1435 host_table.push_back(h);
1439 /* Sets the parent field of hosts to point to their parent charmrun. The root
1440 * charmrun will create children for all hosts which are parent of at least one
1442 static int branchfactor;
1443 static int nodes_per_child;
1444 static void nodetab_init_hierarchical_start(void)
1447 branchfactor = ceil(sqrt(nodetab_rank0_size));
1448 nodes_per_child = round(nodetab_rank0_size * 1.0 / branchfactor);
1452 static void nodetab_init_with_nodelist()
1455 /* Open the NODES_FILE. */
1456 char *nodesfile = nodetab_file_find();
1458 printf("Charmrun> using %s as nodesfile\n", nodesfile);
1461 if (!(f = fopen(nodesfile, "r"))) {
1462 fprintf(stderr, "ERROR> Cannot read %s: %s\n", nodesfile, strerror(errno));
1467 nodetab_host global, group;
1468 int rightgroup = (strcmp(arg_nodegroup, "main") == 0);
1470 /* Store the previous host so we can make sure we aren't mixing localhost and
1472 char *prevHostName = NULL;
1473 char input_line[MAX_LINE_LENGTH];
1474 std::unordered_map<std::string, nodetab_host *> temp_hosts;
1478 while (fgets(input_line, sizeof(input_line) - 1, f) != 0) {
1479 if (input_line[0] == '#')
1481 zap_newline(input_line);
1482 if (!nodetab_args(input_line, &global)) {
1483 /*An option line-- also add options to current group*/
1484 nodetab_args(input_line, &group);
1485 } else { /*Not an option line*/
1486 const char *b1 = skipblanks(input_line), *e1 = skipstuff(b1);
1487 const char *b2 = skipblanks(e1), *e2 = skipstuff(b2);
1488 const char *b3 = skipblanks(e2);
1489 if (subeqs(b1, e1, "host")) {
1491 /* check if we have a previous host, if it's different than our
1492 * current host, and if one of them is localhost */
1493 if (prevHostName && strcmp(b2, prevHostName) &&
1494 (!strcmp(b2, "localhost") ||
1495 !strcmp(prevHostName, "localhost"))) {
1496 fprintf(stderr, "ERROR> Mixing localhost with other hostnames will "
1497 "lead to connection failures.\n");
1498 fprintf(stderr, "ERROR> The problematic line in group %s is: %s\n",
1499 arg_nodegroup, input_line);
1503 const std::string hostname = substr(b2, e2);
1504 auto host_iter = temp_hosts.find(hostname);
1505 if (host_iter != temp_hosts.end())
1507 nodetab_host *host = (*host_iter).second;
1508 nodetab_args(b3, host);
1512 nodetab_host *host = new nodetab_host{group};
1513 host->name = strdup(hostname.c_str());
1514 host->ip = nodetab_host::resolve(hostname.c_str());
1515 host->hostno = hostno++;
1516 temp_hosts.insert({hostname, host});
1517 nodetab_args(b3, host);
1521 prevHostName = strdup(b2);
1523 } else if (subeqs(b1, e1, "group")) {
1525 nodetab_args(b3, &group);
1526 rightgroup = subeqs(b2, e2, arg_nodegroup);
1527 } else if (b1 != b3) {
1528 fprintf(stderr, "ERROR> unrecognized command in nodesfile:\n");
1529 fprintf(stderr, "ERROR> %s\n", input_line);
1537 if (nodetab_tempName != NULL)
1538 unlink(nodetab_tempName);
1540 const size_t temp_hosts_size = temp_hosts.size();
1541 if (temp_hosts_size == 0) {
1542 fprintf(stderr, "ERROR> No hosts in group %s\n", arg_nodegroup);
1546 host_table.resize(temp_hosts_size);
1547 for (const auto & h_pair : temp_hosts)
1549 nodetab_host * h = h_pair.second;
1550 host_table[h->hostno] = h;
1554 static void nodetab_init()
1556 /* if arg_local is set, ignore the nodelist file */
1557 if (arg_local || arg_mpiexec)
1558 nodetab_init_for_local();
1560 nodetab_init_with_nodelist();
1563 /****************************************************************************
1567 * The global list of node PEs, IPs, and port numbers.
1568 * Stored in ChMachineInt_t format so the table can easily be sent
1569 * back to the nodes.
1571 ****************************************************************************/
1573 static void nodeinfo_add(const ChSingleNodeinfo *in, nodetab_process & p)
1575 const int node = ChMessageInt(in->nodeNo);
1576 if (node != p.nodeno)
1577 fprintf(stderr, "Charmrun> Warning: Process #%d received ChSingleNodeInfo #%d\n", p.nodeno, node);
1580 p.num_pus = ChMessageInt(in->num_pus);
1581 p.num_cores = ChMessageInt(in->num_cores);
1582 p.num_sockets = ChMessageInt(in->num_sockets);
1585 static void nodeinfo_populate(nodetab_process & p)
1587 ChNodeinfo & i = p.info;
1588 const int node = p.nodeno;
1590 i.nodeno = ChMessageInt_new(node);
1591 i.nPE = ChMessageInt_new(p.PEs);
1592 i.nProcessesInPhysNode = ChMessageInt_new(p.host->processes);
1595 p.host->ip = i.IP; /* get IP */
1599 #if !CMK_USE_IBVERBS
1600 unsigned int dataport = ChMessageInt(i.dataport);
1601 if (0 == dataport) {
1602 fprintf(stderr, "Node %d could not initialize network!\n", node);
1605 p.dataport = dataport;
1608 skt_print_ip(ips, i.IP);
1609 printf("Charmrun> client %d connected (IP=%s data_port=%d)\n", node, ips,
1615 /****************************************************************************
1619 * You can use this module to read the standard input. It supports
1620 * one odd function, input_scanf_chars, which is what makes it useful.
1621 * if you use this module, you may not read stdin yourself.
1623 * void input_init(void)
1624 * char *input_gets(void)
1625 * char *input_scanf_chars(char *fmt)
1627 ****************************************************************************/
1629 static char *input_buffer;
1631 static void input_extend()
1634 int len = input_buffer ? strlen(input_buffer) : 0;
1636 if (fgets(line, 1023, stdin) == 0) {
1637 fprintf(stderr, "end-of-file on stdin");
1640 char *new_input_buffer = (char *) realloc(input_buffer, len + strlen(line) + 1);
1641 if (new_input_buffer == NULL) {
1642 // could not realloc
1644 fprintf(stderr, "Charmrun: Realloc failed");
1647 input_buffer = new_input_buffer;
1650 strcpy(input_buffer + len, line);
1653 static void input_init() { input_buffer = strdup(""); }
1655 static char *input_extract(int nchars)
1657 char *res = substr(input_buffer, input_buffer + nchars);
1659 substr(input_buffer + nchars, input_buffer + strlen(input_buffer));
1665 static char *input_gets()
1669 p = strchr(input_buffer, '\n');
1674 int len = p - input_buffer;
1675 char *res = input_extract(len + 1);
1680 /*FIXME: I am terrified by this routine. OSL 9/8/00*/
1681 static char *input_scanf_chars(char *fmt)
1690 strcpy(tmp, "/tmp/fnordXXXXXX");
1691 if (mkstemp(tmp) == -1) {
1692 fprintf(stderr, "charmrun> mkstemp() failed!\n");
1696 char *tmp = tmpnam(NULL); /*This was once /tmp/fnord*/
1699 fd = open(tmp, O_RDWR | O_CREAT | O_TRUNC, 0664);
1701 fprintf(stderr, "cannot open temp file /tmp/fnord");
1704 file = fdopen(fd, "r+");
1709 int len = strlen(input_buffer);
1711 fwrite(input_buffer, len, 1, file);
1714 if (ftruncate(fd, len)) {
1715 fprintf(stderr, "charmrun> ftruncate() failed!\n");
1718 if (fscanf(file, fmt, buf, buf, buf, buf, buf, buf, buf, buf, buf, buf, buf,
1719 buf, buf, buf, buf, buf, buf, buf) <= 0) {
1720 fprintf(stderr, "charmrun> fscanf() failed!\n");
1728 return input_extract(pos);
1731 /***************************************************************************
1733 Charmrun forwards CCS requests on to the node-programs' control
1735 ***************************************************************************/
1737 #if CMK_CCS_AVAILABLE
1739 /*The Ccs Server socket became active--
1740 rec'v the message and respond to the request,
1741 by forwarding the request to the appropriate node.
1743 static void req_ccs_connect(void)
1746 ChMessageHeader ch; /*Make a charmrun header*/
1747 CcsImplHeader hdr; /*Ccs internal header*/
1749 void *reqData; /*CCS request data*/
1750 if (0 == CcsServer_recvRequest(&h.hdr, &reqData))
1751 return; /*Malformed request*/
1752 int pe = ChMessageInt(h.hdr.pe);
1753 int reqBytes = ChMessageInt(h.hdr.len);
1756 /*Treat -1 as broadcast and sent to 0 as root of the spanning tree*/
1759 const int pe_count = pe_to_process_map.size();
1760 if ((pe <= -pe_count || pe >= pe_count) && 0 == replay_single) {
1761 /*Treat out of bound values as errors. Helps detecting bugs*/
1762 /* But when virtualized with Bigemulator, we can have more pes than nodetabs */
1763 /* TODO: We should somehow check boundaries also for bigemulator... */
1764 #if !CMK_BIGSIM_CHARM
1765 if (pe == -pe_count)
1766 fprintf(stderr, "Invalid processor index in CCS request: are you trying "
1767 "to do a broadcast instead?");
1769 fprintf(stderr, "Invalid processor index in CCS request.");
1770 CcsServer_sendReply(&h.hdr, 0, 0);
1774 } else if (pe < -1) {
1775 /*Treat negative values as multicast to a number of processors specified by
1777 The pes to multicast to follows sits at the beginning of reqData*/
1778 reqBytes -= pe * sizeof(ChMessageInt_t);
1779 pe = ChMessageInt(*(ChMessageInt_t *) reqData);
1782 if (!check_stdio_header(&h.hdr)) {
1785 #if LOOPBACK /*Immediately reply "there's nothing!" (for performance \
1787 CcsServer_sendReply(&h.hdr, 0, 0);
1790 #if CMK_BIGSIM_CHARM
1791 destpe = destpe % pe_count;
1795 /*Fill out the charmrun header & forward the CCS request*/
1796 ChMessageHeader_new("req_fw", sizeof(h.hdr) + reqBytes, &h.ch);
1798 const void *bufs[3];
1801 lens[0] = sizeof(h);
1804 const SOCKET ctrlfd = pe_to_process_map[pe]->req_client;
1805 skt_sendV(ctrlfd, 2, bufs, lens);
1813 Forward the CCS reply (if any) from this client back to the
1814 original network requestor, on the original request socket.
1816 static int req_ccs_reply_fw(ChMessage *msg, SOCKET srcFd)
1818 int len = msg->len; /* bytes of data remaining to receive */
1820 /* First pull down the CCS header sent by the client. */
1822 skt_recvN(srcFd, &hdr, sizeof(hdr));
1825 #define m (4 * 1024) /* packets of message to recv/send at once */
1826 if (len < m || hdr.attr.auth) { /* short or authenticated message: grab the
1827 whole thing first */
1828 void *data = malloc(len);
1829 skt_recvN(srcFd, data, len);
1830 CcsServer_sendReply(&hdr, len, data);
1832 } else { /* long messages: packetize (for pipelined sending; a 2x bandwidth
1834 ChMessageInt_t outLen;
1835 int destFd; /* destination for data */
1836 skt_abortFn old = skt_set_abort(reply_abortFn);
1839 destFd = ChMessageInt(hdr.replyFd);
1840 outLen = ChMessageInt_new(len);
1841 skt_sendN(destFd, &outLen, sizeof(outLen)); /* first comes the length */
1847 skt_recvN(srcFd, buf, r);
1848 if (0 == destErrs) /* don't keep sending to dead clients, but *do* clean
1850 destErrs |= skt_sendN(destFd, buf, r);
1862 static int req_ccs_reply_fw(ChMessage *msg, SOCKET srcFd) {}
1863 #endif /*CMK_CCS_AVAILABLE*/
1865 /****************************************************************************
1869 * The request servicer accepts connections on a TCP port. The client
1870 * sends a sequence of commands (each is one line). It then closes the
1871 * connection. The server must then contact the client, sending replies.
1873 ****************************************************************************/
1874 /** Macro to switch on the case when charmrun stays up even if
1875 one of the processor crashes*/
1876 /*#define __FAULT__*/
1878 static int req_ending = 0;
1880 /* socket and std streams for the gdb info program */
1881 static int gdb_info_pid = 0;
1882 static int gdb_info_std[3];
1883 static FILE *gdb_stream = NULL;
1886 #define REQ_FAILED -1
1889 static int req_reply(SOCKET fd, const char *type, const char *data, int dataLen);
1890 static int req_reply_child(SOCKET fd, const char *type, const char *data, int dataLen)
1893 int status = req_reply(fd, type, data, dataLen);
1894 if (status != REQ_OK)
1897 skt_recvN(fd, (char *) &clientFd, sizeof(SOCKET));
1898 skt_sendN(fd, (const char *) &clientFd, sizeof(fd));
1903 * @brief This is the only place where charmrun talks back to anyone.
1905 static int req_reply(SOCKET fd, const char *type, const char *data, int dataLen)
1907 ChMessageHeader msg;
1908 if (fd == INVALID_SOCKET)
1910 ChMessageHeader_new(type, dataLen, &msg);
1911 skt_sendN(fd, (const char *) &msg, sizeof(msg));
1912 skt_sendN(fd, data, dataLen);
1916 static void kill_all_compute_nodes(const char *msg, size_t msgSize)
1918 ChMessageHeader hdr;
1919 ChMessageHeader_new("die", msgSize, &hdr);
1920 for (const nodetab_process & p : my_process_table)
1922 skt_sendN(p.req_client, (const char *) &hdr, sizeof(hdr));
1923 skt_sendN(p.req_client, msg, msgSize);
1927 static void kill_all_compute_nodes(const char *msg)
1929 return kill_all_compute_nodes(msg, strlen(msg)+1);
1932 template <size_t msgSize>
1933 static inline void kill_all_compute_nodes(const char msg[msgSize])
1935 return kill_all_compute_nodes(msg, msgSize);
1938 /* Request handlers:
1939 When a client asks us to do something, these are the
1940 routines that actually respond to the request.
1942 /*Stash this new node's control and data ports.
1944 static int req_handle_initnode(ChMessage *msg, nodetab_process & p)
1946 if (msg->len != sizeof(ChSingleNodeinfo)) {
1947 fprintf(stderr, "Charmrun: Bad initnode data length. Aborting\n");
1948 fprintf(stderr, "Charmrun: possibly because: %s.\n", msg->data);
1952 nodeinfo_add((ChSingleNodeinfo *) msg->data, p);
1956 #if CMK_USE_IBVERBS || CMK_USE_IBUD
1957 static int req_handle_qplist(ChMessage *msg, nodetab_process & p)
1960 const int my_process_count = my_process_table.size();
1961 int qpListSize = (my_process_count-1) * sizeof(ChInfiAddr);
1963 if (msg->len != qpListSize)
1965 fprintf(stderr, "Charmrun: Bad qplist data length. Aborting.\n");
1969 p.qpList = (ChInfiAddr *) malloc(qpListSize);
1970 memcpy(p.qpList, msg->data, qpListSize);
1972 if (msg->len != sizeof(ChInfiAddr))
1974 fprintf(stderr, "Charmrun: Bad qplist data length. Aborting.\n");
1978 p.qp = *(ChInfiAddr *)msg->data;
1979 printf("Charmrun> client %d lid=%d qpn=%i psn=%i\n", node,
1980 ChMessageInt(p.qp.lid), ChMessageInt(p.qp.qpn),
1981 ChMessageInt(p.qp.psn));
1988 * @brief Gets the array of node numbers, IPs, and ports. This is used by the
1990 * to talk to one another.
1992 static void req_send_initnodetab_internal(const nodetab_process & destination, int count, int msgSize)
1994 ChMessageHeader hdr;
1995 ChMessageInt_t nNodes = ChMessageInt_new(count);
1996 ChMessageInt_t nodeno = ChMessageInt_new(destination.nodeno);
1997 ChMessageHeader_new("initnodetab", msgSize, &hdr);
1998 const SOCKET fd = destination.req_client;
1999 skt_sendN(fd, (const char *) &hdr, sizeof(hdr));
2000 skt_sendN(fd, (const char *) &nNodes, sizeof(nNodes));
2001 skt_sendN(fd, (const char *) &nodeno, sizeof(nodeno));
2002 for (const nodetab_process & p : my_process_table)
2003 skt_sendN(fd, (const char *) &p.info, sizeof(ChNodeinfo));
2006 static void req_send_initnodetab(const nodetab_process & destination)
2008 const int my_process_count = my_process_table.size();
2009 int msgSize = sizeof(ChMessageInt_t) * ChInitNodetabFields +
2010 sizeof(ChNodeinfo) * my_process_count;
2011 req_send_initnodetab_internal(destination, my_process_count, msgSize);
2015 /* Used for fault tolerance with hierarchical start */
2016 static int req_send_initnodetab1(SOCKET fd)
2018 const int my_process_count = my_process_table.size();
2019 ChMessageHeader hdr;
2020 ChMessageInt_t nNodes = ChMessageInt_new(my_process_count);
2021 ChMessageHeader_new("initnttab", sizeof(ChMessageInt_t) +
2022 sizeof(ChNodeinfo) * my_process_count,
2024 skt_sendN(fd, (const char *) &hdr, sizeof(hdr));
2025 skt_sendN(fd, (const char *) &nNodes, sizeof(nNodes));
2026 for (const nodetab_process & p : my_process_table)
2027 skt_sendN(fd, (const char *) p.info, sizeof(ChNodeinfo));
2031 /*Get the array of node numbers, IPs, and ports.
2032 This is used by the node-programs to talk to one another.
2034 static int parent_charmrun_fd = -1;
2035 static int req_handle_initnodedistribution(ChMessage *msg, const nodetab_process & p)
2037 const int nodetab_rank0_size = nodetab_rank0_table.size();
2039 nodes_per_child; /* rounding should help in better load distribution*/
2040 int rank0_start = nodetab_rank0_table[client * nodes_per_child];
2042 if (client == branchfactor - 1) {
2043 nodes_to_fork = nodetab_rank0_table.size() - client * nodes_per_child;
2044 rank0_finish = nodetab_rank0_size;
2047 nodetab_rank0_table[client * nodes_per_child + nodes_to_fork];
2049 ChMessageInt_t *nodemsg = (ChMessageInt_t *) malloc(
2050 (rank0_finish - rank0_start) * sizeof(ChMessageInt_t));
2051 for (int k = 0; k < rank0_finish - rank0_start; k++)
2052 nodemsg[k] = ChMessageInt_new(nodetab_rank0_table[rank0_start + k]);
2053 ChMessageHeader hdr;
2054 ChMessageInt_t nNodes = ChMessageInt_new(rank0_finish - rank0_start);
2055 ChMessageInt_t nTotalNodes = ChMessageInt_new(nodetab_rank0_size);
2056 ChMessageHeader_new("initnodetab",
2057 sizeof(ChMessageInt_t) * 2 +
2058 sizeof(ChMessageInt_t) * (rank0_finish - rank0_start),
2060 const SOCKET fd = p.charmrun_fds;
2061 skt_sendN(fd, (const char *) &hdr, sizeof(hdr));
2062 skt_sendN(fd, (const char *) &nNodes, sizeof(nNodes));
2063 skt_sendN(fd, (const char *) &nTotalNodes, sizeof(nTotalNodes));
2064 skt_sendN(fd, (const char *) nodemsg,
2065 (rank0_finish - rank0_start) * sizeof(ChMessageInt_t));
2070 static std::vector<ChSingleNodeinfo> myNodesInfo;
2071 static int send_myNodeInfo_to_parent()
2073 const int nodetab_rank0_size = nodetab_rank0_table.size();
2074 ChMessageHeader hdr;
2075 ChMessageInt_t nNodes = ChMessageInt_new(nodetab_rank0_size);
2076 ChMessageHeader_new("initnodetab",
2077 sizeof(ChMessageInt_t) +
2078 sizeof(ChSingleNodeinfo) * nodetab_rank0_size,
2080 skt_sendN(parent_charmrun_fd, (const char *) &hdr, sizeof(hdr));
2081 skt_sendN(parent_charmrun_fd, (const char *) &nNodes, sizeof(nNodes));
2082 skt_sendN(parent_charmrun_fd, (const char *) myNodesInfo.data(),
2083 sizeof(ChSingleNodeinfo) * myNodesInfo.size());
2087 static void forward_nodetab_to_children()
2089 /*it just needs to receive and copy the nodetab info if required and send it
2090 * as it is to its nodes */
2091 if (!skt_select1(parent_charmrun_fd, 1200 * 1000)) {
2095 ChMessage_recv(parent_charmrun_fd, &msg);
2097 ChMessageInt_t *nodelistmsg = (ChMessageInt_t *) msg.data;
2098 int nodetab_Nodes = ChMessageInt(nodelistmsg[0]);
2099 for (const nodetab_process & p : my_process_table)
2101 SOCKET fd = p.req_client;
2102 ChMessageHeader hdr;
2103 ChMessageInt_t nNodes = ChMessageInt_new(nodetab_Nodes);
2104 ChMessageHeader_new("initnodetab", sizeof(ChMessageInt_t) +
2105 sizeof(ChNodeinfo) * nodetab_Nodes,
2107 skt_sendN(fd, (const char *) &hdr, sizeof(hdr));
2108 skt_sendN(fd, (const char *) &nNodes, sizeof(nNodes));
2109 skt_sendN(fd, (const char *) (nodelistmsg + 1),
2110 sizeof(ChNodeinfo) * nodetab_Nodes);
2113 /*Parent Charmrun receives the nodetab from child and processes it. msg contain
2114 * array of ChSingleNodeInfo*/
2115 static void receive_nodeset_from_child(ChMessage *msg, SOCKET fd)
2117 ChMessageInt_t *n32 = (ChMessageInt_t *) msg->data;
2118 int numOfNodes = ChMessageInt(n32[0]);
2119 ChSingleNodeinfo *childNodeInfo = (ChSingleNodeinfo *) (n32 + 1);
2120 for (int k = 0; k < numOfNodes; k++)
2121 nodeinfo_add(childNodeInfo + k, my_process_table[childNodeInfo[k].nodeNo]);
2124 static void set_sockets_list(ChMessage *msg, SOCKET fd)
2126 ChMessageInt_t *n32 = (ChMessageInt_t *) msg->data;
2127 int node_start = ChMessageInt(n32[0]);
2129 charmrun_fds[node_start / nodes_per_child] = fd;
2132 /* Check this return code from "printf". */
2133 static void checkPrintfError(int err)
2136 static int warned = 0;
2138 perror("charmrun WARNING> error in printf");
2144 static int req_handle_print(ChMessage *msg, SOCKET fd)
2146 checkPrintfError(printf("%s", msg->data));
2147 checkPrintfError(fflush(stdout));
2148 write_stdio_duplicate(msg->data);
2152 static int req_handle_printerr(ChMessage *msg, SOCKET fd)
2154 fprintf(stderr, "%s", msg->data);
2156 write_stdio_duplicate(msg->data);
2160 static int req_handle_printsyn(ChMessage *msg, SOCKET fd)
2162 checkPrintfError(printf("%s", msg->data));
2163 checkPrintfError(fflush(stdout));
2164 write_stdio_duplicate(msg->data);
2166 if (arg_hierarchical_start)
2167 req_reply_child(fd, "printdone", "", 1);
2170 req_reply(fd, "printdone", "", 1);
2174 static int req_handle_printerrsyn(ChMessage *msg, SOCKET fd)
2176 fprintf(stderr, "%s", msg->data);
2178 write_stdio_duplicate(msg->data);
2180 if (arg_hierarchical_start)
2181 req_reply_child(fd, "printdone", "", 1);
2184 req_reply(fd, "printdone", "", 1);
2188 static int _exitcode = 0;
2190 static int req_handle_ending(ChMessage *msg, SOCKET fd)
2195 int exitcode = atoi(msg->data);
2197 _exitcode = exitcode;
2200 #if CMK_SHRINK_EXPAND
2201 // When using shrink-expand, only PE 0 will send an "ending" request.
2202 #elif (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2203 if (req_ending == my_process_table.size())
2205 if (req_ending == arg_requested_pes)
2208 #if CMK_SHRINK_EXPAND
2210 ChMessage_new("realloc_ack", 0, &ackmsg);
2211 for (const nodetab_process & p : my_process_table)
2212 ChMessage_send(p.req_client, &ackmsg);
2215 for (const nodetab_process & p : my_process_table)
2216 skt_close(p.req_client);
2218 printf("Charmrun> Graceful exit with exit code %d.\n", _exitcode);
2224 static int req_handle_barrier(ChMessage *msg, SOCKET fd)
2226 static int barrier_count = 0;
2227 static int barrier_phase = 0;
2230 if (barrier_count == arg_requested_pes)
2232 if (barrier_count == my_process_table.size())
2237 for (const nodetab_process & p : my_process_table)
2238 if (REQ_OK != req_reply(p.req_client, "barrier", "", 1)) {
2239 fprintf(stderr, "req_handle_barrier socket error: %d\n", p.nodeno);
2246 static int req_handle_barrier0(ChMessage *msg, SOCKET fd)
2248 static int count = 0;
2250 int pe = atoi(msg->data);
2255 if (count == arg_requested_pes)
2257 if (count == my_host_table.size())
2260 req_reply(fd0, "barrier0", "", 1); /* only send to node 0 */
2266 static void req_handle_abort(ChMessage *msg, SOCKET fd)
2268 /*fprintf(stderr,"req_handle_abort called \n");*/
2270 fprintf(stderr, "Aborting!\n");
2272 fprintf(stderr, "%s\n", msg->data);
2276 static int req_handle_scanf(ChMessage *msg, SOCKET fd)
2278 char *fmt = msg->data;
2279 fmt[msg->len - 1] = 0;
2280 char *res = input_scanf_chars(fmt);
2288 if (arg_hierarchical_start)
2289 req_reply_child(fd, "scanf-data", res, strlen(res) + 1);
2292 req_reply(fd, "scanf-data", res, strlen(res) + 1);
2297 #if CMK_SHRINK_EXPAND
2298 static int req_handle_realloc(ChMessage *msg, SOCKET fd)
2300 printf("Charmrun> Realloc request received\n");
2302 /* Exec to clear and restart everything, just preserve contents of
2304 int restart_idx = -1, newp_idx = -1, oldp_idx = -1, shrink_expand_idx= -1, charmrun_idx = -1;
2305 int additional_args = 10;
2306 for (int i = 0; i < saved_argc; ++i) {
2307 if (strcmp(saved_argv[i], "+restart") == 0) {
2309 additional_args -= 2;
2311 if(strcmp(saved_argv[i], "++newp") == 0)
2314 additional_args -= 2;
2316 if(strcmp(saved_argv[i], "++oldp") == 0)
2319 additional_args -= 2;
2321 if(strcmp(saved_argv[i], "++shrinkexpand") == 0)
2323 shrink_expand_idx = i;
2324 additional_args -= 1;
2326 if(strcmp(saved_argv[i], "++charmrun_port") == 0)
2329 additional_args -= 2;
2333 #if defined __APPLE__
2334 const char *dir = "/tmp";
2336 const char *dir = "/dev/shm";
2338 for (int i = 0; i < saved_argc; ++i) {
2339 if (strcmp(saved_argv[i], "+shrinkexpand_basedir") == 0) {
2340 dir = saved_argv[i+1];
2345 const char **ret = (const char **) malloc(sizeof(char *) * (saved_argc + additional_args));
2347 int newP = ChMessageInt(*(ChMessageInt_t *)msg->data);
2348 int oldP = arg_requested_pes;
2349 printf("Charmrun> newp = %d oldP = %d \n \n \n", newP, oldP);
2351 for (int i = 0; i < saved_argc; i++) {
2352 ret[i] = saved_argv[i];
2357 char sp_buffer[50]; // newP buffer
2358 sprintf(sp_buffer, "%d", newP);
2360 char sp_buffer1[50]; // oldP buffer
2361 sprintf(sp_buffer1, "%d", oldP);
2363 char sp_buffer2[6]; // charmrun port
2364 sprintf(sp_buffer2, "%d", server_port);
2366 /* Check that shrink expand parameters don't already exist */
2370 ret[saved_argc + index++] = "++newp";
2371 ret[saved_argc + index++] = sp_buffer;
2374 ret[newp_idx + 1] = sp_buffer;
2378 ret[saved_argc + index++] = "++oldp";
2379 ret[saved_argc + index++] = sp_buffer1;
2382 ret[oldp_idx + 1] = sp_buffer1;
2384 if(shrink_expand_idx == -1)
2386 ret[saved_argc + index++] = "++shrinkexpand";
2389 if(charmrun_idx == -1)
2391 ret[saved_argc + index++] = "++charmrun_port";
2392 ret[saved_argc + index++] = sp_buffer2;
2395 ret[charmrun_idx + 1] = sp_buffer2;
2397 if (restart_idx == -1) {
2398 ret[saved_argc + index++] = "+restart";
2399 ret[saved_argc + index++] = dir;
2400 ret[saved_argc + index++] = NULL;
2402 ret[restart_idx + 1] = dir;
2403 ret[saved_argc + index++] = NULL;
2407 ChMessage_new("realloc_ack", 0, &ackmsg);
2408 for (const nodetab_process & p : my_process_table)
2409 ChMessage_send(p.req_client, &ackmsg);
2411 skt_close(server_fd);
2412 skt_close(CcsServer_fd());
2413 execv(ret[0], (char **)ret);
2414 printf("Should not be here\n");
2422 static void restart_node(nodetab_process &);
2423 static void reconnect_crashed_client(nodetab_process &);
2424 static void announce_crash(const nodetab_process &);
2426 static const nodetab_process * _last_crash; /* last crashed process */
2428 static nodetab_process * _crash_charmrun_process; /* last restart socket */
2429 static int crashed_pe_id;
2430 static int restarted_pe_id;
2432 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2433 static int numCrashes = 0; /*number of crashes*/
2434 static SOCKET last_crashed_fd = -1;
2438 * @brief Handles an ACK after a crash. Once it has received all the pending
2439 * acks, it sends the nodetab
2440 * table to the crashed node.
2442 static int req_handle_crashack(ChMessage *msg, SOCKET fd)
2444 static int count = 0;
2447 if (arg_hierarchical_start) {
2448 if (count == nodetab_rank0_table.size() - 1) {
2449 /* only after everybody else update its nodetab, can this
2450 restarted process continue */
2451 PRINT(("Charmrun> continue node: %d\n", _last_crash->nodeno));
2452 req_send_initnodetab1(_crash_charmrun_process->req_client);
2453 _last_crash = nullptr;
2455 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2456 last_crashed_fd = -1;
2464 if (count == my_process_table.size() - 1) {
2465 // only after everybody else update its nodetab, can this restarted process
2467 PRINT(("Charmrun> continue node: %d\n", _last_crash->nodeno));
2468 req_send_initnodetab(*_last_crash);
2469 _last_crash = nullptr;
2471 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2472 last_crashed_fd = -1;
2479 /* send initnode to root*/
2480 static int set_crashed_socket_id(ChMessage *msg, SOCKET fd)
2482 ChSingleNodeinfo *nodeInfo = (ChSingleNodeinfo *) msg->data;
2483 int nt = ChMessageInt(nodeInfo->nodeNo) - mynodes_start; // TODO: relative nodeNo
2484 nodeInfo->nodeNo = ChMessageInt_new(nt);
2485 /* Required for CCS */
2486 /*Nodetable index for this node*/
2487 my_process_table[nt].req_client = fd; // TODO: nodeno as index
2491 /* Receives new dataport of restarted prcoess and resends nodetable to
2493 static int req_handle_crash(ChMessage *msg, nodetab_process & p)
2495 const SOCKET fd = p.req_client;
2497 ChMessageInt_t oldpe, newpe;
2498 skt_recvN(fd, (const char *) &oldpe, sizeof(oldpe));
2499 skt_recvN(fd, (const char *) &newpe, sizeof(newpe));
2500 *nodetab_table[ChMessageInt(oldpe)] = *nodetab_table[ChMessageInt(newpe)];
2502 int status = req_handle_initnode(msg, p);
2503 _crash_charmrun_process = &p;
2505 fprintf(stderr, "Root charmrun : Socket %d failed: %s\n", fd,
2506 _crash_charmrun_process->host->name);
2508 ChSingleNodeinfo *nodeInfo = (ChSingleNodeinfo *) msg->data;
2509 int crashed_node = ChMessageInt(nodeInfo->nodeNo);
2510 _last_crash = crashed_node;
2518 /* Already processed, so send*/
2519 for (const nodetab_process & p2 : my_process_table)
2520 req_send_initnodetab(p2);
2522 /*Anounce crash to all child charmruns*/
2530 static void error_in_req_serve_client(nodetab_process & p)
2532 const SOCKET fd = p.req_client;
2533 fprintf(stderr, "Socket %d failed \n", fd);
2536 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
2540 /** should also send a message to all the other processors telling them that
2541 * this guy has crashed*/
2542 /*announce_crash(p);*/
2545 fprintf(stderr, "charmrun says process %d failed (on host %s)\n", p.nodeno, p.host->name);
2546 /** after the crashed processor has been recreated
2547 it connects to charmrun. That data must now be filled
2548 into my_process_table and the nodetab_table*/
2550 reconnect_crashed_client(p);
2551 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2557 static int req_handler_dispatch(ChMessage *msg, nodetab_process & p)
2559 const int replyFd = p.req_client;
2560 char *cmd = msg->header.type;
2561 DEBUGF(("Got request '%s'\n", cmd, replyFd));
2562 #if CMK_CCS_AVAILABLE /* CCS *doesn't* want data yet, for faster forwarding */
2563 if (strcmp(cmd, "reply_fw") == 0)
2564 return req_ccs_reply_fw(msg, replyFd);
2567 /* grab request data */
2568 int recv_status = ChMessageData_recv(replyFd, msg);
2571 if (!arg_hierarchical_start)
2573 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2574 if (recv_status < 0) {
2575 if (replyFd == last_crashed_fd) {
2578 DEBUGF(("recv_status %d on socket %d \n", recv_status, replyFd));
2579 error_in_req_serve_client(p);
2582 if (recv_status < 0) {
2583 error_in_req_serve_client(p);
2589 if (strcmp(cmd, "ping") == 0)
2591 else if (strcmp(cmd, "print") == 0)
2592 return req_handle_print(msg, replyFd);
2593 else if (strcmp(cmd, "printerr") == 0)
2594 return req_handle_printerr(msg, replyFd);
2595 else if (strcmp(cmd, "printsyn") == 0)
2596 return req_handle_printsyn(msg, replyFd);
2597 else if (strcmp(cmd, "printerrsyn") == 0)
2598 return req_handle_printerrsyn(msg, replyFd);
2599 else if (strcmp(cmd, "scanf") == 0)
2600 return req_handle_scanf(msg, replyFd);
2601 else if (strcmp(cmd, "barrier") == 0)
2602 return req_handle_barrier(msg, replyFd);
2603 else if (strcmp(cmd, "barrier0") == 0)
2604 return req_handle_barrier0(msg, replyFd);
2605 else if (strcmp(cmd, "ending") == 0)
2606 return req_handle_ending(msg, replyFd);
2607 else if (strcmp(cmd, "abort") == 0) {
2608 req_handle_abort(msg, replyFd);
2612 else if (strcmp(cmd, "crash_ack") == 0)
2613 return req_handle_crashack(msg, replyFd);
2615 else if (strcmp(cmd, "initnode") == 0)
2616 return req_handle_crash(msg, p);
2619 #if CMK_SHRINK_EXPAND
2620 else if (strcmp(cmd, "realloc") == 0)
2621 return req_handle_realloc(msg, replyFd);
2625 fprintf(stderr, "Charmrun> Bad control socket request '%s'\n", cmd);
2633 static void req_serve_client(nodetab_process & p)
2635 DEBUGF(("Getting message from client...\n"));
2638 int recv_status = ChMessageHeader_recv(p.req_client, &msg);
2641 if (!arg_hierarchical_start && recv_status < 0)
2642 error_in_req_serve_client(p);
2644 if (recv_status < 0) {
2645 error_in_req_serve_client(p);
2651 DEBUGF(("Message is '%s'\n", msg.header.type));
2652 int status = req_handler_dispatch(&msg, p);
2657 fprintf(stderr, "Charmrun> Error processing control socket request %s\n",
2662 ChMessage_free(&msg);
2666 static void req_forward_root(nodetab_process & p)
2668 const SOCKET fd = p.req_client;
2670 int recv_status = ChMessage_recv(fd, &msg);
2672 char *cmd = msg.header.type;
2675 if (recv_status < 0) {
2676 error_in_req_serve_client(p);
2680 /*called from reconnect_crashed_client */
2681 if (strcmp(cmd, "initnode") == 0) {
2682 set_crashed_socket_id(&msg, fd);
2686 int status = REQ_OK;
2687 if (strcmp(cmd, "ping") != 0) {
2688 status = req_reply(parent_charmrun_fd, cmd, msg.data,
2689 ChMessageInt(msg.header.len));
2691 if (strcmp(cmd, "scanf") == 0 || strcmp(cmd, "printsyn") == 0 ||
2692 strcmp(cmd, "printerrsyn") == 0)
2693 skt_sendN(parent_charmrun_fd, (const char *) &fd, sizeof(fd));
2696 if (strcmp(cmd, "initnode") == 0) {
2697 ChMessageInt_t oldpe = ChMessageInt_new(crashed_pe_id);
2698 ChMessageInt_t newpe = ChMessageInt_new(restarted_pe_id);
2699 skt_sendN(parent_charmrun_fd, (const char *) &oldpe, sizeof(oldpe));
2700 skt_sendN(parent_charmrun_fd, (const char *) &newpe, sizeof(newpe));
2712 ChMessage_free(&msg);
2715 static void req_forward_client()
2718 int recv_status = ChMessage_recv(parent_charmrun_fd, &msg);
2719 if (recv_status < 0) {
2721 for (const nodetab_process & p : my_process_table)
2722 skt_close(p.req_client);
2726 char *cmd = msg.header.type;
2728 if (strcmp(cmd, "barrier") == 0) {
2729 for (const nodetab_process & p : my_process_table)
2730 if (REQ_OK != req_reply(p.req_client, cmd, msg.data,
2731 ChMessageInt(msg.header.len))) {
2737 if (strcmp(cmd, "initnodetab") == 0) {
2738 if (_last_crash == nullptr)
2739 current_restart_phase++;
2741 for (const nodetab_process & p : my_process_table)
2742 if (_last_crash == nullptr)
2743 if (REQ_OK != req_reply(p.req_client, cmd, msg.data,
2744 ChMessageInt(msg.header.len))) {
2750 if (strcmp(cmd, "crashnode") == 0) {
2751 for (const nodetab_process & p : my_process_table)
2752 if (_last_crash == nullptr)
2753 if (REQ_OK != req_reply(p.req_client, cmd, msg.data,
2754 ChMessageInt(msg.header.len))) {
2759 if (strcmp(cmd, "initnttab") == 0) {
2760 _last_crash = nullptr;
2761 if (REQ_OK != req_reply(_last_crash->req_client, "initnodetab",
2762 msg.data, ChMessageInt(msg.header.len))) {
2772 /* CCS forward request */
2773 if (strcmp(cmd, "req_fw") == 0) {
2774 CcsImplHeader *hdr = (CcsImplHeader *) msg.data;
2775 int pe = ChMessageInt(hdr->pe);
2776 fd = nodetab_table[pe]->ctrlfd;
2777 } else if (strcmp(cmd, "barrier0") == 0) {
2778 fd = nodetab_table[0]->ctrlfd;
2780 skt_recvN(parent_charmrun_fd, (char *) &fd, sizeof(SOCKET));
2782 int status = req_reply(fd, cmd, msg.data, ChMessageInt(msg.header.len));
2791 ChMessage_free(&msg);
2796 static int ignore_socket_errors(SOCKET skt, int c, const char *m)
2797 { /*Abandon on further socket errors during error shutdown*/
2805 static nodetab_process & get_process_for_socket(std::vector<nodetab_process> & process_table, SOCKET req_client)
2807 nodetab_process * ptr = nullptr;
2808 for (nodetab_process & p : process_table)
2810 if (p.req_client == req_client)
2818 fprintf(stderr, "Charmrun> get_process_for_socket: unknown socket\n");
2822 nodetab_process & p = *ptr;
2826 /*A socket went bad somewhere! Immediately disconnect,
2827 which kills everybody.
2829 static int socket_error_in_poll(SOCKET skt, int code, const char *msg)
2831 /*commenting it for fault tolerance*/
2833 skt_set_abort(ignore_socket_errors);
2836 const nodetab_process & p = get_process_for_socket(my_process_table, skt);
2837 fprintf(stderr, "Charmrun> error on request socket to node %d '%s'--\n"
2839 p.nodeno, p.host->name, msg);
2843 for (const nodetab_process & p : my_process_table)
2844 skt_close(p.req_client);
2847 ftTimer = GetClock();
2851 #if CMK_USE_POLL /*poll() version*/
2852 #define CMK_PIPE_DECL(maxn, delayMs) \
2853 static struct pollfd *fds = NULL; \
2855 int *nFds = &nFds_sto; \
2856 int pollDelayMs = delayMs; \
2858 fds = (struct pollfd *) malloc((maxn) * sizeof(struct pollfd));
2859 #define CMK_PIPE_SUB fds, nFds
2860 #define CMK_PIPE_CALL() \
2861 poll(fds, *nFds, pollDelayMs); \
2864 #define CMK_PIPE_PARAM struct pollfd *fds, int *nFds
2865 #define CMK_PIPE_ADDREAD(rd_fd) \
2867 fds[*nFds].fd = rd_fd; \
2868 fds[*nFds].events = POLLIN; \
2871 #define CMK_PIPE_ADDWRITE(wr_fd) \
2873 fds[*nFds].fd = wr_fd; \
2874 fds[*nFds].events = POLLOUT; \
2877 #define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents &POLLIN
2878 #define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents &POLLOUT
2880 #else /*select() version*/
2882 #define CMK_PIPE_DECL(maxn, delayMs) \
2883 fd_set rfds_sto, wfds_sto; \
2885 fd_set *rfds = &rfds_sto, *wfds = &wfds_sto; \
2886 struct timeval tmo; \
2889 tmo.tv_sec = delayMs / 1000; \
2890 tmo.tv_usec = 1000 * (delayMs % 1000);
2891 #define CMK_PIPE_SUB rfds, wfds
2892 #define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, 0, 0, &tmo)
2894 #define CMK_PIPE_PARAM fd_set *rfds, fd_set *wfds
2895 #define CMK_PIPE_ADDREAD(rd_fd) \
2897 assert(nFds < FD_SETSIZE); \
2898 FD_SET(rd_fd, rfds); \
2901 #define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd, wfds)
2902 #define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd, rfds)
2903 #define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd, wfds)
2907 Wait for incoming requests on all client sockets,
2908 and the CCS socket (if present).
2910 static void req_poll()
2912 CMK_PIPE_DECL(my_process_table.size() + 5, 1000);
2913 for (const nodetab_process & p : my_process_table)
2914 CMK_PIPE_ADDREAD(p.req_client);
2915 if (CcsServer_fd() != INVALID_SOCKET)
2916 CMK_PIPE_ADDREAD(CcsServer_fd());
2917 if (arg_charmdebug) {
2918 CMK_PIPE_ADDREAD(0);
2919 CMK_PIPE_ADDREAD(gdb_info_std[1]);
2920 CMK_PIPE_ADDREAD(gdb_info_std[2]);
2923 skt_set_abort(socket_error_in_poll);
2925 DEBUGF(("Req_poll: Calling select...\n"));
2926 int status = CMK_PIPE_CALL();
2927 DEBUGF(("Req_poll: Select returned %d...\n", status));
2930 return; /*Nothing to do-- timeout*/
2933 if (errno == EINTR || errno == EAGAIN)
2937 socket_error_in_poll(-1, 1359, "Node program terminated unexpectedly!\n");
2939 for (nodetab_process & p : my_process_table)
2941 const SOCKET req_client = p.req_client;
2942 if (CMK_PIPE_CHECKREAD(req_client)) {
2943 int readcount = 10; /*number of successive reads we serve per socket*/
2944 /*This client is ready to read*/
2946 req_serve_client(p);
2948 } while (1 == skt_select1(req_client, 0) && readcount > 0);
2952 if (CcsServer_fd() != INVALID_SOCKET)
2953 if (CMK_PIPE_CHECKREAD(CcsServer_fd())) {
2954 DEBUGF(("Activity on CCS server port...\n"));
2958 if (arg_charmdebug) {
2960 if (CMK_PIPE_CHECKREAD(0)) {
2961 int indata = read(0, buf, 5);
2964 fprintf(stderr, "Error reading command (%s)\n", buf);
2965 if (strncmp(buf, "info:", 5) == 0) {
2966 /* Found info command, forward data to gdb info program */
2969 // printf("Command to be forwarded\n");
2970 while (read(0, &c, 1) != -1) {
2972 if (c == '\n' || num >= 2045) {
2973 if (write(gdb_info_std[0], buf, num) != num) {
2974 fprintf(stderr, "charmrun> writing info command to gdb failed!\n");
2982 // printf("Command from charmdebug: %d(%s)\n",indata,buf);
2984 /* All streams from gdb are forwarded to the stderr stream through the FILE
2985 gdb_stream which has been duplicated from stderr */
2986 /* NOTE: gdb_info_std[2] must be flushed before gdb_info_std[1] because the
2987 latter contains the string "(gdb) " ending the synchronization. Also the
2988 std[1] should be read with the else statement. It will not work without.
2990 if (CMK_PIPE_CHECKREAD(gdb_info_std[2])) {
2991 int indata = read(gdb_info_std[2], buf, 100);
2992 /*printf("read data from gdb info stderr %d\n",indata);*/
2995 // printf("printing %s\n",buf);
2997 // fprintf(gdb_stream,"%s",buf);
3000 } else if (CMK_PIPE_CHECKREAD(gdb_info_std[1])) {
3001 int indata = read(gdb_info_std[1], buf, 100);
3002 /*printf("read data from gdb info stdout %d\n",indata);*/
3005 // printf("printing %s\n",buf);
3007 fprintf(gdb_stream, "%s", buf);
3015 static void req_poll_hierarchical()
3017 skt_set_abort(socket_error_in_poll);
3023 FD_ZERO(&rfds); /* clears set of file descriptor */
3024 for (const nodetab_process & p : my_process_table)
3025 FD_SET(p.req_client, &rfds); /* adds client sockets to rfds set*/
3026 if (CcsServer_fd() != INVALID_SOCKET)
3027 FD_SET(CcsServer_fd(), &rfds);
3028 if (arg_charmdebug) {
3030 FD_SET(gdb_info_std[1], &rfds);
3031 FD_SET(gdb_info_std[2], &rfds);
3034 if (arg_child_charmrun)
3035 FD_SET(parent_charmrun_fd, &rfds); /* adds client sockets to rfds set*/
3036 DEBUGF(("Req_poll: Calling select...\n"));
3037 int status = select(FD_SETSIZE, &rfds, 0, 0,
3038 &tmo); /* FD_SETSIZE is the maximum number of file
3039 descriptors that a fd_set object can hold
3040 information about, select returns number of
3042 DEBUGF(("Req_poll: Select returned %d...\n", status));
3045 return; /*Nothing to do-- timeout*/
3049 socket_error_in_poll(req_clients[0], 1359, "Node program terminated unexpectedly!\n");
3051 for (nodetab_process & p : my_process_table)
3053 const SOCKET req_client = p.req_client;
3054 if (FD_ISSET(req_client, &rfds)) {
3055 int readcount = 10; /*number of successive reads we serve per socket*/
3056 /*This client is ready to read*/
3058 if (arg_child_charmrun)
3059 req_forward_root(p);
3061 req_serve_client(p);
3063 } while (1 == skt_select1(req_client, 0) && readcount > 0);
3066 if (arg_child_charmrun)
3067 // Forward from root to clients
3068 if (FD_ISSET(parent_charmrun_fd, &rfds)) {
3069 int readcount = 10; /*number of successive reads we serve per socket*/
3071 req_forward_client();
3073 } while (1 == skt_select1(parent_charmrun_fd, 0) && readcount > 0);
3076 /*Wait to receive responses and Forward responses */
3077 if (CcsServer_fd() != INVALID_SOCKET)
3078 if (FD_ISSET(CcsServer_fd(), &rfds)) {
3079 DEBUGF(("Activity on CCS server port...\n"));
3083 if (arg_charmdebug) {
3085 if (FD_ISSET(0, &rfds)) {
3086 int indata = read(0, buf, 5);
3089 fprintf(stderr, "Error reading command (%s)\n", buf);
3090 if (strncmp(buf, "info:", 5) == 0) {
3091 /* Found info command, forward data to gdb info program */
3094 // printf("Command to be forwarded\n");
3095 while (read(0, &c, 1) != -1) {
3097 if (c == '\n' || num >= 2045) {
3098 if (write(gdb_info_std[0], buf, num) != num) {
3099 fprintf(stderr, "charmrun> writing info command to gdb failed!\n");
3107 // printf("Command from charmdebug: %d(%s)\n",indata,buf);
3109 /* All streams from gdb are forwarded to the stderr stream through the FILE
3110 gdb_stream which has been duplicated from stderr */
3111 /* NOTE: gdb_info_std[2] must be flushed before gdb_info_std[1] because the
3112 latter contains the string "(gdb) " ending the synchronization. Also the
3113 std[1] should be read with the else statement. It will not work without.
3115 if (FD_ISSET(gdb_info_std[2], &rfds)) {
3116 int indata = read(gdb_info_std[2], buf, 100);
3117 /*printf("read data from gdb info stderr %d\n",indata);*/
3120 // printf("printing %s\n",buf);
3122 // fprintf(gdb_stream,"%s",buf);
3125 } else if (FD_ISSET(gdb_info_std[1], &rfds)) {
3126 int indata = read(gdb_info_std[1], buf, 100);
3127 /*printf("read data from gdb info stdout %d\n",indata);*/
3130 // printf("printing %s\n",buf);
3132 fprintf(gdb_stream, "%s", buf);
3141 static skt_ip_t parent_charmrun_IP;
3142 static int parent_charmrun_port;
3143 static int parent_charmrun_pid;
3144 static unsigned int dataport;
3145 static SOCKET dataskt;
3146 static int charmrun_phase = 0;
3149 static int client_connect_problem(const nodetab_process & p, const char *msg)
3150 { /*Called when something goes wrong during a client connect*/
3151 fprintf(stderr, "Charmrun> error attaching to node '%s':\n%s\n", p.host->name, msg);
3156 static int client_connect_problem_skt(SOCKET skt, int code, const char *msg)
3157 { /* Passed to skt_set_abort */
3158 const nodetab_process & p = get_process_for_socket(my_process_table, skt);
3159 return client_connect_problem(p, msg);
3162 /** return 1 if connection is opened succesfully with client**/
3163 static SOCKET errorcheck_one_client_connect(void)
3165 static int numClientsConnected = 0;
3166 unsigned int clientPort; /*These are actually ignored*/
3169 printf("Charmrun> Waiting for %d-th client to connect.\n", numClientsConnected);
3171 if (0 == skt_select1(server_fd, arg_timeout * 1000))
3173 fprintf(stderr, "Charmrun> Timeout waiting for node-program to connect\n");
3177 const SOCKET req_client = skt_accept(server_fd, &clientIP, &clientPort);
3179 /* FIXME: will this ever be triggered? It seems the skt_abort handler here is
3180 * 'client_connect_problem', which calls exit(1), so we'd exit
3182 if (req_client == SOCKET_ERROR)
3184 fprintf(stderr, "Charmrun> Failure in node accept\n");
3189 fprintf(stderr, "Charmrun> Warning: errorcheck_one_client_connect: socket < 0\n");
3192 skt_tcp_no_nagle(req_client);
3194 ++numClientsConnected;
3199 static nodetab_process & get_process_for_nodeno(std::vector<nodetab_process> & process_table, int nodeno)
3201 nodetab_process * ptr = nullptr;
3202 for (nodetab_process & p : process_table)
3204 if (p.nodeno == nodeno)
3212 fprintf(stderr, "Charmrun> get_process_for_nodeno: unknown nodeno %d\n", nodeno);
3216 nodetab_process & p = *ptr;
3217 assert(p.nodeno == nodeno);
3225 read_initnode_one_client(nodetab_process & p)
3227 if (!skt_select1(p.req_client, arg_timeout * 1000))
3228 client_connect_problem(p, "Timeout on IP request");
3231 ChMessage_recv(p.req_client, &msg);
3232 req_handle_initnode(&msg, p);
3233 ChMessage_free(&msg);
3236 #if CMK_IBVERBS_FAST_START
3237 static void req_one_client_partinit_skt(std::vector<nodetab_process> & process_table, const SOCKET req_client)
3239 if (!skt_select1(req_client, arg_timeout * 1000))
3241 fprintf(stderr, "Charmrun> Timeout on partial init request, socket %d\n", req_client);
3245 ChMessage partStartMsg;
3246 ChMessage_recv(req_client, &partStartMsg);
3247 assert(strncmp(partStartMsg.header.type, "partinit", 8) == 0);
3248 int nodeNo = ChMessageInt(*(ChMessageInt_t *) partStartMsg.data);
3249 ChMessage_free(&partStartMsg);
3251 nodetab_process & p = get_process_for_nodeno(process_table, nodeNo);
3252 p.req_client = req_client;
3255 static void req_one_client_partinit(std::vector<nodetab_process> & process_table, int index)
3258 if (arg_hierarchical_start && !arg_child_charmrun && charmrun_phase == 1)
3260 nodetab_process & p = process_table[index];
3261 req_one_client_partinit_skt(process_table, p.req_client);
3266 const SOCKET req_client = errorcheck_one_client_connect();
3267 req_one_client_partinit_skt(process_table, req_client);
3273 /* To keep a global node numbering */
3274 static void add_singlenodeinfo_to_mynodeinfo(ChMessage *msg, SOCKET ctrlfd)
3276 /*add to myNodesInfo */
3277 ChSingleNodeinfo *nodeInfo = (ChSingleNodeinfo *) msg->data;
3279 /* need to change nodeNo */
3280 ChMessageInt_t nodeNo = ChMessageInt_new(
3281 nodetab_rank0_table[ChMessageInt(nodeInfo->nodeNo) - mynodes_start]);
3282 myNodesInfo.push_back({nodeNo, nodeInfo->info});
3284 /* Required for CCS */
3285 int nt = ChMessageInt(nodeInfo->nodeNo) - mynodes_start; // TODO: relative nodeNo
3286 nodeInfo->nodeNo = ChMessageInt_new(nt);
3287 my_process_table[nt].req_client = ctrlfd; // TODO: nodeno as index
3291 static void req_set_client_connect(std::vector<nodetab_process> & process_table, int start, int end)
3293 int curclientend, curclientstart = start;
3295 std::queue<SOCKET> open_sockets;
3298 #if CMK_USE_IBVERBS && !CMK_IBVERBS_FAST_START
3300 if (!(arg_hierarchical_start && !arg_child_charmrun && charmrun_phase == 1))
3303 for (int i = start; i < end; i++)
3304 open_sockets.push(errorcheck_one_client_connect());
3308 curclientend = start;
3312 while (finished < end - start)
3314 /* check server socket for messages */
3315 #if !CMK_USE_IBVERBS || CMK_IBVERBS_FAST_START
3316 while (curclientstart == curclientend || skt_select1(server_fd, 1) != 0) {
3318 if (!(arg_hierarchical_start && !arg_child_charmrun && charmrun_phase == 1))
3320 open_sockets.push(errorcheck_one_client_connect());
3325 /* check appropriate clients for messages */
3326 while (!open_sockets.empty())
3328 const SOCKET req_client = open_sockets.front();
3331 if (skt_select1(req_client, 1) != 0)
3333 ChMessage_recv(req_client, &msg);
3335 int nodeNo = ChMessageInt(((ChSingleNodeinfo *)msg.data)->nodeNo);
3336 nodetab_process & p = get_process_for_nodeno(process_table, nodeNo);
3337 p.req_client = req_client;
3340 if (arg_hierarchical_start)
3342 if (!arg_child_charmrun) {
3343 if (charmrun_phase == 1)
3344 receive_nodeset_from_child(&msg, req_client);
3346 set_sockets_list(&msg, req_client);
3347 // here we need to decide based upon the phase
3348 } else /* hier-start with 2nd leval*/
3349 add_singlenodeinfo_to_mynodeinfo(&msg, req_client);
3353 req_handle_initnode(&msg, p);
3359 open_sockets.push(req_client);
3364 ChMessage_free(&msg);
3367 static void send_clients_nodeinfo()
3369 const int my_process_count = my_process_table.size();
3370 int msgSize = sizeof(ChMessageInt_t) * ChInitNodetabFields +
3371 sizeof(ChNodeinfo) * my_process_count;
3373 for (const nodetab_process & p : my_process_table)
3375 const SOCKET fd = p.req_client;
3376 req_send_initnodetab_internal(p, my_process_count, msgSize);
3380 #if CMK_USE_IBVERBS || CMK_USE_IBUD
3381 static void receive_qplist()
3383 #if CMK_USE_IBVERBS && !CMK_IBVERBS_FAST_START
3384 /* a barrier to make sure infiniband device gets initialized */
3385 if (my_process_table.size() > 1)
3388 for (const nodetab_process & p : my_process_table)
3390 ChMessage_recv(p.req_client, &msg);
3391 ChMessage_free(&msg);
3393 for (const nodetab_process & p : my_process_table)
3394 req_reply(p.req_client, "barrier", "", 1);
3398 for (nodetab_process & p : my_process_table)
3400 const SOCKET fd = p.req_client;
3401 if (!skt_select1(p.req_client, arg_timeout * 1000))
3402 client_connect_problem(p, "Timeout on IP request");
3405 ChMessage_recv(p.req_client, &msg);
3407 req_handle_qplist(&msg, p);
3409 ChMessage_free(&msg);
3415 /* Each node has sent the qpn data for all the qpns it has created
3416 This data needs to be sent to all the other nodes
3417 This needs to be done for all nodes
3419 static void exchange_qpdata_clients()
3421 const int my_process_count = my_process_table.size();
3423 for (nodetab_process & p : my_process_table)
3425 (ChInfiAddr *) malloc(sizeof(ChInfiAddr) * my_process_count);
3427 for (nodetab_process & p1 : my_process_table)
3429 const int proc = p1.nodeno;
3431 for (nodetab_process & p2 : my_process_table)
3435 ChInfiAddr & ia = p2.qpData[proc] = p1.qpList[count];
3436 ia.nodeno = ChMessageInt_new(proc);
3437 // printf("Charmrun> nt %d proc %d lid 0x%x qpn
3440 // 0x%x\n",nt,proc,ChMessageInt(p2.qpData[proc].lid),ChMessageInt(p2.qpData[proc].qpn),ChMessageInt(p2.qpData[proc].psn));
3445 p1.qpList = nullptr;
3450 #if CMK_USE_IBVERBS || CMK_USE_IBUD
3451 static void send_clients_qpdata()
3453 const int my_process_count = my_process_table.size();
3454 int qpDataSize = sizeof(ChInfiAddr) * my_process_count;
3456 for (const nodetab_process & p : my_process_table)
3458 const SOCKET fd = p.req_client;
3459 ChMessageHeader hdr;
3460 ChMessageHeader_new("qpdata", qpDataSize, &hdr);
3461 skt_sendN(fd, (const char *) &hdr, sizeof(hdr));
3463 skt_sendN(fd, (const char *) p.qpData, qpDataSize);
3465 for (const nodetab_process & p2 : my_process_table)
3466 skt_sendN(fd, (const char *) &p2.qp, sizeof(ChInfiAddr));
3472 static struct timeval tim;
3473 #define getthetime(x) \
3474 gettimeofday(&tim, NULL); \
3475 x = tim.tv_sec + (tim.tv_usec / 1000000.0);
3476 #define getthetime1(x) \
3477 gettimeofday(&tim, NULL); \
3480 static void req_add_phase2_processes(std::vector<nodetab_process> &);
3481 static void req_all_clients_connected();
3483 /*Wait for all the clients to connect to our server port*/
3484 static void req_client_connect_table(std::vector<nodetab_process> & process_table)
3486 #if CMK_IBVERBS_FAST_START
3487 for (int c = 0, c_end = process_table.size(); c < c_end; ++c)
3488 req_one_client_partinit(process_table, c);
3489 for (nodetab_process & p : process_table)
3490 read_initnode_one_client(p);
3492 req_set_client_connect(process_table, 0, process_table.size());
3496 static int get_old_style_process_count()
3498 const int p = arg_requested_pes;
3499 const int np = arg_requested_nodes;
3500 const int ppn = arg_ppn;
3502 const bool p_active = (p > 0);
3503 const bool np_active = (np > 0);
3504 const bool ppn_active = (ppn > 0);
3509 return ppn_active ? (p + ppn - 1) / ppn : p;
3514 static int calculated_processes_per_host;
3516 static void req_construct_phase2_processes(std::vector<nodetab_process> & phase2_processes)
3518 const int active_host_count = my_process_table.size(); // phase1_process_count
3520 int total_processes;
3522 if (proc_per.active())
3524 const nodetab_process & p0 = my_process_table[0];
3526 for (nodetab_process & p : my_process_table)
3528 if (p.num_pus != p0.num_pus ||
3529 p.num_cores != p0.num_cores ||
3530 p.num_sockets != p0.num_sockets)
3532 fprintf(stderr, "Charmrun> Error: Detected system topology is heterogeneous, please use old-style launch options.\n");
3537 using Unit = typename TopologyRequest::Unit;
3540 const Unit proc_unit = proc_per.unit();
3544 num_processes = proc_per.host;
3547 num_processes = proc_per.socket * p0.num_sockets;
3550 num_processes = proc_per.core * p0.num_cores;
3553 num_processes = proc_per.pu * p0.num_pus;
3560 calculated_processes_per_host = num_processes;
3561 total_processes = arg_requested_nodes <= 0 ? num_processes * active_host_count : arg_requested_nodes;
3565 total_processes = get_old_style_process_count();
3566 calculated_processes_per_host = (total_processes + active_host_count - 1) / active_host_count;
3569 const int num_new_processes = total_processes - active_host_count;
3570 const int new_processes_per_host = (num_new_processes + active_host_count - 1) / active_host_count;
3572 for (nodetab_process & p : my_process_table)
3574 p.forkstart = active_host_count + p.nodeno * new_processes_per_host;
3575 p.host->processes = 1;
3578 for (int i = 0; i < num_new_processes; ++i)
3580 nodetab_process & src = my_process_table[i % active_host_count];
3581 phase2_processes.push_back(src);
3583 nodetab_process & p = phase2_processes.back();
3584 p.nodeno = src.forkstart + (src.host->processes++ - 1);
3588 static void start_nodes_local(std::vector<nodetab_process> &);
3589 static void start_nodes_ssh(std::vector<nodetab_process> &);
3590 static void finish_nodes(std::vector<nodetab_process> &);
3592 static void req_client_connect(void)
3594 skt_set_abort(client_connect_problem_skt);
3598 req_construct_phase2_processes(my_process_table);
3599 req_client_connect_table(my_process_table);
3600 req_all_clients_connected();
3604 req_client_connect_table(my_process_table);
3606 std::vector<nodetab_process> phase2_processes;
3607 req_construct_phase2_processes(phase2_processes);
3609 if (phase2_processes.size() > 0)
3611 if (!arg_scalable_start)
3615 #if CMK_SHRINK_EXPAND
3616 if (!arg_shrinkexpand || (arg_requested_pes > arg_old_pes))
3619 assert(!arg_mpiexec);
3620 start_nodes_ssh(phase2_processes);
3623 finish_nodes(phase2_processes);
3628 start_nodes_local(phase2_processes);
3634 for (const nodetab_process & p : my_process_table)
3636 int numforks = p.host->processes - 1;
3640 ChMessageHeader hdr;
3641 ChMessageInt_t mydata[ChInitNodeforkFields] =
3643 ChMessageInt_new(numforks),
3644 ChMessageInt_new(p.forkstart),
3646 ChMessageHeader_new("nodefork", sizeof(mydata), &hdr);
3647 skt_sendN(p.req_client, (const char *) &hdr, sizeof(hdr));
3648 skt_sendN(p.req_client, (const char *) mydata, sizeof(mydata));
3652 req_client_connect_table(phase2_processes);
3655 req_add_phase2_processes(phase2_processes);
3656 req_all_clients_connected();
3659 static void req_add_phase2_processes(std::vector<nodetab_process> & phase2_processes)
3661 // add phase-two processes to main table
3662 my_process_table.insert(my_process_table.end(), phase2_processes.begin(), phase2_processes.end());
3665 static void req_all_clients_connected()
3670 printf("Charmrun> All clients connected.\n");
3675 if (onewth_per.active())
3677 using Unit = typename TopologyRequest::Unit;
3679 const nodetab_process & p0 = my_process_table[0];
3681 int threads_per_host;
3682 const Unit onewth_unit = onewth_per.unit();
3683 switch (onewth_unit)
3686 threads_per_host = p0.num_sockets;
3689 threads_per_host = p0.num_cores;
3692 threads_per_host = p0.num_pus;
3697 threads_per_host = 1;
3701 // account for comm thread, except when space is unavailable
3702 // assumes that proc_per.xyz == 1, which is enforced in this situation during arg checking
3703 if (threads_per_host > calculated_processes_per_host && threads_per_host + calculated_processes_per_host > p0.num_pus)
3704 threads_per_host -= calculated_processes_per_host;
3706 if (threads_per_host == 0)
3707 threads_per_host = 1;
3709 if (threads_per_host < calculated_processes_per_host || threads_per_host % calculated_processes_per_host != 0)
3711 fprintf(stderr, "Charmrun> Error: Invalid request for %d PEs among %d processes per host.\n",
3712 threads_per_host, calculated_processes_per_host);
3713 kill_all_compute_nodes("Invalid provisioning request");
3717 ppn = threads_per_host / calculated_processes_per_host;
3723 else if (arg_requested_pes > 0 && arg_requested_nodes > 0)
3724 ppn = arg_requested_pes / arg_requested_nodes;
3728 // sort them so that node number locality implies physical locality
3729 std::stable_sort(my_process_table.begin(), my_process_table.end());
3732 for (nodetab_process & p : my_process_table)
3734 // assign new numbering
3737 // inform the node of any SMP threads to spawn
3740 // record each PE's process for our own purposes
3741 for (int j = 0; j < ppn; ++j)
3742 pe_to_process_map.push_back(&p);
3745 for (nodetab_process & p : my_process_table)
3746 nodeinfo_populate(p);
3749 if (arg_hierarchical_start) {
3750 /* first we need to send data to parent charmrun and then send the nodeinfo
3752 send_myNodeInfo_to_parent();
3753 /*then receive from root */
3754 forward_nodetab_to_children();
3760 send_clients_nodeinfo();
3761 #if CMK_USE_IBVERBS || CMK_USE_IBUD
3764 exchange_qpdata_clients();
3766 send_clients_qpdata();
3771 printf("Charmrun> IP tables sent.\n");
3774 /*Wait for all the clients to connect to our server port, then collect and send
3775 * nodetable to all */
3777 static void req_charmrun_connect(void)
3779 // double t1, t2, t3, t4;
3781 skt_set_abort(client_connect_problem_skt);
3783 #if CMK_IBVERBS_FAST_START
3784 for (int c = 0, c_end = my_process_table.size(); c < c_end; ++c)
3785 req_one_client_partinit(my_process_table, c);
3786 for (nodetab_process & p : my_process_table)
3787 read_initnode_one_client(p);
3789 // if(!arg_child_charmrun) getthetime(t1);
3791 req_set_client_connect(my_process_table, 0, my_process_table.size());
3792 // if(!arg_child_charmrun) getthetime(t2); /* also need to process
3793 // received nodesets JIT */
3796 // TODO: two-phase, if applicable (?)
3801 printf("Charmrun> All clients connected.\n");
3803 // TODO: nodeinfo_populate?
3805 #if CMK_USE_IBVERBS || CMK_USE_IBUD
3806 send_clients_nodeinfo();
3809 exchange_qpdata_clients();
3811 send_clients_qpdata();
3813 for (const nodetab_process & p : my_process_table)
3814 // add flag to check what level charmrun it is and what phase
3815 req_handle_initnodedistribution(NULL, p);
3819 /* Now receive the nodetab from child charmruns*/
3822 skt_set_abort(client_connect_problem_skt);
3824 req_set_client_connect(my_process_table, 0, my_process_table.size());
3826 send_clients_nodeinfo();
3828 // if(!arg_child_charmrun) getthetime(t4);
3831 printf("Charmrun> IP tables sent.\n");
3832 // if(!arg_child_charmrun) printf("Time for charmruns connect= %f , sending
3833 // nodes to fire= %f, node clients connected= %f n ", t2-t1, t3-t2, t4-t3);
3840 static void start_one_node_ssh(nodetab_process & p);
3841 static void finish_set_nodes(std::vector<nodetab_process> &, int start, int stop);
3843 static void req_client_start_and_connect_table(std::vector<nodetab_process> & process_table)
3845 int batch = arg_batch_spawn; /* fire several at a time */
3846 const int process_count = process_table.size();
3847 int clientstart = 0;
3850 int clientend = clientstart + batch;
3851 if (clientend > process_count)
3852 clientend = process_count;
3854 for (int c = clientstart; c < clientend; ++c)
3855 start_one_node_ssh(process_table[c]);
3858 /* ssh x11 forwarding will make sure ssh exit */
3859 if (!arg_ssh_display)
3861 finish_set_nodes(process_table, clientstart, clientend);
3863 #if CMK_IBVERBS_FAST_START
3864 for (int c = clientstart; c < clientend; ++c)
3865 req_one_client_partinit(process_table, c);
3867 req_set_client_connect(process_table, clientstart, clientend);
3870 clientstart = clientend;
3872 while (clientstart < process_count);
3874 #if CMK_IBVERBS_FAST_START
3875 for (nodetab_process & p : process_table)
3876 read_initnode_one_client(p);
3880 static void req_client_start_and_connect(void)
3882 skt_set_abort(client_connect_problem_skt);
3884 req_client_start_and_connect_table(my_process_table);
3886 std::vector<nodetab_process> phase2_processes;
3887 req_construct_phase2_processes(phase2_processes);
3888 if (phase2_processes.size() > 0)
3889 req_client_start_and_connect_table(phase2_processes);
3891 req_add_phase2_processes(phase2_processes);
3892 req_all_clients_connected();
3897 /*Start the server socket the clients will connect to.*/
3898 static void req_start_server(void)
3900 skt_ip_t ip = skt_innode_my_ip();
3902 #if CMK_SHRINK_EXPAND
3903 if (arg_shrinkexpand) { // Need port information
3904 char *ns = getenv("NETSTART");
3905 if (ns != 0) { /*Read values set by Charmrun*/
3906 int node_num, old_charmrun_pid, port;
3907 char old_charmrun_name[1024 * 1000];
3908 int nread = sscanf(ns, "%d%s%d%d%d", &node_num, old_charmrun_name,
3909 &server_port, &old_charmrun_pid, &port);
3911 fprintf(stderr, "Error parsing NETSTART '%s'\n", ns);
3918 /* local execution, use localhost always */
3919 strcpy(server_addr, "127.0.0.1");
3920 else if (arg_charmrunip != NULL)
3921 /* user specify the IP at +useip */
3922 strcpy(server_addr, arg_charmrunip);
3923 else if ((arg_charmrunip = getenv("CHARMRUN_IP")) != NULL)
3924 /* user specify the env */
3925 strcpy(server_addr, arg_charmrunip);
3926 else if (skt_ip_match(ip, _skt_invalid_ip)) {
3927 fprintf(stderr, "Charmrun> Warning-- cannot find IP address for your hostname. "
3928 "Using loopback.\n");
3929 strcpy(server_addr, "127.0.0.1");
3930 } else if (arg_usehostname || skt_ip_match(ip, skt_lookup_ip("127.0.0.1")))
3931 /*Use symbolic host name as charmrun address*/
3932 gethostname(server_addr, sizeof(server_addr));
3934 skt_print_ip(server_addr, ip);
3936 #if CMK_SHRINK_EXPAND
3937 server_port = arg_charmrun_port;
3941 server_fd = skt_server(&server_port);
3944 printf("Charmrun> Charmrun = %s, port = %d\n", server_addr, server_port);
3947 #if CMK_CCS_AVAILABLE
3949 if (!arg_hierarchical_start ||
3950 (arg_hierarchical_start && !arg_child_charmrun))
3952 if (arg_server == 1)
3953 CcsServer_new(NULL, &arg_server_port, arg_server_auth);
3958 /* Function copied from machine.C file */
3959 static void parse_netstart(void)
3961 char *ns = getenv("NETSTART");
3962 if (ns != 0) { /*Read values set by Charmrun*/
3964 char parent_charmrun_name[1024 * 1000];
3965 int nread = sscanf(ns, "%d%s%d%d%d", &mynodes_start, parent_charmrun_name,
3966 &parent_charmrun_port, &parent_charmrun_pid, &port);
3967 parent_charmrun_IP = skt_lookup_ip(parent_charmrun_name);
3970 fprintf(stderr, "Error parsing NETSTART '%s'\n", ns);
3974 #if CMK_USE_IBVERBS | CMK_USE_IBUD
3975 char *cmi_num_nodes = getenv("CmiNumNodes");
3976 if (cmi_num_nodes != NULL) {
3977 sscanf(cmi_num_nodes, "%d", &_Cmi_numnodes);
3982 static int hstart_total_hosts;
3983 /* Receive nodes for which I am responsible*/
3984 static void my_nodetab_store(ChMessage *msg)
3986 ChMessageInt_t * nodelistmsg = (ChMessageInt_t *) msg->data;
3987 const int hstart_hosts_size = ChMessageInt(nodelistmsg[0]);
3988 hstart_total_hosts = ChMessageInt(nodelistmsg[1]);
3989 my_host_table.reserve(hstart_hosts_size);
3990 ChMessageInt_t * hstart_hosts = nodelistmsg + 2;
3991 for (int k = 0; k < hstart_hosts_size; k++)
3992 my_host_table.push(host_table[ChMessageInt(hstart_hosts[k])]);
3995 /* In hierarchical startup, this function is used by child charmrun to obtains
3996 * the list of nodes for which it is responsible */
3997 static void nodelist_obtain(void)
4002 int qpListSize = (_Cmi_numnodes-1)*sizeof(ChInfiAddr);
4003 me.info.qpList = malloc(qpListSize);
4004 copyInfiAddr(me.info.qpList);
4005 MACHSTATE1(3,"me.info.qpList created and copied size %d bytes",qpListSize);
4006 ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),(const char *)me.info.qpList,qpListSize);
4007 free(me.info.qpList);
4011 ChMessageHeader hdr;
4012 ChMessageInt_t node_start = ChMessageInt_new(mynodes_start);
4013 ChMessageHeader_new("initnodetab", sizeof(ChMessageInt_t), &hdr);
4014 skt_sendN(parent_charmrun_fd, (const char *) &hdr, sizeof(hdr));
4015 skt_sendN(parent_charmrun_fd, (const char *) &node_start, sizeof(node_start));
4016 #endif // CMK_USE_IBVERBS
4018 ChMessage nodelistmsg; /* info about all nodes*/
4019 /*Contact charmrun for machine info.*/
4020 /*We get the other node addresses from a message sent
4021 back via the charmrun control port.*/
4022 if (!skt_select1(parent_charmrun_fd, 1200 * 1000)) {
4025 ChMessage_recv(parent_charmrun_fd, &nodelistmsg);
4027 my_nodetab_store(&nodelistmsg);
4028 ChMessage_free(&nodelistmsg);
4031 static void init_mynodes(void)
4034 if (!skt_ip_match(parent_charmrun_IP, _skt_invalid_ip)) {
4035 dataskt = skt_server(&dataport);
4036 parent_charmrun_fd =
4037 skt_connect(parent_charmrun_IP, parent_charmrun_port, 1800);
4039 parent_charmrun_fd = -1;
4046 /****************************************************************************
4050 ****************************************************************************/
4051 static void start_nodes_daemon(std::vector<nodetab_process> &);
4052 static void start_nodes_mpiexec();
4054 static void start_next_level_charmruns(void);
4057 static void nodetab_init_for_scyld(void);
4058 static void start_nodes_scyld(void);
4060 static void kill_nodes(void);
4061 static void open_gdb_info(void);
4062 static void read_global_segments_size(void);
4064 static void fast_idleFn(void) { sleep(0); }
4066 static char **main_envp;
4068 int main(int argc, const char **argv, char **envp)
4072 skt_set_idle(fast_idleFn);
4073 /* CrnSrand((int) time(0)); */
4076 /* Compute the values of all constants */
4077 arg_init(argc, argv);
4079 printf("Charmrun> charmrun started...\n");
4081 start_timer = GetClock();
4083 /* check scyld configuration */
4087 nodetab_init_for_scyld();
4089 /* Initialize the node-table by reading nodesfile */
4093 if (arg_requested_numhosts > 0)
4095 if (arg_requested_numhosts > host_table.size())
4097 fprintf(stderr, "Charmrun> Error: ++numHosts exceeds available host pool.\n");
4101 host_table.resize(arg_requested_numhosts);
4107 for (const nodetab_host * h : host_table)
4109 skt_print_ip(ips, h->ip);
4110 printf("Charmrun> added host \"%s\", IP:%s\n", h->name, ips);
4115 if (arg_hierarchical_start)
4116 nodetab_init_hierarchical_start();
4119 /* Start the server port */
4122 /* Initialize the IO module */
4126 /* Hierarchical startup*/
4127 if (arg_child_charmrun) {
4128 init_mynodes(); /* contacts root charmrun and gets list of nodes to start*/
4131 my_host_table = host_table;
4134 const int my_host_count = my_host_table.size();
4135 const int my_initial_process_count = proc_per.active()
4136 ? (arg_requested_nodes > 0 ? std::min(my_host_count, arg_requested_nodes) : my_host_count)
4137 : std::min(my_host_count, get_old_style_process_count());
4138 my_process_table.resize(my_initial_process_count);
4139 for (int i = 0; i < my_initial_process_count; ++i)
4141 nodetab_host * h = my_host_table[i];
4142 nodetab_process & p = my_process_table[i];
4144 p.nodeno = h->hostno;
4147 /* start the node processes */
4148 if (0 != getenv("CONV_DAEMON"))
4149 start_nodes_daemon(my_process_table);
4152 start_nodes_scyld();
4155 PRINT(("Charmrun> IBVERBS version of charmrun\n"));
4159 /* Hierarchical-startup*/
4160 if (arg_hierarchical_start) {
4162 if (!arg_child_charmrun) {
4163 start_next_level_charmruns();
4165 if (!arg_batch_spawn)
4166 start_nodes_ssh(my_process_table);
4168 req_client_start_and_connect();
4171 start_nodes_local(my_process_table);
4180 if (!arg_batch_spawn) {
4181 #if CMK_SHRINK_EXPAND
4182 // modified rsh in shrink expand, need to launch only new ones,
4183 // preserve some info between new and old
4184 if (!arg_shrinkexpand || (arg_requested_pes > arg_old_pes))
4188 start_nodes_mpiexec();
4190 start_nodes_ssh(my_process_table);
4193 req_client_start_and_connect();
4195 start_nodes_local(my_process_table);
4199 if (arg_charmdebug) {
4200 #if defined(_WIN32) || CMK_BPROC
4201 /* Gdb stream (and charmdebug) currently valid only with ssh subsystem */
4203 "Charmdebug is supported currently only with the ssh subsystem\n");
4206 /* Open an additional connection to node 0 with a gdb to grab info */
4207 PRINT(("opening connection with node 0 for info gdb\n"));
4208 read_global_segments_size();
4210 gdb_stream = fdopen(dup(2), "a");
4216 printf("Charmrun> node programs all started\n");
4218 /* Wait for all clients to connect */
4220 /* Hierarchical startup*/
4221 if (arg_hierarchical_start) {
4223 if (!arg_batch_spawn || (!arg_child_charmrun))
4224 finish_nodes(my_process_table);
4227 if (!arg_child_charmrun)
4228 req_charmrun_connect();
4229 else if (!arg_batch_spawn)
4230 req_client_connect();
4237 if (!arg_batch_spawn)
4238 finish_nodes(my_process_table);
4240 if (!arg_batch_spawn)
4241 req_client_connect();
4247 printf("Charmrun> node programs all connected\n");
4249 PRINT(("Charmrun> started all node programs in %.3f seconds.\n",
4250 GetClock() - start_timer));
4252 /* enter request-service mode */
4254 if (arg_hierarchical_start)
4256 req_poll_hierarchical();
4260 if (arg_timelimit == -1)
4267 time_t start = time(NULL);
4271 time_t end = time(NULL);
4272 double elapsed = difftime(end, start);
4273 if (elapsed >= arg_timelimit)
4275 fprintf(stderr, "Charmrun> Error: Time limit reached\n");
4277 kill_all_compute_nodes("Time limit reached");
4278 for (const nodetab_process & p : my_process_table)
4279 skt_close(p.req_client);
4287 /*This little snippet creates a NETSTART
4288 environment variable entry for the given node #.
4289 It uses the idiotic "return reference to static buffer"
4290 string return idiom.
4292 static char *create_netstart(int node)
4294 static char dest[1536];
4297 sprintf(dest, "$CmiMyNode %s %d %d %d", server_addr, server_port,
4298 getpid() & 0x7FFF, port);
4300 sprintf(dest, "%d %s %d %d %d", node, server_addr, server_port,
4301 getpid() & 0x7FFF, port);
4305 /* The remainder of charmrun is only concerned with starting all
4306 the node-programs, also known as charmrun clients. We have to
4307 start nodetab_rank0_table.size() processes on the remote machines.
4310 /*Ask the converse daemon running on each machine to start the node-programs.*/
4311 static void start_nodes_daemon(std::vector<nodetab_process> & process_table)
4313 /*Set the parts of the task structure that will be the same for all nodes*/
4314 /*Figure out the command line arguments (same for all PEs)*/
4315 char argBuffer[5000] = { '\0' }; /*Buffer to hold assembled program arguments*/
4316 for (int i = 0; arg_argv[i]; i++) {
4318 printf("Charmrun> packing arg: %s\n", arg_argv[i]);
4319 strcat(argBuffer, " ");
4320 strcat(argBuffer, arg_argv[i]);
4324 task.magic = ChMessageInt_new(DAEMON_MAGIC);
4326 /*Start up the user program, by sending a message
4327 to PE 0 on each node.*/
4328 for (const nodetab_process & p : process_table)
4330 const nodetab_host * h = p.host;
4332 char *arg_currdir_r = pathfix(arg_currdir_a, h->pathfixes);
4333 strcpy(task.cwd, arg_currdir_r);
4334 free(arg_currdir_r);
4335 char *arg_nodeprog_r = pathextfix(arg_nodeprog_a, h->pathfixes, h->ext);
4336 strcpy(task.pgm, arg_nodeprog_r);
4339 printf("Charmrun> Starting node program %d on '%s' as %s.\n", p.nodeno,
4340 h->name, arg_nodeprog_r);
4341 free(arg_nodeprog_r);
4342 sprintf(task.env, "NETSTART=%s", create_netstart(p.nodeno));
4344 char nodeArgBuffer[5120]; /*Buffer to hold assembled program arguments*/
4346 if (h->nice != -100) {
4348 printf("Charmrun> +nice %d\n", h->nice);
4349 sprintf(nodeArgBuffer, "%s +nice %d", argBuffer, h->nice);
4350 argBuf = nodeArgBuffer;
4353 task.argLength = ChMessageInt_new(strlen(argBuf));
4355 /*Send request out to remote node*/
4356 char statusCode = 'N'; /*Default error code-- network problem*/
4357 int fd = skt_connect(h->ip, DAEMON_IP_PORT, 30);
4359 INVALID_SOCKET) { /*Contact! Ask the daemon to start the program*/
4360 skt_sendN(fd, (const char *) &task, sizeof(task));
4361 skt_sendN(fd, (const char *) argBuf, strlen(argBuf));
4362 skt_recvN(fd, &statusCode, sizeof(char));
4364 if (statusCode != 'G') { /*Something went wrong--*/
4365 fprintf(stderr, "Error '%c' starting remote node program on %s--\n%s\n",
4366 statusCode, h->name, daemon_status2msg(statusCode));
4368 } else if (arg_verbose)
4369 printf("Charmrun> Node program %d started.\n", p.nodeno);
4374 /*Sadly, interprocess communication on Win32 is quite
4375 different, so we can't use Ssh on win32 yet.
4376 Fall back to the daemon.*/
4377 static void start_nodes_ssh(std::vector<nodetab_process> & process_table) { start_nodes_daemon(process_table); }
4378 static void finish_nodes(std::vector<nodetab_process> & process_table) {}
4379 static void start_one_node_ssh(nodetab_process & p) {}
4380 static void start_nodes_mpiexec() {}
4382 static void finish_set_nodes(std::vector<nodetab_process> & process_table, int start, int stop) {}
4384 static void envCat(char *dest, LPTSTR oldEnv)
4387 dest += strlen(dest); // Advance to end of dest
4388 dest++; // Advance past terminating NULL character
4389 while ((*src) != '\0') {
4390 int adv = strlen(src) + 1; // Length of newly-copied string plus NULL
4391 strcpy(dest, src); // Copy another environment string
4392 dest += adv; // Advance past newly-copied string and NULL
4393 src += adv; // Ditto for src
4395 *dest = '\0'; // Paste on final terminating NULL character
4396 FreeEnvironmentStrings(oldEnv);
4399 /* simple version of charmrun that avoids the sshd or charmd, */
4400 /* it spawn the node program just on local machine using exec. */
4401 static void start_nodes_local(std::vector<nodetab_process> & process_table)
4403 char cmdLine[10000]; /*Program command line, including executable name*/
4404 /*Command line too long.*/
4406 if (strlen(pparam_argv[1])+strlen(args) > 10000)
4409 strcpy(cmdLine, pparam_argv[1]);
4410 const char **param = pparam_argv + 2;
4412 strcat(cmdLine, " ");
4413 strcat(cmdLine, *param);
4417 PROCESS_INFORMATION pi; /* process Information for the process spawned */
4418 char environment[10000]; /*Doubly-null terminated environment strings*/
4419 for (nodetab_process & p : process_table)
4421 STARTUPINFO si = {0}; /* startup info for the process spawned */
4423 sprintf(environment, "NETSTART=%s", create_netstart(p.nodeno));
4424 /*Paste all system environment strings */
4425 envCat(environment, GetEnvironmentStrings());
4427 /* Initialise the security attributes for the process
4431 printf("Charmrun> start %d node program on localhost.\n", p.nodeno);
4434 ret = CreateProcess(NULL, /* application name */
4435 cmdLine, /* command line */
4436 NULL, /*&sa,*/ /* process SA */
4437 NULL, /*&sa,*/ /* thread SA */
4438 FALSE, /* inherit flag */
4440 // don't disable console output on rank 0 process (need to be able to see python syntax errors, etc)
4441 CREATE_NEW_PROCESS_GROUP | (p.nodeno == 0 ? 0 : DETACHED_PROCESS),
4443 CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS,
4445 CREATE_NEW_PROCESS_GROUP | CREATE_NEW_CONSOLE,
4447 /* creation flags */
4448 environment, /* environment block */
4449 ".", /* working directory */
4450 &si, /* startup info */
4454 /*Something went wrong! Look up the Windows error code*/
4456 int error=GetLastError();
4457 char statusCode=daemon_err2status(error);
4458 fprintf(logfile,"******************* ERROR *****************\n"
4459 "Error in creating process!\n"
4460 "Error code = %ld-- %s\n\n\n", error,
4461 daemon_status2msg(statusCode));
4464 int error = GetLastError();
4465 fprintf(stderr, "startProcess failed to start process \"%s\" with status: %d\n",
4466 pparam_argv[1], error);
4474 static int bproc_nodeisup(int node)
4477 #if CMK_BPROC_VERSION < 4
4478 if (bproc_nodestatus(node) == bproc_node_up)
4481 printf("Charmrun> node %d status: %s\n", node, status ? "up" : "down");
4483 char nodestatus[128];
4484 if (node == -1) { /* master node is always up */
4485 strcpy(nodestatus, "up");
4488 if (bproc_nodestatus(node, nodestatus, 128)) {
4489 if (strcmp(nodestatus, "up") == 0)
4493 printf("Charmrun> node %d status: %s\n", node, nodestatus);
4498 /* ++ppn now is supported in both SMP and non SMP version
4499 in SMP, ++ppn specifies number of threads on each node;
4500 in non-SMP, ++ppn specifies number of processes on each node. */
4501 static void nodetab_init_for_scyld()
4503 int maxNodes = bproc_numnodes() + 1;
4504 if (arg_endpe < maxNodes)
4505 maxNodes = arg_endpe + 1;
4507 /* check which slave node is available from frompe to endpe */
4509 for (int i = -1; i < maxNodes; i++) {
4511 if (!bproc_nodeisup(i))
4513 if (i != -1 && i < arg_startpe)
4515 if (i == -1 && arg_skipmaster)
4516 continue; /* skip master node -1 */
4517 sprintf(hostname, "%d", i);
4518 nodetab_host * h = new nodetab_host{};
4519 h->name = strdup(hostname);
4520 h->ip = nodetab_host::resolve(hostname);
4521 h->hostno = hostno++;
4522 host_table.push_back(h);
4525 const int hosts_size = host_table.size();
4526 if (hosts_size == 0) {
4527 fprintf(stderr, "Charmrun> no slave node available!\n");
4531 printf("Charmrun> There are %d slave nodes available.\n",
4532 hosts_size - (arg_skipmaster ? 0 : 1));
4535 static void start_nodes_scyld(void)
4537 char *envp[2] = { (char *) malloc(256), NULL };
4538 for (const nodetab_process & p : my_process_table)
4540 const nodetab_host * h = p.host;
4541 const int bproc_nodeno = atoi(h->name);
4544 printf("Charmrun> start node program on slave node: %d.\n", bproc_nodeno);
4545 sprintf(envp[0], "NETSTART=%s", create_netstart(p.nodeno));
4551 int fd, fd1 = dup(1);
4552 if (!(arg_debug || arg_debug_no_pause)) { /* debug mode */
4553 if (fd = open("/dev/null", O_RDWR)) {
4559 if (bproc_nodeno == -1) {
4560 int status = execve(pparam_argv[1], pparam_argv + 1, envp);
4562 fprintf(stderr, "execve failed to start process \"%s\" with status: %d\n",
4563 pparam_argv[1], status);
4565 int status = bproc_execmove(bproc_nodeno, pparam_argv[1], pparam_argv + 1, envp);
4567 fprintf(stderr, "bproc_execmove failed to start remote process \"%s\" with "
4569 pparam_argv[1], status);
4577 static void finish_nodes(std::vector<nodetab_process> & process_table) {}
4580 /*Unix systems can use Ssh normally*/
4581 /********** SSH-ONLY CODE *****************************************/
4585 /* this starts all the node programs. It executes fully in the background. */
4587 /****************************************************************************/
4588 #include <sys/wait.h>
4590 extern char **environ;
4591 static void removeEnv(const char *doomedEnv)
4592 { /*Remove a value from the environment list*/
4595 while (*ie != NULL) {
4596 if (0 != strncmp(*ie, doomedEnv, strlen(doomedEnv)))
4600 *oe = NULL; /*NULL-terminate list*/
4603 static int ssh_fork(const nodetab_process & p, const char *startScript)
4605 const nodetab_host * h = p.host;
4607 std::vector<const char *> sshargv;
4609 const char *s = h->shell;
4610 const char *e = skipstuff(s);
4612 sshargv.push_back(substr(s, e));
4617 sshargv.push_back(h->name);
4618 if (arg_ssh_display)
4619 sshargv.push_back("-X");
4620 sshargv.push_back("-l");
4621 sshargv.push_back(h->login);
4622 sshargv.push_back("-o");
4623 sshargv.push_back("KbdInteractiveAuthentication=no");
4624 sshargv.push_back("-o");
4625 sshargv.push_back("PasswordAuthentication=no");
4626 sshargv.push_back("-o");
4627 sshargv.push_back("NoHostAuthenticationForLocalhost=yes");
4628 sshargv.push_back("/bin/bash -f");
4629 sshargv.push_back((const char *) NULL);
4632 std::string cmd_str = sshargv[0];
4633 for (int n = 1; n < sshargv.size()-1; ++n)
4634 cmd_str += " " + std::string(sshargv[n]);
4635 printf("Charmrun> Starting %s\n", cmd_str.c_str());
4640 perror("ERROR> starting remote shell");
4643 if (pid == 0) { /*Child process*/
4644 int fdScript = open(startScript, O_RDONLY);
4645 /**/ unlink(startScript); /**/
4646 dup2(fdScript, 0); /*Open script as standard input*/
4647 // removeEnv("DISPLAY="); /*No DISPLAY disables ssh's slow X11 forwarding*/
4648 for (int i = 3; i < 1024; i++)
4650 execvp(sshargv[0], const_cast<char **>(&sshargv[0]));
4651 fprintf(stderr, "Charmrun> Couldn't find remote shell program '%s'!\n",
4656 printf("Charmrun> remote shell (%s:%d) started\n", h->name, p.nodeno);
4660 static void fprint_arg(FILE *f, const char **argv)
4663 fprintf(f, " %s", *argv);
4667 static void ssh_Find(FILE *f, const char *program, const char *dest)
4669 fprintf(f, "Find %s\n", program);
4670 fprintf(f, "%s=$loc\n", dest);
4672 static void ssh_script(FILE *f, const nodetab_process & p, const char **argv)
4674 const nodetab_host * h = p.host;
4675 const char *dbg = h->debugger;
4676 const char *host = h->name;
4677 const int nodeno = p.nodeno;
4680 fprintf(f, "#!/bin/sh\n");
4682 fprintf(f, /*Echo: prints out status message*/
4684 " echo 'Charmrun remote shell(%s.%d)>' $*\n"
4687 fprintf(f, /*Exit: exits with return code*/
4689 " if [ $1 -ne 0 ]\n"
4691 " Echo Exiting with error code $1\n"
4693 #if CMK_SSH_KILL /*End by killing ourselves*/
4694 " sleep 5\n" /*Delay until any error messages are flushed*/
4696 #else /*Exit normally*/
4700 fprintf(f, /*Find: locates a binary program in PATH, sets loc*/
4703 " for dir in `echo $PATH | sed -e 's/:/ /g'`\n"
4705 " test -f \"$dir/$1\" && loc=\"$dir/$1\"\n"
4707 " if [ \"x$loc\" = x ]\n"
4709 " Echo $1 not found in your PATH \"($PATH)\"--\n"
4710 " Echo set your path in your ~/.charmrunrc\n"
4716 fprintf(f, "Echo 'remote responding...'\n");
4718 fprintf(f, "test -f \"$HOME/.charmrunrc\" && . \"$HOME/.charmrunrc\"\n");
4719 /* let's leave DISPLAY untouched and rely on X11 forwarding,
4720 changing DISPLAY to charmrun does not always work if X11 forwarding
4723 if (arg_display && !arg_ssh_display)
4724 fprintf(f, "DISPLAY='%s';export DISPLAY\n", arg_display);
4727 if (arg_child_charmrun)
4728 fprintf(f, "NETMAGIC=\"%d\";export NETMAGIC\n",
4729 parent_charmrun_pid & 0x7FFF);
4732 fprintf(f, "NETMAGIC=\"%d\";export NETMAGIC\n", getpid() & 0x7FFF);
4735 fprintf(f, "CmiMyNode=$OMPI_COMM_WORLD_RANK\n");
4736 fprintf(f, "test -z \"$CmiMyNode\" && CmiMyNode=$MPIRUN_RANK\n");
4737 fprintf(f, "test -z \"$CmiMyNode\" && CmiMyNode=$PMI_RANK\n");
4738 fprintf(f, "test -z \"$CmiMyNode\" && CmiMyNode=$PMI_ID\n");
4739 fprintf(f, "test -z \"$CmiMyNode\" && CmiMyNode=$MP_CHILD\n");
4740 fprintf(f, "test -z \"$CmiMyNode\" && CmiMyNode=$SLURM_PROCID\n");
4741 fprintf(f, "test -z \"$CmiMyNode\" && (Echo Could not detect rank from "
4742 "environment ; Exit 1)\n");
4743 fprintf(f, "export CmiMyNode\n");
4746 else if (arg_hierarchical_start && arg_child_charmrun)
4747 fprintf(f, "CmiMyNode='%d'; export CmiMyNode\n", mynodes_start + nodeno);
4750 fprintf(f, "CmiMyNode='%d'; export CmiMyNode\n", nodeno);
4754 if (arg_hierarchical_start && arg_child_charmrun)
4755 netstart = create_netstart(mynodes_start + nodeno);
4758 netstart = create_netstart(nodeno);
4759 fprintf(f, "NETSTART=\"%s\";export NETSTART\n", netstart);
4761 fprintf(f, "CmiMyNodeSize='%d'; export CmiMyNodeSize\n", h->cpus);
4763 fprintf(f, "CmiMyForks='%d'; export CmiMyForks\n", 0);
4765 // cpu affinity hints
4766 using Unit = typename TopologyRequest::Unit;
4767 switch (proc_per.unit())
4770 fprintf(f, "CmiProcessPerHost='%d'; export CmiProcessPerHost\n", proc_per.host);
4773 fprintf(f, "CmiProcessPerSocket='%d'; export CmiProcessPerSocket\n", proc_per.socket);
4776 fprintf(f, "CmiProcessPerCore='%d'; export CmiProcessPerCore\n", proc_per.core);
4779 fprintf(f, "CmiProcessPerPU='%d'; export CmiProcessPerPU\n", proc_per.pu);
4785 switch (onewth_per.unit())
4788 fprintf(f, "CmiOneWthPerHost='%d'; export CmiOneWthPerHost\n", 1);
4791 fprintf(f, "CmiOneWthPerSocket='%d'; export CmiOneWthPerSocket\n", 1);
4794 fprintf(f, "CmiOneWthPerCore='%d'; export CmiOneWthPerCore\n", 1);
4797 fprintf(f, "CmiOneWthPerPU='%d'; export CmiOneWthPerPU\n", 1);
4805 fprintf(f, "CmiNumNodes=$OMPI_COMM_WORLD_SIZE\n");
4806 fprintf(f, "test -z \"$CmiNumNodes\" && CmiNumNodes=$MPIRUN_NPROCS\n");
4807 fprintf(f, "test -z \"$CmiNumNodes\" && CmiNumNodes=$PMI_SIZE\n");
4808 fprintf(f, "test -z \"$CmiNumNodes\" && CmiNumNodes=$MP_PROCS\n");
4809 fprintf(f, "test -z \"$CmiNumNodes\" && CmiNumNodes=$SLURM_NTASKS\n");
4810 fprintf(f, "test -z \"$CmiNumNodes\" && CmiNumNodes=$SLURM_NPROCS\n");
4811 fprintf(f, "test -z \"$CmiNumNodes\" && (Echo Could not detect node count "
4812 "from environment ; Exit 1)\n");
4813 fprintf(f, "export CmiNumNodes\n");
4816 else if (arg_hierarchical_start && arg_child_charmrun)
4817 fprintf(f, "CmiNumNodes='%d'; export CmiNumNodes\n", hstart_total_hosts);
4821 fprintf(f, "CmiNumNodes='%d'; export CmiNumNodes\n", (int)my_process_table.size());
4824 fprintf(f, "GFORTRAN_UNBUFFERED_ALL=YES; export GFORTRAN_UNBUFFERED_ALL\n");
4827 fprintf(f, "MX_MONOTHREAD=1; export MX_MONOTHREAD\n");
4828 /*fprintf(f,"MX_RCACHE=1; export MX_RCACHE\n");*/
4830 #if CMK_AIX && CMK_SMP
4831 fprintf(f, "MALLOCMULTIHEAP=1; export MALLOCMULTIHEAP\n");
4835 printf("Charmrun> Sending \"%s\" to client %d.\n", netstart, nodeno);
4838 "PATH=\"$PATH:/bin:/usr/bin:/usr/X/bin:/usr/X11/bin:/usr/local/bin:"
4839 "/usr/X11R6/bin:/usr/openwin/bin\"\n");
4841 /* find the node-program */
4842 char *arg_nodeprog_r = pathextfix(arg_nodeprog_a, h->pathfixes, h->ext);
4844 /* find the current directory, relative version */
4845 char *arg_currdir_r = pathfix(arg_currdir_a, h->pathfixes);
4848 printf("Charmrun> find the node program \"%s\" at \"%s\" for %d.\n",
4849 arg_nodeprog_r, arg_currdir_r, nodeno);
4851 if (arg_debug || arg_debug_no_pause || arg_in_xterm) {
4852 ssh_Find(f, h->xterm, "F_XTERM");
4853 if (!arg_ssh_display && !arg_debug_no_xrdb)
4854 ssh_Find(f, "xrdb", "F_XRDB");
4856 fprintf(f, "Echo 'using xterm' $F_XTERM\n");
4859 if (arg_debug || arg_debug_no_pause) { /*Look through PATH for debugger*/
4860 ssh_Find(f, dbg, "F_DBG");
4862 fprintf(f, "Echo 'using debugger' $F_DBG\n");
4865 if (!arg_ssh_display && !arg_debug_no_xrdb &&
4866 (arg_debug || arg_debug_no_pause || arg_in_xterm)) {
4867 /* if (arg_debug || arg_debug_no_pause || arg_in_xterm) {*/
4868 fprintf(f, "$F_XRDB -query > /dev/null\n");
4869 fprintf(f, "if test $? != 0\nthen\n");
4870 fprintf(f, " Echo 'Cannot contact X Server '$DISPLAY'. You probably'\n");
4871 fprintf(f, " Echo 'need to run xhost to authorize connections.'\n");
4872 fprintf(f, " Echo '(See manual for xhost for security issues)'\n");
4873 fprintf(f, " Echo 'Or try ++batch 1 ++ssh-display to rely on SSH X11 "
4875 fprintf(f, " Exit 1\n");
4879 fprintf(f, "if test ! -x \"%s\"\nthen\n", arg_nodeprog_r);
4880 fprintf(f, " Echo 'Cannot locate this node-program: %s'\n", arg_nodeprog_r);
4881 fprintf(f, " Exit 1\n");
4884 fprintf(f, "cd \"%s\"\n", arg_currdir_r);
4885 fprintf(f, "if test $? = 1\nthen\n");
4886 fprintf(f, " Echo 'Cannot propagate this current directory:'\n");
4887 fprintf(f, " Echo '%s'\n", arg_currdir_r);
4888 fprintf(f, " Exit 1\n");
4891 if (strcmp(h->setup, "*")) {
4892 fprintf(f, "%s\n", h->setup);
4893 fprintf(f, "if test $? = 1\nthen\n");
4894 fprintf(f, " Echo 'this initialization command failed:'\n");
4895 fprintf(f, " Echo '\"%s\"'\n", h->setup);
4896 fprintf(f, " Echo 'edit your nodes file to fix it.'\n");
4897 fprintf(f, " Exit 1\n");
4901 fprintf(f, "rm -f /tmp/charmrun_err.$$\n");
4903 fprintf(f, "Echo 'starting node-program...'\n");
4904 /* This is the start of the the run-nodeprogram script */
4907 if (arg_debug || arg_debug_no_pause) {
4908 if (strcmp(dbg, "gdb") == 0 || strcmp(dbg, "idb") == 0) {
4909 fprintf(f, "cat > /tmp/charmrun_gdb.$$ << END_OF_SCRIPT\n");
4910 if (strcmp(dbg, "idb") == 0) {
4911 fprintf(f, "set \\$cmdset=\"gdb\"\n");
4913 fprintf(f, "shell /bin/rm -f /tmp/charmrun_gdb.$$\n");
4914 fprintf(f, "handle SIGPIPE nostop noprint\n");
4915 fprintf(f, "handle SIGWINCH nostop noprint\n");
4916 fprintf(f, "handle SIGWAITING nostop noprint\n");
4917 if (arg_debug_commands)
4918 fprintf(f, "%s\n", arg_debug_commands);
4919 fprintf(f, "set args");
4920 fprint_arg(f, argv);
4922 if (arg_debug_no_pause)
4923 fprintf(f, "run\n");
4924 fprintf(f, "END_OF_SCRIPT\n");
4926 fprintf(f, "\"%s\" ", arg_runscript);
4927 fprintf(f, "$F_XTERM");
4928 fprintf(f, " -title 'Node %d (%s)' ", nodeno, h->name);
4929 if (strcmp(dbg, "idb") == 0)
4930 fprintf(f, " -e $F_DBG \"%s\" -c /tmp/charmrun_gdb.$$ \n", arg_nodeprog_r);
4932 fprintf(f, " -e $F_DBG \"%s\" -x /tmp/charmrun_gdb.$$ \n", arg_nodeprog_r);
4933 } else if (strcmp(dbg, "lldb") == 0) {
4934 fprintf(f, "cat > /tmp/charmrun_lldb.$$ << END_OF_SCRIPT\n");
4935 fprintf(f, "platform shell -- /bin/rm -f /tmp/charmrun_lldb.$$\n");
4936 // must launch before configuring signals, or else:
4937 // "error: No current process; cannot handle signals until you have a valid process."
4938 // use -s to stop at the entry point
4939 fprintf(f, "process launch -X true -s --");
4940 fprint_arg(f, argv);
4942 fprintf(f, "process handle -s false -n false SIGPIPE SIGWINCH\n");
4943 if (arg_debug_commands)
4944 fprintf(f, "%s\n", arg_debug_commands);
4945 if (arg_debug_no_pause)
4946 fprintf(f, "continue\n");
4948 fprintf(f, "# Use \"continue\" or \"c\" to begin execution.\n");
4949 fprintf(f, "END_OF_SCRIPT\n");
4951 fprintf(f, "\"%s\" ", arg_runscript);
4952 fprintf(f, "$F_XTERM");
4953 fprintf(f, " -title 'Node %d (%s)' ", nodeno, h->name);
4954 fprintf(f, " -e $F_DBG \"%s\" -s /tmp/charmrun_lldb.$$ \n", arg_nodeprog_r);
4955 } else if (strcmp(dbg, "dbx") == 0) {
4956 fprintf(f, "cat > /tmp/charmrun_dbx.$$ << END_OF_SCRIPT\n");
4957 fprintf(f, "sh /bin/rm -f /tmp/charmrun_dbx.$$\n");
4958 fprintf(f, "dbxenv suppress_startup_message 5.0\n");
4959 fprintf(f, "ignore SIGPOLL\n");
4960 fprintf(f, "ignore SIGPIPE\n");
4961 fprintf(f, "ignore SIGWINCH\n");
4962 fprintf(f, "ignore SIGWAITING\n");
4963 if (arg_debug_commands)
4964 fprintf(f, "%s\n", arg_debug_commands);
4965 fprintf(f, "END_OF_SCRIPT\n");
4967 fprintf(f, "\"%s\" ", arg_runscript);
4968 fprintf(f, "$F_XTERM");
4969 fprintf(f, " -title 'Node %d (%s)' ", nodeno, h->name);
4970 fprintf(f, " -e $F_DBG %s ", arg_debug_no_pause ? "-r" : "");
4972 fprintf(f, "-c \'runargs ");
4973 fprint_arg(f, argv);
4976 fprintf(f, "-s/tmp/charmrun_dbx.$$ %s", arg_nodeprog_r);
4977 if (arg_debug_no_pause)
4978 fprint_arg(f, argv);
4981 fprintf(stderr, "Unknown debugger: %s.\n Exiting.\n", h->debugger);
4983 } else if (arg_in_xterm) {
4985 printf("Charmrun> node %d: xterm is %s\n", nodeno, h->xterm);
4986 fprintf(f, "cat > /tmp/charmrun_inx.$$ << END_OF_SCRIPT\n");
4987 fprintf(f, "#!/bin/sh\n");
4988 fprintf(f, "/bin/rm -f /tmp/charmrun_inx.$$\n");
4989 fprintf(f, "%s", arg_nodeprog_r);
4990 fprint_arg(f, argv);
4992 fprintf(f, "echo 'program exited with code '\\$?\n");
4993 fprintf(f, "read eoln\n");
4994 fprintf(f, "END_OF_SCRIPT\n");
4995 fprintf(f, "chmod 700 /tmp/charmrun_inx.$$\n");
4997 fprintf(f, "\"%s\" ", arg_runscript);
4998 fprintf(f, "$F_XTERM -title 'Node %d (%s)' ", nodeno, h->name);
4999 fprintf(f, " -sl 5000");
5000 fprintf(f, " -e /tmp/charmrun_inx.$$\n");
5003 fprintf(f, "\"%s\" ", arg_runscript);
5004 if (arg_no_va_rand) {
5006 printf("Charmrun> setarch -R is used.\n");
5007 fprintf(f, "setarch `uname -m` -R ");
5009 fprintf(f, "\"%s\" ", arg_nodeprog_r);
5010 fprint_arg(f, argv);
5011 if (h->nice != -100) {
5013 printf("Charmrun> nice -n %d\n", h->nice);
5014 fprintf(f, " +nice %d ", h->nice);
5016 fprintf(f, "\nres=$?\n");
5017 /* If shared libraries fail to load, the program dies without
5018 calling charmrun back. Since we *have* to close down stdin/out/err,
5019 we have to smuggle this failure information out via a file,
5020 /tmp/charmrun_err.<pid> */
5021 fprintf(f, "if [ $res -eq 127 ]\n"
5023 " ( \n" /* Re-run, spitting out errors from a subshell: */
5026 " ) > /tmp/charmrun_err.$$ 2>&1 \n"
5028 arg_nodeprog_r, arg_nodeprog_r);
5031 /* End the node-program subshell. To minimize the number
5032 of open ports on the front-end, we must close down ssh;
5033 to do this, we have to close stdin, stdout, stderr, and
5034 run the subshell in the background. */
5036 fprintf(f, " < /dev/null 1> /dev/null 2> /dev/null");
5042 fprintf(f, "Echo 'remote shell phase successful.'\n");
5043 fprintf(f, /* Check for startup errors: */
5045 "if [ -r /tmp/charmrun_err.$$ ]\n"
5047 " cat /tmp/charmrun_err.$$ \n"
5048 " rm -f /tmp/charmrun_err.$$ \n"
5051 fprintf(f, "Exit 0\n");
5052 free(arg_currdir_r);
5055 /* use the command "size" to get information about the position of the ".data"
5056 and ".bss" segments inside the program memory */
5057 static void read_global_segments_size()
5059 nodetab_host const * h = host_table[0];
5061 /* find the node-program */
5063 pathextfix(arg_nodeprog_a, h->pathfixes, h->ext);
5065 std::vector<const char *> sshargv;
5066 sshargv.push_back(h->shell);
5067 sshargv.push_back(h->name);
5068 sshargv.push_back("-l");
5069 sshargv.push_back(h->login);
5070 char *tmp = (char *) malloc(sizeof(char) * 9 + strlen(arg_nodeprog_r));
5071 sprintf(tmp, "size -A %s", arg_nodeprog_r);
5072 sshargv.push_back(tmp);
5073 sshargv.push_back((const char *) NULL);
5075 int childPid = fork();
5077 perror("ERROR> getting the size of the global variables segments");
5079 } else if (childPid == 0) {
5082 /*printf("executing: \"%s\" \"%s\" \"%s\" \"%s\"
5083 * \"%s\"\n",sshargv[0],sshargv[1],sshargv[2],sshargv[3],sshargv[4]);*/
5084 execvp(sshargv[0], const_cast<char **>(&sshargv[0]));
5085 fprintf(stderr, "Charmrun> Couldn't find remote shell program '%s'!\n",
5089 /* else we are in the parent */
5091 waitpid(childPid, NULL, 0);
5095 /* open a ssh connection with processor 0 and open a gdb session for info */
5096 static void open_gdb_info()
5098 nodetab_host const * h = host_table[0];
5100 /* find the node-program */
5102 pathextfix(arg_nodeprog_a, h->pathfixes, h->ext);
5104 std::vector<const char *> sshargv;
5105 sshargv.push_back(h->shell);
5106 sshargv.push_back(h->name);
5107 sshargv.push_back("-l");
5108 sshargv.push_back(h->login);
5109 char *tmp = (char *) malloc(sizeof(char) * 8 + strlen(arg_nodeprog_r));
5110 sprintf(tmp, "gdb -q %s", arg_nodeprog_r);
5111 sshargv.push_back(tmp);
5112 sshargv.push_back((const char *) NULL);
5117 if (pipe(fdin) == -1) {
5118 fprintf(stderr, "charmrun> pipe() failed!\n");
5121 if (pipe(fdout) == -1) {
5122 fprintf(stderr, "charmrun> pipe() failed!\n");
5125 if (pipe(fderr) == -1) {
5126 fprintf(stderr, "charmrun> pipe() failed!\n");
5130 gdb_info_pid = fork();
5131 if (gdb_info_pid < 0) {
5132 perror("ERROR> starting info gdb");
5134 } else if (gdb_info_pid == 0) {
5139 PRINT(("executing: \"%s\" \"%s\" \"%s\" \"%s\" \"%s\"\n", sshargv[0],
5140 sshargv[1], sshargv[2], sshargv[3], sshargv[4]));
5144 for (int i = 3; i < 1024; i++)
5146 execvp(sshargv[0], const_cast<char **>(&sshargv[0]));
5147 fprintf(stderr, "Charmrun> Couldn't find remote shell program '%s'!\n",
5151 /* else we are in the parent */
5153 gdb_info_std[0] = fdin[1];
5154 gdb_info_std[1] = fdout[0];
5155 gdb_info_std[2] = fderr[0];
5161 static void start_next_level_charmruns()
5164 const char *nodeprog_name = strrchr(arg_nodeprog_a, '/');
5165 static char buf[1024];
5166 sprintf(buf, "%.*s%s%s", (int)(nodeprog_name-arg_nodeprog_a), arg_nodeprog_a, DIRSEP, "charmrun");
5167 arg_nodeprog_a = strdup(buf);
5171 // while (nextIndex < branchfactor) {
5172 for (nodetab_process * p : my_process_table)
5174 TODO; // need to do something more detailed with branchfactor and nodes_per_child
5177 char startScript[200];
5178 sprintf(startScript, "/tmp/charmrun.%d.%d", getpid(), p.procno);
5179 f = fopen(startScript, "w");
5181 /* now try current directory */
5182 sprintf(startScript, "charmrun.%d.%d", getpid(), p.procno);
5183 f = fopen(startScript, "w");
5185 fprintf(stderr, "Charmrun> Can not write file %s!\n", startScript);
5189 ssh_script(f, p, arg_argv);
5192 p.ssh_pid = ssh_fork(p, startScript);
5193 client += nodes_per_child;
5199 static void start_one_node_ssh(nodetab_process & p)
5201 const nodetab_host * h = p.host;
5203 char startScript[200];
5204 sprintf(startScript, "/tmp/charmrun.%d.%d", getpid(), p.nodeno);
5205 FILE *f = fopen(startScript, "w");
5207 /* now try current directory */
5208 sprintf(startScript, "charmrun.%d.%d", getpid(), p.nodeno);
5209 f = fopen(startScript, "w");
5211 fprintf(stderr, "Charmrun> Can not write file %s!\n", startScript);
5215 ssh_script(f, p, arg_argv);
5218 p.ssh_pid = ssh_fork(p, startScript);
5221 static void start_nodes_ssh(std::vector<nodetab_process> & process_table)
5223 for (nodetab_process & p : process_table)
5225 start_one_node_ssh(p);
5229 /* for mpiexec, for once calling mpiexec to start on all nodes */
5230 static int ssh_fork_one(nodetab_process & p, const char *startScript)
5232 nodetab_host const * h = p.host;
5234 std::vector<const char *> sshargv;
5236 const char *s = h->shell;
5237 const char *e = skipstuff(s);
5239 sshargv.push_back(substr(s, e));
5244 const int processes = get_old_style_process_count();
5247 if ( ! arg_mpiexec_no_n ) {
5248 sshargv.push_back("-n");
5249 sprintf(npes, "%d", processes);
5250 sshargv.push_back(npes);
5252 sshargv.push_back((char *) startScript);
5253 sshargv.push_back((const char *) NULL);
5255 printf("Charmrun> Starting %s %s \n", h->shell, startScript);
5259 perror("ERROR> starting mpiexec");
5262 if (pid == 0) { /*Child process*/
5263 /* unlink(startScript); */
5264 // removeEnv("DISPLAY="); /*No DISPLAY disables ssh's slow X11 forwarding*/
5265 for (int i = 3; i < 1024; i++)
5267 execvp(sshargv[0], const_cast<char *const *>(&sshargv[0]));
5268 fprintf(stderr, "Charmrun> Couldn't find mpiexec program '%s'!\n",
5273 printf("Charmrun> mpiexec started\n");
5277 static void start_nodes_mpiexec()
5279 char startScript[200];
5280 sprintf(startScript, "./charmrun.%d", getpid());
5281 FILE *f = fopen(startScript, "w");
5282 chmod(startScript, S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IROTH);
5284 /* now try current directory */
5285 sprintf(startScript, "./charmrun.%d", getpid());
5286 f = fopen(startScript, "w");
5288 fprintf(stderr, "Charmrun> Can not write file %s!\n", startScript);
5293 nodetab_process & p = my_process_table[0];
5295 ssh_script(f, p, arg_argv);
5298 ssh_fork_one(p, startScript);
5299 /* all ssh_pid remain zero: skip finish_nodes */
5302 static void finish_set_nodes(std::vector<nodetab_process> & process_table, int start, int stop)
5304 std::vector<int> num_retries(stop - start, 0);
5308 for (int i = start; i < stop; i++) { /* check all nodes */
5309 nodetab_process & p = process_table[i];
5310 const nodetab_host * h = p.host;
5311 if (p.ssh_pid != 0) {
5312 done = 0; /* we are not finished yet */
5314 waitpid(p.ssh_pid, &status, 0); /* check if the process is finished */
5315 if (WIFEXITED(status)) {
5316 if (!WEXITSTATUS(status)) { /* good */
5317 p.ssh_pid = 0; /* process is finished */
5320 "Charmrun> Error %d returned from remote shell (%s:%d)\n",
5321 WEXITSTATUS(status), h->name, p.nodeno);
5323 if (WEXITSTATUS(status) != 255)
5326 if (++num_retries[i - start] <= MAX_NUM_RETRIES) {
5327 fprintf(stderr, "Charmrun> Reconnection attempt %d of %d\n",
5328 num_retries[i - start], MAX_NUM_RETRIES);
5329 start_one_node_ssh(p);
5333 "Charmrun> Too many reconnection attempts; bailing out\n");
5343 static void finish_nodes(std::vector<nodetab_process> & process_table)
5346 if (arg_hierarchical_start && !arg_child_charmrun)
5347 finish_set_nodes(process_table, 0, branchfactor);
5350 finish_set_nodes(process_table, 0, process_table.size());
5353 static void kill_nodes()
5355 /*Now wait for all the ssh'es to finish*/
5356 for (nodetab_process & p : my_process_table)
5358 const nodetab_host * h = p.host;
5361 printf("Charmrun> waiting for remote shell (%s:%d), pid %d\n", h->name,
5362 p.nodeno, p.ssh_pid);
5364 waitpid(p.ssh_pid, &status, 0); /*<- no zombies*/
5370 /* find the absolute path for an executable in the path */
5371 static char *find_abs_path(const char *target)
5373 char *thepath=getenv("PATH");
5374 char *path=strdup(thepath);
5375 char *subpath=strtok(path,":");
5376 char *abspath=(char*) malloc(PATH_MAX + strlen(target) + 2);
5377 while(subpath!=NULL) {
5378 strcpy(abspath,subpath);
5379 strcat(abspath,"/");
5380 strcat(abspath,target);
5381 if(probefile(abspath)){
5385 subpath=strtok(NULL,":");
5392 /* simple version of charmrun that avoids the sshd or charmd, */
5393 /* it spawn the node program just on local machine using exec. */
5394 static void start_nodes_local(std::vector<nodetab_process> & process_table)
5396 char ** env = main_envp;
5398 /* copy environ and expanded to hold NETSTART and CmiNumNodes */
5400 for (envc = 0; env[envc]; envc++)
5403 #if CMK_AIX && CMK_SMP
5406 const int proc_active = proc_per.active();
5407 extra += proc_active;
5409 const int onewth_active = onewth_per.active();
5410 extra += onewth_active;
5413 char **envp = (char **) malloc((envc + 2 + extra + 1) * sizeof(void *));
5414 for (int i = 0; i < envc; i++)
5416 envp[envc] = (char *) malloc(256);
5417 envp[envc + 1] = (char *) malloc(256);
5419 #if CMK_AIX && CMK_SMP
5420 envp[envc + n] = (char *) malloc(256);
5421 sprintf(envp[envc + n], "MALLOCMULTIHEAP=1");
5424 // cpu affinity hints
5425 using Unit = typename TopologyRequest::Unit;
5428 envp[envc + n] = (char *) malloc(256);
5429 switch (proc_per.unit())
5432 sprintf(envp[envc + n], "CmiProcessPerHost=%d", proc_per.host);
5435 sprintf(envp[envc + n], "CmiProcessPerSocket=%d", proc_per.socket);
5438 sprintf(envp[envc + n], "CmiProcessPerCore=%d", proc_per.core);
5441 sprintf(envp[envc + n], "CmiProcessPerPU=%d", proc_per.pu);
5451 envp[envc + n] = (char *) malloc(256);
5452 switch (onewth_per.unit())
5455 sprintf(envp[envc + n], "CmiOneWthPerHost=%d", 1);
5458 sprintf(envp[envc + n], "CmiOneWthPerSocket=%d", 1);
5461 sprintf(envp[envc + n], "CmiOneWthPerCore=%d", 1);
5464 sprintf(envp[envc + n], "CmiOneWthPerPU=%d", 1);
5474 /* insert xterm gdb in front of command line and pass args to gdb */
5476 std::vector<char *> dparamv;
5477 if (arg_debug || arg_debug_no_pause || arg_in_xterm)
5479 char *abs_xterm=find_abs_path(arg_xterm);
5482 fprintf(stderr, "Charmrun> cannot find xterm for debugging, please add it to your path\n");
5486 dparamv.push_back(strdup(abs_xterm));
5487 dparamv.push_back(strdup("-title"));
5488 dparamv.push_back(strdup(pparam_argv[1]));
5489 dparamv.push_back(strdup("-e"));
5491 std::vector<const char *> cparamv;
5492 if (arg_debug || arg_debug_no_pause)
5494 const bool isLLDB = strcmp(arg_debugger, "lldb") == 0;
5495 const char *commandflag = isLLDB ? "-o" : "-ex";
5496 const char *argsflag = isLLDB ? "--" : "--args";
5498 cparamv.push_back(arg_debugger);
5500 if (arg_debug_no_pause)
5502 cparamv.push_back(commandflag);
5503 cparamv.push_back("r");
5506 cparamv.push_back(argsflag);
5509 for (int i = 1; pparam_argv[i] != nullptr; ++i)
5510 cparamv.push_back(pparam_argv[i]);
5512 if (!(arg_debug || arg_debug_no_pause))
5513 cparamv.push_back("; echo \"program exited with code $?\" ; read eoln");
5515 dparamv.push_back(cstring_join(cparamv, " "));
5519 printf("Charmrun> xterm args:");
5520 for (const char *p : dparamv)
5525 // null terminate your argv or face the wrath of undefined behavior
5526 dparamv.push_back(nullptr);
5528 dparamp = dparamv.data();
5532 dparamp = (char **)(pparam_argv+1);
5535 for (const nodetab_process & p : process_table)
5538 printf("Charmrun> start %d node program on localhost.\n", p.nodeno);
5539 sprintf(envp[envc], "NETSTART=%s", create_netstart(p.nodeno));
5540 sprintf(envp[envc + 1], "CmiNumNodes=%d", 0);
5544 fprintf(stderr, "fork failed: %s\n", strerror(errno));
5548 int fd, fd2 = dup(2);
5550 // don't disable initial output on rank 0 process (need to be able to see python syntax errors, etc)
5551 if ((p.nodeno != 0) && (-1 != (fd = open("/dev/null", O_RDWR)))) {
5553 if (-1 != (fd = open("/dev/null", O_RDWR))) {
5559 int status = execve(dparamp[0],
5560 const_cast<char *const *>(dparamp), envp);
5562 fprintf(stderr, "execve failed to start process \"%s\": %s\n",
5563 dparamp[0], strerror(errno));
5568 for (char *p : dparamv)
5570 for (int i = envc, i_end = envc + n; i < i_end; ++i)
5577 static int current_restart_phase = 1;
5580 * @brief Relaunches a program on the crashed node.
5582 static void restart_node(nodetab_process & p)
5584 char startScript[200];
5585 /** write the startScript file to be sent**/
5586 sprintf(startScript, "/tmp/charmrun.%d.%d", getpid(), p.nodeno);
5587 FILE *f = fopen(startScript, "w");
5589 /** add an argument to the argv of the new process
5590 so that the restarting processor knows that it
5591 is a restarting processor */
5593 while (arg_argv[i] != NULL) {
5596 const char **restart_argv = (const char **) malloc(sizeof(char *) * (i + 4));
5598 while (arg_argv[i] != NULL) {
5599 restart_argv[i] = arg_argv[i];
5602 restart_argv[i] = "+restartaftercrash";
5605 sprintf(phase_str, "%d", ++current_restart_phase);
5606 restart_argv[i + 1] = phase_str;
5607 restart_argv[i + 2] = "+restartisomalloc";
5608 restart_argv[i + 3] = NULL;
5610 /** change the nodetable entry of the crashed
5611 processor to connect it to a new one**/
5612 static int next_replacement_host = 0;
5613 p.host->crashed = true;
5614 const int host_count = host_table.size();
5616 // Disabled since ft does not distinguish host faults from process faults
5617 int hosts_checked = 0;
5618 while (host_table[next_replacement_host]->crashed)
5620 ++next_replacement_host;
5621 next_replacement_host %= host_count;
5622 if (++hosts_checked == host_count)
5624 fprintf(stderr, "Charmrun> All hosts crashed, aborting.\n");
5629 p.host = host_table[next_replacement_host];
5630 ++next_replacement_host;
5631 next_replacement_host %= host_count;
5633 ssh_script(f, p, restart_argv);
5635 /**start the new processor */
5636 int restart_ssh_pid = ssh_fork(p, startScript);
5637 /**wait for the reply from the new process*/
5639 if (arg_debug_no_pause || arg_debug)
5643 waitpid(restart_ssh_pid, &status, 0);
5644 } while (!WIFEXITED(status));
5645 if (WEXITSTATUS(status) != 0) {
5647 "Charmrun> Error %d returned from new attempted remote shell \n",
5648 WEXITSTATUS(status));
5652 PRINT(("Charmrun finished launching new process in %fs\n",
5653 GetClock() - ftTimer));
5657 * @brief Reconnects a crashed node. It waits for the I-tuple from the just
5658 * relaunched program. It also:
5659 * i) Broadcast the nodetabtable to every other node.
5660 * ii) Announces the crash to every other node.
5662 static void reconnect_crashed_client(nodetab_process & crashed)
5664 if (0 == skt_select1(server_fd, arg_timeout * 1000)) {
5665 client_connect_problem(crashed, "Timeout waiting for restarted node-program to connect");
5669 unsigned int clientPort;
5671 const SOCKET req_client = skt_accept(server_fd, &clientIP, &clientPort);
5673 if (req_client == SOCKET_ERROR) {
5674 client_connect_problem(crashed, "Failure in restarted node accept");
5677 skt_tcp_no_nagle(req_client);
5680 if (!skt_select1(req_client, arg_timeout * 1000)) {
5681 client_connect_problem(crashed, "Timeout on IP request for restarted processor");
5685 if (arg_hierarchical_start) {
5686 req_forward_root(crashed);
5687 if (_last_crash != nullptr) {
5688 fprintf(stderr, "ERROR> Charmrun detected multiple crashes.\n");
5692 _last_crash = &crashed;
5696 ChMessage_recv(req_client, &msg);
5697 if (msg.len != sizeof(ChSingleNodeinfo)) {
5698 fprintf(stderr, "Charmrun: Bad initnode data length. Aborting\n");
5699 fprintf(stderr, "Charmrun: possibly because: %s.\n", msg.data);
5701 fprintf(stderr, "crashed_node %d reconnected fd %d \n",
5702 crashed.nodeno, req_client);
5704 /** update the nodetab entry corresponding to
5705 this node, skip the restarted one */
5706 ChSingleNodeinfo *in = (ChSingleNodeinfo *) msg.data;
5708 crashed.req_client = req_client;
5710 nodeinfo_add(in, crashed);
5712 nodeinfo_populate(crashed);
5714 for (const nodetab_process & p : my_process_table)
5716 req_send_initnodetab(p);
5718 /* tell every one there is a crash */
5719 announce_crash(crashed);
5720 if (_last_crash != nullptr) {
5721 fprintf(stderr, "ERROR> Charmrun detected multiple crashes.\n");
5724 _last_crash = &crashed;
5725 /*holds the restarted process until I got ack back from
5726 everyone in req_handle_crashack
5727 now the restarted one can only continue until
5728 req_handle_crashack calls req_send_initnodetab(socket_index)
5729 req_send_initnodetab(req_clients[socket_index]); */
5730 ChMessage_free(&msg);
5735 * @brief Sends a message announcing the crash to every other node. This message
5737 * trigger fault tolerance methods.
5739 static void announce_crash(const nodetab_process & crashed)
5741 ChMessageHeader hdr;
5742 ChMessageInt_t crashNo = ChMessageInt_new(crashed.nodeno);
5743 ChMessageHeader_new("crashnode", sizeof(ChMessageInt_t), &hdr);
5744 for (const nodetab_process & p : my_process_table)
5748 skt_sendN(p.req_client, (const char *) &hdr, sizeof(hdr));
5749 skt_sendN(p.req_client, (const char *) &crashNo,
5750 sizeof(ChMessageInt_t));
5757 #endif /*CMK_USE_SSH*/