2 * @brief Common function reimplementation for SMP machines
7 * This version of converse is for multiple-processor workstations,
8 * and we assume that the OS provides threads to gain access to those
9 * multiple processors. This section contains an interface layer for
10 * the OS specific threads package. It contains routines to start
11 * the threads, routines to access their thread-specific state, and
12 * routines to control mutual exclusion between them.
14 * In addition, we wish to support nonthreaded operation. To do this,
15 * we provide a version of these functions that uses the main/only thread
16 * as a single PE, and simulates a communication thread using interrupts.
21 * Allocates one CmiState structure per PE. Initializes all of
22 * the CmiState structures using the function CmiStateInit.
23 * Starts processor threads 1..N (not 0, that's the one
24 * that calls CmiStartThreads), as well as the communication
25 * thread. Each processor thread (other than 0) must call ConverseInitPE
26 * followed by Cmi_startfn. The communication thread must be an infinite
27 * loop that calls the function CommunicationServer over and over.
31 * When called by a PE-thread, returns the processor-specific state
32 * structure for that PE.
36 * returns processor-specific state structure for the PE of rank n.
38 * CmiMemLock() and CmiMemUnlock()
40 * The memory module calls these functions to obtain mutual exclusion
41 * in the memory routines, and to keep interrupts from reentering malloc.
43 * CmiCommLock() and CmiCommUnlock()
45 * These functions lock a mutex that insures mutual exclusion in the
46 * communication routines.
48 * CmiMyPe() and CmiMyRank()
50 * The usual. Implemented here, since a highly-optimized version
51 * is possible in the nonthreaded case.
55 FIXME: There is horrible duplication of code (e.g. locking code)
56 both here and in converse.h. It could be much shorter. OSL 9/9/2000
58 *****************************************************************************/
77 CmiIdleLock_addMessage
78 CmiIdleLock_checkMessage
81 #include "machine-smp.h"
82 #include "sockRoutines.h"
84 void CmiStateInit(int pe, int rank, CmiState state);
85 void CommunicationServerInit(void);
87 static struct CmiStateStruct Cmi_default_state; /* State structure to return during startup */
89 #define CMI_NUM_NODE_BARRIER_TYPES 2
90 #define CMI_NODE_BARRIER 0
91 #define CMI_NODE_ALL_BARRIER 1
93 /************************ Win32 kernel SMP threads **************/
95 #if CMK_SHARED_VARS_NT_THREADS
97 CmiNodeLock CmiMemLock_lock;
98 #ifdef CMK_NO_ASM_AVAILABLE
99 CmiNodeLock cmiMemoryLock;
101 static CmiNodeLock comm_mutex;
102 #define CmiCommLockOrElse(x) /*empty*/
103 #define CmiCommLock() (CmiLock(comm_mutex))
104 #define CmiCommUnlock() (CmiUnlock(comm_mutex))
106 static DWORD Cmi_state_key = 0xFFFFFFFF;
107 static CmiState Cmi_state_vector = 0;
110 # define CmiGetState() ((CmiState)TlsGetValue(Cmi_state_key))
112 CmiState CmiGetState(void)
115 result = (CmiState)TlsGetValue(Cmi_state_key);
117 return &Cmi_default_state;
118 /* PerrorExit("CmiGetState: TlsGetValue");*/
129 #define CmiGetStateN(n) (Cmi_state_vector+(n))
131 void CommunicationServerThread(int sleepTime);
134 static DWORD WINAPI comm_thread(LPVOID dummy)
136 if (Cmi_charmrun_fd!=-1)
137 while (1) CommunicationServerThread(5);
141 static DWORD WINAPI call_startfn(LPVOID vindex)
143 int index = (int)vindex;
145 CmiState state = Cmi_state_vector + index;
146 if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc");
147 if(TlsSetValue(Cmi_state_key, (LPVOID)state) == 0) PerrorExit("TlsSetValue");
154 static DWORD WINAPI call_startfn(LPVOID vindex)
156 int index = (int)(intptr_t)vindex;
158 CmiState state = Cmi_state_vector + index;
159 if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc");
160 if(TlsSetValue(Cmi_state_key, (LPVOID)state) == 0) PerrorExit("TlsSetValue");
164 if (index<_Cmi_mynodesize)
165 ConverseRunPE(0); /*Regular worker thread*/
166 else { /*Communication thread*/
167 CommunicationServerInit();
168 if (Cmi_charmrun_fd!=-1)
169 while (1) CommunicationServerThread(5);
177 * Double-sided barrier algorithm (threads wait to enter, and wait to exit the barrier)
178 * There are 2 different barriers: one for CmiNodeAllBarrier, and another for CmiNodeBarrier,
179 * determined by 'mode' parameter.
181 static volatile LONG entered_barrier_count[CMI_NUM_NODE_BARRIER_TYPES] = {0};
182 static volatile LONG exited_barrier_count[CMI_NUM_NODE_BARRIER_TYPES] = {0};
183 static HANDLE entrance_semaphore[CMI_NUM_NODE_BARRIER_TYPES];
184 static HANDLE exit_semaphore[CMI_NUM_NODE_BARRIER_TYPES];
186 // Adapted from https://adilevin.wordpress.com/category/multithreading/
187 // (Based on the reasoning behind the double-sided barrier, I'm not sure the exit_semaphore
188 // can be omitted from this implementation -Juan)
189 void CmiNodeBarrierCount(int nThreads, uint8_t mode)
192 if (InterlockedIncrement(&entered_barrier_count[mode]) < nThreads)
193 WaitForSingleObject(entrance_semaphore[mode], INFINITE);
195 exited_barrier_count[mode] = 0;
196 ReleaseSemaphore(entrance_semaphore[mode], nThreads-1, &prev);
198 if (InterlockedIncrement(&exited_barrier_count[mode]) < nThreads)
199 WaitForSingleObject(exit_semaphore[mode], INFINITE);
201 entered_barrier_count[mode] = 0;
202 ReleaseSemaphore(exit_semaphore[mode], nThreads-1, &prev);
206 static void CmiStartThreads(char **argv)
212 CmiMemLock_lock=CmiCreateLock();
213 comm_mutex = CmiCreateLock();
214 for (i=0; i < CMI_NUM_NODE_BARRIER_TYPES; i++) {
215 entrance_semaphore[i] = CreateSemaphore(NULL, 0, _Cmi_mynodesize+1, NULL);
216 exit_semaphore[i] = CreateSemaphore(NULL, 0, _Cmi_mynodesize+1, NULL);
218 #ifdef CMK_NO_ASM_AVAILABLE
219 cmiMemoryLock = CmiCreateLock();
220 if (CmiMyNode()==0) printf("Charm++ warning> fences and atomic operations not available in native assembly\n");
223 Cmi_state_key = TlsAlloc();
224 if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc main");
227 (CmiState)calloc(_Cmi_mynodesize+1, sizeof(struct CmiStateStruct));
229 for (i=0; i<_Cmi_mynodesize; i++)
230 CmiStateInit(i+Cmi_nodestart, i, CmiGetStateN(i));
231 /*Create a fake state structure for the comm. thread*/
232 /* CmiStateInit(-1,_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize)); */
233 CmiStateInit(_Cmi_mynode+CmiNumPes(),_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize));
235 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
237 tocreate = _Cmi_mynodesize-1;
240 tocreate = _Cmi_mynodesize;
241 for (i=1; i<=tocreate; i++) {
242 if((thr = CreateThread(NULL, 0, call_startfn, (LPVOID)(intptr_t)i, 0, &threadID))
243 == NULL) PerrorExit("CreateThread");
247 if(TlsSetValue(Cmi_state_key, (LPVOID)Cmi_state_vector) == 0)
248 PerrorExit("TlsSetValue");
251 static void CmiDestroyLocks(void)
253 CmiDestroyLock(comm_mutex);
255 CmiDestroyLock(CmiMemLock_lock);
257 for (int i=0; i < CMI_NUM_NODE_BARRIER_TYPES; i++) {
258 CloseHandle(entrance_semaphore[i]);
259 CloseHandle(exit_semaphore[i]);
261 #ifdef CMK_NO_ASM_AVAILABLE
262 CmiDestroyLock(cmiMemoryLock);
266 /***************** Pthreads kernel SMP threads ******************/
267 #elif CMK_SHARED_VARS_POSIX_THREADS_SMP
269 CmiNodeLock CmiMemLock_lock;
270 #ifdef CMK_NO_ASM_AVAILABLE
271 CmiNodeLock cmiMemoryLock;
273 int _Cmi_sleepOnIdle=0;
274 int _Cmi_forceSpinOnIdle=0;
276 extern void CharmScheduler(void);
278 #if CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD
279 static CMK_THREADLOCAL struct CmiStateStruct Cmi_mystate;
280 static CmiState *Cmi_state_vector;
282 CmiState CmiGetState(void) {
285 #define CmiGetStateN(n) Cmi_state_vector[n]
289 static pthread_key_t Cmi_state_key=(pthread_key_t)(-1);
290 static CmiState Cmi_state_vector;
293 #define CmiGetState() ((CmiState)pthread_getspecific(Cmi_state_key))
295 CmiState CmiGetState(void) {
297 if (Cmi_state_key == (pthread_key_t)(-1)) return &Cmi_default_state;
298 ret=(CmiState)pthread_getspecific(Cmi_state_key);
299 return (ret==NULL)? &Cmi_default_state : ret;
303 #define CmiGetStateN(n) (Cmi_state_vector+(n))
308 #if CMK_HAS_SPINLOCK && CMK_USE_SPINLOCK
309 CmiNodeLock CmiCreateLock(void)
311 CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(pthread_spinlock_t));
313 pthread_spin_init(lk, 0);
317 void CmiDestroyLock(CmiNodeLock lk)
319 pthread_spin_destroy(lk);
323 CmiNodeLock CmiCreateLock(void)
325 CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(pthread_mutex_t));
327 pthread_mutex_init(lk,(pthread_mutexattr_t *)0);
331 void CmiDestroyLock(CmiNodeLock lk)
333 pthread_mutex_destroy(lk);
337 #endif //CMK_USE_LRTS
339 void CmiYield(void) { sched_yield(); }
342 pthread_cond_t barrier_cond = PTHREAD_COND_INITIALIZER;
343 pthread_mutex_t barrier_mutex = PTHREAD_MUTEX_INITIALIZER;
345 void CmiNodeBarrierCount(int nThreads, uint8_t mode)
347 static unsigned int volatile level = 0;
349 pthread_mutex_lock(&barrier_mutex);
351 /* CmiPrintf("[%d] CmiNodeBarrierCount: %d of %d level:%d\n", CmiMyPe(), barrier, nThreads, level); */
353 if(barrier != nThreads) {
354 /* occasionally it wakes up without having reach the count */
356 pthread_cond_wait(&barrier_cond, &barrier_mutex);
360 level++; /* !level; */
361 pthread_cond_broadcast(&barrier_cond);
363 pthread_mutex_unlock(&barrier_mutex);
366 static CmiNodeLock comm_mutex;
368 #define CmiCommLockOrElse(x) /*empty*/
371 /*Regular comm. lock*/
372 # define CmiCommLock() CmiLock(comm_mutex)
373 # define CmiCommUnlock() CmiUnlock(comm_mutex)
375 /*Verbose debugging comm. lock*/
376 static int comm_mutex_isLocked=0;
377 void CmiCommLock(void) {
378 if (comm_mutex_isLocked)
379 CmiAbort("CommLock: already locked!\n");
381 comm_mutex_isLocked=1;
383 void CmiCommUnlock(void) {
384 if (!comm_mutex_isLocked)
385 CmiAbort("CommUnlock: double unlock!\n");
386 comm_mutex_isLocked=0;
387 CmiUnlock(comm_mutex);
392 static void comm_thread(void)
394 while (1) CommunicationServer(5);
397 static void *call_startfn(void *vindex)
399 int index = (int)vindex;
400 CmiState state = Cmi_state_vector + index;
401 pthread_setspecific(Cmi_state_key, state);
407 void StartInteropScheduler(void);
408 void CommunicationServerThread(int sleepTime);
410 static void *call_startfn(void *vindex)
412 size_t index = (size_t)vindex;
413 #if CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD
414 if (index<_Cmi_mynodesize)
415 CmiStateInit(index+Cmi_nodestart, index, &Cmi_mystate);
417 CmiStateInit(_Cmi_mynode+CmiNumPes(),_Cmi_mynodesize,&Cmi_mystate);
418 Cmi_state_vector[index] = &Cmi_mystate;
420 CmiState state = Cmi_state_vector + index;
421 pthread_setspecific(Cmi_state_key, state);
426 if(CharmLibInterOperate) {
429 StartInteropScheduler();
432 if (CmiMyRank() == CmiMyNodeSize()) {
433 while (ckExitComplete.load() == 0) { CommunicationServerThread(5); }
444 if (index<_Cmi_mynodesize)
445 ConverseRunPE(0); /*Regular worker thread*/
447 { /*Communication thread*/
448 CommunicationServerInit();
449 if (Cmi_charmrun_fd!=-1)
450 while (1) CommunicationServer(5,COM_SERVER_FROM_SMP);
456 #if CMK_BLUEGENEQ && !CMK_USE_LRTS
457 /* pami/machine.C defines its own version of this: */
458 void PerrorExit(const char*);
461 #if CMK_CONVERSE_PAMI
462 // Array used by the 'rank 0' thread to wait for other threads using pthread_join
463 pthread_t *_Cmi_mypidlist;
466 static void CmiStartThreads(char **argv)
474 MACHSTATE(4,"CmiStartThreads")
475 CmiMemLock_lock=CmiCreateLock();
476 _smp_mutex = CmiCreateLock();
477 #if defined(CMK_NO_ASM_AVAILABLE) && CMK_PCQUEUE_LOCK
478 cmiMemoryLock = CmiCreateLock();
479 if (CmiMyNode()==0) printf("Charm++ warning> fences and atomic operations not available in native assembly\n");
482 #if ! (CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD)
483 pthread_key_create(&Cmi_state_key, 0);
485 (CmiState)calloc(_Cmi_mynodesize+1, sizeof(struct CmiStateStruct));
486 for (i=0; i<_Cmi_mynodesize; i++)
487 CmiStateInit(i+Cmi_nodestart, i, CmiGetStateN(i));
488 /*Create a fake state structure for the comm. thread*/
489 /* CmiStateInit(-1,_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize)); */
490 CmiStateInit(_Cmi_mynode+CmiNumPes(),_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize));
492 /* for main thread */
493 Cmi_state_vector = (CmiState *)calloc(_Cmi_mynodesize+1, sizeof(CmiState));
495 /* main thread is communication thread */
496 if(!CharmLibInterOperate) {
497 CmiStateInit(_Cmi_mynode+CmiNumPes(), _Cmi_mynodesize, &Cmi_mystate);
498 Cmi_state_vector[_Cmi_mynodesize] = &Cmi_mystate;
502 /* main thread is of rank 0 */
503 CmiStateInit(Cmi_nodestart, 0, &Cmi_mystate);
504 Cmi_state_vector[0] = &Cmi_mystate;
508 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
510 tocreate = _Cmi_mynodesize-1;
513 tocreate = _Cmi_mynodesize;
515 if(!CharmLibInterOperate) {
517 end = tocreate - 1; /* skip comm thread */
522 end = tocreate; /* skip rank 0 main thread */
525 #if CMK_CONVERSE_PAMI
526 // allocate space for the pids
527 _Cmi_mypidlist = (pthread_t *)malloc(sizeof(pthread_t)*(end - start +1));
531 for (i=start; i<=end; i++) {
532 pthread_attr_init(&attr);
533 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
534 ok = pthread_create(&pid, &attr, call_startfn, (void *)i);
536 CmiPrintf("CmiStartThreads: %s(%d)\n", strerror(errno), errno);
537 PerrorExit("pthread_create");
539 #if CMK_CONVERSE_PAMI
540 _Cmi_mypidlist[numThreads++] = pid; // store the pid in the array
542 pthread_attr_destroy(&attr);
544 #if ! (CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD)
546 if(!CharmLibInterOperate)
547 pthread_setspecific(Cmi_state_key, Cmi_state_vector+_Cmi_mynodesize);
550 pthread_setspecific(Cmi_state_key, Cmi_state_vector);
553 MACHSTATE(4,"CmiStartThreads done")
556 static void CmiDestroyLocks(void)
558 CmiDestroyLock(comm_mutex);
560 CmiDestroyLock(CmiMemLock_lock);
562 pthread_mutex_destroy(&barrier_mutex);
563 #ifdef CMK_NO_ASM_AVAILABLE
564 CmiDestroyLock(cmiMemoryLock);
570 #if !CMK_SHARED_VARS_UNAVAILABLE
572 /* Wait for all worker threads */
573 void CmiNodeBarrier(void) {
574 CmiNodeBarrierCount(CmiMyNodeSize(), CMI_NODE_BARRIER);
577 /* Wait for all worker threads as well as comm. thread */
578 /* unfortunately this could also be called in a seemingly non smp version
579 net-win32, which actually is implemented as smp with comm. thread */
580 void CmiNodeAllBarrier(void) {
581 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
583 CmiNodeBarrierCount(CmiMyNodeSize(), CMI_NODE_BARRIER);
586 CmiNodeBarrierCount(CmiMyNodeSize()+1, CMI_NODE_ALL_BARRIER);
591 /***********************************************************
593 * In an SMP system, idle processors need to sleep on a
594 * lock so that if a message for them arrives, they can be
596 **********************************************************/
598 static int CmiIdleLock_hasMessage(CmiState cs) {
599 return cs->idle.hasMessages;
602 #if CMK_SHARED_VARS_NT_THREADS
604 static void CmiIdleLock_init(CmiIdleLock *l) {
607 l->sem=CreateSemaphore(NULL,0,1, NULL);
610 static void CmiIdleLock_sleep(CmiIdleLock *l,int msTimeout) {
611 if (l->hasMessages) return;
613 MACHSTATE(4,"Processor going to sleep {")
614 WaitForSingleObject(l->sem,msTimeout);
615 MACHSTATE(4,"} Processor awake again")
619 static void CmiIdleLock_addMessage(CmiIdleLock *l) {
621 if (l->isSleeping) { /*The PE is sleeping on this lock-- wake him*/
622 MACHSTATE(4,"Waking sleeping processor")
623 ReleaseSemaphore(l->sem,1,NULL);
626 static void CmiIdleLock_checkMessage(CmiIdleLock *l) {
630 #elif CMK_SHARED_VARS_POSIX_THREADS_SMP
632 static void CmiIdleLock_init(CmiIdleLock *l) {
635 pthread_mutex_init(&l->mutex,NULL);
636 pthread_cond_init(&l->cond,NULL);
639 #include <sys/time.h>
641 static void getTimespec(int msFromNow,struct timespec *dest) {
644 /*Get the current time*/
645 gettimeofday(&cur,NULL);
646 dest->tv_sec=cur.tv_sec;
647 dest->tv_nsec=cur.tv_usec*1000;
648 /*Add in the wait time*/
649 secFromNow=msFromNow/1000;
650 msFromNow-=secFromNow*1000;
651 dest->tv_sec+=secFromNow;
652 dest->tv_nsec+=1000*1000*msFromNow;
653 /*Wrap around if we overflowed the nsec field*/
654 while (dest->tv_nsec>=1000000000ul) {
655 dest->tv_nsec-=1000000000ul;
660 static void CmiIdleLock_sleep(CmiIdleLock *l,int msTimeout) {
661 struct timespec wakeup;
663 if (l->hasMessages) return;
665 MACHSTATE(4,"Processor going to sleep {")
666 pthread_mutex_lock(&l->mutex);
667 getTimespec(msTimeout,&wakeup);
668 while (!l->hasMessages)
669 if (ETIMEDOUT==pthread_cond_timedwait(&l->cond,&l->mutex,&wakeup))
671 pthread_mutex_unlock(&l->mutex);
672 MACHSTATE(4,"} Processor awake again")
676 static void CmiIdleLock_wakeup(CmiIdleLock *l) {
678 MACHSTATE(4,"Waking sleeping processor")
679 /*The PE is sleeping on this condition variable-- wake him*/
680 pthread_mutex_lock(&l->mutex);
681 pthread_cond_signal(&l->cond);
682 pthread_mutex_unlock(&l->mutex);
685 static void CmiIdleLock_addMessage(CmiIdleLock *l) {
686 if (l->isSleeping) CmiIdleLock_wakeup(l);
689 static void CmiIdleLock_checkMessage(CmiIdleLock *l) {
693 #define CmiIdleLock_sleep(x, y) /*empty*/
695 static void CmiIdleLock_init(CmiIdleLock *l) {
698 static void CmiIdleLock_addMessage(CmiIdleLock *l) {
701 static void CmiIdleLock_checkMessage(CmiIdleLock *l) {
706 void CmiStateInit(int pe, int rank, CmiState state)
712 MACHSTATE(4,"StateInit")
715 if (rank==CmiMyNodeSize()) return; /* Communications thread */
717 state->recv = CMIQueueCreate();
719 for(i=0; i<MULTIQ_GRPSIZE; i++) state->recv[i]=CMIQueueCreate();
720 state->myGrpIdx = rank % MULTIQ_GRPSIZE;
721 state->curPolledIdx = 0;
723 state->localqueue = CdsFifo_Create();
724 CmiIdleLock_init(&state->idle);
727 void CmiNodeStateInit(CmiNodeState *nodeState)
729 MACHSTATE1(4,"NodeStateInit %p", nodeState)
730 #if CMK_IMMEDIATE_MSG
731 nodeState->immSendLock = CmiCreateLock();
732 nodeState->immRecvLock = CmiCreateLock();
733 nodeState->immQ = CMIQueueCreate();
734 nodeState->delayedImmQ = CMIQueueCreate();
736 #if CMK_NODE_QUEUE_AVAILABLE
737 nodeState->CmiNodeRecvLock = CmiCreateLock();
738 #if CMK_LOCKLESS_QUEUE
739 nodeState->NodeRecv = MPMCQueueCreate();
741 nodeState->NodeRecv = CMIQueueCreate();
744 MACHSTATE(4,"NodeStateInit done")