Shrink/Expand: Fix C++ error and incorrect serialization of variable for network...
[charm.git] / src / arch / netlrts / machine.c
blobdb9cd04ce26ec109d5f5d77fa1900bfbb6845227
2 /** @file
3 * Basic NET-LRTS implementation of Converse machine layer
4 * @ingroup NET
5 */
7 /** @defgroup NET
8 * NET implementation of machine layer, ethernet in particular
9 * @ingroup Machine
11 * THE DATAGRAM STREAM
13 * Messages are sent using UDP datagrams. The sender allocates a
14 * struct for each datagram to be sent. These structs stick around
15 * until slightly after the datagram is acknowledged.
17 * Datagrams are transmitted node-to-node (as opposed to pe-to-pe).
18 * Each node has an OtherNode struct for every other node in the
19 * system. The OtherNode struct contains:
21 * send_queue (all datagram-structs not yet transmitted)
22 * send_window (all datagram-structs transmitted but not ack'd)
24 * When an acknowledgement comes in, all packets in the send-window
25 * are either marked as acknowledged or pushed back into the send
26 * queue for retransmission.
28 * THE OUTGOING MESSAGE
30 * When you send or broadcast a message, the first thing the system
31 * does is system creates an OutgoingMsg struct to represent the
32 * operation. The OutgoingMsg contains a very direct expression
33 * of what you want to do:
35 * OutgoingMsg:
37 * size --- size of message in bytes
38 * data --- pointer to the buffer containing the message
39 * src --- processor which sent the message
40 * dst --- destination processor (-1=broadcast, -2=broadcast all)
41 * freemode --- see below.
42 * refcount --- see below.
44 * The OutgoingMsg is kept around until the transmission is done, then
45 * it is garbage collected --- the refcount and freemode fields are
46 * to assist garbage collection.
48 * The freemode indicates which kind of buffer-management policy was
49 * used (sync, async, or freeing). The sync policy is handled
50 * superficially by immediately converting sync sends into freeing
51 * sends. Thus, the freemode can either be 'A' (async) or 'F'
52 * (freeing). If the freemode is 'F', then garbage collection
53 * involves freeing the data and the OutgoingMsg structure itself. If
54 * the freemode is 'A', then the only cleanup is to change the
55 * freemode to 'X', a condition which is then detectable by
56 * CmiAsyncMsgSent. In this case, the actual freeing of the
57 * OutgoingMsg is done by CmiReleaseCommHandle.
59 * When the transmission is initiated, the system computes how many
60 * datagrams need to be sent, total. This number is stored in the
61 * refcount field. Each time a datagram is delivered, the refcount
62 * is decremented, when it reaches zero, cleanup is performed. There
63 * are two exceptions to this rule. Exception 1: if the OutgoingMsg
64 * is a send (not a broadcast) and can be performed with shared
65 * memory, the entire datagram system is bypassed, the message is
66 * simply delivered and freed, not using the refcount mechanism at
67 * all. Exception 2: If the message is a broadcast, then part of the
68 * broadcast that can be done via shared memory is performed prior to
69 * initiating the datagram/refcount system.
71 * DATAGRAM FORMATS AND MESSAGE FORMATS
73 * Datagrams have this format:
75 * srcpe (16 bits) --- source processor number.
76 * magic ( 8 bits) --- magic number to make sure DG is good.
77 * dstrank ( 8 bits) --- destination processor rank.
78 * seqno (32 bits) --- packet sequence number.
79 * data (XX byte) --- user data.
81 * The only reason the srcpe is in there is because the receiver needs
82 * to know which receive window to use. The dstrank field is needed
83 * because transmission is node-to-node. Once the message is
84 * assembled by the node, it must be delivered to the appropriate PE.
85 * The dstrank field is used to encode certain special-case scenarios.
86 * If the dstrank is DGRAM_BROADCAST, the transmission is a broadcast,
87 * and should be delivered to all processors in the node. If the dstrank
88 * is DGRAM_ACKNOWLEDGE, the datagram is an acknowledgement datagram, in
89 * which case the srcpe is the number of the acknowledger, the seqno is
90 * always zero, and the user data is a list of the seqno's being
91 * acknowledged. There may be other dstrank codes for special functions.
93 * To send a message, one chops it up into datagrams and stores those
94 * datagrams in a send-queue. These outgoing datagrams aren't stored
95 * in the explicit format shown above. Instead, they are stored as
96 * ImplicitDgrams, which contain the datagram header and a pointer to
97 * the user data (which is in the user message buffer, which is in the
98 * OutgoingMsg). At transmission time these are combined together.
100 * The combination of the datagram header with the user's data is
101 * performed right in the user's message buffer. Note that the
102 * datagram header is exactly 64 bits. One simply overwrites 64 bits
103 * of the user's message with a datagram header, sends the datagram
104 * straight from the user's message buffer, then restores the user's
105 * buffer to its original state. There is a small problem with the
106 * first datagram of the message: one needs 64 bits of space to store
107 * the datagram header. To make sure this space is there, we added a
108 * 64-bit unused space to the front of the Cmi message header. In
109 * addition to this, we also add 32 bits to the Cmi message header
110 * to make room for a length-field, making it possible to identify
111 * message boundaries.
113 * CONCURRENCY CONTROL
115 * This has changed recently.
117 * EFFICIENCY NOTES
119 * The sender-side does little copying. The async and freeing send
120 * routines do no copying at all. The sync send routines copy the
121 * message, then use the freeing-send routines. The other alternative
122 * is to not copy the message, and use the async send mechanism
123 * combined with a blocking wait. Blocking wait seems like a bad
124 * idea, since it could take a VERY long time to get all those
125 * datagrams out the door.
127 * The receiver side, unfortunately, must copy. To avoid copying,
128 * it would have to receive directly into a preallocated message buffer.
129 * Unfortunately, this can't work: there's no way to know how much
130 * memory to preallocate, and there's no way to know which datagram
131 * is coming next. Thus, we receive into fixed-size (large) datagram
132 * buffers. These are then inspected, and the messages extracted from
133 * them.
135 * Note that we are allocating a large number of structs: OutgoingMsg's,
136 * ImplicitDgrams, ExplicitDgrams. By design, each of these structs
137 * is a fixed-size structure. Thus, we can do memory allocation by
138 * simply keeping a linked-list of unused structs around. The only
139 * place where expensive memory allocation is performed is in the
140 * sync routines.
142 * Since the datagrams from one node to another are fully ordered,
143 * there is slightly more ordering than is needed: in theory, the
144 * datagrams of one message don't need to be ordered relative to the
145 * datagrams of another. This was done to simplify the sequencing
146 * mechanisms: implementing a fully-ordered stream is much simpler
147 * than a partially-ordered one. It also makes it possible to
148 * modularize, layering the message transmitter on top of the
149 * datagram-sequencer. In other words, it was just easier this way.
150 * Hopefully, this won't cause serious degradation: LAN's rarely get
151 * datagrams out of order anyway.
153 * A potential efficiency problem is the lack of message-combining.
154 * One datagram could conceivably contain several messages. This
155 * might be more efficient, it's not clear how much overhead is
156 * involved in sending a short datagram. Message-combining isn't
157 * really ``integrated'' into the design of this software, but you
158 * could fudge it as follows. Whenever you pull a short datagram from
159 * the send-queue, check the next one to see if it's also a short
160 * datagram. If so, pack them together into a ``combined'' datagram.
161 * At the receive side, simply check for ``combined'' datagrams, and
162 * treat them as if they were simply two datagrams. This would
163 * require extra copying. I have no idea if this would be worthwhile.
165 *****************************************************************************/
168 * @addtogroup NET
169 * @{
172 /*****************************************************************************
174 * Include Files
176 ****************************************************************************/
178 #define _GNU_SOURCE 1
179 #include <stdarg.h> /*<- was <varargs.h>*/
181 #define CMK_USE_PRINTF_HACK 0
182 #if CMK_USE_PRINTF_HACK
183 /*HACK: turn printf into CmiPrintf, by just defining our own
184 external symbol "printf". This may be more trouble than it's worth,
185 since the only advantage is that it works properly with +syncprint.
187 This version *won't* work with fprintf(stdout,...) or C++ or Fortran I/O,
188 because they don't call printf. Has to be defined up here because we probably
189 haven't properly guessed this compiler's prototype for "printf".
191 static void InternalPrintf(const char *f, va_list l);
192 int printf(const char *fmt, ...) {
193 int nChar;
194 va_list p; va_start(p, fmt);
195 InternalPrintf(fmt,p);
196 va_end(p);
197 return 10;
199 #endif
202 #include "converse.h"
203 #include "memory-isomalloc.h"
205 #include <stdio.h>
206 #include <stdlib.h>
207 #include <ctype.h>
208 #include <fcntl.h>
209 #include <errno.h>
210 #include <setjmp.h>
211 #include <signal.h>
212 #include <string.h>
213 #include <unistd.h>
215 /* define machine debug */
216 #include "machine.h"
217 static int Cmi_charmrun_pid;
218 static int Cmi_charmrun_fd = -1;
220 CMI_EXTERNC_VARIABLE int quietMode;
222 /******************* Producer-Consumer Queues ************************/
223 #include "pcqueue.h"
225 #include "machine-smp.h"
227 #include "machine-lrts.h"
228 #include "machine-common-core.c"
230 #if CMK_USE_KQUEUE
231 #include <sys/event.h>
232 int _kq = -1;
233 #endif
235 #if CMK_USE_POLL
236 #include <poll.h>
237 #endif
239 #include "conv-ccs.h"
240 #include "ccs-server.h"
241 #include "sockRoutines.h"
243 #if defined(_WIN32)
244 /*For windows systems:*/
245 # include <windows.h>
246 # include <wincon.h>
247 # include <sys/types.h>
248 # include <sys/timeb.h>
249 # define fdopen _fdopen
250 # define SIGBUS -1 /*These signals don't exist in Win32*/
251 # define SIGKILL -1
252 # define SIGQUIT -1
253 /*# define SIGTERM -1*/ /* VC++ ver 8 now has SIGTERM */
255 #else /*UNIX*/
256 # include <pwd.h>
257 # include <unistd.h>
258 # include <fcntl.h>
259 # include <sys/file.h>
260 #endif
262 #if CMK_PERSISTENT_COMM
263 #include "machine-persistent.c"
264 #endif
266 #define PRINTBUFSIZE 16384
268 #ifdef __ONESIDED_IMPL
269 #ifdef __ONESIDED_NO_HARDWARE
270 int putSrcHandler;
271 int putDestHandler;
272 int getSrcHandler;
273 int getDestHandler;
274 #include "conv-onesided.c"
275 #endif
276 #endif
278 #if CMK_ONESIDED_IMPL
279 #include "machine-rdma.h"
280 #include "machine-onesided.h"
281 #endif
283 #if CMK_SHRINK_EXPAND
284 extern void resumeAfterRealloc(void);
285 extern char willContinue;
286 extern int mynewpe;
287 extern int numProcessAfterRestart;
288 CcsDelayedReply shrinkExpandreplyToken;
289 extern char *_shrinkexpand_basedir;
290 #endif
292 static void CommunicationServerNet(int withDelayMs, int where);
293 //static void CommunicationServer(int withDelayMs);
295 void CmiHandleImmediate(void);
296 extern int CmemInsideMem(void);
297 extern void CmemCallWhenMemAvail(void);
299 static unsigned int dataport=0;
300 static SOCKET dataskt;
302 extern void TokenUpdatePeriodic(void);
303 extern void getAvailSysMem(void);
305 static int Lrts_numNodes;
306 static int Lrts_myNode;
308 /****************************************************************************
310 * Handling Errors
312 * Errors should be handled by printing a message on stderr and
313 * calling exit(1). Nothing should be sent to charmrun, no attempt at
314 * communication should be made. The other processes will notice the
315 * abnormal termination and will deal with it.
317 * Rationale: if an error triggers an attempt to send a message,
318 * the attempt to send a message is likely to trigger another error,
319 * leading to an infinite loop and a process that spins instead of
320 * shutting down.
322 *****************************************************************************/
324 static int machine_initiated_shutdown=0;
325 static int already_in_signal_handler=0;
327 static void CmiDestroyLocks(void);
329 CMI_EXTERNC
330 void EmergencyExit(void);
331 void MachineExit(void);
333 static void machine_exit(int status)
335 MACHSTATE(3," machine_exit");
336 machine_initiated_shutdown=1;
338 CmiDestroyLocks(); /* destroy locks to prevent dead locking */
339 EmergencyExit();
341 MachineExit();
342 exit(status);
345 static void charmrun_abort(const char*);
347 static void KillEveryone(const char *msg)
349 printf("XYZ FOOBAR exit\n");
350 charmrun_abort(msg);
351 machine_exit(1);
354 static void KillEveryoneCode(int n)
356 char _s[100];
357 sprintf(_s, "[%d] XXX Fatal error #%d\n", CmiMyPe(), n);
358 charmrun_abort(_s);
359 machine_exit(1);
362 CpvCExtern(int, freezeModeFlag);
364 static int Cmi_truecrash;
366 static void KillOnAllSigs(int sigNo)
368 const char *sig="unknown signal";
369 const char *suggestion="";
370 if (machine_initiated_shutdown ||
371 already_in_signal_handler)
372 machine_exit(1); /*Don't infinite loop if there's a signal during a signal handler-- just die.*/
373 already_in_signal_handler=1;
375 #if CMK_CCS_AVAILABLE
376 if (CpvAccess(cmiArgDebugFlag)) {
377 int reply = 0;
378 CpdNotify(CPD_SIGNAL,sigNo);
379 #if ! CMK_BIGSIM_CHARM
380 CcsSendReplyNoError(4,&reply);/*Send an empty reply if not*/
381 CpvAccess(freezeModeFlag) = 1;
382 CpdFreezeModeScheduler();
383 #else
384 CpdFreeze();
385 #endif
387 #endif
390 if (sigNo==SIGSEGV) {
391 sig="segmentation violation";
392 suggestion="Try running with '++debug', or linking with '-memory paranoid' (memory paranoid requires '+netpoll' at runtime).";
394 if (sigNo==SIGFPE) {
395 sig="floating point exception";
396 suggestion="Check for integer or floating-point division by zero.";
398 if (sigNo==SIGBUS) {
399 sig="bus error";
400 suggestion="Check for misaligned reads or writes to memory.";
402 if (sigNo==SIGILL) {
403 sig="illegal instruction";
404 suggestion="Check for calls to uninitialized function pointers.";
406 if (sigNo==SIGKILL) sig="caught signal KILL";
407 if (sigNo==SIGQUIT) sig="caught signal QUIT";
408 if (sigNo==SIGTERM) sig="caught signal TERM";
409 MACHSTATE1(5," Caught signal %s ",sig);
410 /*ifdef this part*/
411 #ifdef __FAULT__
412 if(sigNo == SIGKILL || sigNo == SIGQUIT || sigNo == SIGTERM){
413 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
414 } else
415 #endif
417 Cmi_truecrash = 0;
418 CmiAbortHelper("Caught Signal", sig, suggestion, 0, 1);
422 static void machine_atexit_check(void)
424 if (!machine_initiated_shutdown)
425 CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
426 #if 0 /*Wait for the user to press any key (for Win32 debugging)*/
427 fgetc(stdin);
428 #endif
431 #if !defined(_WIN32)
432 static void HandleUserSignals(int signum)
434 int condnum = ((signum==SIGUSR1) ? CcdSIGUSR1 : CcdSIGUSR2);
435 CcdRaiseCondition(condnum);
437 #endif
439 /*****************************************************************************
441 * Utility routines for network machine interface.
443 *****************************************************************************/
446 Horrific #defines to hide the differences between select() and poll().
448 #if CMK_USE_POLL /*poll() version*/
449 # define CMK_PIPE_DECL(delayMs) \
450 struct pollfd fds[10]; \
451 int nFds_sto=0; int *nFds=&nFds_sto; \
452 int pollDelayMs=delayMs;
453 # define CMK_PIPE_SUB fds,nFds
454 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
456 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
457 # define CMK_PIPE_ADDREAD(rd_fd) \
458 do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
459 # define CMK_PIPE_ADDWRITE(wr_fd) \
460 do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
461 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
462 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
464 #elif CMK_USE_KQUEUE /* kqueue version */
466 # define CMK_PIPE_DECL(delayMs) \
467 if (_kq == -1) _kq = kqueue(); \
468 struct kevent ke_sto; \
469 struct kevent* ke = &ke_sto; \
470 struct timespec tmo; \
471 tmo.tv_sec = 0; tmo.tv_nsec = delayMs*1e6;
472 # define CMK_PIPE_SUB ke
473 # define CMK_PIPE_CALL() kevent(_kq, NULL, 0, ke, 1, &tmo)
475 # define CMK_PIPE_PARAM struct kevent* ke
476 # define CMK_PIPE_ADDREAD(rd_fd) \
477 do { EV_SET(ke, rd_fd, EVFILT_READ, EV_ADD, 0, 10, NULL); \
478 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(*ke));} while(0)
479 # define CMK_PIPE_ADDWRITE(wr_fd) \
480 do { EV_SET(ke, wr_fd, EVFILT_WRITE, EV_ADD, 0, 10, NULL); \
481 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(*ke));} while(0)
482 # define CMK_PIPE_CHECKREAD(rd_fd) (ke->ident == rd_fd && ke->filter == EVFILT_READ)
483 # define CMK_PIPE_CHECKWRITE(wr_fd) (ke->ident == wr_fd && ke->filter == EVFILT_WRITE)
485 #else /*select() version*/
487 # define CMK_PIPE_DECL(delayMs) \
488 fd_set rfds_sto,wfds_sto;\
489 fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
490 FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
491 # define CMK_PIPE_SUB rfds,wfds
492 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
494 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
495 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
496 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
497 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
498 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
499 #endif
501 static void CMK_PIPE_CHECKERR(void) {
502 #if defined(_WIN32)
503 /* Win32 socket seems to randomly return inexplicable errors
504 here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK.
505 int err=WSAGetLastError();
506 CmiPrintf("(%d)Select returns -1; errno=%d, WSAerr=%d\n",withDelayMs,errn
507 o,err);
509 #else /*UNIX machine*/
510 if (errno!=EINTR)
511 KillEveryone("Socket error in CheckSocketsReady!\n");
512 #endif
516 static void CmiStdoutFlush(void);
517 static int CmiStdoutNeedsService(void);
518 static void CmiStdoutService(void);
519 static void CmiStdoutAdd(CMK_PIPE_PARAM);
520 static void CmiStdoutCheck(CMK_PIPE_PARAM);
523 double GetClock(void)
525 #if defined(_WIN32)
526 struct _timeb tv;
527 _ftime(&tv);
528 return (tv.time * 1.0 + tv.millitm * 1.0E-3);
529 #else
530 struct timeval tv; int ok;
531 ok = gettimeofday(&tv, NULL);
532 if (ok<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
533 return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
534 #endif
538 /***********************************************************************
540 * Abort function:
542 ************************************************************************/
544 static int already_aborting=0;
545 void LrtsAbort(const char *message)
547 if (already_aborting) machine_exit(1);
548 already_aborting=1;
549 MACHSTATE1(5,"CmiAbort(%s)",message);
551 /*Send off any remaining prints*/
552 CmiStdoutFlush();
554 if(Cmi_truecrash) {
555 printf("CHARM++ FATAL ERROR: %s\n", message);
556 volatile int* ptr = NULL;
557 *ptr = 0; /*Write to null, causing bus error*/
558 } else {
559 charmrun_abort(message);
560 machine_exit(1);
565 /******************************************************************************
567 * CmiEnableAsyncIO
569 * The net and tcp versions use a bunch of unix processes talking to each
570 * other via file descriptors. We need for a signal SIGIO to be generated
571 * each time a message arrives, making it possible to write a signal
572 * handler to handle the messages. The vast majority of unixes can,
573 * in fact, do this. However, there isn't any standard for how this is
574 * supposed to be done, so each version of UNIX has a different set of
575 * calls to turn this signal on. So, there is like one version here for
576 * every major brand of UNIX.
578 *****************************************************************************/
580 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
581 #include <fcntl.h>
582 void CmiEnableAsyncIO(int fd)
584 if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
585 CmiError("setting socket owner: %s\n", strerror(errno)) ;
586 exit(1);
588 if ( fcntl(fd, F_SETFL, O_ASYNC) < 0 ) {
589 CmiError("setting socket async: %s\n", strerror(errno)) ;
590 exit(1);
593 #else
594 void CmiEnableAsyncIO(int fd) { }
595 #endif
597 /* We should probably have a set of "CMK_NONBLOCK_USE_..." defines here:*/
598 #if !defined(_WIN32)
599 void CmiEnableNonblockingIO(int fd) {
600 int on=1;
601 if (fcntl(fd,F_SETFL,O_NONBLOCK,&on)<0) {
602 CmiError("setting nonblocking IO: %s\n", strerror(errno)) ;
603 exit(1);
606 #else
607 void CmiEnableNonblockingIO(int fd) { }
608 #endif
611 /******************************************************************************
613 * Configuration Data
615 * This data is all read in from the NETSTART variable (provided by the
616 * charmrun) and from the command-line arguments. Once read in, it is never
617 * modified.
619 *****************************************************************************/
621 static skt_ip_t Cmi_self_IP;
622 static skt_ip_t Cmi_charmrun_IP; /*Address of charmrun machine*/
623 static int Cmi_charmrun_port;
624 /* Magic number to be used for sanity check in messege header */
625 static int Cmi_net_magic;
627 static int Cmi_netpoll;
628 static int Cmi_asyncio;
629 static int Cmi_idlepoll;
630 static int Cmi_syncprint;
631 static int Cmi_print_stats = 0;
633 #if CMK_SHRINK_EXPAND
634 int Cmi_isOldProcess = 0; // means this process was already there
635 static int Cmi_mynewpe = 0;
636 static int Cmi_oldpe = 0;
637 static int Cmi_newnumnodes = 0;
638 int Cmi_myoldpe = 0;
639 #endif
641 extern int CmiMyLocalRank;
643 #if ! defined(_WIN32)
644 /* parse forks only used in non-smp mode */
645 static void parse_forks(void) {
646 char *forkstr;
647 int nread;
648 int forks;
649 int i,pid;
650 forkstr=getenv("CmiMyForks");
651 if(forkstr!=0) { /* charmrun */
652 nread = sscanf(forkstr,"%d",&forks);
653 /* CmiMyLocalRank is used for setting default cpu affinity */
654 CmiMyLocalRank = 0;
655 for(i=1;i<=forks;i++) { /* by default forks = 0 */
656 pid=fork();
657 if(pid<0) CmiAbort("Fork returned an error");
658 if(pid==0) { /* forked process */
659 /* reset mynode,pe & exit loop */
660 CmiMyLocalRank = i;
661 Lrts_myNode+=i;
662 #if ! CMK_SMP
663 _Cmi_mype+=i;
664 #endif
665 break;
670 #endif
672 static void parse_magic(void)
674 char* nm;
675 int nread;
676 nm = getenv("NETMAGIC");
677 if (nm!=0)
678 {/*Read values set by Charmrun*/
679 nread = sscanf(nm, "%d",&Cmi_net_magic);
683 static void parse_netstart(void)
685 char *ns;
686 int nread;
687 int port;
688 ns = getenv("NETSTART");
689 if (ns!=0)
690 {/*Read values set by Charmrun*/
691 char Cmi_charmrun_name[1024];
692 nread = sscanf(ns, "%d%s%d%d%d",
693 &Lrts_myNode,
694 Cmi_charmrun_name, &Cmi_charmrun_port,
695 &Cmi_charmrun_pid, &port);
696 Cmi_charmrun_IP=skt_lookup_ip(Cmi_charmrun_name);
698 if (nread!=5) {
699 fprintf(stderr,"Error parsing NETSTART '%s'\n",ns);
700 exit(1);
702 #if CMK_SHRINK_EXPAND
703 if (Cmi_isOldProcess) {
704 Cmi_myoldpe = Lrts_myNode;
706 #endif
707 if (getenv("CmiLocal") != NULL) { /* ++local */
708 /* CmiMyLocalRank is used for setting default cpu affinity */
709 CmiMyLocalRank = Lrts_myNode;
712 } else
713 {/*No charmrun-- set flag values for standalone operation*/
714 Lrts_myNode=0;
715 Cmi_charmrun_IP=_skt_invalid_ip;
716 Cmi_charmrun_port=0;
717 Cmi_charmrun_pid=0;
718 dataport = -1;
719 #if CMK_USE_PXSHM
720 CmiAbort("pxshm must be run with charmrun");
721 #endif
725 static void extract_common_args(char **argv)
727 if (CmiGetArgFlagDesc(argv,"+stats","Print network statistics at shutdown"))
728 Cmi_print_stats = 1;
729 #if CMK_SHRINK_EXPAND
730 //Realloc specific args
731 CmiGetArgIntDesc(argv,"+mynewpe",&Cmi_mynewpe,"New PE after realloc");
732 CmiGetArgIntDesc(argv,"+myoldpe",&Cmi_oldpe,"New PE after realloc");
733 CmiGetArgIntDesc(argv,"+newnumpes",&Cmi_newnumnodes,"New num PEs after realloc");
734 #endif
738 /******************************************************************************
740 * Packet Performance Logging
742 * This module is designed to give a detailed log of the packets and their
743 * acknowledgements, for performance tuning. It can be disabled.
745 *****************************************************************************/
747 #define LOGGING 0
749 #if LOGGING
751 typedef struct logent {
752 double time;
753 int seqno;
754 int srcpe;
755 int dstpe;
756 int kind;
757 } *logent;
760 logent log;
761 int log_pos;
762 int log_wrap;
764 static void log_init(void)
766 log = (logent)malloc(50000 * sizeof(struct logent));
767 _MEMCHECK(log);
768 log_pos = 0;
769 log_wrap = 0;
772 static void log_done(void)
774 char logname[100]; FILE *f; int i, size;
775 sprintf(logname, "log.%d", Lrts_myNode);
776 f = fopen(logname, "w");
777 if (f==0) KillEveryone("fopen problem");
778 if (log_wrap) size = 50000; else size=log_pos;
779 for (i=0; i<size; i++) {
780 logent ent = log+i;
781 fprintf(f, "%1.4f %d %c %d %d\n",
782 ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
784 fclose(f);
787 void printLog(void)
789 char logname[100]; FILE *f; int i, j, size;
790 static int logged = 0;
791 if (logged)
792 return;
793 logged = 1;
794 CmiPrintf("Logging: %d\n", Lrts_myNode);
795 sprintf(logname, "log.%d", Lrts_myNode);
796 f = fopen(logname, "w");
797 if (f==0) KillEveryone("fopen problem");
798 for (i = 5000; i; i--)
800 /*for (i=0; i<size; i++) */
801 j = log_pos - i;
802 if (j < 0)
804 if (log_wrap)
805 j = 5000 + j;
806 else
807 j = 0;
810 logent ent = log+j;
811 fprintf(f, "%1.4f %d %c %d %d\n",
812 ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
815 fclose(f);
816 CmiPrintf("Done Logging: %d\n", Lrts_myNode);
819 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
821 #endif
823 #if !LOGGING
825 #define log_init() /*empty*/
826 #define log_done() /*empty*/
827 #define printLog() /*empty*/
828 #define LOG(t,s,k,d,q) /*empty*/
830 #endif
832 /******************************************************************************
834 * Node state
836 *****************************************************************************/
839 static CmiNodeLock Cmi_comm_var_mutex;
840 static CmiNodeLock Cmi_scanf_mutex;
841 static double Cmi_clock;
842 static double Cmi_check_delay = 3.0;
844 /******************************************************************************
846 * OS Threads
847 * SMP implementation moved to machine-smp.c
848 *****************************************************************************/
849 int* inProgress;
851 /** Mechanism to prevent dual locking when comm-layer functions, including prints,
852 * are called recursively. (UN)LOCK_IF_AVAILABLE is used before and after a code piece
853 * which is guaranteed not to make any-recursive locking calls. (UN)LOCK_AND_(UN)SET
854 * is used before and after a code piece that may make recursive locking calls.
857 #define LOCK_IF_AVAILABLE() \
858 if(!inProgress[CmiMyRank()]) { \
859 CmiCommLock(); \
862 #define UNLOCK_IF_AVAILABLE() \
863 if(!inProgress[CmiMyRank()]) { \
864 CmiCommUnlock(); \
867 #define LOCK_AND_SET() \
868 if(!inProgress[CmiMyRank()]) { \
869 CmiCommLock(); \
870 acqLock = 1; \
872 inProgress[CmiMyRank()] += 1;
874 #define UNLOCK_AND_UNSET() \
875 if(acqLock) { \
876 CmiCommUnlock(); \
877 acqLock = 0; \
879 inProgress[CmiMyRank()] -= 1;
881 /************************ No kernel SMP threads ***************/
882 //XX
883 #if CMK_SHARED_VARS_UNAVAILABLE
885 static volatile int memflag=0;
886 void CmiMemLockNet(void) { memflag++; }
887 void CmiMemUnlockNet(void) { memflag--; }
889 static volatile int comm_flag=0;
890 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
891 #ifndef MACHLOCK_DEBUG
892 # define CmiCommLock() (comm_flag=1)
893 # define CmiCommUnlock() (comm_flag=0)
894 #else /* Error-checking flag locks */
895 void CmiCommLock(void) {
896 MACHLOCK_ASSERT(!comm_flag,"CmiCommLock");
897 comm_flag=1;
899 void CmiCommUnlock(void) {
900 MACHLOCK_ASSERT(comm_flag,"CmiCommUnlock");
901 comm_flag=0;
903 #endif
905 static void CommunicationInterrupt(int ignored)
907 MACHLOCK_ASSERT(!_Cmi_myrank,"CommunicationInterrupt");
908 if (memflag || comm_flag || _immRunning || CmiCheckImmediateLock(0))
909 { /* Already busy inside malloc, comm, or immediate messages */
910 MACHSTATE(5,"--SKIPPING SIGIO--");
911 return;
913 MACHSTATE1(2,"--BEGIN SIGIO comm_mutex_isLocked: %d--", comm_flag)
915 /*Make sure any malloc's we do in here are NOT migratable:*/
916 CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
917 /* _Cmi_myrank=1; */
918 CommunicationServerNet(0, COMM_SERVER_FROM_INTERRUPT); /* from interrupt */
919 //CommunicationServer(0); /* from interrupt */
920 /* _Cmi_myrank=0; */
921 CmiIsomallocBlockListActivate(oldList);
923 MACHSTATE(2,"--END SIGIO--")
926 extern void CmiSignal(int sig1, int sig2, int sig3, void (*handler)(int));
928 static void CmiDestroyLocks(void)
930 comm_flag = 0;
931 memflag = 0;
934 #endif
936 /*Add a message to this processor's receive queue
937 Must be called while holding comm. lock
940 extern double evacTime;
942 /***************************************************************
943 Communication with charmrun:
944 We can send (ctrl_sendone) and receive (ctrl_getone)
945 messages on a TCP socket connected to charmrun.
946 This is used for printfs, CCS, etc; and also for
947 killing ourselves if charmrun dies.
950 /*This flag prevents simultanious outgoing
951 messages on the charmrun socket. It is protected
952 by the commlock.*/
953 static int Cmi_charmrun_fd_sendflag=0;
955 /* ctrl_sendone */
956 static int sendone_abort_fn(SOCKET skt,int code,const char *msg) {
957 fprintf(stderr,"Socket error %d in ctrl_sendone! %s\n",code,msg);
958 machine_exit(1);
959 return -1;
962 static void ctrl_sendone_nolock(const char *type,
963 const char *data1,int dataLen1,
964 const char *data2,int dataLen2)
966 const void *bufs[3]; int lens[3]; int nBuffers=0;
967 ChMessageHeader hdr;
968 skt_abortFn oldAbort=skt_set_abort(sendone_abort_fn);
969 MACHSTATE1(2,"ctrl_sendone_nolock { type=%s", type);
970 if (Cmi_charmrun_fd==-1)
971 charmrun_abort("ctrl_sendone called in standalone!\n");
972 Cmi_charmrun_fd_sendflag=1;
973 ChMessageHeader_new(type,dataLen1+dataLen2,&hdr);
974 bufs[nBuffers]=&hdr; lens[nBuffers]=sizeof(hdr); nBuffers++;
975 if (dataLen1>0) {bufs[nBuffers]=data1; lens[nBuffers]=dataLen1; nBuffers++;}
976 if (dataLen2>0) {bufs[nBuffers]=data2; lens[nBuffers]=dataLen2; nBuffers++;}
977 skt_sendV(Cmi_charmrun_fd,nBuffers,bufs,lens);
978 Cmi_charmrun_fd_sendflag=0;
979 skt_set_abort(oldAbort);
980 MACHSTATE(2,"} ctrl_sendone_nolock");
983 static void ctrl_sendone_locking(const char *type,
984 const char *data1,int dataLen1,
985 const char *data2,int dataLen2)
987 LOCK_IF_AVAILABLE();
988 ctrl_sendone_nolock(type,data1,dataLen1,data2,dataLen2);
989 UNLOCK_IF_AVAILABLE();
992 #ifndef MEMORYUSAGE_OUTPUT
993 #define MEMORYUSAGE_OUTPUT 0
994 #endif
995 #if MEMORYUSAGE_OUTPUT
996 #define MEMORYUSAGE_OUTPUT_FREQ 10 //how many prints in a second
997 static int memoryusage_counter;
998 #define memoryusage_isOutput ((memoryusage_counter%MEMORYUSAGE_OUTPUT_FREQ)==0)
999 #define memoryusage_output {\
1000 memoryusage_counter++;\
1001 if(CmiMyPe()==0) printf("-- %d %f %ld --\n", CmiMyPe(), GetClock(), CmiMemoryUsage());}
1002 #endif
1004 /* if charmrun dies, we finish */
1005 static void pingCharmrun(int ignored)
1007 static double Cmi_check_last;
1009 #if MEMORYUSAGE_OUTPUT
1010 memoryusage_output;
1011 if(memoryusage_isOutput){
1012 memoryusage_counter = 0;
1013 #else
1015 #endif
1017 double clock=GetClock();
1018 if (clock > Cmi_check_last + Cmi_check_delay) {
1019 MACHSTATE1(3,"CommunicationsClock pinging charmrun Cmi_charmrun_fd_sendflag=%d", Cmi_charmrun_fd_sendflag);
1020 Cmi_check_last = clock;
1021 CmiCommLockOrElse(return;); /*Already busy doing communication*/
1022 if (Cmi_charmrun_fd_sendflag) return; /*Busy talking to charmrun*/
1023 LOCK_IF_AVAILABLE();
1024 ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
1025 UNLOCK_IF_AVAILABLE();
1027 CmiStdoutFlush(); /*Make sure stdout buffer hasn't filled up*/
1031 /* periodic charm ping, for netpoll */
1032 static void pingCharmrunPeriodic(void *ignored)
1034 pingCharmrun(0);
1035 CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
1038 static int ignore_further_errors(SOCKET skt,int c,const char *msg) {machine_exit(2);return -1;}
1039 static void charmrun_abort(const char *s)
1041 if (Cmi_charmrun_fd==-1) {/*Standalone*/
1042 fprintf(stderr,"Charm++ fatal error:\n%s\n",s);
1043 CmiPrintStackTrace(0);
1044 abort();
1045 } else {
1046 char msgBuf[80];
1047 skt_set_abort(ignore_further_errors);
1048 if (CmiNumPartitions() == 1) {
1049 sprintf(msgBuf,"Fatal error on PE %d> ",CmiMyPe());
1051 else
1053 sprintf(msgBuf,"Fatal error on Partition %d PE %d> ", CmiMyPartition(), CmiMyPe());
1055 ctrl_sendone_nolock("abort",msgBuf,strlen(msgBuf),s,strlen(s)+1);
1059 #if CMK_SHRINK_EXPAND
1060 void charmrun_realloc(const char *s)
1062 ctrl_sendone_nolock("realloc",s,strlen(s)+1,NULL,0);
1064 #endif
1066 /* ctrl_getone */
1068 #ifdef __FAULT__
1069 #include "machine-recover.c"
1070 #endif
1072 static void node_addresses_store(ChMessage *msg);
1074 static int barrierReceived = 0;
1076 static void ctrl_getone(void)
1078 ChMessage msg;
1079 MACHSTATE(2,"ctrl_getone")
1080 MACHLOCK_ASSERT(comm_mutex_isLocked,"ctrl_getone")
1081 ChMessage_recv(Cmi_charmrun_fd,&msg);
1082 MACHSTATE1(2,"ctrl_getone recv one '%s'", msg.header.type);
1084 if (strcmp(msg.header.type,"die")==0) {
1085 MACHSTATE(2,"ctrl_getone bye bye")
1086 fprintf(stderr,"aborting: %s\n",msg.data);
1087 log_done();
1088 ConverseCommonExit();
1089 machine_exit(0);
1091 #if CMK_CCS_AVAILABLE
1092 else if (strcmp(msg.header.type, "req_fw")==0) {
1093 CcsImplHeader *hdr=(CcsImplHeader *)msg.data;
1094 /*Sadly, I *can't* do a:
1095 CcsImpl_netRequest(hdr,msg.data+sizeof(CcsImplHeader));
1096 here, because I can't send converse messages in the
1097 communication thread. I *can* poke this message into
1098 any convenient processor's queue, though: (OSL, 9/14/2000)
1100 int pe=0;/*<- node-local processor number. Any one will do.*/
1101 void *cmsg=(void *)CcsImpl_ccs2converse(hdr,msg.data+sizeof(CcsImplHeader),NULL);
1102 MACHSTATE(2,"Incoming CCS request");
1103 if (cmsg!=NULL)
1105 if(CmiNumPes() == 1 && CmiNumPartitions() == 1)
1106 ccsRunning = 1;
1107 CmiPushPE(pe,cmsg);
1110 #endif
1111 #ifdef __FAULT__
1112 else if(strcmp(msg.header.type,"crashnode")==0) {
1113 crash_node_handle(&msg);
1115 else if(strcmp(msg.header.type,"initnodetab")==0) {
1116 /** A processor crashed and got recreated. So charmrun sent
1117 across the whole nodetable data to update this processor*/
1118 node_addresses_store(&msg);
1119 // fprintf(stdout,"nodetable added %d\n",CmiMyPe());
1121 #endif
1122 else if(strcmp(msg.header.type,"barrier")==0) {
1123 barrierReceived = 1;
1125 else if(strcmp(msg.header.type,"barrier0")==0) {
1126 barrierReceived = 2;
1128 else {
1129 /* We do not use KillEveryOne here because it calls CmiMyPe(),
1130 * which is not available to the communication thread on an SMP version.
1132 /* CmiPrintf("Unknown message: %s\n", msg.header.type); */
1133 charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
1134 machine_exit(1);
1137 MACHSTATE(2,"ctrl_getone done")
1138 ChMessage_free(&msg);
1141 #if CMK_CCS_AVAILABLE && !NODE_0_IS_CONVHOST
1142 /*Deliver this reply data to this reply socket.
1143 The data is forwarded to CCS server via charmrun.*/
1144 void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
1146 MACHSTATE(2,"Outgoing CCS reply");
1147 ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
1148 (const char *)repData,repLen);
1149 MACHSTATE(1,"Outgoing CCS reply away");
1151 #endif
1153 /*****************************************************************************
1155 * CmiPrintf, CmiError, CmiScanf
1157 *****************************************************************************/
1158 static void InternalWriteToTerminal(int isStdErr,const char *str,int len);
1159 static void InternalPrintf(const char *f, va_list l)
1161 ChMessage replymsg;
1162 char *buffer = (char *)CmiTmpAlloc(PRINTBUFSIZE);
1163 CmiStdoutFlush();
1164 vsprintf(buffer, f, l);
1165 if(Cmi_syncprint) {
1166 LOCK_IF_AVAILABLE();
1167 ctrl_sendone_nolock("printsyn", buffer,strlen(buffer)+1,NULL,0);
1168 ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1169 ChMessage_free(&replymsg);
1170 UNLOCK_IF_AVAILABLE();
1171 } else {
1172 ctrl_sendone_locking("print", buffer,strlen(buffer)+1,NULL,0);
1174 InternalWriteToTerminal(0,buffer,strlen(buffer));
1175 CmiTmpFree(buffer);
1178 static void InternalError(const char *f, va_list l)
1180 ChMessage replymsg;
1181 char *buffer = (char *)CmiTmpAlloc(PRINTBUFSIZE);
1182 CmiStdoutFlush();
1183 vsprintf(buffer, f, l);
1184 if(Cmi_syncprint) {
1185 ctrl_sendone_locking("printerrsyn", buffer,strlen(buffer)+1,NULL,0);
1186 LOCK_IF_AVAILABLE();
1187 ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1188 ChMessage_free(&replymsg);
1189 UNLOCK_IF_AVAILABLE();
1190 } else {
1191 ctrl_sendone_locking("printerr", buffer,strlen(buffer)+1,NULL,0);
1193 InternalWriteToTerminal(1,buffer,strlen(buffer));
1194 CmiTmpFree(buffer);
1197 static int InternalScanf(char *fmt, va_list l)
1199 ChMessage replymsg;
1200 char *ptr[20];
1201 char *p; int nargs, i;
1202 nargs=0;
1203 p=fmt;
1204 while (*p) {
1205 if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
1206 if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
1207 if (p[0]=='%') { nargs++; p++; continue; }
1208 if (*p=='\n') *p=' '; p++;
1210 if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
1211 for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
1212 CmiLock(Cmi_scanf_mutex);
1213 if (Cmi_charmrun_fd!=-1)
1214 {/*Send charmrun the format string*/
1215 ctrl_sendone_locking("scanf", fmt, strlen(fmt)+1,NULL,0);
1216 /*Wait for the reply (characters to scan) from charmrun*/
1217 LOCK_IF_AVAILABLE();
1218 ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1219 i = sscanf((char*)replymsg.data, fmt,
1220 ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1221 ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1222 ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1223 ChMessage_free(&replymsg);
1224 UNLOCK_IF_AVAILABLE();
1225 } else
1226 {/*Just do the scanf normally*/
1227 i=scanf(fmt, ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1228 ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1229 ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1231 CmiUnlock(Cmi_scanf_mutex);
1232 return i;
1234 #if CMK_CMIPRINTF_IS_A_BUILTIN
1236 /*New stdarg.h declarations*/
1237 void CmiPrintf(const char *fmt, ...)
1239 if (quietMode) return;
1240 CpdSystemEnter();
1242 va_list p; va_start(p, fmt);
1243 if (Cmi_charmrun_fd!=-1 && _writeToStdout)
1244 InternalPrintf(fmt, p);
1245 else
1246 vfprintf(stdout,fmt,p);
1247 va_end(p);
1249 CpdSystemExit();
1252 void CmiError(const char *fmt, ...)
1254 CpdSystemEnter();
1256 va_list p; va_start (p, fmt);
1257 if (Cmi_charmrun_fd!=-1)
1258 InternalError(fmt, p);
1259 else
1260 vfprintf(stderr,fmt,p);
1261 va_end(p);
1263 CpdSystemExit();
1266 int CmiScanf(const char *fmt, ...)
1268 int i;
1269 CpdSystemEnter();
1271 va_list p; va_start(p, fmt);
1272 i = InternalScanf((char *)fmt, p);
1273 va_end(p);
1275 CpdSystemExit();
1276 return i;
1279 #endif
1281 /***************************************************************************
1282 * Output redirection:
1283 * When people don't use CkPrintf, like above, we'd still like to be able
1284 * to collect their output. Thus we make a pipe and dup2 it to stdout,
1285 * which lets us read the characters sent to stdout at our lesiure.
1286 ***************************************************************************/
1288 /*Can read from stdout or stderr using these fd's*/
1289 static int readStdout[2];
1290 static int writeStdout[2]; /*The original stdout/stderr sockets*/
1291 static int serviceStdout[2]; /*(bool) Normally zero; one if service needed.*/
1292 #define readStdoutBufLen (16*1024)
1293 static char readStdoutBuf[readStdoutBufLen+1]; /*Protected by comm. lock*/
1294 static int servicingStdout;
1296 /*Initialization-- should only be called once per node*/
1297 static void CmiStdoutInit(void) {
1298 int i;
1299 if (Cmi_charmrun_fd==-1) return; /* standalone mode */
1301 /*There's some way to do this same thing in windows, but I don't know how*/
1302 #if !defined(_WIN32)
1303 /*Prevent buffering in stdio library:*/
1304 setbuf(stdout,NULL); setbuf(stderr,NULL);
1306 /*Reopen stdout and stderr fd's as new pipes:*/
1307 for (i=0;i<2;i++) {
1308 int pair[2];
1309 int srcFd=1+i; /* 1 is stdout; 2 is stderr */
1311 /*First, save a copy of the original stdout*/
1312 writeStdout[i]=dup(srcFd);
1313 #if 0
1314 /*Build a pipe to connect to stdout (4kb buffer, but no SIGIO...)*/
1315 if (-1==pipe(pair)) {perror("building stdio redirection pipe"); exit(1);}
1316 #else
1317 /* UNIX socket (16kb default buffer, and works with SIGIO!) */
1318 if (-1==socketpair(PF_UNIX,SOCK_STREAM,0,pair))
1319 {perror("building stdio redirection socketpair"); exit(1);}
1320 #endif
1321 readStdout[i]=pair[0]; /*We get the read end of pipe*/
1322 if (-1==dup2(pair[1],srcFd)) {perror("dup2 redirection pipe"); exit(1);}
1323 //if (-1==dup2(srcFd,pair[1])) {perror("dup2 redirection pipe"); exit(1);}
1325 #if 0 /*Keep writes from blocking. This just drops excess output, which is bad.*/
1326 CmiEnableNonblockingIO(srcFd);
1327 #endif
1328 #if CMK_SHARED_VARS_UNAVAILABLE
1329 if (Cmi_asyncio)
1331 /*No communication thread-- get a SIGIO on each write(), which keeps the buffer clean*/
1332 CmiEnableAsyncIO(pair[1]);
1334 #endif
1336 #else
1337 /*Windows system-- just fake reads for now*/
1338 # ifndef read
1339 # define read(x,y,z) 0
1340 # endif
1341 # ifndef write
1342 # define write(x,y,z) z
1343 # endif
1344 #endif
1347 /*Sends data to original stdout (e.g., for ++debug or ++in-xterm)*/
1348 static void InternalWriteToTerminal(int isStdErr,const char *str,int len)
1350 if (write(writeStdout[isStdErr],str,len) != len) {
1351 CmiAbort("Writing to terminal failed!");
1356 Service this particular stdout pipe.
1357 Must hold comm. lock.
1359 static void CmiStdoutServiceOne(int i) {
1360 int nBytes;
1361 const static char *cmdName[2]={"print","printerr"};
1362 servicingStdout=1;
1363 while(1) {
1364 const char *tooMuchWarn=NULL; int tooMuchLen=0;
1365 if (!skt_select1(readStdout[i],0)) break; /*Nothing to read*/
1366 nBytes=read(readStdout[i],readStdoutBuf,readStdoutBufLen);
1367 if (nBytes<=0) break; /*Nothing to send*/
1369 /*Send these bytes off to charmrun*/
1370 readStdoutBuf[nBytes]=0; /*Zero-terminate read string*/
1371 nBytes++; /*Include zero-terminator in message to charmrun*/
1373 if (nBytes>=readStdoutBufLen-100)
1374 { /*We must have filled up our output pipe-- most output libraries
1375 don't handle this well (e.g., glibc printf just drops the line).*/
1377 tooMuchWarn="\nWARNING: Too much output at once-- possible output discontinuity!\n"
1378 "Use CkPrintf to avoid discontinuity (and this warning).\n\n";
1379 nBytes--; /*Remove terminator from user's data*/
1380 tooMuchLen=strlen(tooMuchWarn)+1;
1382 ctrl_sendone_nolock(cmdName[i],readStdoutBuf,nBytes,
1383 tooMuchWarn,tooMuchLen);
1385 InternalWriteToTerminal(i,readStdoutBuf,nBytes);
1387 servicingStdout=0;
1388 serviceStdout[i]=0; /*This pipe is now serviced*/
1391 /*Service all stdout pipes, whether it looks like they need it
1392 or not. Used when you aren't sure if select() has been called recently.
1393 Must hold comm. lock.
1395 static void CmiStdoutServiceAll(void) {
1396 int i;
1397 for (i=0;i<2;i++) {
1398 if (readStdout[i]==0) continue; /*Pipe not open*/
1399 CmiStdoutServiceOne(i);
1403 /*Service any outstanding stdout pipes.
1404 Must hold comm. lock.
1406 static void CmiStdoutService(void) {
1407 CmiStdoutServiceAll();
1410 /*Add our pipes to the pile for select() or poll().
1411 Both can be called with or without the comm. lock.
1413 static void CmiStdoutAdd(CMK_PIPE_PARAM) {
1414 int i;
1415 for (i=0;i<2;i++) {
1416 if (readStdout[i]==0) continue; /*Pipe not open*/
1417 CMK_PIPE_ADDREAD(readStdout[i]);
1420 static void CmiStdoutCheck(CMK_PIPE_PARAM) {
1421 int i;
1422 for (i=0;i<2;i++) {
1423 if (readStdout[i]==0) continue; /*Pipe not open*/
1424 if (CMK_PIPE_CHECKREAD(readStdout[i])) serviceStdout[i]=1;
1427 static int CmiStdoutNeedsService(void) {
1428 return (serviceStdout[0]!=0 || serviceStdout[1]!=0);
1431 /*Called every few milliseconds to flush the stdout pipes*/
1432 static void CmiStdoutFlush(void) {
1433 if (servicingStdout) return; /* might be called by SIGALRM */
1434 CmiCommLockOrElse( return; )
1435 LOCK_IF_AVAILABLE();
1436 CmiStdoutServiceAll();
1437 UNLOCK_IF_AVAILABLE();
1440 /***************************************************************************
1441 * Message Delivery:
1443 ***************************************************************************/
1445 #include "machine-dgram.c"
1447 static void open_charmrun_socket(void)
1449 #if CMK_USE_TCP
1450 dataskt=skt_server(&dataport);
1451 #else
1452 dataskt=skt_datagram(&dataport, Cmi_os_buffer_size);
1453 #endif
1454 MACHSTATE2(5,"skt_connect at dataskt:%d Cmi_charmrun_port:%d",dataskt, Cmi_charmrun_port);
1455 Cmi_charmrun_fd = skt_connect(Cmi_charmrun_IP, Cmi_charmrun_port, 1800);
1456 MACHSTATE2(5,"Opened connection to charmrun at socket %d, dataport=%d", Cmi_charmrun_fd, dataport);
1457 skt_tcp_no_nagle(Cmi_charmrun_fd);
1461 /*****************************************************************************
1463 * node_addresses
1465 * These two functions fill the node-table.
1468 * This node, like all others, first sends its own address to charmrun
1469 * using this command:
1471 * Type: nodeinfo
1472 * Data: Big-endian 4-byte ints
1473 * <my-node #><Dataport>
1475 * When charmrun has all the addresses, he sends this table to me:
1477 * Type: nodes
1478 * Data: Big-endian 4-byte ints
1479 * <number of nodes n>
1480 * <#PEs><IP><Dataport> Node 0
1481 * <#PEs><IP><Dataport> Node 1
1482 * ...
1483 * <#PEs><IP><Dataport> Node n-1
1485 *****************************************************************************/
1487 static void send_singlenodeinfo(void)
1489 /* Contact charmrun for machine info. */
1490 ChSingleNodeinfo me;
1491 memset(&me, 0, sizeof(me));
1493 me.nodeNo = ChMessageInt_new(Lrts_myNode);
1494 me.num_pus = ChMessageInt_new(CmiHwlocTopologyLocal.num_pus);
1495 me.num_cores = ChMessageInt_new(CmiHwlocTopologyLocal.num_cores);
1496 me.num_sockets = ChMessageInt_new(CmiHwlocTopologyLocal.num_sockets);
1498 /* The nPE fields are set by charmrun--these values don't matter.
1499 Set IP in case it is mpiexec mode where charmrun does not have IP yet */
1500 me.info.nPE = ChMessageInt_new(0);
1501 /* me.info.IP = _skt_invalid_ip; */
1502 me.info.IP = skt_innode_my_ip();
1503 me.info.dataport = ChMessageInt_new(dataport);
1505 /* Send our node info. to charmrun.
1506 CommLock hasn't been initialized yet--
1507 use non-locking version */
1508 ctrl_sendone_nolock("initnode", (const char *)&me, sizeof(me), NULL, 0);
1509 MACHSTATE1(5, "send initnode - dataport:%d", dataport);
1511 MACHSTATE(3, "initnode sent");
1514 /*Note: node_addresses_obtain is called before starting
1515 threads, so no locks are needed (or valid!)*/
1516 static void node_addresses_obtain(char **argv)
1518 ChMessage nodetabmsg; /* info about all nodes*/
1519 MACHSTATE(3,"node_addresses_obtain { ");
1520 if (Cmi_charmrun_fd==-1)
1521 {/*Standalone-- fake a single-node nodetab message*/
1522 ChMessageInt_t *n32;
1523 ChNodeinfo *nodeInfo;
1524 ChMessage_new("initnodetab", sizeof(ChMessageInt_t)*ChInitNodetabFields+sizeof(ChNodeinfo), &nodetabmsg);
1525 n32 = (ChMessageInt_t *)nodetabmsg.data;
1526 nodeInfo = (ChNodeinfo *)(nodetabmsg.data + sizeof(ChMessageInt_t)*ChInitNodetabFields);
1528 n32[0] = ChMessageInt_new(1);
1529 n32[1] = ChMessageInt_new(Lrts_myNode);
1530 nodeInfo->nPE = ChMessageInt_new(_Cmi_mynodesize);
1531 nodeInfo->dataport = ChMessageInt_new(0);
1532 nodeInfo->IP = _skt_invalid_ip;
1534 else
1536 send_singlenodeinfo();
1538 /* We get the other node addresses from a message sent
1539 back via the charmrun control port. */
1540 if (!skt_select1(Cmi_charmrun_fd, 1200*1000))
1541 CmiAbort("Timeout waiting for nodetab!\n");
1543 MACHSTATE(2,"recv initnode {");
1544 ChMessage_recv(Cmi_charmrun_fd,&nodetabmsg);
1546 if (strcmp("nodefork", nodetabmsg.header.type) == 0)
1548 #ifndef _WIN32
1549 int i;
1551 assert(sizeof(ChMessageInt_t)*ChInitNodeforkFields == (size_t)nodetabmsg.len);
1552 ChMessageInt_t *n32 = (ChMessageInt_t *) nodetabmsg.data;
1553 const int phase2_forks = ChMessageInt(n32[0]);
1554 const int start_id = ChMessageInt(n32[1]);
1556 ChMessage_free(&nodetabmsg);
1558 for (i = 0; i < phase2_forks; ++i)
1560 const int pid = fork();
1561 if (pid < 0)
1562 CmiAbort("fork failed");
1563 else if (pid == 0)
1565 skt_close(Cmi_charmrun_fd);
1566 dataport = 0;
1567 Lrts_myNode = start_id + i;
1568 open_charmrun_socket();
1569 send_singlenodeinfo();
1570 break;
1573 #endif
1575 if (!skt_select1(Cmi_charmrun_fd, 1200*1000))
1576 CmiAbort("Timeout waiting for nodetab!\n");
1578 ChMessage_recv(Cmi_charmrun_fd, &nodetabmsg);
1581 MACHSTATE(2,"} recv initnode");
1584 if (strcmp("initnodetab", nodetabmsg.header.type) == 0)
1586 ChMessageInt_t *n32 = (ChMessageInt_t *) nodetabmsg.data;
1587 ChNodeinfo *d = (ChNodeinfo *) (n32+ChInitNodetabFields);
1588 Lrts_myNode = ChMessageInt(n32[1]);
1589 _Cmi_myphysnode_numprocesses = ChMessageInt(d[Lrts_myNode].nProcessesInPhysNode);
1591 node_addresses_store(&nodetabmsg);
1592 ChMessage_free(&nodetabmsg);
1594 else if (strcmp("die", nodetabmsg.header.type) == 0)
1596 _Exit(1);
1599 MACHSTATE(3,"} node_addresses_obtain ");
1603 /***********************************************************************
1604 * DeliverOutgoingMessage()
1606 * This function takes care of delivery of outgoing messages from the
1607 * sender end. Broadcast messages are divided into sets of messages that
1608 * are bound to the local node, and to remote nodes. For local
1609 * transmission, the messages are directly pushed into the recv
1610 * queues. For non-local transmission, the function DeliverViaNetwork()
1611 * is called
1612 ***********************************************************************/
1613 int DeliverOutgoingMessage(OutgoingMsg ogm)
1615 int i, rank, dst; OtherNode node;
1617 int network = 1;
1619 dst = ogm->dst;
1621 //printf("deliver outgoing message, dest: %d \n", dst);
1622 #if CMK_ERROR_CHECKING
1623 if (dst<0 || dst>=CmiNumPesGlobal())
1624 CmiAbort("Send to out-of-bounds processor!");
1625 #endif
1626 node = nodes_by_pe[dst];
1627 rank = dst - node->nodestart;
1628 int acqLock = 0;
1629 if (node->nodestart != Cmi_nodestartGlobal) {
1630 DeliverViaNetwork(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1632 #if CMK_MULTICORE
1633 network = 0;
1634 #endif
1635 return network;
1639 * Set up an OutgoingMsg structure for this message.
1641 static OutgoingMsg PrepareOutgoing(int pe,int size,int freemode,char *data) {
1642 OutgoingMsg ogm;
1643 MallocOutgoingMsg(ogm);
1644 MACHSTATE2(2,"Preparing outgoing message for pe %d, size %d",pe,size);
1645 ogm->size = size;
1646 ogm->data = data;
1647 ogm->src = CmiMyPeGlobal();
1648 ogm->dst = pe;
1649 ogm->freemode = freemode;
1650 ogm->refcount = 0;
1651 return ogm;
1655 /******************************************************************************
1657 * CmiGeneralSend
1659 * Description: This is a generic message sending routine. All the
1660 * converse message send functions are implemented in terms of this
1661 * function. (By setting appropriate flags (eg freemode) that tell
1662 * CmiGeneralSend() how exactly to handle the particular case of
1663 * message send)
1665 *****************************************************************************/
1667 //CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
1668 CmiCommHandle LrtsSendFunc(int destNode, int pe, int size, char *data, int freemode)
1670 int sendonnetwork;
1671 OutgoingMsg ogm;
1672 MACHSTATE(1,"CmiGeneralSend {");
1674 CMI_MSG_SIZE(data) = size;
1676 ogm=PrepareOutgoing(pe,size,'F',data);
1678 sendonnetwork = DeliverOutgoingMessage(ogm);
1679 MACHSTATE(1,"} LrtsSend");
1680 return (CmiCommHandle)ogm;
1684 /******************************************************************************
1686 * Comm Handle manipulation.
1688 *****************************************************************************/
1690 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1692 /*****************************************************************************
1694 * NET version List-Cast and Multicast Code
1696 ****************************************************************************/
1698 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1700 int i;
1701 for(i=0;i<npes;i++) {
1702 CmiReference(msg);
1703 CmiSyncSendAndFree(pes[i], len, msg);
1707 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1709 CmiError("ListSend not implemented.");
1710 return (CmiCommHandle) 0;
1714 because in all net versions, the message buffer after CmiSyncSendAndFree
1715 returns is not changed, we can use memory reference trick to avoid
1716 memory copying here
1718 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1720 int i;
1721 for(i=0;i<npes;i++) {
1722 CmiReference(msg);
1723 CmiSyncSendAndFree(pes[i], len, msg);
1725 CmiFree(msg);
1728 #endif
1730 void LrtsDrainResources(void){}
1731 void LrtsPostNonLocal(void) {}
1733 /* Network progress function is used to poll the network when for
1734 messages. This flushes receive buffers on some implementations*/
1736 #if CMK_MACHINE_PROGRESS_DEFINED
1737 void CmiMachineProgressImpl(void){
1738 #if CMK_SMP && !CMK_MULTICORE
1739 if (CmiMyRank() == CmiMyNodeSize())
1740 #endif
1741 CommunicationServerNet(0, COMM_SERVER_FROM_SMP);
1743 #endif
1745 void LrtsAdvanceCommunication(int whileidle)
1747 #if CMK_SMP
1748 CommunicationServerNet(0, COMM_SERVER_FROM_SMP);
1749 #else
1750 CommunicationServerNet(0, COMM_SERVER_FROM_WORKER);
1751 #endif
1755 /******************************************************************************
1757 * Main code, Init, and Exit
1759 *****************************************************************************/
1761 #if CMK_BARRIER_USE_COMMON_CODE
1763 /* happen at node level */
1764 /* must be called on every PE including communication processors */
1765 void LrtsBarrier(void)
1767 int numnodes = CmiNumNodesGlobal();
1768 static int barrier_phase = 0;
1770 if (Cmi_charmrun_fd == -1) return; // standalone
1771 if (numnodes == 1) {
1772 return;
1775 ctrl_sendone_locking("barrier",NULL,0,NULL,0);
1776 while (barrierReceived != 1) {
1777 LOCK_IF_AVAILABLE();
1778 ctrl_getone();
1779 UNLOCK_IF_AVAILABLE();
1781 barrierReceived = 0;
1782 barrier_phase ++;
1785 int CmiBarrierZero(void)
1787 int i;
1788 int numnodes = CmiNumNodesGlobal();
1789 ChMessage msg;
1791 if (Cmi_charmrun_fd == -1) return 0; // standalone
1792 if (numnodes == 1) {
1793 CmiNodeAllBarrier();
1794 return 0;
1797 if (CmiMyRank() == 0) {
1798 char str[64];
1799 sprintf(str, "%d", CmiMyNodeGlobal());
1800 ctrl_sendone_locking("barrier0",str,strlen(str)+1,NULL,0);
1801 if (CmiMyNodeGlobal() == 0) {
1802 while (barrierReceived != 2) {
1803 LOCK_IF_AVAILABLE();
1804 ctrl_getone();
1805 UNLOCK_IF_AVAILABLE();
1807 barrierReceived = 0;
1811 CmiNodeAllBarrier();
1812 return 0;
1815 #endif
1817 /******************************************************************************
1819 * Main code, Init, and Exit
1821 *****************************************************************************/
1823 void LrtsPreCommonInit(int everReturn)
1825 #if !CMK_SMP
1826 #if !CMK_ASYNC_NOT_NEEDED
1827 if (Cmi_asyncio)
1829 CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
1830 if (!Cmi_netpoll) {
1831 if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
1832 if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
1835 #endif
1836 #endif
1839 void LrtsPostCommonInit(int everReturn)
1841 /* better to show the status here */
1842 if (CmiMyPe() == 0) {
1843 if (Cmi_netpoll == 1) {
1844 CmiPrintf("Charm++> scheduler running in netpoll mode.\n");
1846 #if CMK_SHARED_VARS_UNAVAILABLE
1847 else {
1848 if (CmiMemoryIs(CMI_MEMORY_IS_OS))
1849 CmiAbort("Charm++ Fatal Error: interrupt mode does not work with default system memory allocator. Run with +netpoll to disable the interrupt.");
1851 #endif
1854 #if MEMORYUSAGE_OUTPUT
1855 memoryusage_counter = 0;
1856 #endif
1858 #if CMK_SHARED_VARS_UNAVAILABLE
1859 if (Cmi_netpoll) /*Repeatedly call CommServer*/
1860 CcdCallOnConditionKeep(CcdPERIODIC,
1861 (CcdVoidFn) CommunicationPeriodic, NULL);
1862 else /*Only need this for retransmits*/
1863 CcdCallOnConditionKeep(CcdPERIODIC_10ms,
1864 (CcdVoidFn) CommunicationPeriodic, NULL);
1865 #endif
1867 if (CmiMyRank()==0 && Cmi_charmrun_fd!=-1) {
1868 CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) CmiStdoutFlush, NULL);
1869 #if CMK_SHARED_VARS_UNAVAILABLE && !CMK_TIMER_USE_WIN32API
1870 if (!Cmi_asyncio) {
1871 CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
1873 else {
1874 /*Occasionally ping charmrun, to test if it's dead*/
1875 struct itimerval i;
1876 CmiSignal(SIGALRM, 0, 0, pingCharmrun);
1877 #if MEMORYUSAGE_OUTPUT
1878 i.it_interval.tv_sec = 0;
1879 i.it_interval.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
1880 i.it_value.tv_sec = 0;
1881 i.it_value.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
1882 #else
1883 i.it_interval.tv_sec = 10;
1884 i.it_interval.tv_usec = 0;
1885 i.it_value.tv_sec = 10;
1886 i.it_value.tv_usec = 0;
1887 #endif
1888 setitimer(ITIMER_REAL, &i, NULL);
1891 #if ! CMK_USE_TCP
1892 /*Occasionally check for retransmissions, outgoing acks, etc.*/
1893 CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
1894 #endif
1895 #endif
1897 /*Initialize the clock*/
1898 Cmi_clock=GetClock();
1901 #ifdef IGET_FLOWCONTROL
1902 /* Call the function once to determine the amount of physical memory available */
1903 getAvailSysMem();
1904 /* Call the function to periodically call the token adapt function */
1905 CcdCallFnAfter((CcdVoidFn)TokenUpdatePeriodic, NULL, 2000); // magic number of 2000ms
1906 CcdCallOnConditionKeep(CcdPERIODIC_10s, // magic number of PERIOD 10s
1907 (CcdVoidFn) TokenUpdatePeriodic, NULL);
1908 #endif
1910 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
1911 srand((int)(1024.0*CmiWallTimer()));
1912 if (CmiMyPe()==0)
1913 CmiPrintf("Charm++: Machine layer will randomly corrupt every %d'th message (rand %d)\n",
1914 CMK_RANDOMLY_CORRUPT_MESSAGES,rand());
1915 #endif
1917 #ifdef __ONESIDED_IMPL
1918 #ifdef __ONESIDED_NO_HARDWARE
1919 putSrcHandler = CmiRegisterHandler((CmiHandler)handlePutSrc);
1920 putDestHandler = CmiRegisterHandler((CmiHandler)handlePutDest);
1921 getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
1922 getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
1923 #endif
1924 #endif
1928 void LrtsExit(void)
1930 int i;
1931 machine_initiated_shutdown=1;
1933 CmiStdoutFlush();
1934 if (Cmi_charmrun_fd==-1) {
1935 exit(0);
1937 else {
1938 Cmi_check_delay = 1.0; /* speed up checking of charmrun */
1939 for(i = 0; i < CmiMyNodeSize(); i++) {
1940 ctrl_sendone_locking("ending",NULL,0,NULL,0); /* this causes charmrun to go away, every PE needs to report */
1942 while(1) {
1943 #if CMK_USE_PXSHM
1944 CommunicationServerPxshm();
1945 #endif
1946 CommunicationServerNet(5, COMM_SERVER_FROM_SMP);
1951 #if CMK_SHRINK_EXPAND
1952 void ConverseCleanup(void)
1954 MACHSTATE(2,"ConverseCleanup {");
1955 if (CmiMyRank()==0) {
1956 if(Cmi_print_stats)
1957 printNetStatistics();
1958 log_done();
1961 CmiBarrier();
1963 if (CmiMyPe() == 0) {
1964 if (willContinue) {
1965 CcsSendDelayedReply(shrinkExpandreplyToken, 0, 0); //reply to CCS client
1966 // wait for this message to receive, hack
1967 // TODO: figure out why this is important
1968 usleep(500);
1969 // this causes charmrun to go away
1970 ChMessageInt_t numProcessAfterRestart_msg = ChMessageInt_new(numProcessAfterRestart);
1971 ctrl_sendone_locking("realloc",(char *)&numProcessAfterRestart_msg, sizeof(int),NULL,0);
1972 } else {
1973 ctrl_sendone_locking("ending",NULL,0,NULL,0);
1977 // TODO: ensure this won't gobble up some other important message
1978 ChMessage replymsg;
1979 memset(replymsg.header.type, 0, sizeof(replymsg.header.type));
1980 while (strncmp(replymsg.header.type, "realloc_ack", CH_TYPELEN) != 0)
1981 ChMessage_recv(Cmi_charmrun_fd, &replymsg);
1983 #if CMK_USE_SYSVSHM
1984 CmiExitSysvshm();
1985 #elif CMK_USE_PXSHM
1986 CmiExitPxshm();
1987 #endif
1988 ConverseCommonExit(); /* should be called by every rank */
1989 CmiNodeBarrier(); /* single node SMP, make sure every rank is done */
1990 if (CmiMyRank()==0) CmiStdoutFlush();
1991 if (Cmi_charmrun_fd==-1) {
1992 if (CmiMyRank() == 0) exit(0); /*Standalone version-- just leave*/
1993 else while (1) CmiYield();
1994 } else {
1995 if (willContinue) {
1996 int argc=CmiGetArgc(Cmi_argvcopy);
1998 int i;
1999 int restart_idx = -1;
2000 for (i = 0; i < argc; ++i) {
2001 if (strcmp(Cmi_argvcopy[i], "+restart") == 0) {
2002 restart_idx = i;
2003 break;
2007 char **ret;
2008 if (restart_idx == -1) {
2009 ret=(char **)malloc(sizeof(char *)*(argc+10));
2010 } else {
2011 ret=(char **)malloc(sizeof(char *)*(argc+8));
2014 for (i=0;i<argc;i++) {
2015 MACHSTATE1(2,"Parameters %s",Cmi_argvcopy[i]);
2016 ret[i]=Cmi_argvcopy[i];
2019 ret[argc+0]="+shrinkexpand";
2020 ret[argc+1]="+newnumpes";
2022 char temp[50];
2023 sprintf(temp,"%d", numProcessAfterRestart);
2024 ret[argc+2]=temp;
2026 ret[argc+3]="+mynewpe";
2027 char temp2[50];
2028 sprintf(temp2,"%d", mynewpe);
2029 ret[argc+4]=temp2;
2031 ret[argc+5]="+myoldpe";
2032 char temp3[50];
2033 sprintf(temp3,"%d", _Cmi_mype);
2034 ret[argc+6]=temp3;
2036 if (restart_idx == -1) {
2037 ret[argc+7]="+restart";
2038 ret[argc+8]=_shrinkexpand_basedir;
2039 ret[argc+9]=Cmi_argvcopy[argc];
2040 } else {
2041 ret[restart_idx + 1] = _shrinkexpand_basedir;
2042 ret[argc+7]=Cmi_argvcopy[argc];
2045 free(Cmi_argvcopy);
2046 MACHSTATE1(3,"ConverseCleanup mynewpe %s", temp2);
2047 MACHSTATE(2,"} ConverseCleanup");
2049 skt_close(Cmi_charmrun_fd);
2050 // Avoid crash by SIGALRM
2051 #if !defined(_WIN32)
2052 struct sigaction act;
2053 memset(&act, 0, sizeof(act));
2054 act.sa_handler = SIG_IGN;
2055 act.sa_flags = SA_RESETHAND;
2056 sigaction(SIGALRM, &act, 0);
2057 #else
2058 signal(SIGALRM, SIG_IGN);
2059 #endif
2060 #if CMK_USE_IBVERBS
2061 CmiMachineCleanup();
2062 #endif
2063 //put references to the controlling tty back on normal fd so that
2064 //CmiStdoutInit refers to the tty not the old pipe
2065 dup2(writeStdout[0], 1);
2066 dup2(writeStdout[1], 2);
2068 // Close any old file descriptors.
2069 // FDs carry over execv, so these have to be closed at some point.
2070 // Since some of them are async pipes, however, doing so flushes the buffer,
2071 // raising a SIGIO before a handler is assigned in LrtsPreCommonInit, which
2072 // kills the program.
2073 // An easy way to deal with this is to simply close the FDs here.
2074 int b;
2075 for (b = 3; b < 20; b++) {
2076 close(b);
2079 // TODO: check variant of execv that takes file descriptor
2080 execv(ret[0], ret); // Need to check if the process name is always first arg
2081 /* should not be here */
2082 MACHSTATE1(3,"execv error: %s", strerror(errno));
2083 CmiPrintf("[%d] should not be here\n", CmiMyPe());
2084 exit(1);
2085 } else {
2086 skt_close(Cmi_charmrun_fd);
2087 exit(0);
2092 #endif
2094 static void set_signals(void)
2096 if(!Cmi_truecrash) {
2097 #if !defined(_WIN32)
2098 struct sigaction sa;
2099 sa.sa_handler = KillOnAllSigs;
2100 sigemptyset(&sa.sa_mask);
2101 sa.sa_flags = SA_RESTART;
2103 sigaction(SIGSEGV, &sa, NULL);
2104 sigaction(SIGFPE, &sa, NULL);
2105 sigaction(SIGILL, &sa, NULL);
2106 sigaction(SIGINT, &sa, NULL);
2107 sigaction(SIGTERM, &sa, NULL);
2108 sigaction(SIGABRT, &sa, NULL);
2109 #else
2110 signal(SIGSEGV, KillOnAllSigs);
2111 signal(SIGFPE, KillOnAllSigs);
2112 signal(SIGILL, KillOnAllSigs);
2113 signal(SIGINT, KillOnAllSigs);
2114 signal(SIGTERM, KillOnAllSigs);
2115 signal(SIGABRT, KillOnAllSigs);
2116 #endif
2117 # if !defined(_WIN32) /*UNIX-only signals*/
2118 sigaction(SIGQUIT, &sa, NULL);
2119 sigaction(SIGBUS, &sa, NULL);
2120 # if CMK_HANDLE_SIGUSR
2121 sa.sa_handler = HandleUserSignals;
2122 sigaction(SIGUSR1, &sa, NULL);
2123 sigaction(SIGUSR2, &sa, NULL);
2124 # endif
2125 # endif /*UNIX*/
2129 /*Socket idle function to use before addresses have been
2130 obtained. During the real program, we idle with CmiYield.
2132 static void obtain_idleFn(void) {sleep(0);}
2134 static int net_default_skt_abort(SOCKET skt,int code,const char *msg)
2136 fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
2137 machine_exit(1);
2138 return -1;
2141 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2143 int i;
2144 Cmi_netpoll = 0;
2145 #if CMK_NETPOLL
2146 Cmi_netpoll = 1;
2147 #endif
2148 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
2149 Cmi_idlepoll = 0;
2150 #else
2151 Cmi_idlepoll = 1;
2152 #endif
2153 Cmi_truecrash = 0;
2154 if (CmiGetArgFlagDesc(*argv,"+truecrash","Do not install signal handlers") ||
2155 CmiGetArgFlagDesc(*argv,"++debug",NULL /*meaning: don't show this*/)) Cmi_truecrash = 1;
2156 /* netpoll disable signal */
2157 if (CmiGetArgFlagDesc(*argv,"+netpoll","Do not use SIGIO--poll instead")) Cmi_netpoll = 1;
2158 if (CmiGetArgFlagDesc(*argv,"+netint","Use SIGIO")) Cmi_netpoll = 0;
2159 /* idlepoll use poll instead if sleep when idle */
2160 if (CmiGetArgFlagDesc(*argv,"+idlepoll","Do not sleep when idle")) Cmi_idlepoll = 1;
2161 /* idlesleep use sleep instead if busywait when idle */
2162 if (CmiGetArgFlagDesc(*argv,"+idlesleep","Make sleep calls when idle")) Cmi_idlepoll = 0;
2163 Cmi_syncprint = CmiGetArgFlagDesc(*argv,"+syncprint", "Flush each CmiPrintf to the terminal");
2165 #if CMK_SHRINK_EXPAND
2166 if (CmiGetArgFlagDesc(*argv,"+shrinkexpand","Restarting of already running prcoess")) Cmi_isOldProcess = 1;
2167 #endif
2169 Cmi_asyncio = 1;
2170 #if CMK_ASYNC_NOT_NEEDED
2171 Cmi_asyncio = 0;
2172 #endif
2173 if (CmiGetArgFlagDesc(*argv,"+asyncio","Use async IO")) Cmi_asyncio = 1;
2174 if (CmiGetArgFlagDesc(*argv,"+asynciooff","Don not use async IO")) Cmi_asyncio = 0;
2175 #if CMK_MULTICORE
2176 if (CmiGetArgFlagDesc(*argv,"+commthread","Use communication thread")) {
2177 Cmi_commthread = 1;
2178 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
2179 _Cmi_sleepOnIdle = 1; /* worker thread go sleep */
2180 #endif
2181 if (CmiMyPe() == 0) CmiPrintf("Charm++> communication thread is launched in multicore version. \n");
2183 #endif
2185 skt_init();
2186 /* use special abort handler instead of default_skt_abort to
2187 prevent exit trapped by atexit_check() due to the exit() call */
2188 skt_set_abort(net_default_skt_abort);
2189 atexit(machine_atexit_check);
2190 parse_netstart();
2191 parse_magic();
2192 #if ! defined(_WIN32)
2193 /* only get forks in non-smp mode */
2194 #if CMK_SHRINK_EXPAND
2195 if(Cmi_isOldProcess!=1)
2196 #endif
2197 parse_forks();
2198 #endif
2199 extract_args(*argv);
2200 log_init();
2201 Cmi_comm_var_mutex = CmiCreateLock();
2202 Cmi_freelist_mutex = CmiCreateLock();
2203 Cmi_scanf_mutex = CmiCreateLock();
2204 #if CMK_SHRINK_EXPAND
2205 if (Cmi_isOldProcess == 1) {
2206 Lrts_myNode = Cmi_mynewpe;
2207 Cmi_myoldpe = Cmi_oldpe;
2208 Lrts_numNodes = Cmi_newnumnodes;
2210 #endif
2212 /* NOTE: can not acutally call timer before timerInit ! GZ */
2213 #if CMK_SHRINK_EXPAND
2214 MACHSTATE3(2,"After reorg %d %d %d \n", Cmi_oldpe, Lrts_myNode, Lrts_numNodes);
2215 #endif
2216 MACHSTATE2(5,"Init: (netpoll=%d), (idlepoll=%d)",Cmi_netpoll,Cmi_idlepoll);
2218 skt_set_idle(obtain_idleFn);
2219 if (!skt_ip_match(Cmi_charmrun_IP,_skt_invalid_ip)) {
2220 set_signals();
2221 open_charmrun_socket();
2222 } else {/*Standalone operation*/
2223 if (!quietMode) printf("Charm++: standalone mode (not using charmrun)\n");
2224 dataskt=-1;
2225 Cmi_charmrun_fd=-1;
2228 node_addresses_obtain(*argv);
2229 MACHSTATE(5,"node_addresses_obtain done");
2231 if (Cmi_charmrun_fd != -1)
2232 CmiStdoutInit();
2234 CmiMachineInit(*argv);
2236 CmiCommunicationInit(*argv);
2238 skt_set_idle(CmiYield);
2239 Cmi_check_delay = 1.0+0.25*Lrts_numNodes;
2241 if (Cmi_charmrun_fd==-1) /*Don't bother with check in standalone mode*/
2242 Cmi_check_delay=1.0e30;
2244 #if CMK_SMP
2245 // Allocate a slot for the comm thread. Do this even for multicore,
2246 // since it's possible to ask for a 'comm' thread at runtime
2247 inProgress = (int *)calloc(_Cmi_mynodesize+1, sizeof(int));
2248 #else
2249 inProgress = (int *)calloc(_Cmi_mynodesize, sizeof(int));
2250 #endif
2252 *numNodes = Lrts_numNodes;
2253 *myNodeID = Lrts_myNode;
2257 #if CMK_CELL
2259 #include "spert_ppu.h"
2261 void machine_OffloadAPIProgress(void) {
2262 LOCK_IF_AVAILABLE();
2263 OffloadAPIProgress();
2264 UNLOCK_IF_AVAILABLE();
2266 #endif
2268 void LrtsPrepareEnvelope(char *msg, int size)
2270 CMI_MSG_SIZE(msg) = size;
2274 /*@}*/