netlrts: Fix big (10x+) pingpong performance regression by partially reverting f505e76
[charm.git] / src / arch / netlrts / machine.c
blob572098ebf10f0b7ab9c42ad4f328169e2f842c7b
2 /** @file
3 * Basic NET-LRTS implementation of Converse machine layer
4 * @ingroup NET
5 */
7 /** @defgroup NET
8 * NET implementation of machine layer, ethernet in particular
9 * @ingroup Machine
11 * THE DATAGRAM STREAM
13 * Messages are sent using UDP datagrams. The sender allocates a
14 * struct for each datagram to be sent. These structs stick around
15 * until slightly after the datagram is acknowledged.
17 * Datagrams are transmitted node-to-node (as opposed to pe-to-pe).
18 * Each node has an OtherNode struct for every other node in the
19 * system. The OtherNode struct contains:
21 * send_queue (all datagram-structs not yet transmitted)
22 * send_window (all datagram-structs transmitted but not ack'd)
24 * When an acknowledgement comes in, all packets in the send-window
25 * are either marked as acknowledged or pushed back into the send
26 * queue for retransmission.
28 * THE OUTGOING MESSAGE
30 * When you send or broadcast a message, the first thing the system
31 * does is system creates an OutgoingMsg struct to represent the
32 * operation. The OutgoingMsg contains a very direct expression
33 * of what you want to do:
35 * OutgoingMsg:
37 * size --- size of message in bytes
38 * data --- pointer to the buffer containing the message
39 * src --- processor which sent the message
40 * dst --- destination processor (-1=broadcast, -2=broadcast all)
41 * freemode --- see below.
42 * refcount --- see below.
44 * The OutgoingMsg is kept around until the transmission is done, then
45 * it is garbage collected --- the refcount and freemode fields are
46 * to assist garbage collection.
48 * The freemode indicates which kind of buffer-management policy was
49 * used (sync, async, or freeing). The sync policy is handled
50 * superficially by immediately converting sync sends into freeing
51 * sends. Thus, the freemode can either be 'A' (async) or 'F'
52 * (freeing). If the freemode is 'F', then garbage collection
53 * involves freeing the data and the OutgoingMsg structure itself. If
54 * the freemode is 'A', then the only cleanup is to change the
55 * freemode to 'X', a condition which is then detectable by
56 * CmiAsyncMsgSent. In this case, the actual freeing of the
57 * OutgoingMsg is done by CmiReleaseCommHandle.
59 * When the transmission is initiated, the system computes how many
60 * datagrams need to be sent, total. This number is stored in the
61 * refcount field. Each time a datagram is delivered, the refcount
62 * is decremented, when it reaches zero, cleanup is performed. There
63 * are two exceptions to this rule. Exception 1: if the OutgoingMsg
64 * is a send (not a broadcast) and can be performed with shared
65 * memory, the entire datagram system is bypassed, the message is
66 * simply delivered and freed, not using the refcount mechanism at
67 * all. Exception 2: If the message is a broadcast, then part of the
68 * broadcast that can be done via shared memory is performed prior to
69 * initiating the datagram/refcount system.
71 * DATAGRAM FORMATS AND MESSAGE FORMATS
73 * Datagrams have this format:
75 * srcpe (16 bits) --- source processor number.
76 * magic ( 8 bits) --- magic number to make sure DG is good.
77 * dstrank ( 8 bits) --- destination processor rank.
78 * seqno (32 bits) --- packet sequence number.
79 * data (XX byte) --- user data.
81 * The only reason the srcpe is in there is because the receiver needs
82 * to know which receive window to use. The dstrank field is needed
83 * because transmission is node-to-node. Once the message is
84 * assembled by the node, it must be delivered to the appropriate PE.
85 * The dstrank field is used to encode certain special-case scenarios.
86 * If the dstrank is DGRAM_BROADCAST, the transmission is a broadcast,
87 * and should be delivered to all processors in the node. If the dstrank
88 * is DGRAM_ACKNOWLEDGE, the datagram is an acknowledgement datagram, in
89 * which case the srcpe is the number of the acknowledger, the seqno is
90 * always zero, and the user data is a list of the seqno's being
91 * acknowledged. There may be other dstrank codes for special functions.
93 * To send a message, one chops it up into datagrams and stores those
94 * datagrams in a send-queue. These outgoing datagrams aren't stored
95 * in the explicit format shown above. Instead, they are stored as
96 * ImplicitDgrams, which contain the datagram header and a pointer to
97 * the user data (which is in the user message buffer, which is in the
98 * OutgoingMsg). At transmission time these are combined together.
100 * The combination of the datagram header with the user's data is
101 * performed right in the user's message buffer. Note that the
102 * datagram header is exactly 64 bits. One simply overwrites 64 bits
103 * of the user's message with a datagram header, sends the datagram
104 * straight from the user's message buffer, then restores the user's
105 * buffer to its original state. There is a small problem with the
106 * first datagram of the message: one needs 64 bits of space to store
107 * the datagram header. To make sure this space is there, we added a
108 * 64-bit unused space to the front of the Cmi message header. In
109 * addition to this, we also add 32 bits to the Cmi message header
110 * to make room for a length-field, making it possible to identify
111 * message boundaries.
113 * CONCURRENCY CONTROL
115 * This has changed recently.
117 * EFFICIENCY NOTES
119 * The sender-side does little copying. The async and freeing send
120 * routines do no copying at all. The sync send routines copy the
121 * message, then use the freeing-send routines. The other alternative
122 * is to not copy the message, and use the async send mechanism
123 * combined with a blocking wait. Blocking wait seems like a bad
124 * idea, since it could take a VERY long time to get all those
125 * datagrams out the door.
127 * The receiver side, unfortunately, must copy. To avoid copying,
128 * it would have to receive directly into a preallocated message buffer.
129 * Unfortunately, this can't work: there's no way to know how much
130 * memory to preallocate, and there's no way to know which datagram
131 * is coming next. Thus, we receive into fixed-size (large) datagram
132 * buffers. These are then inspected, and the messages extracted from
133 * them.
135 * Note that we are allocating a large number of structs: OutgoingMsg's,
136 * ImplicitDgrams, ExplicitDgrams. By design, each of these structs
137 * is a fixed-size structure. Thus, we can do memory allocation by
138 * simply keeping a linked-list of unused structs around. The only
139 * place where expensive memory allocation is performed is in the
140 * sync routines.
142 * Since the datagrams from one node to another are fully ordered,
143 * there is slightly more ordering than is needed: in theory, the
144 * datagrams of one message don't need to be ordered relative to the
145 * datagrams of another. This was done to simplify the sequencing
146 * mechanisms: implementing a fully-ordered stream is much simpler
147 * than a partially-ordered one. It also makes it possible to
148 * modularize, layering the message transmitter on top of the
149 * datagram-sequencer. In other words, it was just easier this way.
150 * Hopefully, this won't cause serious degradation: LAN's rarely get
151 * datagrams out of order anyway.
153 * A potential efficiency problem is the lack of message-combining.
154 * One datagram could conceivably contain several messages. This
155 * might be more efficient, it's not clear how much overhead is
156 * involved in sending a short datagram. Message-combining isn't
157 * really ``integrated'' into the design of this software, but you
158 * could fudge it as follows. Whenever you pull a short datagram from
159 * the send-queue, check the next one to see if it's also a short
160 * datagram. If so, pack them together into a ``combined'' datagram.
161 * At the receive side, simply check for ``combined'' datagrams, and
162 * treat them as if they were simply two datagrams. This would
163 * require extra copying. I have no idea if this would be worthwhile.
165 *****************************************************************************/
168 * @addtogroup NET
169 * @{
172 /*****************************************************************************
174 * Include Files
176 ****************************************************************************/
178 #define _GNU_SOURCE 1
179 #include <stdarg.h> /*<- was <varargs.h>*/
181 #define CMK_USE_PRINTF_HACK 0
182 #if CMK_USE_PRINTF_HACK
183 /*HACK: turn printf into CmiPrintf, by just defining our own
184 external symbol "printf". This may be more trouble than it's worth,
185 since the only advantage is that it works properly with +syncprint.
187 This version *won't* work with fprintf(stdout,...) or C++ or Fortran I/O,
188 because they don't call printf. Has to be defined up here because we probably
189 haven't properly guessed this compiler's prototype for "printf".
191 static void InternalPrintf(const char *f, va_list l);
192 int printf(const char *fmt, ...) {
193 int nChar;
194 va_list p; va_start(p, fmt);
195 InternalPrintf(fmt,p);
196 va_end(p);
197 return 10;
199 #endif
202 #include "converse.h"
203 #include "memory-isomalloc.h"
205 #include <stdio.h>
206 #include <stdlib.h>
207 #include <ctype.h>
208 #include <fcntl.h>
209 #include <errno.h>
210 #include <setjmp.h>
211 #include <signal.h>
212 #include <string.h>
213 #include <unistd.h>
215 /* define machine debug */
216 #include "machine.h"
217 static int Cmi_charmrun_pid;
219 extern int quietMode;
221 /******************* Producer-Consumer Queues ************************/
222 #include "pcqueue.h"
224 #include "machine-smp.h"
226 #include "machine-lrts.h"
227 #include "machine-common-core.c"
229 #if CMK_USE_KQUEUE
230 #include <sys/event.h>
231 int _kq = -1;
232 #endif
234 #if CMK_USE_POLL
235 #include <poll.h>
236 #endif
238 #include "conv-ccs.h"
239 #include "ccs-server.h"
240 #include "sockRoutines.h"
242 #if defined(_WIN32)
243 /*For windows systems:*/
244 # include <windows.h>
245 # include <wincon.h>
246 # include <sys/types.h>
247 # include <sys/timeb.h>
248 # define fdopen _fdopen
249 # define SIGBUS -1 /*These signals don't exist in Win32*/
250 # define SIGKILL -1
251 # define SIGQUIT -1
252 /*# define SIGTERM -1*/ /* VC++ ver 8 now has SIGTERM */
254 #else /*UNIX*/
255 # include <pwd.h>
256 # include <unistd.h>
257 # include <fcntl.h>
258 # include <sys/file.h>
259 #endif
261 #if CMK_PERSISTENT_COMM
262 #include "machine-persistent.c"
263 #endif
265 #define PRINTBUFSIZE 16384
267 #ifdef __ONESIDED_IMPL
268 #ifdef __ONESIDED_NO_HARDWARE
269 int putSrcHandler;
270 int putDestHandler;
271 int getSrcHandler;
272 int getDestHandler;
273 #include "conv-onesided.c"
274 #endif
275 #endif
277 #if CMK_SHRINK_EXPAND
278 extern void resumeAfterRealloc();
279 extern char willContinue;
280 extern int mynewpe;
281 extern int numProcessAfterRestart;
282 CcsDelayedReply shrinkExpandreplyToken;
283 extern char *_shrinkexpand_basedir;
284 #endif
286 static void CommunicationServerNet(int withDelayMs, int where);
287 //static void CommunicationServer(int withDelayMs);
289 void CmiHandleImmediate();
290 extern int CmemInsideMem();
291 extern void CmemCallWhenMemAvail();
293 static unsigned int dataport=0;
294 static SOCKET dataskt;
296 extern void TokenUpdatePeriodic();
297 extern void getAvailSysMem();
299 static int Lrts_numNodes;
300 static int Lrts_myNode;
302 /****************************************************************************
304 * Handling Errors
306 * Errors should be handled by printing a message on stderr and
307 * calling exit(1). Nothing should be sent to charmrun, no attempt at
308 * communication should be made. The other processes will notice the
309 * abnormal termination and will deal with it.
311 * Rationale: if an error triggers an attempt to send a message,
312 * the attempt to send a message is likely to trigger another error,
313 * leading to an infinite loop and a process that spins instead of
314 * shutting down.
316 *****************************************************************************/
318 static int machine_initiated_shutdown=0;
319 static int already_in_signal_handler=0;
321 static void CmiDestroyLocks();
323 void EmergencyExit(void);
324 void MachineExit();
326 static void machine_exit(int status)
328 MACHSTATE(3," machine_exit");
329 machine_initiated_shutdown=1;
331 CmiDestroyLocks(); /* destroy locks to prevent dead locking */
332 EmergencyExit();
334 MachineExit();
335 exit(status);
338 static void charmrun_abort(const char*);
340 static void KillEveryone(const char *msg)
342 charmrun_abort(msg);
343 machine_exit(1);
346 static void KillEveryoneCode(n)
347 int n;
349 char _s[100];
350 sprintf(_s, "[%d] Fatal error #%d\n", CmiMyPe(), n);
351 charmrun_abort(_s);
352 machine_exit(1);
355 CpvExtern(int, freezeModeFlag);
357 static int Cmi_truecrash;
359 static void KillOnAllSigs(int sigNo)
361 const char *sig="unknown signal";
362 const char *suggestion="";
363 if (machine_initiated_shutdown ||
364 already_in_signal_handler)
365 machine_exit(1); /*Don't infinite loop if there's a signal during a signal handler-- just die.*/
366 already_in_signal_handler=1;
368 #if CMK_CCS_AVAILABLE
369 if (CpvAccess(cmiArgDebugFlag)) {
370 int reply = 0;
371 CpdNotify(CPD_SIGNAL,sigNo);
372 #if ! CMK_BIGSIM_CHARM
373 CcsSendReplyNoError(4,&reply);/*Send an empty reply if not*/
374 CpvAccess(freezeModeFlag) = 1;
375 CpdFreezeModeScheduler();
376 #else
377 CpdFreeze();
378 #endif
380 #endif
383 if (sigNo==SIGSEGV) {
384 sig="segmentation violation";
385 suggestion="Try running with '++debug', or linking with '-memory paranoid' (memory paranoid requires '+netpoll' at runtime).";
387 if (sigNo==SIGFPE) {
388 sig="floating point exception";
389 suggestion="Check for integer or floating-point division by zero.";
391 if (sigNo==SIGBUS) {
392 sig="bus error";
393 suggestion="Check for misaligned reads or writes to memory.";
395 if (sigNo==SIGILL) {
396 sig="illegal instruction";
397 suggestion="Check for calls to uninitialized function pointers.";
399 if (sigNo==SIGKILL) sig="caught signal KILL";
400 if (sigNo==SIGQUIT) sig="caught signal QUIT";
401 if (sigNo==SIGTERM) sig="caught signal TERM";
402 MACHSTATE1(5," Caught signal %s ",sig);
403 /*ifdef this part*/
404 #ifdef __FAULT__
405 if(sigNo == SIGKILL || sigNo == SIGQUIT || sigNo == SIGTERM){
406 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
407 } else
408 #endif
410 Cmi_truecrash = 0;
411 CmiAbortHelper("Caught Signal", sig, suggestion, 0, 1);
415 static void machine_atexit_check(void)
417 if (!machine_initiated_shutdown)
418 CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
419 #if 0 /*Wait for the user to press any key (for Win32 debugging)*/
420 fgetc(stdin);
421 #endif
424 #if !defined(_WIN32)
425 static void HandleUserSignals(int signum)
427 int condnum = ((signum==SIGUSR1) ? CcdSIGUSR1 : CcdSIGUSR2);
428 CcdRaiseCondition(condnum);
430 #endif
432 /*****************************************************************************
434 * Utility routines for network machine interface.
436 *****************************************************************************/
439 Horrific #defines to hide the differences between select() and poll().
441 #if CMK_USE_POLL /*poll() version*/
442 # define CMK_PIPE_DECL(delayMs) \
443 struct pollfd fds[10]; \
444 int nFds_sto=0; int *nFds=&nFds_sto; \
445 int pollDelayMs=delayMs;
446 # define CMK_PIPE_SUB fds,nFds
447 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
449 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
450 # define CMK_PIPE_ADDREAD(rd_fd) \
451 do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
452 # define CMK_PIPE_ADDWRITE(wr_fd) \
453 do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
454 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
455 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
457 #elif CMK_USE_KQUEUE /* kqueue version */
459 # define CMK_PIPE_DECL(delayMs) \
460 if (_kq == -1) _kq = kqueue(); \
461 struct kevent ke_sto; \
462 struct kevent* ke = &ke_sto; \
463 struct timespec tmo; \
464 tmo.tv_sec = 0; tmo.tv_nsec = delayMs*1e6;
465 # define CMK_PIPE_SUB ke
466 # define CMK_PIPE_CALL() kevent(_kq, NULL, 0, ke, 1, &tmo)
468 # define CMK_PIPE_PARAM struct kevent* ke
469 # define CMK_PIPE_ADDREAD(rd_fd) \
470 do { EV_SET(ke, rd_fd, EVFILT_READ, EV_ADD, 0, 10, NULL); \
471 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(*ke));} while(0)
472 # define CMK_PIPE_ADDWRITE(wr_fd) \
473 do { EV_SET(ke, wr_fd, EVFILT_WRITE, EV_ADD, 0, 10, NULL); \
474 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(*ke));} while(0)
475 # define CMK_PIPE_CHECKREAD(rd_fd) (ke->ident == rd_fd && ke->filter == EVFILT_READ)
476 # define CMK_PIPE_CHECKWRITE(wr_fd) (ke->ident == wr_fd && ke->filter == EVFILT_WRITE)
478 #else /*select() version*/
480 # define CMK_PIPE_DECL(delayMs) \
481 fd_set rfds_sto,wfds_sto;\
482 fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
483 FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
484 # define CMK_PIPE_SUB rfds,wfds
485 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
487 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
488 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
489 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
490 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
491 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
492 #endif
494 static void CMK_PIPE_CHECKERR(void) {
495 #if defined(_WIN32)
496 /* Win32 socket seems to randomly return inexplicable errors
497 here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK.
498 int err=WSAGetLastError();
499 CmiPrintf("(%d)Select returns -1; errno=%d, WSAerr=%d\n",withDelayMs,errn
500 o,err);
502 #else /*UNIX machine*/
503 if (errno!=EINTR)
504 KillEveryone("Socket error in CheckSocketsReady!\n");
505 #endif
509 static void CmiStdoutFlush(void);
510 static int CmiStdoutNeedsService(void);
511 static void CmiStdoutService(void);
512 static void CmiStdoutAdd(CMK_PIPE_PARAM);
513 static void CmiStdoutCheck(CMK_PIPE_PARAM);
516 double GetClock(void)
518 #if defined(_WIN32)
519 struct _timeb tv;
520 _ftime(&tv);
521 return (tv.time * 1.0 + tv.millitm * 1.0E-3);
522 #else
523 struct timeval tv; int ok;
524 ok = gettimeofday(&tv, NULL);
525 if (ok<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
526 return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
527 #endif
531 /***********************************************************************
533 * Abort function:
535 ************************************************************************/
537 static int already_aborting=0;
538 void LrtsAbort(const char *message)
540 if (already_aborting) machine_exit(1);
541 already_aborting=1;
542 MACHSTATE1(5,"CmiAbort(%s)",message);
544 /*Send off any remaining prints*/
545 CmiStdoutFlush();
547 if(Cmi_truecrash) {
548 printf("CHARM++ FATAL ERROR: %s\n", message);
549 *(int *)NULL = 0; /*Write to null, causing bus error*/
550 } else {
551 charmrun_abort(message);
552 machine_exit(1);
557 /******************************************************************************
559 * CmiEnableAsyncIO
561 * The net and tcp versions use a bunch of unix processes talking to each
562 * other via file descriptors. We need for a signal SIGIO to be generated
563 * each time a message arrives, making it possible to write a signal
564 * handler to handle the messages. The vast majority of unixes can,
565 * in fact, do this. However, there isn't any standard for how this is
566 * supposed to be done, so each version of UNIX has a different set of
567 * calls to turn this signal on. So, there is like one version here for
568 * every major brand of UNIX.
570 *****************************************************************************/
572 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
573 #include <fcntl.h>
574 void CmiEnableAsyncIO(int fd)
576 if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
577 CmiError("setting socket owner: %s\n", strerror(errno)) ;
578 exit(1);
580 if ( fcntl(fd, F_SETFL, O_ASYNC) < 0 ) {
581 CmiError("setting socket async: %s\n", strerror(errno)) ;
582 exit(1);
585 #else
586 void CmiEnableAsyncIO(int fd) { }
587 #endif
589 /* We should probably have a set of "CMK_NONBLOCK_USE_..." defines here:*/
590 #if !defined(_WIN32)
591 void CmiEnableNonblockingIO(int fd) {
592 int on=1;
593 if (fcntl(fd,F_SETFL,O_NONBLOCK,&on)<0) {
594 CmiError("setting nonblocking IO: %s\n", strerror(errno)) ;
595 exit(1);
598 #else
599 void CmiEnableNonblockingIO(int fd) { }
600 #endif
603 /******************************************************************************
605 * Configuration Data
607 * This data is all read in from the NETSTART variable (provided by the
608 * charmrun) and from the command-line arguments. Once read in, it is never
609 * modified.
611 *****************************************************************************/
613 static skt_ip_t Cmi_self_IP;
614 static skt_ip_t Cmi_charmrun_IP; /*Address of charmrun machine*/
615 static int Cmi_charmrun_port;
616 static int Cmi_charmrun_fd=-1;
617 /* Magic number to be used for sanity check in messege header */
618 static int Cmi_net_magic;
620 static int Cmi_netpoll;
621 static int Cmi_asyncio;
622 static int Cmi_idlepoll;
623 static int Cmi_syncprint;
624 static int Cmi_print_stats = 0;
626 #if CMK_SHRINK_EXPAND
627 int Cmi_isOldProcess = 0; // means this process was already there
628 static int Cmi_mynewpe = 0;
629 static int Cmi_oldpe = 0;
630 static int Cmi_newnumnodes = 0;
631 int Cmi_myoldpe = 0;
632 #endif
634 #if ! defined(_WIN32)
635 /* parse forks only used in non-smp mode */
636 static void parse_forks(void) {
637 char *forkstr;
638 int nread;
639 int forks;
640 int i,pid;
641 forkstr=getenv("CmiMyForks");
642 if(forkstr!=0) { /* charmrun */
643 nread = sscanf(forkstr,"%d",&forks);
644 for(i=1;i<=forks;i++) { /* by default forks = 0 */
645 pid=fork();
646 if(pid<0) CmiAbort("Fork returned an error");
647 if(pid==0) { /* forked process */
648 /* reset mynode,pe & exit loop */
649 Lrts_myNode+=i;
650 #if ! CMK_SMP
651 _Cmi_mype+=i;
652 #endif
653 break;
658 #endif
659 static void parse_magic(void)
661 char* nm;
662 int nread;
663 nm = getenv("NETMAGIC");
664 if (nm!=0)
665 {/*Read values set by Charmrun*/
666 nread = sscanf(nm, "%d",&Cmi_net_magic);
669 static void parse_netstart(void)
671 char *ns;
672 int nread;
673 int port;
674 ns = getenv("NETSTART");
675 if (ns!=0)
676 {/*Read values set by Charmrun*/
677 char Cmi_charmrun_name[1024];
678 nread = sscanf(ns, "%d%s%d%d%d",
679 &Lrts_myNode,
680 Cmi_charmrun_name, &Cmi_charmrun_port,
681 &Cmi_charmrun_pid, &port);
682 Cmi_charmrun_IP=skt_lookup_ip(Cmi_charmrun_name);
684 if (nread!=5) {
685 fprintf(stderr,"Error parsing NETSTART '%s'\n",ns);
686 exit(1);
688 #if CMK_SHRINK_EXPAND
689 if (Cmi_isOldProcess) {
690 Cmi_myoldpe = Lrts_myNode;
692 #endif
694 } else
695 {/*No charmrun-- set flag values for standalone operation*/
696 Lrts_myNode=0;
697 Cmi_charmrun_IP=_skt_invalid_ip;
698 Cmi_charmrun_port=0;
699 Cmi_charmrun_pid=0;
700 dataport = -1;
701 #if CMK_USE_PXSHM
702 CmiAbort("pxshm must be run with charmrun");
703 #endif
707 static void extract_common_args(char **argv)
709 if (CmiGetArgFlagDesc(argv,"+stats","Print network statistics at shutdown"))
710 Cmi_print_stats = 1;
711 #if CMK_SHRINK_EXPAND
712 //Realloc specific args
713 CmiGetArgIntDesc(argv,"+mynewpe",&Cmi_mynewpe,"New PE after realloc");
714 CmiGetArgIntDesc(argv,"+myoldpe",&Cmi_oldpe,"New PE after realloc");
715 CmiGetArgIntDesc(argv,"+newnumpes",&Cmi_newnumnodes,"New num PEs after realloc");
716 #endif
720 /******************************************************************************
722 * Packet Performance Logging
724 * This module is designed to give a detailed log of the packets and their
725 * acknowledgements, for performance tuning. It can be disabled.
727 *****************************************************************************/
729 #define LOGGING 0
731 #if LOGGING
733 typedef struct logent {
734 double time;
735 int seqno;
736 int srcpe;
737 int dstpe;
738 int kind;
739 } *logent;
742 logent log;
743 int log_pos;
744 int log_wrap;
746 static void log_init(void)
748 log = (logent)malloc(50000 * sizeof(struct logent));
749 _MEMCHECK(log);
750 log_pos = 0;
751 log_wrap = 0;
754 static void log_done(void)
756 char logname[100]; FILE *f; int i, size;
757 sprintf(logname, "log.%d", Lrts_myNode);
758 f = fopen(logname, "w");
759 if (f==0) KillEveryone("fopen problem");
760 if (log_wrap) size = 50000; else size=log_pos;
761 for (i=0; i<size; i++) {
762 logent ent = log+i;
763 fprintf(f, "%1.4f %d %c %d %d\n",
764 ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
766 fclose(f);
769 void printLog(void)
771 char logname[100]; FILE *f; int i, j, size;
772 static int logged = 0;
773 if (logged)
774 return;
775 logged = 1;
776 CmiPrintf("Logging: %d\n", Lrts_myNode);
777 sprintf(logname, "log.%d", Lrts_myNode);
778 f = fopen(logname, "w");
779 if (f==0) KillEveryone("fopen problem");
780 for (i = 5000; i; i--)
782 /*for (i=0; i<size; i++) */
783 j = log_pos - i;
784 if (j < 0)
786 if (log_wrap)
787 j = 5000 + j;
788 else
789 j = 0;
792 logent ent = log+j;
793 fprintf(f, "%1.4f %d %c %d %d\n",
794 ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
797 fclose(f);
798 CmiPrintf("Done Logging: %d\n", Lrts_myNode);
801 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
803 #endif
805 #if !LOGGING
807 #define log_init() /*empty*/
808 #define log_done() /*empty*/
809 #define printLog() /*empty*/
810 #define LOG(t,s,k,d,q) /*empty*/
812 #endif
814 /******************************************************************************
816 * Node state
818 *****************************************************************************/
821 static CmiNodeLock Cmi_comm_var_mutex;
822 static CmiNodeLock Cmi_scanf_mutex;
823 static double Cmi_clock;
824 static double Cmi_check_delay = 3.0;
826 /******************************************************************************
828 * OS Threads
829 * SMP implementation moved to machine-smp.c
830 *****************************************************************************/
831 int* inProgress;
833 /** Mechanism to prevent dual locking when comm-layer functions, including prints,
834 * are called recursively. (UN)LOCK_IF_AVAILABLE is used before and after a code piece
835 * which is guaranteed not to make any-recursive locking calls. (UN)LOCK_AND_(UN)SET
836 * is used before and after a code piece that may make recursive locking calls.
839 #define LOCK_IF_AVAILABLE() \
840 if(!inProgress[CmiMyRank()]) { \
841 CmiCommLock(); \
844 #define UNLOCK_IF_AVAILABLE() \
845 if(!inProgress[CmiMyRank()]) { \
846 CmiCommUnlock(); \
849 #define LOCK_AND_SET() \
850 if(!inProgress[CmiMyRank()]) { \
851 CmiCommLock(); \
852 acqLock = 1; \
854 inProgress[CmiMyRank()] += 1;
856 #define UNLOCK_AND_UNSET() \
857 if(acqLock) { \
858 CmiCommUnlock(); \
859 acqLock = 0; \
861 inProgress[CmiMyRank()] -= 1;
863 /************************ No kernel SMP threads ***************/
864 //XX
865 #if CMK_SHARED_VARS_UNAVAILABLE
867 static volatile int memflag=0;
868 void CmiMemLockNet() { memflag++; }
869 void CmiMemUnlockNet() { memflag--; }
871 static volatile int comm_flag=0;
872 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
873 #ifndef MACHLOCK_DEBUG
874 # define CmiCommLock() (comm_flag=1)
875 # define CmiCommUnlock() (comm_flag=0)
876 #else /* Error-checking flag locks */
877 void CmiCommLock(void) {
878 MACHLOCK_ASSERT(!comm_flag,"CmiCommLock");
879 comm_flag=1;
881 void CmiCommUnlock(void) {
882 MACHLOCK_ASSERT(comm_flag,"CmiCommUnlock");
883 comm_flag=0;
885 #endif
887 //int _Cmi_myrank=0; /* Normally zero; only 1 during SIGIO handling */
888 _Cmi_myrank=0;
890 static void CommunicationInterrupt(int ignored)
892 MACHLOCK_ASSERT(!_Cmi_myrank,"CommunicationInterrupt");
893 if (memflag || comm_flag || _immRunning || CmiCheckImmediateLock(0))
894 { /* Already busy inside malloc, comm, or immediate messages */
895 MACHSTATE(5,"--SKIPPING SIGIO--");
896 return;
898 MACHSTATE1(2,"--BEGIN SIGIO comm_mutex_isLocked: %d--", comm_flag)
900 /*Make sure any malloc's we do in here are NOT migratable:*/
901 CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
902 /* _Cmi_myrank=1; */
903 CommunicationServerNet(0, COMM_SERVER_FROM_INTERRUPT); /* from interrupt */
904 //CommunicationServer(0); /* from interrupt */
905 /* _Cmi_myrank=0; */
906 CmiIsomallocBlockListActivate(oldList);
908 MACHSTATE(2,"--END SIGIO--")
911 extern void CmiSignal(int sig1, int sig2, int sig3, void (*handler)());
913 static void CmiDestroyLocks()
915 comm_flag = 0;
916 memflag = 0;
919 #endif
921 /*Add a message to this processor's receive queue
922 Must be called while holding comm. lock
925 extern double evacTime;
927 /***************************************************************
928 Communication with charmrun:
929 We can send (ctrl_sendone) and receive (ctrl_getone)
930 messages on a TCP socket connected to charmrun.
931 This is used for printfs, CCS, etc; and also for
932 killing ourselves if charmrun dies.
935 /*This flag prevents simultanious outgoing
936 messages on the charmrun socket. It is protected
937 by the commlock.*/
938 static int Cmi_charmrun_fd_sendflag=0;
940 /* ctrl_sendone */
941 static int sendone_abort_fn(SOCKET skt,int code,const char *msg) {
942 fprintf(stderr,"Socket error %d in ctrl_sendone! %s\n",code,msg);
943 machine_exit(1);
944 return -1;
947 static void ctrl_sendone_nolock(const char *type,
948 const char *data1,int dataLen1,
949 const char *data2,int dataLen2)
951 const void *bufs[3]; int lens[3]; int nBuffers=0;
952 ChMessageHeader hdr;
953 skt_abortFn oldAbort=skt_set_abort(sendone_abort_fn);
954 MACHSTATE1(2,"ctrl_sendone_nolock { type=%s", type);
955 if (Cmi_charmrun_fd==-1)
956 charmrun_abort("ctrl_sendone called in standalone!\n");
957 Cmi_charmrun_fd_sendflag=1;
958 ChMessageHeader_new(type,dataLen1+dataLen2,&hdr);
959 bufs[nBuffers]=&hdr; lens[nBuffers]=sizeof(hdr); nBuffers++;
960 if (dataLen1>0) {bufs[nBuffers]=data1; lens[nBuffers]=dataLen1; nBuffers++;}
961 if (dataLen2>0) {bufs[nBuffers]=data2; lens[nBuffers]=dataLen2; nBuffers++;}
962 skt_sendV(Cmi_charmrun_fd,nBuffers,bufs,lens);
963 Cmi_charmrun_fd_sendflag=0;
964 skt_set_abort(oldAbort);
965 MACHSTATE(2,"} ctrl_sendone_nolock");
968 static void ctrl_sendone_locking(const char *type,
969 const char *data1,int dataLen1,
970 const char *data2,int dataLen2)
972 LOCK_IF_AVAILABLE();
973 ctrl_sendone_nolock(type,data1,dataLen1,data2,dataLen2);
974 UNLOCK_IF_AVAILABLE();
977 #ifndef MEMORYUSAGE_OUTPUT
978 #define MEMORYUSAGE_OUTPUT 0
979 #endif
980 #if MEMORYUSAGE_OUTPUT
981 #define MEMORYUSAGE_OUTPUT_FREQ 10 //how many prints in a second
982 static int memoryusage_counter;
983 #define memoryusage_isOutput ((memoryusage_counter%MEMORYUSAGE_OUTPUT_FREQ)==0)
984 #define memoryusage_output {\
985 memoryusage_counter++;\
986 if(CmiMyPe()==0) printf("-- %d %f %ld --\n", CmiMyPe(), GetClock(), CmiMemoryUsage());}
987 #endif
989 static double Cmi_check_last;
991 /* if charmrun dies, we finish */
992 static void pingCharmrun(void *ignored)
994 #if MEMORYUSAGE_OUTPUT
995 memoryusage_output;
996 if(memoryusage_isOutput){
997 memoryusage_counter = 0;
998 #else
1000 #endif
1002 double clock=GetClock();
1003 if (clock > Cmi_check_last + Cmi_check_delay) {
1004 MACHSTATE1(3,"CommunicationsClock pinging charmrun Cmi_charmrun_fd_sendflag=%d", Cmi_charmrun_fd_sendflag);
1005 Cmi_check_last = clock;
1006 CmiCommLockOrElse(return;); /*Already busy doing communication*/
1007 if (Cmi_charmrun_fd_sendflag) return; /*Busy talking to charmrun*/
1008 LOCK_IF_AVAILABLE();
1009 ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
1010 UNLOCK_IF_AVAILABLE();
1012 CmiStdoutFlush(); /*Make sure stdout buffer hasn't filled up*/
1016 /* periodic charm ping, for netpoll */
1017 static void pingCharmrunPeriodic(void *ignored)
1019 pingCharmrun(ignored);
1020 CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
1023 static int ignore_further_errors(SOCKET skt,int c,const char *msg) {machine_exit(2);return -1;}
1024 static void charmrun_abort(const char *s)
1026 if (Cmi_charmrun_fd==-1) {/*Standalone*/
1027 fprintf(stderr,"Charm++ fatal error:\n%s\n",s);
1028 CmiPrintStackTrace(0);
1029 abort();
1030 } else {
1031 char msgBuf[80];
1032 skt_set_abort(ignore_further_errors);
1033 if (CmiNumPartitions() == 1) {
1034 sprintf(msgBuf,"Fatal error on PE %d> ",CmiMyPe());
1036 else
1038 sprintf(msgBuf,"Fatal error on Partition %d PE %d> ", CmiMyPartition(), CmiMyPe());
1040 ctrl_sendone_nolock("abort",msgBuf,strlen(msgBuf),s,strlen(s)+1);
1044 #if CMK_SHRINK_EXPAND
1045 void charmrun_realloc(const char *s)
1047 ctrl_sendone_nolock("realloc",s,strlen(s)+1,NULL,0);
1049 #endif
1051 /* ctrl_getone */
1053 #ifdef __FAULT__
1054 #include "machine-recover.c"
1055 #endif
1057 static void node_addresses_store(ChMessage *msg);
1059 static int barrierReceived = 0;
1061 static void ctrl_getone(void)
1063 ChMessage msg;
1064 MACHSTATE(2,"ctrl_getone")
1065 MACHLOCK_ASSERT(comm_mutex_isLocked,"ctrl_getone")
1066 ChMessage_recv(Cmi_charmrun_fd,&msg);
1067 MACHSTATE1(2,"ctrl_getone recv one '%s'", msg.header.type);
1069 if (strcmp(msg.header.type,"die")==0) {
1070 MACHSTATE(2,"ctrl_getone bye bye")
1071 fprintf(stderr,"aborting: %s\n",msg.data);
1072 log_done();
1073 ConverseCommonExit();
1074 machine_exit(0);
1076 #if CMK_CCS_AVAILABLE
1077 else if (strcmp(msg.header.type, "req_fw")==0) {
1078 CcsImplHeader *hdr=(CcsImplHeader *)msg.data;
1079 /*Sadly, I *can't* do a:
1080 CcsImpl_netRequest(hdr,msg.data+sizeof(CcsImplHeader));
1081 here, because I can't send converse messages in the
1082 communication thread. I *can* poke this message into
1083 any convenient processor's queue, though: (OSL, 9/14/2000)
1085 int pe=0;/*<- node-local processor number. Any one will do.*/
1086 void *cmsg=(void *)CcsImpl_ccs2converse(hdr,msg.data+sizeof(CcsImplHeader),NULL);
1087 MACHSTATE(2,"Incoming CCS request");
1088 if (cmsg!=NULL)
1090 if(CmiNumPes() == 1 && CmiNumPartitions() == 1)
1091 ccsRunning = 1;
1092 CmiPushPE(pe,cmsg);
1095 #endif
1096 #ifdef __FAULT__
1097 else if(strcmp(msg.header.type,"crashnode")==0) {
1098 crash_node_handle(&msg);
1100 else if(strcmp(msg.header.type,"initnodetab")==0) {
1101 /** A processor crashed and got recreated. So charmrun sent
1102 across the whole nodetable data to update this processor*/
1103 node_addresses_store(&msg);
1104 // fprintf(stdout,"nodetable added %d\n",CmiMyPe());
1106 #endif
1107 else if(strcmp(msg.header.type,"barrier")==0) {
1108 barrierReceived = 1;
1110 else if(strcmp(msg.header.type,"barrier0")==0) {
1111 barrierReceived = 2;
1113 else {
1114 /* We do not use KillEveryOne here because it calls CmiMyPe(),
1115 * which is not available to the communication thread on an SMP version.
1117 /* CmiPrintf("Unknown message: %s\n", msg.header.type); */
1118 charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
1119 machine_exit(1);
1122 MACHSTATE(2,"ctrl_getone done")
1123 ChMessage_free(&msg);
1126 #if CMK_CCS_AVAILABLE && !NODE_0_IS_CONVHOST
1127 /*Deliver this reply data to this reply socket.
1128 The data is forwarded to CCS server via charmrun.*/
1129 void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
1131 MACHSTATE(2,"Outgoing CCS reply");
1132 ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
1133 repData,repLen);
1134 MACHSTATE(1,"Outgoing CCS reply away");
1136 #endif
1138 /*****************************************************************************
1140 * CmiPrintf, CmiError, CmiScanf
1142 *****************************************************************************/
1143 static void InternalWriteToTerminal(int isStdErr,const char *str,int len);
1144 static void InternalPrintf(const char *f, va_list l)
1146 ChMessage replymsg;
1147 char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1148 CmiStdoutFlush();
1149 vsprintf(buffer, f, l);
1150 if(Cmi_syncprint) {
1151 LOCK_IF_AVAILABLE();
1152 ctrl_sendone_nolock("printsyn", buffer,strlen(buffer)+1,NULL,0);
1153 ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1154 ChMessage_free(&replymsg);
1155 UNLOCK_IF_AVAILABLE();
1156 } else {
1157 ctrl_sendone_locking("print", buffer,strlen(buffer)+1,NULL,0);
1159 InternalWriteToTerminal(0,buffer,strlen(buffer));
1160 CmiTmpFree(buffer);
1163 static void InternalError(const char *f, va_list l)
1165 ChMessage replymsg;
1166 char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1167 CmiStdoutFlush();
1168 vsprintf(buffer, f, l);
1169 if(Cmi_syncprint) {
1170 ctrl_sendone_locking("printerrsyn", buffer,strlen(buffer)+1,NULL,0);
1171 LOCK_IF_AVAILABLE();
1172 ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1173 ChMessage_free(&replymsg);
1174 UNLOCK_IF_AVAILABLE();
1175 } else {
1176 ctrl_sendone_locking("printerr", buffer,strlen(buffer)+1,NULL,0);
1178 InternalWriteToTerminal(1,buffer,strlen(buffer));
1179 CmiTmpFree(buffer);
1182 static int InternalScanf(char *fmt, va_list l)
1184 ChMessage replymsg;
1185 char *ptr[20];
1186 char *p; int nargs, i;
1187 nargs=0;
1188 p=fmt;
1189 while (*p) {
1190 if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
1191 if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
1192 if (p[0]=='%') { nargs++; p++; continue; }
1193 if (*p=='\n') *p=' '; p++;
1195 if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
1196 for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
1197 CmiLock(Cmi_scanf_mutex);
1198 if (Cmi_charmrun_fd!=-1)
1199 {/*Send charmrun the format string*/
1200 ctrl_sendone_locking("scanf", fmt, strlen(fmt)+1,NULL,0);
1201 /*Wait for the reply (characters to scan) from charmrun*/
1202 LOCK_IF_AVAILABLE();
1203 ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1204 i = sscanf((char*)replymsg.data, fmt,
1205 ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1206 ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1207 ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1208 ChMessage_free(&replymsg);
1209 UNLOCK_IF_AVAILABLE();
1210 } else
1211 {/*Just do the scanf normally*/
1212 i=scanf(fmt, ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1213 ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1214 ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1216 CmiUnlock(Cmi_scanf_mutex);
1217 return i;
1219 #if CMK_CMIPRINTF_IS_A_BUILTIN
1221 /*New stdarg.h declarations*/
1222 void CmiPrintf(const char *fmt, ...)
1224 if (quietMode) return;
1225 CpdSystemEnter();
1227 va_list p; va_start(p, fmt);
1228 if (Cmi_charmrun_fd!=-1 && _writeToStdout)
1229 InternalPrintf(fmt, p);
1230 else
1231 vfprintf(stdout,fmt,p);
1232 va_end(p);
1234 CpdSystemExit();
1237 void CmiError(const char *fmt, ...)
1239 CpdSystemEnter();
1241 va_list p; va_start (p, fmt);
1242 if (Cmi_charmrun_fd!=-1)
1243 InternalError(fmt, p);
1244 else
1245 vfprintf(stderr,fmt,p);
1246 va_end(p);
1248 CpdSystemExit();
1251 int CmiScanf(const char *fmt, ...)
1253 int i;
1254 CpdSystemEnter();
1256 va_list p; va_start(p, fmt);
1257 i = InternalScanf((char *)fmt, p);
1258 va_end(p);
1260 CpdSystemExit();
1261 return i;
1264 #endif
1266 /***************************************************************************
1267 * Output redirection:
1268 * When people don't use CkPrintf, like above, we'd still like to be able
1269 * to collect their output. Thus we make a pipe and dup2 it to stdout,
1270 * which lets us read the characters sent to stdout at our lesiure.
1271 ***************************************************************************/
1273 /*Can read from stdout or stderr using these fd's*/
1274 static int readStdout[2];
1275 static int writeStdout[2]; /*The original stdout/stderr sockets*/
1276 static int serviceStdout[2]; /*(bool) Normally zero; one if service needed.*/
1277 #define readStdoutBufLen (16*1024)
1278 static char readStdoutBuf[readStdoutBufLen+1]; /*Protected by comm. lock*/
1279 static int servicingStdout;
1281 /*Initialization-- should only be called once per node*/
1282 static void CmiStdoutInit(void) {
1283 int i;
1284 if (Cmi_charmrun_fd==-1) return; /* standalone mode */
1286 /*There's some way to do this same thing in windows, but I don't know how*/
1287 #if !defined(_WIN32)
1288 /*Prevent buffering in stdio library:*/
1289 setbuf(stdout,NULL); setbuf(stderr,NULL);
1291 /*Reopen stdout and stderr fd's as new pipes:*/
1292 for (i=0;i<2;i++) {
1293 int pair[2];
1294 int srcFd=1+i; /* 1 is stdout; 2 is stderr */
1296 /*First, save a copy of the original stdout*/
1297 writeStdout[i]=dup(srcFd);
1298 #if 0
1299 /*Build a pipe to connect to stdout (4kb buffer, but no SIGIO...)*/
1300 if (-1==pipe(pair)) {perror("building stdio redirection pipe"); exit(1);}
1301 #else
1302 /* UNIX socket (16kb default buffer, and works with SIGIO!) */
1303 if (-1==socketpair(PF_UNIX,SOCK_STREAM,0,pair))
1304 {perror("building stdio redirection socketpair"); exit(1);}
1305 #endif
1306 readStdout[i]=pair[0]; /*We get the read end of pipe*/
1307 if (-1==dup2(pair[1],srcFd)) {perror("dup2 redirection pipe"); exit(1);}
1308 //if (-1==dup2(srcFd,pair[1])) {perror("dup2 redirection pipe"); exit(1);}
1310 #if 0 /*Keep writes from blocking. This just drops excess output, which is bad.*/
1311 CmiEnableNonblockingIO(srcFd);
1312 #endif
1313 #if CMK_SHARED_VARS_UNAVAILABLE
1314 if (Cmi_asyncio)
1316 /*No communication thread-- get a SIGIO on each write(), which keeps the buffer clean*/
1317 CmiEnableAsyncIO(pair[1]);
1319 #endif
1321 #else
1322 /*Windows system-- just fake reads for now*/
1323 # ifndef read
1324 # define read(x,y,z) 0
1325 # endif
1326 # ifndef write
1327 # define write(x,y,z)
1328 # endif
1329 #endif
1332 /*Sends data to original stdout (e.g., for ++debug or ++in-xterm)*/
1333 static void InternalWriteToTerminal(int isStdErr,const char *str,int len)
1335 write(writeStdout[isStdErr],str,len);
1339 Service this particular stdout pipe.
1340 Must hold comm. lock.
1342 static void CmiStdoutServiceOne(int i) {
1343 int nBytes;
1344 const static char *cmdName[2]={"print","printerr"};
1345 servicingStdout=1;
1346 while(1) {
1347 const char *tooMuchWarn=NULL; int tooMuchLen=0;
1348 if (!skt_select1(readStdout[i],0)) break; /*Nothing to read*/
1349 nBytes=read(readStdout[i],readStdoutBuf,readStdoutBufLen);
1350 if (nBytes<=0) break; /*Nothing to send*/
1352 /*Send these bytes off to charmrun*/
1353 readStdoutBuf[nBytes]=0; /*Zero-terminate read string*/
1354 nBytes++; /*Include zero-terminator in message to charmrun*/
1356 if (nBytes>=readStdoutBufLen-100)
1357 { /*We must have filled up our output pipe-- most output libraries
1358 don't handle this well (e.g., glibc printf just drops the line).*/
1360 tooMuchWarn="\nWARNING: Too much output at once-- possible output discontinuity!\n"
1361 "Use CkPrintf to avoid discontinuity (and this warning).\n\n";
1362 nBytes--; /*Remove terminator from user's data*/
1363 tooMuchLen=strlen(tooMuchWarn)+1;
1365 ctrl_sendone_nolock(cmdName[i],readStdoutBuf,nBytes,
1366 tooMuchWarn,tooMuchLen);
1368 InternalWriteToTerminal(i,readStdoutBuf,nBytes);
1370 servicingStdout=0;
1371 serviceStdout[i]=0; /*This pipe is now serviced*/
1374 /*Service all stdout pipes, whether it looks like they need it
1375 or not. Used when you aren't sure if select() has been called recently.
1376 Must hold comm. lock.
1378 static void CmiStdoutServiceAll(void) {
1379 int i;
1380 for (i=0;i<2;i++) {
1381 if (readStdout[i]==0) continue; /*Pipe not open*/
1382 CmiStdoutServiceOne(i);
1386 /*Service any outstanding stdout pipes.
1387 Must hold comm. lock.
1389 static void CmiStdoutService(void) {
1390 CmiStdoutServiceAll();
1393 /*Add our pipes to the pile for select() or poll().
1394 Both can be called with or without the comm. lock.
1396 static void CmiStdoutAdd(CMK_PIPE_PARAM) {
1397 int i;
1398 for (i=0;i<2;i++) {
1399 if (readStdout[i]==0) continue; /*Pipe not open*/
1400 CMK_PIPE_ADDREAD(readStdout[i]);
1403 static void CmiStdoutCheck(CMK_PIPE_PARAM) {
1404 int i;
1405 for (i=0;i<2;i++) {
1406 if (readStdout[i]==0) continue; /*Pipe not open*/
1407 if (CMK_PIPE_CHECKREAD(readStdout[i])) serviceStdout[i]=1;
1410 static int CmiStdoutNeedsService(void) {
1411 return (serviceStdout[0]!=0 || serviceStdout[1]!=0);
1414 /*Called every few milliseconds to flush the stdout pipes*/
1415 static void CmiStdoutFlush(void) {
1416 if (servicingStdout) return; /* might be called by SIGALRM */
1417 CmiCommLockOrElse( return; )
1418 LOCK_IF_AVAILABLE();
1419 CmiStdoutServiceAll();
1420 UNLOCK_IF_AVAILABLE();
1423 /***************************************************************************
1424 * Message Delivery:
1426 ***************************************************************************/
1428 #include "machine-dgram.c"
1431 /*****************************************************************************
1433 * node_addresses
1435 * These two functions fill the node-table.
1438 * This node, like all others, first sends its own address to charmrun
1439 * using this command:
1441 * Type: nodeinfo
1442 * Data: Big-endian 4-byte ints
1443 * <my-node #><Dataport>
1445 * When charmrun has all the addresses, he sends this table to me:
1447 * Type: nodes
1448 * Data: Big-endian 4-byte ints
1449 * <number of nodes n>
1450 * <#PEs><IP><Dataport> Node 0
1451 * <#PEs><IP><Dataport> Node 1
1452 * ...
1453 * <#PEs><IP><Dataport> Node n-1
1455 *****************************************************************************/
1457 /*Note: node_addresses_obtain is called before starting
1458 threads, so no locks are needed (or valid!)*/
1459 static void node_addresses_obtain(char **argv)
1461 ChMessage nodetabmsg; /* info about all nodes*/
1462 MACHSTATE(3,"node_addresses_obtain { ");
1463 if (Cmi_charmrun_fd==-1)
1464 {/*Standalone-- fake a single-node nodetab message*/
1466 int npes=1;
1467 ChSingleNodeinfo *fakeTab;
1468 ChMessage_new("nodeinfo",sizeof(ChSingleNodeinfo),&nodetabmsg);
1469 fakeTab=(ChSingleNodeinfo *)(nodetabmsg.data);
1470 int plusPSet =CmiGetArgIntDesc(argv,"+p",&npes,"Set the number of processes to create");
1471 #if CMK_SHARED_VARS_UNAVAILABLE
1472 if (npes!=1) {
1473 fprintf(stderr,
1474 "To use multiple processors, you must run this program as:\n"
1475 " > charmrun +p%d %s <args>\n"
1476 "or build the %s-smp version of Charm++.\n",
1477 npes,argv[0],CMK_MACHINE_NAME);
1478 exit(1);
1480 #else
1481 if(plusPSet)
1483 if(_Cmi_mynodesize >1 && _Cmi_mynodesize != npes)
1485 // if you want to use redundant arguments they need to be consistent
1486 CmiError("Error, p!=ppn, must not have inconsistent values .\n"
1487 "standalone invocation should use only one of [+p, +ppn, ++ppn]\n", npes, _Cmi_mynodesize);
1488 exit(1);
1490 else
1491 { // +ppn wasn't set, make it the same as +p
1492 _Cmi_mynodesize =npes;
1495 else
1496 { // +p wasn't set, make it the same as +ppn
1497 npes = _Cmi_mynodesize;
1499 #endif
1500 /*This is a stupid hack: we expect the *number* of nodes
1501 followed by ChNodeinfo structs; so we use a ChSingleNodeinfo
1502 (which happens to have exactly that layout!) and stuff
1503 a 1 into the "node number" slot
1505 fakeTab->nodeNo=ChMessageInt_new(1); /* <- hack */
1506 fakeTab->info.nPE=ChMessageInt_new(npes);
1507 fakeTab->info.dataport=ChMessageInt_new(0);
1508 fakeTab->info.IP=_skt_invalid_ip;
1510 else
1511 { /*Contact charmrun for machine info.*/
1512 ChSingleNodeinfo me;
1513 memset(&me, 0, sizeof(me));
1515 me.nodeNo=ChMessageInt_new(Lrts_myNode);
1517 /*The nPE fields are set by charmrun--
1518 these values don't matter.
1519 Set IP in case it is mpiexec mode where charmrun does not have IP yet
1521 me.info.nPE=ChMessageInt_new(0);
1522 /* me.info.IP=_skt_invalid_ip; */
1523 me.info.IP=skt_innode_my_ip();
1524 me.info.dataport=ChMessageInt_new(dataport);
1526 /*Send our node info. to charmrun.
1527 CommLock hasn't been initialized yet--
1528 use non-locking version*/
1529 ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),NULL,0);
1530 MACHSTATE1(5,"send initnode - dataport:%d", dataport);
1532 MACHSTATE(3,"initnode sent");
1534 /*We get the other node addresses from a message sent
1535 back via the charmrun control port.*/
1536 if (!skt_select1(Cmi_charmrun_fd,1200*1000)){
1537 CmiAbort("Timeout waiting for nodetab!\n");
1539 MACHSTATE(2,"recv initnode {");
1540 ChMessage_recv(Cmi_charmrun_fd,&nodetabmsg);
1541 MACHSTATE(2,"} recv initnode");
1543 ChMessageInt_t *n32 = (ChMessageInt_t *) nodetabmsg.data;
1544 ChNodeinfo *d = (ChNodeinfo *) (n32+1);
1545 _Cmi_myphysnode_numprocesses = ChMessageInt(d[Lrts_myNode].nProcessesInPhysNode);
1546 //#if CMK_USE_IBVERBS
1547 //#else
1548 node_addresses_store(&nodetabmsg);
1549 ChMessage_free(&nodetabmsg);
1550 //#endif
1551 MACHSTATE(3,"} node_addresses_obtain ");
1555 /***********************************************************************
1556 * DeliverOutgoingMessage()
1558 * This function takes care of delivery of outgoing messages from the
1559 * sender end. Broadcast messages are divided into sets of messages that
1560 * are bound to the local node, and to remote nodes. For local
1561 * transmission, the messages are directly pushed into the recv
1562 * queues. For non-local transmission, the function DeliverViaNetwork()
1563 * is called
1564 ***********************************************************************/
1565 int DeliverOutgoingMessage(OutgoingMsg ogm)
1567 int i, rank, dst; OtherNode node;
1569 int network = 1;
1571 dst = ogm->dst;
1573 //printf("deliver outgoing message, dest: %d \n", dst);
1574 #if CMK_ERROR_CHECKING
1575 if (dst<0 || dst>=CmiNumPesGlobal())
1576 CmiAbort("Send to out-of-bounds processor!");
1577 #endif
1578 node = nodes_by_pe[dst];
1579 rank = dst - node->nodestart;
1580 int acqLock = 0;
1581 if (node->nodestart != Cmi_nodestartGlobal) {
1582 DeliverViaNetwork(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1584 #if CMK_MULTICORE
1585 network = 0;
1586 #endif
1587 return network;
1591 * Set up an OutgoingMsg structure for this message.
1593 static OutgoingMsg PrepareOutgoing(int pe,int size,int freemode,char *data) {
1594 OutgoingMsg ogm;
1595 MallocOutgoingMsg(ogm);
1596 MACHSTATE2(2,"Preparing outgoing message for pe %d, size %d",pe,size);
1597 ogm->size = size;
1598 ogm->data = data;
1599 ogm->src = CmiMyPeGlobal();
1600 ogm->dst = pe;
1601 ogm->freemode = freemode;
1602 ogm->refcount = 0;
1603 return (CmiCommHandle)ogm;
1607 /******************************************************************************
1609 * CmiGeneralSend
1611 * Description: This is a generic message sending routine. All the
1612 * converse message send functions are implemented in terms of this
1613 * function. (By setting appropriate flags (eg freemode) that tell
1614 * CmiGeneralSend() how exactly to handle the particular case of
1615 * message send)
1617 *****************************************************************************/
1619 //CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
1620 CmiCommHandle LrtsSendFunc(int destNode, int pe, int size, char *data, int freemode)
1622 int sendonnetwork;
1623 OutgoingMsg ogm;
1624 MACHSTATE(1,"CmiGeneralSend {");
1626 CMI_MSG_SIZE(data) = size;
1628 ogm=PrepareOutgoing(pe,size,'F',data);
1630 sendonnetwork = DeliverOutgoingMessage(ogm);
1631 MACHSTATE(1,"} LrtsSend");
1632 return (CmiCommHandle)ogm;
1636 /******************************************************************************
1638 * Comm Handle manipulation.
1640 *****************************************************************************/
1642 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1644 /*****************************************************************************
1646 * NET version List-Cast and Multicast Code
1648 ****************************************************************************/
1650 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1652 int i;
1653 for(i=0;i<npes;i++) {
1654 CmiReference(msg);
1655 CmiSyncSendAndFree(pes[i], len, msg);
1659 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1661 CmiError("ListSend not implemented.");
1662 return (CmiCommHandle) 0;
1666 because in all net versions, the message buffer after CmiSyncSendAndFree
1667 returns is not changed, we can use memory reference trick to avoid
1668 memory copying here
1670 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1672 int i;
1673 for(i=0;i<npes;i++) {
1674 CmiReference(msg);
1675 CmiSyncSendAndFree(pes[i], len, msg);
1677 CmiFree(msg);
1680 #endif
1682 void LrtsDrainResources(){}
1683 void LrtsPostNonLocal() {}
1685 /* Network progress function is used to poll the network when for
1686 messages. This flushes receive buffers on some implementations*/
1688 #if CMK_MACHINE_PROGRESS_DEFINED
1689 void CmiMachineProgressImpl(){
1690 #if CMK_SMP && !CMK_MULTICORE
1691 if (CmiMyRank() == CmiMyNodeSize())
1692 #endif
1693 CommunicationServerNet(0, COMM_SERVER_FROM_SMP);
1695 #endif
1697 void LrtsAdvanceCommunication(int whileidle)
1699 #if CMK_SMP
1700 CommunicationServerNet(0, COMM_SERVER_FROM_SMP);
1701 #else
1702 CommunicationServerNet(0, COMM_SERVER_FROM_WORKER);
1703 #endif
1707 /******************************************************************************
1709 * Main code, Init, and Exit
1711 *****************************************************************************/
1713 #if CMK_BARRIER_USE_COMMON_CODE
1715 /* happen at node level */
1716 /* must be called on every PE including communication processors */
1717 void LrtsBarrier()
1719 int numnodes = CmiNumNodesGlobal();
1720 static int barrier_phase = 0;
1722 if (Cmi_charmrun_fd == -1) return; // standalone
1723 if (numnodes == 1) {
1724 return;
1727 ctrl_sendone_locking("barrier",NULL,0,NULL,0);
1728 while (barrierReceived != 1) {
1729 LOCK_IF_AVAILABLE();
1730 ctrl_getone();
1731 UNLOCK_IF_AVAILABLE();
1733 barrierReceived = 0;
1734 barrier_phase ++;
1737 int CmiBarrierZero()
1739 int i;
1740 int numnodes = CmiNumNodesGlobal();
1741 ChMessage msg;
1743 if (Cmi_charmrun_fd == -1) return 0; // standalone
1744 if (numnodes == 1) {
1745 CmiNodeAllBarrier();
1746 return 0;
1749 if (CmiMyRank() == 0) {
1750 char str[64];
1751 sprintf(str, "%d", CmiMyNodeGlobal());
1752 ctrl_sendone_locking("barrier0",str,strlen(str)+1,NULL,0);
1753 if (CmiMyNodeGlobal() == 0) {
1754 while (barrierReceived != 2) {
1755 LOCK_IF_AVAILABLE();
1756 ctrl_getone();
1757 UNLOCK_IF_AVAILABLE();
1759 barrierReceived = 0;
1763 CmiNodeAllBarrier();
1764 return 0;
1767 #endif
1769 /******************************************************************************
1771 * Main code, Init, and Exit
1773 *****************************************************************************/
1775 void LrtsPreCommonInit(int everReturn)
1777 #if !CMK_SMP
1778 #if !CMK_ASYNC_NOT_NEEDED
1779 if (Cmi_asyncio)
1781 CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
1782 if (!Cmi_netpoll) {
1783 if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
1784 if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
1787 #endif
1788 #endif
1791 void LrtsPostCommonInit(int everReturn)
1793 /* better to show the status here */
1794 if (CmiMyPe() == 0) {
1795 if (Cmi_netpoll == 1) {
1796 CmiPrintf("Charm++> scheduler running in netpoll mode.\n");
1798 #if CMK_SHARED_VARS_UNAVAILABLE
1799 else {
1800 if (CmiMemoryIs(CMI_MEMORY_IS_OS))
1801 CmiAbort("Charm++ Fatal Error: interrupt mode does not work with default system memory allocator. Run with +netpoll to disable the interrupt.");
1803 #endif
1806 #if MEMORYUSAGE_OUTPUT
1807 memoryusage_counter = 0;
1808 #endif
1810 #if CMK_SHARED_VARS_UNAVAILABLE
1811 if (Cmi_netpoll) /*Repeatedly call CommServer*/
1812 CcdCallOnConditionKeep(CcdPERIODIC,
1813 (CcdVoidFn) CommunicationPeriodic, NULL);
1814 else /*Only need this for retransmits*/
1815 CcdCallOnConditionKeep(CcdPERIODIC_10ms,
1816 (CcdVoidFn) CommunicationPeriodic, NULL);
1817 #endif
1819 if (CmiMyRank()==0 && Cmi_charmrun_fd!=-1) {
1820 CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) CmiStdoutFlush, NULL);
1821 #if CMK_SHARED_VARS_UNAVAILABLE && !CMK_TIMER_USE_WIN32API
1822 if (!Cmi_asyncio) {
1823 CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
1825 else {
1826 /*Occasionally ping charmrun, to test if it's dead*/
1827 struct itimerval i;
1828 CmiSignal(SIGALRM, 0, 0, pingCharmrun);
1829 #if MEMORYUSAGE_OUTPUT
1830 i.it_interval.tv_sec = 0;
1831 i.it_interval.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
1832 i.it_value.tv_sec = 0;
1833 i.it_value.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
1834 #else
1835 i.it_interval.tv_sec = 10;
1836 i.it_interval.tv_usec = 0;
1837 i.it_value.tv_sec = 10;
1838 i.it_value.tv_usec = 0;
1839 #endif
1840 setitimer(ITIMER_REAL, &i, NULL);
1843 #if ! CMK_USE_TCP
1844 /*Occasionally check for retransmissions, outgoing acks, etc.*/
1845 CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
1846 #endif
1847 #endif
1849 /*Initialize the clock*/
1850 Cmi_clock=GetClock();
1853 #ifdef IGET_FLOWCONTROL
1854 /* Call the function once to determine the amount of physical memory available */
1855 getAvailSysMem();
1856 /* Call the function to periodically call the token adapt function */
1857 CcdCallFnAfter((CcdVoidFn)TokenUpdatePeriodic, NULL, 2000); // magic number of 2000ms
1858 CcdCallOnConditionKeep(CcdPERIODIC_10s, // magic number of PERIOD 10s
1859 (CcdVoidFn) TokenUpdatePeriodic, NULL);
1860 #endif
1862 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
1863 srand((int)(1024.0*CmiWallTimer()));
1864 if (CmiMyPe()==0)
1865 CmiPrintf("Charm++: Machine layer will randomly corrupt every %d'th message (rand %d)\n",
1866 CMK_RANDOMLY_CORRUPT_MESSAGES,rand());
1867 #endif
1869 #ifdef __ONESIDED_IMPL
1870 #ifdef __ONESIDED_NO_HARDWARE
1871 putSrcHandler = CmiRegisterHandler((CmiHandler)handlePutSrc);
1872 putDestHandler = CmiRegisterHandler((CmiHandler)handlePutDest);
1873 getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
1874 getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
1875 #endif
1876 #endif
1880 void LrtsExit()
1882 int i;
1883 machine_initiated_shutdown=1;
1885 CmiStdoutFlush();
1886 if (Cmi_charmrun_fd==-1) {
1887 exit(0);
1889 else {
1890 Cmi_check_delay = 1.0; /* speed up checking of charmrun */
1891 for(i = 0; i < CmiMyNodeSize(); i++) {
1892 ctrl_sendone_locking("ending",NULL,0,NULL,0); /* this causes charmrun to go away, every PE needs to report */
1894 while(1) {
1895 #if CMK_USE_PXSHM
1896 CommunicationServerPxshm();
1897 #endif
1898 CommunicationServerNet(5, COMM_SERVER_FROM_SMP);
1903 #if CMK_SHRINK_EXPAND
1904 void ConverseCleanup(void)
1906 MACHSTATE(2,"ConverseCleanup {");
1907 if (CmiMyRank()==0) {
1908 if(Cmi_print_stats)
1909 printNetStatistics();
1910 log_done();
1913 CmiBarrier();
1915 if (CmiMyPe() == 0) {
1916 if (willContinue) {
1917 CcsSendDelayedReply(shrinkExpandreplyToken, 0, 0); //reply to CCS client
1918 // wait for this message to receive, hack
1919 // TODO: figure out why this is important
1920 usleep(500);
1921 // this causes charmrun to go away
1922 ctrl_sendone_locking("realloc",&numProcessAfterRestart, sizeof(int),NULL,0);
1923 } else {
1924 ctrl_sendone_locking("ending",NULL,0,NULL,0);
1928 // TODO: ensure this won't gobble up some other important message
1929 ChMessage replymsg;
1930 memset(replymsg.header.type, 0, sizeof(replymsg.header.type));
1931 while (strncmp(replymsg.header.type, "realloc_ack", CH_TYPELEN) != 0)
1932 ChMessage_recv(Cmi_charmrun_fd, &replymsg);
1934 #if CMK_USE_SYSVSHM
1935 CmiExitSysvshm();
1936 #elif CMK_USE_PXSHM
1937 CmiExitPxshm();
1938 #endif
1939 ConverseCommonExit(); /* should be called by every rank */
1940 CmiNodeBarrier(); /* single node SMP, make sure every rank is done */
1941 if (CmiMyRank()==0) CmiStdoutFlush();
1942 if (Cmi_charmrun_fd==-1) {
1943 if (CmiMyRank() == 0) exit(0); /*Standalone version-- just leave*/
1944 else while (1) CmiYield();
1945 } else {
1946 if (willContinue) {
1947 int argc=CmiGetArgc(Cmi_argvcopy);
1949 int i;
1950 int restart_idx = -1;
1951 for (i = 0; i < argc; ++i) {
1952 if (strcmp(Cmi_argvcopy[i], "+restart") == 0) {
1953 restart_idx = i;
1954 break;
1958 char **ret;
1959 if (restart_idx == -1) {
1960 ret=(char **)malloc(sizeof(char *)*(argc+10));
1961 } else {
1962 ret=(char **)malloc(sizeof(char *)*(argc+8));
1965 for (i=0;i<argc;i++) {
1966 MACHSTATE1(2,"Parameters %s",Cmi_argvcopy[i]);
1967 ret[i]=Cmi_argvcopy[i];
1970 ret[argc+0]="+shrinkexpand";
1971 ret[argc+1]="+newnumpes";
1973 char temp[50];
1974 sprintf(temp,"%d", numProcessAfterRestart);
1975 ret[argc+2]=temp;
1977 ret[argc+3]="+mynewpe";
1978 char temp2[50];
1979 sprintf(temp2,"%d", mynewpe);
1980 ret[argc+4]=temp2;
1982 ret[argc+5]="+myoldpe";
1983 char temp3[50];
1984 sprintf(temp3,"%d", _Cmi_mype);
1985 ret[argc+6]=temp3;
1987 if (restart_idx == -1) {
1988 ret[argc+7]="+restart";
1989 ret[argc+8]=_shrinkexpand_basedir;
1990 ret[argc+9]=Cmi_argvcopy[argc];
1991 } else {
1992 ret[restart_idx + 1] = _shrinkexpand_basedir;
1993 ret[argc+7]=Cmi_argvcopy[argc];
1996 free(Cmi_argvcopy);
1997 MACHSTATE1(3,"ConverseCleanup mynewpe %s", temp2);
1998 MACHSTATE(2,"} ConverseCleanup");
2000 skt_close(Cmi_charmrun_fd);
2001 // Avoid crash by SIGALRM
2002 #if !defined(_WIN32)
2003 sigaction(SIGALRM, SIG_IGN, NULL);
2004 #else
2005 signal(SIGALRM, SIG_IGN);
2006 #endif
2007 #if CMK_USE_IBVERBS
2008 CmiMachineCleanup();
2009 #endif
2010 //put references to the controlling tty back on normal fd so that
2011 //CmiStdoutInit refers to the tty not the old pipe
2012 dup2(writeStdout[0], 1);
2013 dup2(writeStdout[1], 2);
2015 #if CMK_SHRINK_EXPAND
2016 // Close any old file descriptors.
2017 // FDs carry over execv, so these have to be closed at some point.
2018 // Since some of them are async pipes, however, doing so flushes the buffer,
2019 // raising a SIGIO before a handler is assigned in LrtsPreCommonInit, which
2020 // kills the program.
2021 // An easy way to deal with this is to simply close the FDs here.
2022 int b;
2023 for (b = 3; b < 20; b++) {
2024 close(b);
2026 #endif
2028 // TODO: check variant of execv that takes file descriptor
2029 execv(ret[0], ret); // Need to check if the process name is always first arg
2030 /* should not be here */
2031 MACHSTATE1(3,"execv error: %s", strerror(errno));
2032 CmiPrintf("[%d] should not be here\n", CmiMyPe());
2033 exit(1);
2034 } else {
2035 skt_close(Cmi_charmrun_fd);
2036 exit(0);
2041 #endif
2043 static void set_signals(void)
2045 if(!Cmi_truecrash) {
2046 #if !defined(_WIN32)
2047 struct sigaction sa;
2048 sa.sa_handler = KillOnAllSigs;
2049 sigemptyset(&sa.sa_mask);
2050 sa.sa_flags = SA_RESTART;
2052 sigaction(SIGSEGV, &sa, NULL);
2053 sigaction(SIGFPE, &sa, NULL);
2054 sigaction(SIGILL, &sa, NULL);
2055 sigaction(SIGINT, &sa, NULL);
2056 sigaction(SIGTERM, &sa, NULL);
2057 sigaction(SIGABRT, &sa, NULL);
2058 #else
2059 signal(SIGSEGV, KillOnAllSigs);
2060 signal(SIGFPE, KillOnAllSigs);
2061 signal(SIGILL, KillOnAllSigs);
2062 signal(SIGINT, KillOnAllSigs);
2063 signal(SIGTERM, KillOnAllSigs);
2064 signal(SIGABRT, KillOnAllSigs);
2065 #endif
2066 # if !defined(_WIN32) /*UNIX-only signals*/
2067 sigaction(SIGQUIT, &sa, NULL);
2068 sigaction(SIGBUS, &sa, NULL);
2069 # if CMK_HANDLE_SIGUSR
2070 sa.sa_handler = HandleUserSignals;
2071 sigaction(SIGUSR1, &sa, NULL);
2072 sigaction(SIGUSR2, &sa, NULL);
2073 # endif
2074 # endif /*UNIX*/
2078 /*Socket idle function to use before addresses have been
2079 obtained. During the real program, we idle with CmiYield.
2081 static void obtain_idleFn(void) {sleep(0);}
2083 static int net_default_skt_abort(SOCKET skt,int code,const char *msg)
2085 fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
2086 machine_exit(1);
2087 return -1;
2090 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2092 int i;
2093 Cmi_netpoll = 0;
2094 #if CMK_NETPOLL
2095 Cmi_netpoll = 1;
2096 #endif
2097 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
2098 Cmi_idlepoll = 0;
2099 #else
2100 Cmi_idlepoll = 1;
2101 #endif
2102 Cmi_truecrash = 0;
2103 if (CmiGetArgFlagDesc(*argv,"+truecrash","Do not install signal handlers") ||
2104 CmiGetArgFlagDesc(*argv,"++debug",NULL /*meaning: don't show this*/)) Cmi_truecrash = 1;
2105 /* netpoll disable signal */
2106 if (CmiGetArgFlagDesc(*argv,"+netpoll","Do not use SIGIO--poll instead")) Cmi_netpoll = 1;
2107 if (CmiGetArgFlagDesc(*argv,"+netint","Use SIGIO")) Cmi_netpoll = 0;
2108 /* idlepoll use poll instead if sleep when idle */
2109 if (CmiGetArgFlagDesc(*argv,"+idlepoll","Do not sleep when idle")) Cmi_idlepoll = 1;
2110 /* idlesleep use sleep instead if busywait when idle */
2111 if (CmiGetArgFlagDesc(*argv,"+idlesleep","Make sleep calls when idle")) Cmi_idlepoll = 0;
2112 Cmi_syncprint = CmiGetArgFlagDesc(*argv,"+syncprint", "Flush each CmiPrintf to the terminal");
2114 #if CMK_SHRINK_EXPAND
2115 if (CmiGetArgFlagDesc(*argv,"+shrinkexpand","Restarting of already running prcoess")) Cmi_isOldProcess = 1;
2116 #endif
2118 Cmi_asyncio = 1;
2119 #if CMK_ASYNC_NOT_NEEDED
2120 Cmi_asyncio = 0;
2121 #endif
2122 if (CmiGetArgFlagDesc(*argv,"+asyncio","Use async IO")) Cmi_asyncio = 1;
2123 if (CmiGetArgFlagDesc(*argv,"+asynciooff","Don not use async IO")) Cmi_asyncio = 0;
2124 #if CMK_MULTICORE
2125 if (CmiGetArgFlagDesc(*argv,"+commthread","Use communication thread")) {
2126 Cmi_commthread = 1;
2127 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
2128 _Cmi_sleepOnIdle = 1; /* worker thread go sleep */
2129 #endif
2130 if (CmiMyPe() == 0) CmiPrintf("Charm++> communication thread is launched in multicore version. \n");
2132 #endif
2134 skt_init();
2135 /* use special abort handler instead of default_skt_abort to
2136 prevent exit trapped by atexit_check() due to the exit() call */
2137 skt_set_abort(net_default_skt_abort);
2138 atexit(machine_atexit_check);
2139 parse_netstart();
2140 parse_magic();
2141 #if ! defined(_WIN32)
2142 /* only get forks in non-smp mode */
2143 #if CMK_SHRINK_EXPAND
2144 if(Cmi_isOldProcess!=1)
2145 #endif
2146 parse_forks();
2147 #endif
2148 extract_args(*argv);
2149 log_init();
2150 Cmi_comm_var_mutex = CmiCreateLock();
2151 Cmi_freelist_mutex = CmiCreateLock();
2152 Cmi_scanf_mutex = CmiCreateLock();
2153 #if CMK_SHRINK_EXPAND
2154 if (Cmi_isOldProcess == 1) {
2155 Lrts_myNode = Cmi_mynewpe;
2156 Cmi_myoldpe = Cmi_oldpe;
2157 Lrts_numNodes = Cmi_newnumnodes;
2159 #endif
2161 /* NOTE: can not acutally call timer before timerInit ! GZ */
2162 #if CMK_SHRINK_EXPAND
2163 MACHSTATE3(2,"After reorg %d %d %d \n", Cmi_oldpe, Lrts_myNode, Lrts_numNodes);
2164 #endif
2165 MACHSTATE2(5,"Init: (netpoll=%d), (idlepoll=%d)",Cmi_netpoll,Cmi_idlepoll);
2167 skt_set_idle(obtain_idleFn);
2168 if (!skt_ip_match(Cmi_charmrun_IP,_skt_invalid_ip)) {
2169 set_signals();
2170 #if CMK_USE_TCP
2171 dataskt=skt_server(&dataport);
2172 #else
2173 dataskt=skt_datagram(&dataport, Cmi_os_buffer_size);
2174 #endif
2175 MACHSTATE2(5,"skt_connect at dataskt:%d Cmi_charmrun_port:%d",dataskt, Cmi_charmrun_port);
2176 Cmi_charmrun_fd = skt_connect(Cmi_charmrun_IP, Cmi_charmrun_port, 1800);
2177 MACHSTATE2(5,"Opened connection to charmrun at socket %d, dataport=%d", Cmi_charmrun_fd, dataport);
2178 skt_tcp_no_nagle(Cmi_charmrun_fd);
2179 CmiStdoutInit();
2180 } else {/*Standalone operation*/
2181 if (!quietMode) printf("Charm++: standalone mode (not using charmrun)\n");
2182 dataskt=-1;
2183 Cmi_charmrun_fd=-1;
2186 CmiMachineInit(*argv);
2188 node_addresses_obtain(*argv);
2189 MACHSTATE(5,"node_addresses_obtain done");
2191 CmiCommunicationInit(*argv);
2193 skt_set_idle(CmiYield);
2194 Cmi_check_delay = 1.0+0.25*Lrts_numNodes;
2196 if (Cmi_charmrun_fd==-1) /*Don't bother with check in standalone mode*/
2197 Cmi_check_delay=1.0e30;
2199 #if CMK_SMP
2200 // Allocate a slot for the comm thread. Do this even for multicore,
2201 // since it's possible to ask for a 'comm' thread at runtime
2202 inProgress = calloc(_Cmi_mynodesize+1, sizeof(int));
2203 #else
2204 inProgress = calloc(_Cmi_mynodesize, sizeof(int));
2205 #endif
2207 *numNodes = Lrts_numNodes;
2208 *myNodeID = Lrts_myNode;
2212 #if CMK_CELL
2214 #include "spert_ppu.h"
2216 void machine_OffloadAPIProgress() {
2217 LOCK_IF_AVAILABLE();
2218 OffloadAPIProgress();
2219 UNLOCK_IF_AVAILABLE();
2221 #endif
2223 void LrtsPrepareEnvelope(char *msg, int size)
2225 CMI_MSG_SIZE(msg) = size;
2229 /*@}*/