3 * Basic NET-LRTS implementation of Converse machine layer
8 * NET implementation of machine layer, ethernet in particular
13 * Messages are sent using UDP datagrams. The sender allocates a
14 * struct for each datagram to be sent. These structs stick around
15 * until slightly after the datagram is acknowledged.
17 * Datagrams are transmitted node-to-node (as opposed to pe-to-pe).
18 * Each node has an OtherNode struct for every other node in the
19 * system. The OtherNode struct contains:
21 * send_queue (all datagram-structs not yet transmitted)
22 * send_window (all datagram-structs transmitted but not ack'd)
24 * When an acknowledgement comes in, all packets in the send-window
25 * are either marked as acknowledged or pushed back into the send
26 * queue for retransmission.
28 * THE OUTGOING MESSAGE
30 * When you send or broadcast a message, the first thing the system
31 * does is system creates an OutgoingMsg struct to represent the
32 * operation. The OutgoingMsg contains a very direct expression
33 * of what you want to do:
37 * size --- size of message in bytes
38 * data --- pointer to the buffer containing the message
39 * src --- processor which sent the message
40 * dst --- destination processor (-1=broadcast, -2=broadcast all)
41 * freemode --- see below.
42 * refcount --- see below.
44 * The OutgoingMsg is kept around until the transmission is done, then
45 * it is garbage collected --- the refcount and freemode fields are
46 * to assist garbage collection.
48 * The freemode indicates which kind of buffer-management policy was
49 * used (sync, async, or freeing). The sync policy is handled
50 * superficially by immediately converting sync sends into freeing
51 * sends. Thus, the freemode can either be 'A' (async) or 'F'
52 * (freeing). If the freemode is 'F', then garbage collection
53 * involves freeing the data and the OutgoingMsg structure itself. If
54 * the freemode is 'A', then the only cleanup is to change the
55 * freemode to 'X', a condition which is then detectable by
56 * CmiAsyncMsgSent. In this case, the actual freeing of the
57 * OutgoingMsg is done by CmiReleaseCommHandle.
59 * When the transmission is initiated, the system computes how many
60 * datagrams need to be sent, total. This number is stored in the
61 * refcount field. Each time a datagram is delivered, the refcount
62 * is decremented, when it reaches zero, cleanup is performed. There
63 * are two exceptions to this rule. Exception 1: if the OutgoingMsg
64 * is a send (not a broadcast) and can be performed with shared
65 * memory, the entire datagram system is bypassed, the message is
66 * simply delivered and freed, not using the refcount mechanism at
67 * all. Exception 2: If the message is a broadcast, then part of the
68 * broadcast that can be done via shared memory is performed prior to
69 * initiating the datagram/refcount system.
71 * DATAGRAM FORMATS AND MESSAGE FORMATS
73 * Datagrams have this format:
75 * srcpe (16 bits) --- source processor number.
76 * magic ( 8 bits) --- magic number to make sure DG is good.
77 * dstrank ( 8 bits) --- destination processor rank.
78 * seqno (32 bits) --- packet sequence number.
79 * data (XX byte) --- user data.
81 * The only reason the srcpe is in there is because the receiver needs
82 * to know which receive window to use. The dstrank field is needed
83 * because transmission is node-to-node. Once the message is
84 * assembled by the node, it must be delivered to the appropriate PE.
85 * The dstrank field is used to encode certain special-case scenarios.
86 * If the dstrank is DGRAM_BROADCAST, the transmission is a broadcast,
87 * and should be delivered to all processors in the node. If the dstrank
88 * is DGRAM_ACKNOWLEDGE, the datagram is an acknowledgement datagram, in
89 * which case the srcpe is the number of the acknowledger, the seqno is
90 * always zero, and the user data is a list of the seqno's being
91 * acknowledged. There may be other dstrank codes for special functions.
93 * To send a message, one chops it up into datagrams and stores those
94 * datagrams in a send-queue. These outgoing datagrams aren't stored
95 * in the explicit format shown above. Instead, they are stored as
96 * ImplicitDgrams, which contain the datagram header and a pointer to
97 * the user data (which is in the user message buffer, which is in the
98 * OutgoingMsg). At transmission time these are combined together.
100 * The combination of the datagram header with the user's data is
101 * performed right in the user's message buffer. Note that the
102 * datagram header is exactly 64 bits. One simply overwrites 64 bits
103 * of the user's message with a datagram header, sends the datagram
104 * straight from the user's message buffer, then restores the user's
105 * buffer to its original state. There is a small problem with the
106 * first datagram of the message: one needs 64 bits of space to store
107 * the datagram header. To make sure this space is there, we added a
108 * 64-bit unused space to the front of the Cmi message header. In
109 * addition to this, we also add 32 bits to the Cmi message header
110 * to make room for a length-field, making it possible to identify
111 * message boundaries.
113 * CONCURRENCY CONTROL
115 * This has changed recently.
119 * The sender-side does little copying. The async and freeing send
120 * routines do no copying at all. The sync send routines copy the
121 * message, then use the freeing-send routines. The other alternative
122 * is to not copy the message, and use the async send mechanism
123 * combined with a blocking wait. Blocking wait seems like a bad
124 * idea, since it could take a VERY long time to get all those
125 * datagrams out the door.
127 * The receiver side, unfortunately, must copy. To avoid copying,
128 * it would have to receive directly into a preallocated message buffer.
129 * Unfortunately, this can't work: there's no way to know how much
130 * memory to preallocate, and there's no way to know which datagram
131 * is coming next. Thus, we receive into fixed-size (large) datagram
132 * buffers. These are then inspected, and the messages extracted from
135 * Note that we are allocating a large number of structs: OutgoingMsg's,
136 * ImplicitDgrams, ExplicitDgrams. By design, each of these structs
137 * is a fixed-size structure. Thus, we can do memory allocation by
138 * simply keeping a linked-list of unused structs around. The only
139 * place where expensive memory allocation is performed is in the
142 * Since the datagrams from one node to another are fully ordered,
143 * there is slightly more ordering than is needed: in theory, the
144 * datagrams of one message don't need to be ordered relative to the
145 * datagrams of another. This was done to simplify the sequencing
146 * mechanisms: implementing a fully-ordered stream is much simpler
147 * than a partially-ordered one. It also makes it possible to
148 * modularize, layering the message transmitter on top of the
149 * datagram-sequencer. In other words, it was just easier this way.
150 * Hopefully, this won't cause serious degradation: LAN's rarely get
151 * datagrams out of order anyway.
153 * A potential efficiency problem is the lack of message-combining.
154 * One datagram could conceivably contain several messages. This
155 * might be more efficient, it's not clear how much overhead is
156 * involved in sending a short datagram. Message-combining isn't
157 * really ``integrated'' into the design of this software, but you
158 * could fudge it as follows. Whenever you pull a short datagram from
159 * the send-queue, check the next one to see if it's also a short
160 * datagram. If so, pack them together into a ``combined'' datagram.
161 * At the receive side, simply check for ``combined'' datagrams, and
162 * treat them as if they were simply two datagrams. This would
163 * require extra copying. I have no idea if this would be worthwhile.
165 *****************************************************************************/
172 /*****************************************************************************
176 ****************************************************************************/
178 #define _GNU_SOURCE 1
179 #include <stdarg.h> /*<- was <varargs.h>*/
181 #define CMK_USE_PRINTF_HACK 0
182 #if CMK_USE_PRINTF_HACK
183 /*HACK: turn printf into CmiPrintf, by just defining our own
184 external symbol "printf". This may be more trouble than it's worth,
185 since the only advantage is that it works properly with +syncprint.
187 This version *won't* work with fprintf(stdout,...) or C++ or Fortran I/O,
188 because they don't call printf. Has to be defined up here because we probably
189 haven't properly guessed this compiler's prototype for "printf".
191 static void InternalPrintf(const char *f
, va_list l
);
192 int printf(const char *fmt
, ...) {
194 va_list p
; va_start(p
, fmt
);
195 InternalPrintf(fmt
,p
);
202 #include "converse.h"
203 #include "memory-isomalloc.h"
215 /* define machine debug */
218 /******************* Producer-Consumer Queues ************************/
221 #include "machine-smp.h"
223 // This is used by machine-pxshm.c, which is included by machine-common-core.c
224 // (itself included below.)
225 static int Cmi_charmrun_pid
;
227 #include "machine-lrts.h"
228 #include "machine-common-core.c"
231 #include <sys/event.h>
240 int Cmi_commthread
= 0;
243 #include "conv-ccs.h"
244 #include "ccs-server.h"
245 #include "sockRoutines.h"
247 #if defined(_WIN32) && ! defined(__CYGWIN__)
248 /*For windows systems:*/
249 # include <windows.h>
251 # include <sys/types.h>
252 # include <sys/timeb.h>
253 # define fdopen _fdopen
254 # define SIGBUS -1 /*These signals don't exist in Win32*/
257 /*# define SIGTERM -1*/ /* VC++ ver 8 now has SIGTERM */
263 # include <sys/file.h>
266 #if CMK_PERSISTENT_COMM
267 #include "machine-persistent.c"
270 #define PRINTBUFSIZE 16384
272 #ifdef __ONESIDED_IMPL
273 #ifdef __ONESIDED_NO_HARDWARE
278 #include "conv-onesided.c"
282 static void CommunicationServerNet(int withDelayMs
, int where
);
283 //static void CommunicationServer(int withDelayMs);
285 void CmiHandleImmediate();
286 extern int CmemInsideMem();
287 extern void CmemCallWhenMemAvail();
289 static unsigned int dataport
=0;
290 static SOCKET dataskt
;
292 extern void TokenUpdatePeriodic();
293 extern void getAvailSysMem();
295 static int Lrts_numNodes
;
296 static int Lrts_myNode
;
298 /****************************************************************************
302 * Errors should be handled by printing a message on stderr and
303 * calling exit(1). Nothing should be sent to charmrun, no attempt at
304 * communication should be made. The other processes will notice the
305 * abnormal termination and will deal with it.
307 * Rationale: if an error triggers an attempt to send a message,
308 * the attempt to send a message is likely to trigger another error,
309 * leading to an infinite loop and a process that spins instead of
312 *****************************************************************************/
314 static int machine_initiated_shutdown
=0;
315 static int already_in_signal_handler
=0;
317 static void CmiDestroyLocks();
321 static void machine_exit(int status
)
323 MACHSTATE(3," machine_exit");
324 machine_initiated_shutdown
=1;
326 CmiDestroyLocks(); /* destroy locks to prevent dead locking */
333 static void charmrun_abort(const char*);
335 static void KillEveryone(const char *msg
)
341 static void KillEveryoneCode(n
)
345 sprintf(_s
, "[%d] Fatal error #%d\n", CmiMyPe(), n
);
350 CpvExtern(int, freezeModeFlag
);
352 static int Cmi_truecrash
;
354 static void KillOnAllSigs(int sigNo
)
356 const char *sig
="unknown signal";
357 const char *suggestion
="";
358 if (machine_initiated_shutdown
||
359 already_in_signal_handler
)
360 machine_exit(1); /*Don't infinite loop if there's a signal during a signal handler-- just die.*/
361 already_in_signal_handler
=1;
363 #if CMK_CCS_AVAILABLE
364 if (CpvAccess(cmiArgDebugFlag
)) {
366 CpdNotify(CPD_SIGNAL
,sigNo
);
367 #if ! CMK_BIGSIM_CHARM
368 CcsSendReplyNoError(4,&reply
);/*Send an empty reply if not*/
369 CpvAccess(freezeModeFlag
) = 1;
370 CpdFreezeModeScheduler();
377 if (sigNo
==SIGSEGV
) {
378 sig
="segmentation violation";
379 suggestion
="Try running with '++debug', or linking with '-memory paranoid' (memory paranoid requires '+netpoll' at runtime).";
382 sig
="floating point exception";
383 suggestion
="Check for integer or floating-point division by zero.";
387 suggestion
="Check for misaligned reads or writes to memory.";
390 sig
="illegal instruction";
391 suggestion
="Check for calls to uninitialized function pointers.";
393 if (sigNo
==SIGKILL
) sig
="caught signal KILL";
394 if (sigNo
==SIGQUIT
) sig
="caught signal QUIT";
395 if (sigNo
==SIGTERM
) sig
="caught signal TERM";
396 MACHSTATE1(5," Caught signal %s ",sig
);
399 if(sigNo
== SIGKILL
|| sigNo
== SIGQUIT
|| sigNo
== SIGTERM
){
400 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
405 CmiAbortHelper("Caught Signal", sig
, suggestion
, 0, 1);
409 static void machine_atexit_check(void)
411 if (!machine_initiated_shutdown
)
412 CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
413 #if 0 /*Wait for the user to press any key (for Win32 debugging)*/
418 #if !defined(_WIN32) || defined(__CYGWIN__)
419 static void HandleUserSignals(int signum
)
421 int condnum
= ((signum
==SIGUSR1
) ? CcdSIGUSR1
: CcdSIGUSR2
);
422 CcdRaiseCondition(condnum
);
426 /*****************************************************************************
428 * Utility routines for network machine interface.
430 *****************************************************************************/
433 Horrific #defines to hide the differences between select() and poll().
435 #if CMK_USE_POLL /*poll() version*/
436 # define CMK_PIPE_DECL(delayMs) \
437 struct pollfd fds[10]; \
438 int nFds_sto=0; int *nFds=&nFds_sto; \
439 int pollDelayMs=delayMs;
440 # define CMK_PIPE_SUB fds,nFds
441 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
443 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
444 # define CMK_PIPE_ADDREAD(rd_fd) \
445 do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
446 # define CMK_PIPE_ADDWRITE(wr_fd) \
447 do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
448 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
449 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
451 #elif CMK_USE_KQUEUE /* kqueue version */
453 # define CMK_PIPE_DECL(delayMs) \
454 if (_kq == -1) _kq = kqueue(); \
455 struct kevent ke_sto; \
456 struct kevent* ke = &ke_sto; \
457 struct timespec tmo; \
458 tmo.tv_sec = 0; tmo.tv_nsec = delayMs*1e6;
459 # define CMK_PIPE_SUB ke
460 # define CMK_PIPE_CALL() kevent(_kq, NULL, 0, ke, 1, &tmo)
462 # define CMK_PIPE_PARAM struct kevent* ke
463 # define CMK_PIPE_ADDREAD(rd_fd) \
464 do { EV_SET(ke, rd_fd, EVFILT_READ, EV_ADD, 0, 10, NULL); \
465 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(*ke));} while(0)
466 # define CMK_PIPE_ADDWRITE(wr_fd) \
467 do { EV_SET(ke, wr_fd, EVFILT_WRITE, EV_ADD, 0, 10, NULL); \
468 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(*ke));} while(0)
469 # define CMK_PIPE_CHECKREAD(rd_fd) (ke->ident == rd_fd && ke->filter == EVFILT_READ)
470 # define CMK_PIPE_CHECKWRITE(wr_fd) (ke->ident == wr_fd && ke->filter == EVFILT_WRITE)
472 #else /*select() version*/
474 # define CMK_PIPE_DECL(delayMs) \
475 fd_set rfds_sto,wfds_sto;\
476 fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
477 FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
478 # define CMK_PIPE_SUB rfds,wfds
479 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
481 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
482 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
483 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
484 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
485 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
488 static void CMK_PIPE_CHECKERR(void) {
489 #if defined(_WIN32) && !defined(__CYGWIN__)
490 /* Win32 socket seems to randomly return inexplicable errors
491 here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK.
492 int err=WSAGetLastError();
493 CmiPrintf("(%d)Select returns -1; errno=%d, WSAerr=%d\n",withDelayMs,errn
496 #else /*UNIX machine*/
498 KillEveryone("Socket error in CheckSocketsReady!\n");
503 static void CmiStdoutFlush(void);
504 static int CmiStdoutNeedsService(void);
505 static void CmiStdoutService(void);
506 static void CmiStdoutAdd(CMK_PIPE_PARAM
);
507 static void CmiStdoutCheck(CMK_PIPE_PARAM
);
510 double GetClock(void)
512 #if defined(_WIN32) && !defined(__CYGWIN__)
515 return (tv
.time
* 1.0 + tv
.millitm
* 1.0E-3);
517 struct timeval tv
; int ok
;
518 ok
= gettimeofday(&tv
, NULL
);
519 if (ok
<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
520 return (tv
.tv_sec
* 1.0 + tv
.tv_usec
* 1.0E-6);
525 /***********************************************************************
529 ************************************************************************/
531 static int Cmi_truecrash
;
532 static int already_aborting
=0;
533 void LrtsAbort(const char *message
)
535 if (already_aborting
) machine_exit(1);
537 MACHSTATE1(5,"CmiAbort(%s)",message
);
539 /*Send off any remaining prints*/
543 printf("CHARM++ FATAL ERROR: %s\n", message
);
544 *(int *)NULL
= 0; /*Write to null, causing bus error*/
546 charmrun_abort(message
);
552 /******************************************************************************
556 * The net and tcp versions use a bunch of unix processes talking to each
557 * other via file descriptors. We need for a signal SIGIO to be generated
558 * each time a message arrives, making it possible to write a signal
559 * handler to handle the messages. The vast majority of unixes can,
560 * in fact, do this. However, there isn't any standard for how this is
561 * supposed to be done, so each version of UNIX has a different set of
562 * calls to turn this signal on. So, there is like one version here for
563 * every major brand of UNIX.
565 *****************************************************************************/
567 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
569 void CmiEnableAsyncIO(int fd
)
571 if ( fcntl(fd
, F_SETOWN
, getpid()) < 0 ) {
572 CmiError("setting socket owner: %s\n", strerror(errno
)) ;
575 if ( fcntl(fd
, F_SETFL
, O_ASYNC
) < 0 ) {
576 CmiError("setting socket async: %s\n", strerror(errno
)) ;
581 void CmiEnableAsyncIO(int fd
) { }
584 /* We should probably have a set of "CMK_NONBLOCK_USE_..." defines here:*/
585 #if !defined(_WIN32) || defined(__CYGWIN__)
586 void CmiEnableNonblockingIO(int fd
) {
588 if (fcntl(fd
,F_SETFL
,O_NONBLOCK
,&on
)<0) {
589 CmiError("setting nonblocking IO: %s\n", strerror(errno
)) ;
594 void CmiEnableNonblockingIO(int fd
) { }
598 /******************************************************************************
602 * This data is all read in from the NETSTART variable (provided by the
603 * charmrun) and from the command-line arguments. Once read in, it is never
606 *****************************************************************************/
608 static skt_ip_t Cmi_self_IP
;
609 static skt_ip_t Cmi_charmrun_IP
; /*Address of charmrun machine*/
610 static int Cmi_charmrun_port
;
611 static int Cmi_charmrun_fd
=-1;
612 /* Magic number to be used for sanity check in messege header */
613 static int Cmi_net_magic
;
615 static int Cmi_netpoll
;
616 static int Cmi_asyncio
;
617 static int Cmi_idlepoll
;
618 static int Cmi_syncprint
;
619 static int Cmi_print_stats
= 0;
621 #if ! defined(_WIN32)
622 /* parse forks only used in non-smp mode */
623 static void parse_forks(void) {
628 forkstr
=getenv("CmiMyForks");
629 if(forkstr
!=0) { /* charmrun */
630 nread
= sscanf(forkstr
,"%d",&forks
);
631 for(i
=1;i
<=forks
;i
++) { /* by default forks = 0 */
633 if(pid
<0) CmiAbort("Fork returned an error");
634 if(pid
==0) { /* forked process */
635 /* reset mynode,pe & exit loop */
646 static void parse_magic(void)
650 nm
= getenv("NETMAGIC");
652 {/*Read values set by Charmrun*/
653 nread
= sscanf(nm
, "%d",&Cmi_net_magic
);
656 static void parse_netstart(void)
661 ns
= getenv("NETSTART");
663 {/*Read values set by Charmrun*/
664 char Cmi_charmrun_name
[1024];
665 nread
= sscanf(ns
, "%d%s%d%d%d",
667 Cmi_charmrun_name
, &Cmi_charmrun_port
,
668 &Cmi_charmrun_pid
, &port
);
669 Cmi_charmrun_IP
=skt_lookup_ip(Cmi_charmrun_name
);
672 fprintf(stderr
,"Error parsing NETSTART '%s'\n",ns
);
676 {/*No charmrun-- set flag values for standalone operation*/
678 Cmi_charmrun_IP
=_skt_invalid_ip
;
683 #if CMK_USE_IBVERBS | CMK_USE_IBUD
684 char *cmi_num_nodes
= getenv("CmiNumNodes");
685 if(cmi_num_nodes
!= NULL
){
686 sscanf(cmi_num_nodes
,"%d",&Lrts_numNodes
);
691 static void extract_common_args(char **argv
)
693 if (CmiGetArgFlagDesc(argv
,"+stats","Print network statistics at shutdown"))
698 /******************************************************************************
700 * Packet Performance Logging
702 * This module is designed to give a detailed log of the packets and their
703 * acknowledgements, for performance tuning. It can be disabled.
705 *****************************************************************************/
711 typedef struct logent
{
724 static void log_init(void)
726 log
= (logent
)malloc(50000 * sizeof(struct logent
));
732 static void log_done(void)
734 char logname
[100]; FILE *f
; int i
, size
;
735 sprintf(logname
, "log.%d", Lrts_myNode
);
736 f
= fopen(logname
, "w");
737 if (f
==0) KillEveryone("fopen problem");
738 if (log_wrap
) size
= 50000; else size
=log_pos
;
739 for (i
=0; i
<size
; i
++) {
741 fprintf(f
, "%1.4f %d %c %d %d\n",
742 ent
->time
, ent
->srcpe
, ent
->kind
, ent
->dstpe
, ent
->seqno
);
749 char logname
[100]; FILE *f
; int i
, j
, size
;
750 static int logged
= 0;
754 CmiPrintf("Logging: %d\n", Lrts_myNode
);
755 sprintf(logname
, "log.%d", Lrts_myNode
);
756 f
= fopen(logname
, "w");
757 if (f
==0) KillEveryone("fopen problem");
758 for (i
= 5000; i
; i
--)
760 /*for (i=0; i<size; i++) */
771 fprintf(f
, "%1.4f %d %c %d %d\n",
772 ent
->time
, ent
->srcpe
, ent
->kind
, ent
->dstpe
, ent
->seqno
);
776 CmiPrintf("Done Logging: %d\n", Lrts_myNode
);
779 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
785 #define log_init() /*empty*/
786 #define log_done() /*empty*/
787 #define printLog() /*empty*/
788 #define LOG(t,s,k,d,q) /*empty*/
792 /******************************************************************************
796 *****************************************************************************/
798 static CmiNodeLock Cmi_scanf_mutex
;
799 static double Cmi_clock
;
800 static double Cmi_check_delay
= 3.0;
803 /** Mechanism to prevent dual locking when comm-layer functions, including prints,
804 * are called recursively. (UN)LOCK_IF_AVAILABLE is used before and after a code piece
805 * which is guaranteed not to make any-recursive locking calls. (UN)LOCK_AND_(UN)SET
806 * is used before and after a code piece that may make recursive locking calls.
809 #define LOCK_IF_AVAILABLE() \
810 if(!inProgress[CmiMyRank()]) { \
814 #define UNLOCK_IF_AVAILABLE() \
815 if(!inProgress[CmiMyRank()]) { \
819 #define LOCK_AND_SET() \
820 if(!inProgress[CmiMyRank()]) { \
824 inProgress[CmiMyRank()] += 1;
826 #define UNLOCK_AND_UNSET() \
831 inProgress[CmiMyRank()] -= 1;
834 /******************************************************************************
837 * SMP implementation moved to machine-smp.c
838 *****************************************************************************/
840 /************************ No kernel SMP threads ***************/
843 static volatile int memflag
=0;
844 void CmiMemLockNet() { memflag
++; }
845 void CmiMemUnlockNet() { memflag
--; }
847 static volatile int comm_flag
=0;
848 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
849 #ifndef MACHLOCK_DEBUG
850 # define CmiCommLock() (comm_flag=1)
851 # define CmiCommUnlock() (comm_flag=0)
852 #else /* Error-checking flag locks */
853 void CmiCommLock(void) {
854 MACHLOCK_ASSERT(!comm_flag
,"CmiCommLock");
857 void CmiCommUnlock(void) {
858 MACHLOCK_ASSERT(comm_flag
,"CmiCommUnlock");
863 //int _Cmi_myrank=0; /* Normally zero; only 1 during SIGIO handling */
866 static void CommunicationInterrupt(int ignored
)
868 MACHLOCK_ASSERT(!_Cmi_myrank
,"CommunicationInterrupt");
869 if (memflag
|| comm_flag
|| _immRunning
|| CmiCheckImmediateLock(0))
870 { /* Already busy inside malloc, comm, or immediate messages */
871 MACHSTATE(5,"--SKIPPING SIGIO--");
874 MACHSTATE1(2,"--BEGIN SIGIO comm_mutex_isLocked: %d--", comm_flag
)
876 /*Make sure any malloc's we do in here are NOT migratable:*/
877 CmiIsomallocBlockList
*oldList
=CmiIsomallocBlockListActivate(NULL
);
879 CommunicationServerNet(0, COMM_SERVER_FROM_INTERRUPT
); /* from interrupt */
880 //CommunicationServer(0); /* from interrupt */
882 CmiIsomallocBlockListActivate(oldList
);
884 MACHSTATE(2,"--END SIGIO--")
887 extern void CmiSignal(int sig1
, int sig2
, int sig3
, void (*handler
)());
889 static void CmiDestroyLocks()
897 /*Add a message to this processor's receive queue
898 Must be called while holding comm. lock
901 extern double evacTime
;
904 /***************************************************************
905 Communication with charmrun:
906 We can send (ctrl_sendone) and receive (ctrl_getone)
907 messages on a TCP socket connected to charmrun.
908 This is used for printfs, CCS, etc; and also for
909 killing ourselves if charmrun dies.
912 /*This flag prevents simultanious outgoing
913 messages on the charmrun socket. It is protected
915 static int Cmi_charmrun_fd_sendflag
=0;
918 static int sendone_abort_fn(SOCKET skt
,int code
,const char *msg
) {
919 fprintf(stderr
,"Socket error %d in ctrl_sendone! %s\n",code
,msg
);
924 static void ctrl_sendone_nolock(const char *type
,
925 const char *data1
,int dataLen1
,
926 const char *data2
,int dataLen2
)
928 const void *bufs
[3]; int lens
[3]; int nBuffers
=0;
930 skt_abortFn oldAbort
=skt_set_abort(sendone_abort_fn
);
931 MACHSTATE1(2,"ctrl_sendone_nolock { type=%s", type
);
932 if (Cmi_charmrun_fd
==-1)
933 charmrun_abort("ctrl_sendone called in standalone!\n");
934 Cmi_charmrun_fd_sendflag
=1;
935 ChMessageHeader_new(type
,dataLen1
+dataLen2
,&hdr
);
936 bufs
[nBuffers
]=&hdr
; lens
[nBuffers
]=sizeof(hdr
); nBuffers
++;
937 if (dataLen1
>0) {bufs
[nBuffers
]=data1
; lens
[nBuffers
]=dataLen1
; nBuffers
++;}
938 if (dataLen2
>0) {bufs
[nBuffers
]=data2
; lens
[nBuffers
]=dataLen2
; nBuffers
++;}
939 skt_sendV(Cmi_charmrun_fd
,nBuffers
,bufs
,lens
);
940 Cmi_charmrun_fd_sendflag
=0;
941 skt_set_abort(oldAbort
);
942 MACHSTATE(2,"} ctrl_sendone_nolock");
945 static void ctrl_sendone_locking(const char *type
,
946 const char *data1
,int dataLen1
,
947 const char *data2
,int dataLen2
)
950 ctrl_sendone_nolock(type
,data1
,dataLen1
,data2
,dataLen2
);
951 UNLOCK_IF_AVAILABLE();
954 #ifndef MEMORYUSAGE_OUTPUT
955 #define MEMORYUSAGE_OUTPUT 0
957 #if MEMORYUSAGE_OUTPUT
958 #define MEMORYUSAGE_OUTPUT_FREQ 10 //how many prints in a second
959 static int memoryusage_counter
;
960 #define memoryusage_isOutput ((memoryusage_counter%MEMORYUSAGE_OUTPUT_FREQ)==0)
961 #define memoryusage_output {\
962 memoryusage_counter++;\
963 if(CmiMyPe()==0) printf("-- %d %f %ld --\n", CmiMyPe(), GetClock(), CmiMemoryUsage());}
966 static double Cmi_check_last
;
968 /* if charmrun dies, we finish */
969 static void pingCharmrun(void *ignored
)
971 #if MEMORYUSAGE_OUTPUT
973 if(memoryusage_isOutput
){
974 memoryusage_counter
= 0;
979 double clock
=GetClock();
980 if (clock
> Cmi_check_last
+ Cmi_check_delay
) {
981 MACHSTATE1(3,"CommunicationsClock pinging charmrun Cmi_charmrun_fd_sendflag=%d", Cmi_charmrun_fd_sendflag
);
982 Cmi_check_last
= clock
;
983 CmiCommLockOrElse(return;); /*Already busy doing communication*/
984 if (Cmi_charmrun_fd_sendflag
) return; /*Busy talking to charmrun*/
986 ctrl_sendone_nolock("ping",NULL
,0,NULL
,0); /*Charmrun may have died*/
987 UNLOCK_IF_AVAILABLE();
989 CmiStdoutFlush(); /*Make sure stdout buffer hasn't filled up*/
993 /* periodic charm ping, for gm and netpoll */
994 static void pingCharmrunPeriodic(void *ignored
)
996 pingCharmrun(ignored
);
997 CcdCallFnAfter((CcdVoidFn
)pingCharmrunPeriodic
,NULL
,1000);
1000 static int ignore_further_errors(SOCKET skt
,int c
,const char *msg
) {machine_exit(2);return -1;}
1001 static void charmrun_abort(const char *s
)
1003 if (Cmi_charmrun_fd
==-1) {/*Standalone*/
1004 fprintf(stderr
,"Charm++ fatal error:\n%s\n",s
);
1005 CmiPrintStackTrace(0);
1009 skt_set_abort(ignore_further_errors
);
1010 if (CmiNumPartitions() == 1) {
1011 sprintf(msgBuf
,"Fatal error on PE %d> ",CmiMyPe());
1015 sprintf(msgBuf
,"Fatal error on Partition %d PE %d> ", CmiMyPartition(), CmiMyPe());
1017 ctrl_sendone_nolock("abort",msgBuf
,strlen(msgBuf
),s
,strlen(s
)+1);
1024 #include "machine-recover.c"
1027 static void node_addresses_store(ChMessage
*msg
);
1029 static int barrierReceived
= 0;
1031 static void ctrl_getone(void)
1034 MACHSTATE(2,"ctrl_getone")
1035 MACHLOCK_ASSERT(comm_mutex_isLocked
,"ctrl_getone")
1036 ChMessage_recv(Cmi_charmrun_fd
,&msg
);
1037 MACHSTATE1(2,"ctrl_getone recv one '%s'", msg
.header
.type
);
1039 if (strcmp(msg
.header
.type
,"die")==0) {
1040 MACHSTATE(2,"ctrl_getone bye bye")
1041 fprintf(stderr
,"aborting: %s\n",msg
.data
);
1043 ConverseCommonExit();
1046 #if CMK_CCS_AVAILABLE
1047 else if (strcmp(msg
.header
.type
, "req_fw")==0) {
1048 CcsImplHeader
*hdr
=(CcsImplHeader
*)msg
.data
;
1049 /*Sadly, I *can't* do a:
1050 CcsImpl_netRequest(hdr,msg.data+sizeof(CcsImplHeader));
1051 here, because I can't send converse messages in the
1052 communication thread. I *can* poke this message into
1053 any convenient processor's queue, though: (OSL, 9/14/2000)
1055 int pe
=0;/*<- node-local processor number. Any one will do.*/
1056 void *cmsg
=(void *)CcsImpl_ccs2converse(hdr
,msg
.data
+sizeof(CcsImplHeader
),NULL
);
1057 MACHSTATE(2,"Incoming CCS request");
1058 if (cmsg
!=NULL
) CmiPushPE(pe
,cmsg
);
1062 else if(strcmp(msg
.header
.type
,"crashnode")==0) {
1063 crash_node_handle(&msg
);
1065 else if(strcmp(msg
.header
.type
,"initnodetab")==0) {
1066 /** A processor crashed and got recreated. So charmrun sent
1067 across the whole nodetable data to update this processor*/
1068 node_addresses_store(&msg
);
1069 // fprintf(stdout,"nodetable added %d\n",CmiMyPe());
1072 else if(strcmp(msg
.header
.type
,"barrier")==0) {
1073 barrierReceived
= 1;
1075 else if(strcmp(msg
.header
.type
,"barrier0")==0) {
1076 barrierReceived
= 2;
1079 /* We do not use KillEveryOne here because it calls CmiMyPe(),
1080 * which is not available to the communication thread on an SMP version.
1082 /* CmiPrintf("Unknown message: %s\n", msg.header.type); */
1083 charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
1087 MACHSTATE(2,"ctrl_getone done")
1088 ChMessage_free(&msg
);
1091 #if CMK_CCS_AVAILABLE && !NODE_0_IS_CONVHOST
1092 /*Deliver this reply data to this reply socket.
1093 The data is forwarded to CCS server via charmrun.*/
1094 void CcsImpl_reply(CcsImplHeader
*hdr
,int repLen
,const void *repData
)
1096 MACHSTATE(2,"Outgoing CCS reply");
1097 ctrl_sendone_locking("reply_fw",(const char *)hdr
,sizeof(CcsImplHeader
),
1099 MACHSTATE(1,"Outgoing CCS reply away");
1103 /*****************************************************************************
1105 * CmiPrintf, CmiError, CmiScanf
1107 *****************************************************************************/
1108 static void InternalWriteToTerminal(int isStdErr
,const char *str
,int len
);
1109 static void InternalPrintf(const char *f
, va_list l
)
1112 char *buffer
= CmiTmpAlloc(PRINTBUFSIZE
);
1114 vsprintf(buffer
, f
, l
);
1116 LOCK_IF_AVAILABLE();
1117 ctrl_sendone_nolock("printsyn", buffer
,strlen(buffer
)+1,NULL
,0);
1118 ChMessage_recv(Cmi_charmrun_fd
,&replymsg
);
1119 ChMessage_free(&replymsg
);
1120 UNLOCK_IF_AVAILABLE();
1122 ctrl_sendone_locking("print", buffer
,strlen(buffer
)+1,NULL
,0);
1124 InternalWriteToTerminal(0,buffer
,strlen(buffer
));
1128 static void InternalError(const char *f
, va_list l
)
1131 char *buffer
= CmiTmpAlloc(PRINTBUFSIZE
);
1133 vsprintf(buffer
, f
, l
);
1135 ctrl_sendone_locking("printerrsyn", buffer
,strlen(buffer
)+1,NULL
,0);
1136 LOCK_IF_AVAILABLE();
1137 ChMessage_recv(Cmi_charmrun_fd
,&replymsg
);
1138 ChMessage_free(&replymsg
);
1139 UNLOCK_IF_AVAILABLE();
1141 ctrl_sendone_locking("printerr", buffer
,strlen(buffer
)+1,NULL
,0);
1143 InternalWriteToTerminal(1,buffer
,strlen(buffer
));
1147 static int InternalScanf(char *fmt
, va_list l
)
1151 char *p
; int nargs
, i
;
1155 if ((p
[0]=='%')&&(p
[1]=='*')) { p
+=2; continue; }
1156 if ((p
[0]=='%')&&(p
[1]=='%')) { p
+=2; continue; }
1157 if (p
[0]=='%') { nargs
++; p
++; continue; }
1158 if (*p
=='\n') *p
=' '; p
++;
1160 if (nargs
> 18) KillEveryone("CmiScanf only does 18 args.\n");
1161 for (i
=0; i
<nargs
; i
++) ptr
[i
]=va_arg(l
, char *);
1162 CmiLock(Cmi_scanf_mutex
);
1163 if (Cmi_charmrun_fd
!=-1)
1164 {/*Send charmrun the format string*/
1165 ctrl_sendone_locking("scanf", fmt
, strlen(fmt
)+1,NULL
,0);
1166 /*Wait for the reply (characters to scan) from charmrun*/
1167 LOCK_IF_AVAILABLE();
1168 ChMessage_recv(Cmi_charmrun_fd
,&replymsg
);
1169 i
= sscanf((char*)replymsg
.data
, fmt
,
1170 ptr
[ 0], ptr
[ 1], ptr
[ 2], ptr
[ 3], ptr
[ 4], ptr
[ 5],
1171 ptr
[ 6], ptr
[ 7], ptr
[ 8], ptr
[ 9], ptr
[10], ptr
[11],
1172 ptr
[12], ptr
[13], ptr
[14], ptr
[15], ptr
[16], ptr
[17]);
1173 ChMessage_free(&replymsg
);
1174 UNLOCK_IF_AVAILABLE();
1176 {/*Just do the scanf normally*/
1177 i
=scanf(fmt
, ptr
[ 0], ptr
[ 1], ptr
[ 2], ptr
[ 3], ptr
[ 4], ptr
[ 5],
1178 ptr
[ 6], ptr
[ 7], ptr
[ 8], ptr
[ 9], ptr
[10], ptr
[11],
1179 ptr
[12], ptr
[13], ptr
[14], ptr
[15], ptr
[16], ptr
[17]);
1181 CmiUnlock(Cmi_scanf_mutex
);
1184 #if CMK_CMIPRINTF_IS_A_BUILTIN
1186 /*New stdarg.h declarations*/
1187 void CmiPrintf(const char *fmt
, ...)
1189 extern int quietMode
;
1190 if (quietMode
) return;
1193 va_list p
; va_start(p
, fmt
);
1194 if (Cmi_charmrun_fd
!=-1 && _writeToStdout
)
1195 InternalPrintf(fmt
, p
);
1197 vfprintf(stdout
,fmt
,p
);
1203 void CmiError(const char *fmt
, ...)
1207 va_list p
; va_start (p
, fmt
);
1208 if (Cmi_charmrun_fd
!=-1)
1209 InternalError(fmt
, p
);
1211 vfprintf(stderr
,fmt
,p
);
1217 int CmiScanf(const char *fmt
, ...)
1222 va_list p
; va_start(p
, fmt
);
1223 i
= InternalScanf((char *)fmt
, p
);
1232 /***************************************************************************
1233 * Output redirection:
1234 * When people don't use CkPrintf, like above, we'd still like to be able
1235 * to collect their output. Thus we make a pipe and dup2 it to stdout,
1236 * which lets us read the characters sent to stdout at our lesiure.
1237 ***************************************************************************/
1239 /*Can read from stdout or stderr using these fd's*/
1240 static int readStdout
[2];
1241 static int writeStdout
[2]; /*The original stdout/stderr sockets*/
1242 static int serviceStdout
[2]; /*(bool) Normally zero; one if service needed.*/
1243 #define readStdoutBufLen (16*1024)
1244 static char readStdoutBuf
[readStdoutBufLen
+1]; /*Protected by comm. lock*/
1245 static int servicingStdout
;
1247 /*Initialization-- should only be called once per node*/
1248 static void CmiStdoutInit(void) {
1250 if (Cmi_charmrun_fd
==-1) return; /* standalone mode */
1252 /*There's some way to do this same thing in windows, but I don't know how*/
1253 #if !defined(_WIN32) || defined(__CYGWIN__)
1254 /*Prevent buffering in stdio library:*/
1255 setbuf(stdout
,NULL
); setbuf(stderr
,NULL
);
1257 /*Reopen stdout and stderr fd's as new pipes:*/
1260 int srcFd
=1+i
; /* 1 is stdout; 2 is stderr */
1262 /*First, save a copy of the original stdout*/
1263 writeStdout
[i
]=dup(srcFd
);
1265 /*Build a pipe to connect to stdout (4kb buffer, but no SIGIO...)*/
1266 if (-1==pipe(pair
)) {perror("building stdio redirection pipe"); exit(1);}
1268 /* UNIX socket (16kb default buffer, and works with SIGIO!) */
1269 if (-1==socketpair(PF_UNIX
,SOCK_STREAM
,0,pair
))
1270 {perror("building stdio redirection socketpair"); exit(1);}
1272 readStdout
[i
]=pair
[0]; /*We get the read end of pipe*/
1273 if (-1==dup2(pair
[1],srcFd
)) {perror("dup2 redirection pipe"); exit(1);}
1274 //if (-1==dup2(srcFd,pair[1])) {perror("dup2 redirection pipe"); exit(1);}
1276 #if 0 /*Keep writes from blocking. This just drops excess output, which is bad.*/
1277 CmiEnableNonblockingIO(srcFd
);
1279 //NOTSURE #if CMK_SHARED_VARS_UNAVAILABLE
1283 /*No communication thread-- get a SIGIO on each write(), which keeps the buffer clean*/
1284 //CmiEnableAsyncIO(readStdout[i]);
1285 CmiEnableAsyncIO(pair
[1]);
1290 /*Windows system-- just fake reads for now*/
1292 # define read(x,y,z) 0
1295 # define write(x,y,z)
1300 /*Sends data to original stdout (e.g., for ++debug or ++in-xterm)*/
1301 static void InternalWriteToTerminal(int isStdErr
,const char *str
,int len
)
1303 write(writeStdout
[isStdErr
],str
,len
);
1307 Service this particular stdout pipe.
1308 Must hold comm. lock.
1310 static void CmiStdoutServiceOne(int i
) {
1312 const static char *cmdName
[2]={"print","printerr"};
1315 const char *tooMuchWarn
=NULL
; int tooMuchLen
=0;
1316 if (!skt_select1(readStdout
[i
],0)) break; /*Nothing to read*/
1317 nBytes
=read(readStdout
[i
],readStdoutBuf
,readStdoutBufLen
);
1318 if (nBytes
<=0) break; /*Nothing to send*/
1320 /*Send these bytes off to charmrun*/
1321 readStdoutBuf
[nBytes
]=0; /*Zero-terminate read string*/
1322 nBytes
++; /*Include zero-terminator in message to charmrun*/
1324 if (nBytes
>=readStdoutBufLen
-100)
1325 { /*We must have filled up our output pipe-- most output libraries
1326 don't handle this well (e.g., glibc printf just drops the line).*/
1328 tooMuchWarn
="\nWARNING: Too much output at once-- possible output discontinuity!\n"
1329 "Use CkPrintf to avoid discontinuity (and this warning).\n\n";
1330 nBytes
--; /*Remove terminator from user's data*/
1331 tooMuchLen
=strlen(tooMuchWarn
)+1;
1333 ctrl_sendone_nolock(cmdName
[i
],readStdoutBuf
,nBytes
,
1334 tooMuchWarn
,tooMuchLen
);
1336 InternalWriteToTerminal(i
,readStdoutBuf
,nBytes
);
1339 serviceStdout
[i
]=0; /*This pipe is now serviced*/
1342 /*Service all stdout pipes, whether it looks like they need it
1343 or not. Used when you aren't sure if select() has been called recently.
1344 Must hold comm. lock.
1346 static void CmiStdoutServiceAll(void) {
1349 if (readStdout
[i
]==0) continue; /*Pipe not open*/
1350 CmiStdoutServiceOne(i
);
1354 /*Service any outstanding stdout pipes.
1355 Must hold comm. lock.
1357 static void CmiStdoutService(void) {
1358 CmiStdoutServiceAll();
1361 /*Add our pipes to the pile for select() or poll().
1362 Both can be called with or without the comm. lock.
1364 static void CmiStdoutAdd(CMK_PIPE_PARAM
) {
1367 if (readStdout
[i
]==0) continue; /*Pipe not open*/
1368 CMK_PIPE_ADDREAD(readStdout
[i
]);
1371 static void CmiStdoutCheck(CMK_PIPE_PARAM
) {
1374 if (readStdout
[i
]==0) continue; /*Pipe not open*/
1375 if (CMK_PIPE_CHECKREAD(readStdout
[i
])) serviceStdout
[i
]=1;
1378 static int CmiStdoutNeedsService(void) {
1379 return (serviceStdout
[0]!=0 || serviceStdout
[1]!=0);
1382 /*Called every few milliseconds to flush the stdout pipes*/
1383 static void CmiStdoutFlush(void) {
1384 if (servicingStdout
) return; /* might be called by SIGALRM */
1385 CmiCommLockOrElse( return; )
1386 LOCK_IF_AVAILABLE();
1387 CmiStdoutServiceAll();
1388 UNLOCK_IF_AVAILABLE();
1391 /***************************************************************************
1394 ***************************************************************************/
1396 #include "machine-dgram.c"
1399 /*****************************************************************************
1403 * These two functions fill the node-table.
1406 * This node, like all others, first sends its own address to charmrun
1407 * using this command:
1410 * Data: Big-endian 4-byte ints
1411 * <my-node #><Dataport>
1413 * When charmrun has all the addresses, he sends this table to me:
1416 * Data: Big-endian 4-byte ints
1417 * <number of nodes n>
1418 * <#PEs><IP><Dataport> Node 0
1419 * <#PEs><IP><Dataport> Node 1
1421 * <#PEs><IP><Dataport> Node n-1
1423 *****************************************************************************/
1426 void copyInfiAddr(ChInfiAddr
*qpList
);
1429 #if CMK_USE_IBVERBS && CMK_IBVERBS_FAST_START
1430 static void send_partial_init()
1432 ChMessageInt_t nodeNo
= ChMessageInt_new(Lrts_myNode
);
1433 ctrl_sendone_nolock("partinit",(const char *)&(nodeNo
),sizeof(nodeNo
),NULL
,0);
1438 /*Note: node_addresses_obtain is called before starting
1439 threads, so no locks are needed (or valid!)*/
1440 static void node_addresses_obtain(char **argv
)
1442 ChMessage nodetabmsg
; /* info about all nodes*/
1443 MACHSTATE(3,"node_addresses_obtain { ");
1444 if (Cmi_charmrun_fd
==-1)
1445 {/*Standalone-- fake a single-node nodetab message*/
1447 ChSingleNodeinfo
*fakeTab
;
1448 ChMessage_new("nodeinfo",sizeof(ChSingleNodeinfo
),&nodetabmsg
);
1449 fakeTab
=(ChSingleNodeinfo
*)(nodetabmsg
.data
);
1450 CmiGetArgIntDesc(argv
,"+p",&npes
,"Set the number of processes to create");
1451 //#if CMK_SHARED_VARS_UNAVAILABLE
1455 "To use multiple processors, you must run this program as:\n"
1456 " > charmrun +p%d %s <args>\n"
1457 "or build the %s-smp version of Charm++.\n",
1458 npes
,argv
[0],CMK_MACHINE_NAME
);
1462 /* standalone smp version reads ppn */
1463 if (CmiGetArgInt(argv
, "+ppn", &_Cmi_mynodesize
) ||
1464 CmiGetArgInt(argv
, "++ppn", &_Cmi_mynodesize
) )
1465 npes
= _Cmi_mynodesize
;
1467 /*This is a stupid hack: we expect the *number* of nodes
1468 followed by ChNodeinfo structs; so we use a ChSingleNodeinfo
1469 (which happens to have exactly that layout!) and stuff
1470 a 1 into the "node number" slot
1472 fakeTab
->nodeNo
=ChMessageInt_new(1); /* <- hack */
1473 fakeTab
->info
.nPE
=ChMessageInt_new(npes
);
1474 fakeTab
->info
.dataport
=ChMessageInt_new(0);
1475 fakeTab
->info
.IP
=_skt_invalid_ip
;
1478 { /*Contact charmrun for machine info.*/
1479 ChSingleNodeinfo me
;
1480 memset(&me
, 0, sizeof(me
));
1482 me
.nodeNo
=ChMessageInt_new(Lrts_myNode
);
1486 int qpListSize
= (Lrts_numNodes
-1)*sizeof(ChInfiAddr
);
1487 me
.info
.qpList
= malloc(qpListSize
);
1488 copyInfiAddr(me
.info
.qpList
);
1489 MACHSTATE1(3,"me.info.qpList created and copied size %d bytes",qpListSize
);
1490 ctrl_sendone_nolock("initnode",(const char *)&me
,sizeof(me
),(const char *)me
.info
.qpList
,qpListSize
);
1491 free(me
.info
.qpList
);
1494 /*The nPE fields are set by charmrun--
1495 these values don't matter.
1496 Set IP in case it is mpiexec mode where charmrun does not have IP yet
1498 me
.info
.nPE
=ChMessageInt_new(0);
1499 /* me.info.IP=_skt_invalid_ip; */
1500 me
.info
.IP
=skt_innode_my_ip();
1502 me
.info
.qp
.lid
=ChMessageInt_new(context
->localAddr
.lid
);
1503 me
.info
.qp
.qpn
=ChMessageInt_new(context
->localAddr
.qpn
);
1504 me
.info
.qp
.psn
=ChMessageInt_new(context
->localAddr
.psn
);
1505 MACHSTATE3(3,"IBUD Information lid=%i qpn=%i psn=%i\n",me
.info
.qp
.lid
,me
.info
.qp
.qpn
,me
.info
.qp
.psn
);
1507 me
.info
.dataport
=ChMessageInt_new(dataport
);
1509 /*Send our node info. to charmrun.
1510 CommLock hasn't been initialized yet--
1511 use non-locking version*/
1512 ctrl_sendone_nolock("initnode",(const char *)&me
,sizeof(me
),NULL
,0);
1513 MACHSTATE1(5,"send initnode - dataport:%d", dataport
);
1514 #endif //CMK_USE_IBVERBS
1516 MACHSTATE(3,"initnode sent");
1518 /*We get the other node addresses from a message sent
1519 back via the charmrun control port.*/
1520 if (!skt_select1(Cmi_charmrun_fd
,1200*1000)){
1521 CmiAbort("Timeout waiting for nodetab!\n");
1523 MACHSTATE(2,"recv initnode {");
1524 ChMessage_recv(Cmi_charmrun_fd
,&nodetabmsg
);
1525 MACHSTATE(2,"} recv initnode");
1527 ChMessageInt_t
*n32
= (ChMessageInt_t
*) nodetabmsg
.data
;
1528 ChNodeinfo
*d
= (ChNodeinfo
*) (n32
+1);
1529 _Cmi_myphysnode_numprocesses
= ChMessageInt(d
[Lrts_myNode
].nProcessesInPhysNode
);
1530 //#if CMK_USE_IBVERBS
1532 node_addresses_store(&nodetabmsg
);
1533 ChMessage_free(&nodetabmsg
);
1535 MACHSTATE(3,"} node_addresses_obtain ");
1539 /***********************************************************************
1540 * DeliverOutgoingMessage()
1542 * This function takes care of delivery of outgoing messages from the
1543 * sender end. Broadcast messages are divided into sets of messages that
1544 * are bound to the local node, and to remote nodes. For local
1545 * transmission, the messages are directly pushed into the recv
1546 * queues. For non-local transmission, the function DeliverViaNetwork()
1548 ***********************************************************************/
1549 int DeliverOutgoingMessage(OutgoingMsg ogm
)
1551 int i
, rank
, dst
; OtherNode node
;
1558 //printf("deliver outgoing message, dest: %d \n", dst);
1559 #if CMK_ERROR_CHECKING
1560 if (dst
<0 || dst
>=CmiNumPesGlobal())
1561 CmiAbort("Send to out-of-bounds processor!");
1563 node
= nodes_by_pe
[dst
];
1564 rank
= dst
- node
->nodestart
;
1565 if (node
->nodestart
!= Cmi_nodestartGlobal
) {
1566 #if !CMK_SMP_NOT_RELAX_LOCK
1569 DeliverViaNetwork(ogm
, node
, rank
, DGRAM_ROOTPE_MASK
, 0);
1570 GarbageCollectMsg(ogm
);
1571 #if !CMK_SMP_NOT_RELAX_LOCK
1582 * Set up an OutgoingMsg structure for this message.
1584 static OutgoingMsg
PrepareOutgoing(int pe
,int size
,int freemode
,char *data
) {
1586 MallocOutgoingMsg(ogm
);
1587 MACHSTATE2(2,"Preparing outgoing message for pe %d, size %d",pe
,size
);
1590 ogm
->src
= CmiMyPeGlobal();
1592 ogm
->freemode
= freemode
;
1594 return (CmiCommHandle
)ogm
;
1598 /******************************************************************************
1602 * Description: This is a generic message sending routine. All the
1603 * converse message send functions are implemented in terms of this
1604 * function. (By setting appropriate flags (eg freemode) that tell
1605 * CmiGeneralSend() how exactly to handle the particular case of
1608 *****************************************************************************/
1610 //CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
1611 CmiCommHandle
LrtsSendFunc(int destNode
, int pe
, int size
, char *data
, int freemode
)
1615 MACHSTATE(1,"CmiGeneralSend {");
1617 CMI_MSG_SIZE(data
)=size
;
1618 ogm
=PrepareOutgoing(pe
,size
,'F',data
);
1620 #if CMK_SMP_NOT_RELAX_LOCK
1625 sendonnetwork
= DeliverOutgoingMessage(ogm
);
1627 #if CMK_SMP_NOT_RELAX_LOCK
1632 // if (sendonnetwork!=0) /* only call server when we send msg on network in SMP */
1633 // CommunicationServerNet(0, COMM_SERVER_FROM_WORKER);
1636 MACHSTATE(1,"} LrtsSend");
1637 return (CmiCommHandle
)ogm
;
1641 /******************************************************************************
1643 * Comm Handle manipulation.
1645 *****************************************************************************/
1647 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1649 /*****************************************************************************
1651 * NET version List-Cast and Multicast Code
1653 ****************************************************************************/
1655 void LrtsSyncListSendFn(int npes
, int *pes
, int len
, char *msg
)
1658 for(i
=0;i
<npes
;i
++) {
1660 CmiSyncSendAndFree(pes
[i
], len
, msg
);
1664 CmiCommHandle
LrtsAsyncListSendFn(int npes
, int *pes
, int len
, char *msg
)
1666 CmiError("ListSend not implemented.");
1667 return (CmiCommHandle
) 0;
1671 because in all net versions, the message buffer after CmiSyncSendAndFree
1672 returns is not changed, we can use memory reference trick to avoid
1675 void LrtsFreeListSendFn(int npes
, int *pes
, int len
, char *msg
)
1678 for(i
=0;i
<npes
;i
++) {
1680 CmiSyncSendAndFree(pes
[i
], len
, msg
);
1688 void LrtsDrainResources() { }
1690 void LrtsPostNonLocal() { }
1692 /* Network progress function is used to poll the network when for
1693 messages. This flushes receive buffers on some implementations*/
1695 #if CMK_MACHINE_PROGRESS_DEFINED
1696 void CmiMachineProgressImpl(){
1697 CommunicationServerNet(0, COMM_SERVER_FROM_SMP
);
1701 void LrtsAdvanceCommunication(int whileidle
)
1704 CommunicationServerNet(0, COMM_SERVER_FROM_SMP
);
1706 CommunicationServerNet(0, COMM_SERVER_FROM_WORKER
);
1710 /******************************************************************************
1712 * Main code, Init, and Exit
1714 *****************************************************************************/
1716 #if CMK_BARRIER_USE_COMMON_CODE
1718 /* happen at node level */
1719 /* must be called on every PE including communication processors */
1722 int numnodes
= CmiNumNodesGlobal();
1723 static int barrier_phase
= 0;
1725 if (Cmi_charmrun_fd
== -1) return; // standalone
1726 if (numnodes
== 1) {
1730 ctrl_sendone_locking("barrier",NULL
,0,NULL
,0);
1731 while (barrierReceived
!= 1) {
1732 LOCK_IF_AVAILABLE();
1734 UNLOCK_IF_AVAILABLE();
1736 barrierReceived
= 0;
1741 int CmiBarrierZero()
1744 int numnodes
= CmiNumNodesGlobal();
1747 if (Cmi_charmrun_fd
== -1) return 0; // standalone
1748 if (numnodes
== 1) {
1749 CmiNodeAllBarrier();
1753 if (CmiMyRank() == 0) {
1755 sprintf(str
, "%d", CmiMyNodeGlobal());
1756 ctrl_sendone_locking("barrier0",str
,strlen(str
)+1,NULL
,0);
1757 if (CmiMyNodeGlobal() == 0) {
1758 while (barrierReceived
!= 2) {
1759 LOCK_IF_AVAILABLE();
1761 UNLOCK_IF_AVAILABLE();
1763 barrierReceived
= 0;
1767 CmiNodeAllBarrier();
1773 /******************************************************************************
1775 * Main code, Init, and Exit
1777 *****************************************************************************/
1779 void LrtsPreCommonInit(int everReturn
)
1782 #if !CMK_ASYNC_NOT_NEEDED
1785 CmiSignal(SIGIO
, 0, 0, CommunicationInterrupt
);
1787 if (dataskt
!=-1) CmiEnableAsyncIO(dataskt
);
1788 if (Cmi_charmrun_fd
!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd
);
1795 void LrtsPostCommonInit(int everReturn
)
1797 /* better to show the status here */
1798 if (CmiMyPe() == 0) {
1799 if (Cmi_netpoll
== 1) {
1800 CmiPrintf("Charm++> scheduler running in netpoll mode.\n");
1802 #if CMK_SHARED_VARS_UNAVAILABLE
1804 if (CmiMemoryIs(CMI_MEMORY_IS_OS
))
1805 CmiAbort("Charm++ Fatal Error: interrupt mode does not work with default system memory allocator. Run with +netpoll to disable the interrupt.");
1810 #if MEMORYUSAGE_OUTPUT
1811 memoryusage_counter
= 0;
1814 #if CMK_SHARED_VARS_UNAVAILABLE
1815 if (Cmi_netpoll
) /*Repeatedly call CommServer*/
1816 CcdCallOnConditionKeep(CcdPERIODIC
,
1817 (CcdVoidFn
) CommunicationPeriodic
, NULL
);
1818 else /*Only need this for retransmits*/
1819 CcdCallOnConditionKeep(CcdPERIODIC_10ms
,
1820 (CcdVoidFn
) CommunicationPeriodic
, NULL
);
1823 if (CmiMyRank()==0 && Cmi_charmrun_fd
!=-1) {
1824 CcdCallOnConditionKeep(CcdPERIODIC_10ms
, (CcdVoidFn
) CmiStdoutFlush
, NULL
);
1825 #if CMK_SHARED_VARS_UNAVAILABLE
1827 /* gm cannot live with setitimer */
1828 CcdCallFnAfter((CcdVoidFn
)pingCharmrunPeriodic
,NULL
,1000);
1831 /*Occasionally ping charmrun, to test if it's dead*/
1833 CmiSignal(SIGALRM
, 0, 0, pingCharmrun
);
1834 #if MEMORYUSAGE_OUTPUT
1835 i
.it_interval
.tv_sec
= 0;
1836 i
.it_interval
.tv_usec
= 1000000/MEMORYUSAGE_OUTPUT_FREQ
;
1837 i
.it_value
.tv_sec
= 0;
1838 i
.it_value
.tv_usec
= 1000000/MEMORYUSAGE_OUTPUT_FREQ
;
1840 i
.it_interval
.tv_sec
= 10;
1841 i
.it_interval
.tv_usec
= 0;
1842 i
.it_value
.tv_sec
= 10;
1843 i
.it_value
.tv_usec
= 0;
1845 setitimer(ITIMER_REAL
, &i
, NULL
);
1848 #if ! CMK_USE_IBVERBS
1849 /*Occasionally check for retransmissions, outgoing acks, etc.*/
1850 /*no need for GM case */
1851 CcdCallFnAfter((CcdVoidFn
)CommunicationsClockCaller
,NULL
,Cmi_comm_clock_delay
);
1855 /*Initialize the clock*/
1856 Cmi_clock
=GetClock();
1859 #ifdef IGET_FLOWCONTROL
1860 /* Call the function once to determine the amount of physical memory available */
1862 /* Call the function to periodically call the token adapt function */
1863 CcdCallFnAfter((CcdVoidFn
)TokenUpdatePeriodic
, NULL
, 2000); // magic number of 2000ms
1864 CcdCallOnConditionKeep(CcdPERIODIC_10s
, // magic number of PERIOD 10s
1865 (CcdVoidFn
) TokenUpdatePeriodic
, NULL
);
1868 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
1869 srand((int)(1024.0*CmiWallTimer()));
1871 CmiPrintf("Charm++: Machine layer will randomly corrupt every %d'th message (rand %d)\n",
1872 CMK_RANDOMLY_CORRUPT_MESSAGES
,rand());
1875 #ifdef __ONESIDED_IMPL
1876 #ifdef __ONESIDED_NO_HARDWARE
1877 putSrcHandler
= CmiRegisterHandler((CmiHandler
)handlePutSrc
);
1878 putDestHandler
= CmiRegisterHandler((CmiHandler
)handlePutDest
);
1879 getSrcHandler
= CmiRegisterHandler((CmiHandler
)handleGetSrc
);
1880 getDestHandler
= CmiRegisterHandler((CmiHandler
)handleGetDest
);
1889 machine_initiated_shutdown
=1;
1892 if (Cmi_charmrun_fd
==-1) {
1893 exit(0); /*Standalone version-- just leave*/
1895 Cmi_check_delay
= 1.0; /* speed up checking of charmrun */
1896 for(i
= 0; i
< CmiMyNodeSize(); i
++) {
1897 ctrl_sendone_locking("ending",NULL
,0,NULL
,0); /* this causes charmrun to go away, every PE needs to report */
1899 while(1) CommunicationServerNet(5, COMM_SERVER_FROM_SMP
);
1903 static void set_signals(void)
1905 if(!Cmi_truecrash
) {
1906 struct sigaction sa
;
1907 sa
.sa_handler
= KillOnAllSigs
;
1908 sigemptyset(&sa
.sa_mask
);
1909 sa
.sa_flags
= SA_RESTART
;
1911 sigaction(SIGSEGV
, &sa
, NULL
);
1912 sigaction(SIGFPE
, &sa
, NULL
);
1913 sigaction(SIGILL
, &sa
, NULL
);
1914 sigaction(SIGINT
, &sa
, NULL
);
1915 sigaction(SIGTERM
, &sa
, NULL
);
1916 sigaction(SIGABRT
, &sa
, NULL
);
1918 # if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1919 sigaction(SIGQUIT
, &sa
, NULL
);
1920 sigaction(SIGBUS
, &sa
, NULL
);
1921 # if CMK_HANDLE_SIGUSR
1922 sa
.sa_handler
= HandleUserSignals
;
1923 sigaction(SIGUSR1
, &sa
, NULL
);
1924 sigaction(SIGUSR2
, &sa
, NULL
);
1930 /*Socket idle function to use before addresses have been
1931 obtained. During the real program, we idle with CmiYield.
1933 static void obtain_idleFn(void) {sleep(0);}
1935 static int net_default_skt_abort(SOCKET skt
,int code
,const char *msg
)
1937 fprintf(stderr
,"Fatal socket error: code %d-- %s\n",code
,msg
);
1942 void LrtsInit(int *argc
, char ***argv
, int *numNodes
, int *myNodeID
)
1949 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
1955 if (CmiGetArgFlagDesc(*argv
,"+truecrash","Do not install signal handlers") ||
1956 CmiGetArgFlagDesc(*argv
,"++debug",NULL
/*meaning: don't show this*/)) Cmi_truecrash
= 1;
1957 /* netpoll disable signal */
1958 if (CmiGetArgFlagDesc(*argv
,"+netpoll","Do not use SIGIO--poll instead")) Cmi_netpoll
= 1;
1959 if (CmiGetArgFlagDesc(*argv
,"+netint","Use SIGIO")) Cmi_netpoll
= 0;
1960 /* idlepoll use poll instead if sleep when idle */
1961 if (CmiGetArgFlagDesc(*argv
,"+idlepoll","Do not sleep when idle")) Cmi_idlepoll
= 1;
1962 /* idlesleep use sleep instead if busywait when idle */
1963 if (CmiGetArgFlagDesc(*argv
,"+idlesleep","Make sleep calls when idle")) Cmi_idlepoll
= 0;
1964 Cmi_syncprint
= CmiGetArgFlagDesc(*argv
,"+syncprint", "Flush each CmiPrintf to the terminal");
1967 #if CMK_ASYNC_NOT_NEEDED
1970 if (CmiGetArgFlagDesc(*argv
,"+asyncio","Use async IO")) Cmi_asyncio
= 1;
1971 if (CmiGetArgFlagDesc(*argv
,"+asynciooff","Don not use async IO")) Cmi_asyncio
= 0;
1973 if (CmiGetArgFlagDesc(*argv
,"+commthread","Use communication thread")) {
1975 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1976 _Cmi_sleepOnIdle
= 1; /* worker thread go sleep */
1978 if (CmiMyPe() == 0) CmiPrintf("Charm++> communication thread is launched in multicore version. \n");
1983 /* use special abort handler instead of default_skt_abort to
1984 prevent exit trapped by atexit_check() due to the exit() call */
1985 skt_set_abort(net_default_skt_abort
);
1986 atexit(machine_atexit_check
);
1989 #if ! defined(_WIN32)
1990 /* only get forks in non-smp mode */
1993 extract_args(*argv
);
1995 Cmi_scanf_mutex
= CmiCreateLock();
1997 /* NOTE: can not acutally call timer before timerInit ! GZ */
1998 MACHSTATE2(5,"Init: (netpoll=%d), (idlepoll=%d)",Cmi_netpoll
,Cmi_idlepoll
);
2000 skt_set_idle(obtain_idleFn
);
2001 if (!skt_ip_match(Cmi_charmrun_IP
,_skt_invalid_ip
)) {
2003 dataskt
=skt_datagram(&dataport
, Cmi_os_buffer_size
);
2004 MACHSTATE2(5,"skt_connect at dataskt:%d Cmi_charmrun_port:%d",dataskt
, Cmi_charmrun_port
);
2005 Cmi_charmrun_fd
= skt_connect(Cmi_charmrun_IP
, Cmi_charmrun_port
, 1800);
2006 MACHSTATE2(5,"Opened connection to charmrun at socket %d, dataport=%d", Cmi_charmrun_fd
, dataport
);
2007 skt_tcp_no_nagle(Cmi_charmrun_fd
);
2009 } else {/*Standalone operation*/
2010 CmiPrintf("Charm++: standalone mode (not using charmrun)\n");
2015 CmiMachineInit(*argv
);
2017 node_addresses_obtain(*argv
);
2018 MACHSTATE(5,"node_addresses_obtain done");
2020 CmiCommunicationInit(*argv
);
2022 skt_set_idle(CmiYield
);
2023 Cmi_check_delay
= 1.0+0.25*Lrts_numNodes
;
2025 if (Cmi_charmrun_fd
==-1) /*Don't bother with check in standalone mode*/
2026 Cmi_check_delay
=1.0e30
;
2029 // Allocate a slot for the comm thread
2030 inProgress
= calloc(_Cmi_mynodesize
+1, sizeof(int));
2032 inProgress
= calloc(_Cmi_mynodesize
, sizeof(int));
2035 *numNodes
= Lrts_numNodes
;
2036 *myNodeID
= Lrts_myNode
;
2042 #include "spert_ppu.h"
2044 void machine_OffloadAPIProgress() {
2045 LOCK_IF_AVAILABLE();
2046 OffloadAPIProgress();
2047 UNLOCK_IF_AVAILABLE();
2051 void LrtsPrepareEnvelope(char *msg
, int size
)
2053 CMI_MSG_SIZE(msg
) = size
;