2 * @brief Common function reimplementation for SMP machines
7 * This version of converse is for multiple-processor workstations,
8 * and we assume that the OS provides threads to gain access to those
9 * multiple processors. This section contains an interface layer for
10 * the OS specific threads package. It contains routines to start
11 * the threads, routines to access their thread-specific state, and
12 * routines to control mutual exclusion between them.
14 * In addition, we wish to support nonthreaded operation. To do this,
15 * we provide a version of these functions that uses the main/only thread
16 * as a single PE, and simulates a communication thread using interrupts.
21 * Allocates one CmiState structure per PE. Initializes all of
22 * the CmiState structures using the function CmiStateInit.
23 * Starts processor threads 1..N (not 0, that's the one
24 * that calls CmiStartThreads), as well as the communication
25 * thread. Each processor thread (other than 0) must call ConverseInitPE
26 * followed by Cmi_startfn. The communication thread must be an infinite
27 * loop that calls the function CommunicationServer over and over.
31 * When called by a PE-thread, returns the processor-specific state
32 * structure for that PE.
36 * returns processor-specific state structure for the PE of rank n.
38 * CmiMemLock() and CmiMemUnlock()
40 * The memory module calls these functions to obtain mutual exclusion
41 * in the memory routines, and to keep interrupts from reentering malloc.
43 * CmiCommLock() and CmiCommUnlock()
45 * These functions lock a mutex that insures mutual exclusion in the
46 * communication routines.
48 * CmiMyPe() and CmiMyRank()
50 * The usual. Implemented here, since a highly-optimized version
51 * is possible in the nonthreaded case.
55 FIXME: There is horrible duplication of code (e.g. locking code)
56 both here and in converse.h. It could be much shorter. OSL 9/9/2000
58 *****************************************************************************/
77 CmiIdleLock_addMessage
78 CmiIdleLock_checkMessage
81 #include "machine-smp.h"
82 #include "sockRoutines.h"
84 void CmiStateInit(int pe, int rank, CmiState state);
85 void CommunicationServerInit(void);
87 static struct CmiStateStruct Cmi_default_state; /* State structure to return during startup */
89 #define CMI_NUM_NODE_BARRIER_TYPES 2
90 #define CMI_NODE_BARRIER 0
91 #define CMI_NODE_ALL_BARRIER 1
93 /************************ Win32 kernel SMP threads **************/
95 #if CMK_SHARED_VARS_NT_THREADS
97 CmiNodeLock CmiMemLock_lock;
98 #ifdef CMK_NO_ASM_AVAILABLE
99 CmiNodeLock cmiMemoryLock;
101 static HANDLE comm_mutex;
102 #define CmiCommLockOrElse(x) /*empty*/
103 #define CmiCommLock() (WaitForSingleObject(comm_mutex, INFINITE))
104 #define CmiCommUnlock() (ReleaseMutex(comm_mutex))
106 static DWORD Cmi_state_key = 0xFFFFFFFF;
107 static CmiState Cmi_state_vector = 0;
110 # define CmiGetState() ((CmiState)TlsGetValue(Cmi_state_key))
112 CmiState CmiGetState(void)
115 result = (CmiState)TlsGetValue(Cmi_state_key);
117 return &Cmi_default_state;
118 /* PerrorExit("CmiGetState: TlsGetValue");*/
129 #define CmiGetStateN(n) (Cmi_state_vector+(n))
132 void CommunicationServerThread(int sleepTime);
135 static DWORD WINAPI comm_thread(LPVOID dummy)
137 if (Cmi_charmrun_fd!=-1)
138 while (1) CommunicationServerThread(5);
142 static DWORD WINAPI call_startfn(LPVOID vindex)
144 int index = (int)vindex;
146 CmiState state = Cmi_state_vector + index;
147 if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc");
148 if(TlsSetValue(Cmi_state_key, (LPVOID)state) == 0) PerrorExit("TlsSetValue");
155 static DWORD WINAPI call_startfn(LPVOID vindex)
157 int index = (int)(intptr_t)vindex;
159 CmiState state = Cmi_state_vector + index;
160 if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc");
161 if(TlsSetValue(Cmi_state_key, (LPVOID)state) == 0) PerrorExit("TlsSetValue");
165 if (index<_Cmi_mynodesize)
166 ConverseRunPE(0); /*Regular worker thread*/
167 else { /*Communication thread*/
168 CommunicationServerInit();
169 if (Cmi_charmrun_fd!=-1)
170 while (1) CommunicationServerThread(5);
178 * Double-sided barrier algorithm (threads wait to enter, and wait to exit the barrier)
179 * There are 2 different barriers: one for CmiNodeAllBarrier, and another for CmiNodeBarrier,
180 * determined by 'mode' parameter.
182 static volatile LONG entered_barrier_count[CMI_NUM_NODE_BARRIER_TYPES] = {0};
183 static volatile LONG exited_barrier_count[CMI_NUM_NODE_BARRIER_TYPES] = {0};
184 static HANDLE entrance_semaphore[CMI_NUM_NODE_BARRIER_TYPES];
185 static HANDLE exit_semaphore[CMI_NUM_NODE_BARRIER_TYPES];
187 // Adapted from https://adilevin.wordpress.com/category/multithreading/
188 // (Based on the reasoning behind the double-sided barrier, I'm not sure the exit_semaphore
189 // can be omitted from this implementation -Juan)
190 void CmiNodeBarrierCount(int nThreads, uint8_t mode)
193 if (InterlockedIncrement(&entered_barrier_count[mode]) < nThreads)
194 WaitForSingleObject(entrance_semaphore[mode], INFINITE);
196 exited_barrier_count[mode] = 0;
197 ReleaseSemaphore(entrance_semaphore[mode], nThreads-1, &prev);
199 if (InterlockedIncrement(&exited_barrier_count[mode]) < nThreads)
200 WaitForSingleObject(exit_semaphore[mode], INFINITE);
202 entered_barrier_count[mode] = 0;
203 ReleaseSemaphore(exit_semaphore[mode], nThreads-1, &prev);
207 static void CmiStartThreads(char **argv)
213 CmiMemLock_lock=CmiCreateLock();
214 comm_mutex = CmiCreateLock();
215 for (i=0; i < CMI_NUM_NODE_BARRIER_TYPES; i++) {
216 entrance_semaphore[i] = CreateSemaphore(NULL, 0, _Cmi_mynodesize+1, NULL);
217 exit_semaphore[i] = CreateSemaphore(NULL, 0, _Cmi_mynodesize+1, NULL);
219 #ifdef CMK_NO_ASM_AVAILABLE
220 cmiMemoryLock = CmiCreateLock();
221 if (CmiMyNode()==0) printf("Charm++ warning> fences and atomic operations not available in native assembly\n");
224 Cmi_state_key = TlsAlloc();
225 if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc main");
228 (CmiState)calloc(_Cmi_mynodesize+1, sizeof(struct CmiStateStruct));
230 for (i=0; i<_Cmi_mynodesize; i++)
231 CmiStateInit(i+Cmi_nodestart, i, CmiGetStateN(i));
232 /*Create a fake state structure for the comm. thread*/
233 /* CmiStateInit(-1,_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize)); */
234 CmiStateInit(_Cmi_mynode+CmiNumPes(),_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize));
236 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
238 tocreate = _Cmi_mynodesize-1;
241 tocreate = _Cmi_mynodesize;
242 for (i=1; i<=tocreate; i++) {
243 if((thr = CreateThread(NULL, 0, call_startfn, (LPVOID)(intptr_t)i, 0, &threadID))
244 == NULL) PerrorExit("CreateThread");
248 if(TlsSetValue(Cmi_state_key, (LPVOID)Cmi_state_vector) == 0)
249 PerrorExit("TlsSetValue");
252 static void CmiDestroyLocks(void)
254 CloseHandle(comm_mutex);
256 CloseHandle(CmiMemLock_lock);
258 for (int i=0; i < CMI_NUM_NODE_BARRIER_TYPES; i++) {
259 CloseHandle(entrance_semaphore[i]);
260 CloseHandle(exit_semaphore[i]);
262 #ifdef CMK_NO_ASM_AVAILABLE
263 CloseHandle(cmiMemoryLock);
267 /***************** Pthreads kernel SMP threads ******************/
268 #elif CMK_SHARED_VARS_POSIX_THREADS_SMP
270 CmiNodeLock CmiMemLock_lock;
271 #ifdef CMK_NO_ASM_AVAILABLE
272 CmiNodeLock cmiMemoryLock;
274 int _Cmi_sleepOnIdle=0;
275 int _Cmi_forceSpinOnIdle=0;
277 extern void CharmScheduler(void);
279 #if CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD
280 static CMK_THREADLOCAL struct CmiStateStruct Cmi_mystate;
281 static CmiState *Cmi_state_vector;
283 CmiState CmiGetState(void) {
286 #define CmiGetStateN(n) Cmi_state_vector[n]
290 static pthread_key_t Cmi_state_key=(pthread_key_t)(-1);
291 static CmiState Cmi_state_vector;
294 #define CmiGetState() ((CmiState)pthread_getspecific(Cmi_state_key))
296 CmiState CmiGetState(void) {
298 if (Cmi_state_key == (pthread_key_t)(-1)) return &Cmi_default_state;
299 ret=(CmiState)pthread_getspecific(Cmi_state_key);
300 return (ret==NULL)? &Cmi_default_state : ret;
304 #define CmiGetStateN(n) (Cmi_state_vector+(n))
309 #if CMK_HAS_SPINLOCK && CMK_USE_SPINLOCK
310 CmiNodeLock CmiCreateLock(void)
312 CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(pthread_spinlock_t));
314 pthread_spin_init(lk, 0);
318 void CmiDestroyLock(CmiNodeLock lk)
320 pthread_spin_destroy(lk);
324 CmiNodeLock CmiCreateLock(void)
326 CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(pthread_mutex_t));
328 pthread_mutex_init(lk,(pthread_mutexattr_t *)0);
332 void CmiDestroyLock(CmiNodeLock lk)
334 pthread_mutex_destroy(lk);
338 #endif //CMK_USE_LRTS
340 void CmiYield(void) { sched_yield(); }
343 pthread_cond_t barrier_cond = PTHREAD_COND_INITIALIZER;
344 pthread_mutex_t barrier_mutex = PTHREAD_MUTEX_INITIALIZER;
346 void CmiNodeBarrierCount(int nThreads, uint8_t mode)
348 static unsigned int volatile level = 0;
350 pthread_mutex_lock(&barrier_mutex);
352 /* CmiPrintf("[%d] CmiNodeBarrierCount: %d of %d level:%d\n", CmiMyPe(), barrier, nThreads, level); */
354 if(barrier != nThreads) {
355 /* occasionally it wakes up without having reach the count */
357 pthread_cond_wait(&barrier_cond, &barrier_mutex);
361 level++; /* !level; */
362 pthread_cond_broadcast(&barrier_cond);
364 pthread_mutex_unlock(&barrier_mutex);
367 static CmiNodeLock comm_mutex;
369 #define CmiCommLockOrElse(x) /*empty*/
372 /*Regular comm. lock*/
373 # define CmiCommLock() CmiLock(comm_mutex)
374 # define CmiCommUnlock() CmiUnlock(comm_mutex)
376 /*Verbose debugging comm. lock*/
377 static int comm_mutex_isLocked=0;
378 void CmiCommLock(void) {
379 if (comm_mutex_isLocked)
380 CmiAbort("CommLock: already locked!\n");
382 comm_mutex_isLocked=1;
384 void CmiCommUnlock(void) {
385 if (!comm_mutex_isLocked)
386 CmiAbort("CommUnlock: double unlock!\n");
387 comm_mutex_isLocked=0;
388 CmiUnlock(comm_mutex);
393 static void comm_thread(void)
395 while (1) CommunicationServer(5);
398 static void *call_startfn(void *vindex)
400 int index = (int)vindex;
401 CmiState state = Cmi_state_vector + index;
402 pthread_setspecific(Cmi_state_key, state);
409 void StartInteropScheduler(void);
411 void CommunicationServerThread(int sleepTime);
413 static void *call_startfn(void *vindex)
415 size_t index = (size_t)vindex;
416 #if CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD
417 if (index<_Cmi_mynodesize)
418 CmiStateInit(index+Cmi_nodestart, index, &Cmi_mystate);
420 CmiStateInit(_Cmi_mynode+CmiNumPes(),_Cmi_mynodesize,&Cmi_mystate);
421 Cmi_state_vector[index] = &Cmi_mystate;
423 CmiState state = Cmi_state_vector + index;
424 pthread_setspecific(Cmi_state_key, state);
429 if(CharmLibInterOperate) {
432 StartInteropScheduler();
435 if (CmiMyRank() == CmiMyNodeSize()) {
436 while (ckExitComplete.load() == 0) { CommunicationServerThread(5); }
447 if (index<_Cmi_mynodesize)
448 ConverseRunPE(0); /*Regular worker thread*/
450 { /*Communication thread*/
451 CommunicationServerInit();
452 if (Cmi_charmrun_fd!=-1)
453 while (1) CommunicationServer(5,COM_SERVER_FROM_SMP);
459 #if CMK_BLUEGENEQ && !CMK_USE_LRTS
460 /* pami/machine.C defines its own version of this: */
461 CMI_EXTERNC void PerrorExit(const char*);
464 #if CMK_CONVERSE_PAMI
465 // Array used by the 'rank 0' thread to wait for other threads using pthread_join
466 pthread_t *_Cmi_mypidlist;
469 static void CmiStartThreads(char **argv)
477 MACHSTATE(4,"CmiStartThreads")
478 CmiMemLock_lock=CmiCreateLock();
479 _smp_mutex = CmiCreateLock();
480 #if defined(CMK_NO_ASM_AVAILABLE) && CMK_PCQUEUE_LOCK
481 cmiMemoryLock = CmiCreateLock();
482 if (CmiMyNode()==0) printf("Charm++ warning> fences and atomic operations not available in native assembly\n");
485 #if ! (CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD)
486 pthread_key_create(&Cmi_state_key, 0);
488 (CmiState)calloc(_Cmi_mynodesize+1, sizeof(struct CmiStateStruct));
489 for (i=0; i<_Cmi_mynodesize; i++)
490 CmiStateInit(i+Cmi_nodestart, i, CmiGetStateN(i));
491 /*Create a fake state structure for the comm. thread*/
492 /* CmiStateInit(-1,_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize)); */
493 CmiStateInit(_Cmi_mynode+CmiNumPes(),_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize));
495 /* for main thread */
496 Cmi_state_vector = (CmiState *)calloc(_Cmi_mynodesize+1, sizeof(CmiState));
498 /* main thread is communication thread */
499 if(!CharmLibInterOperate) {
500 CmiStateInit(_Cmi_mynode+CmiNumPes(), _Cmi_mynodesize, &Cmi_mystate);
501 Cmi_state_vector[_Cmi_mynodesize] = &Cmi_mystate;
505 /* main thread is of rank 0 */
506 CmiStateInit(Cmi_nodestart, 0, &Cmi_mystate);
507 Cmi_state_vector[0] = &Cmi_mystate;
511 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
513 tocreate = _Cmi_mynodesize-1;
516 tocreate = _Cmi_mynodesize;
518 if(!CharmLibInterOperate) {
520 end = tocreate - 1; /* skip comm thread */
525 end = tocreate; /* skip rank 0 main thread */
528 #if CMK_CONVERSE_PAMI
529 // allocate space for the pids
530 _Cmi_mypidlist = (pthread_t *)malloc(sizeof(pthread_t)*(end - start +1));
534 for (i=start; i<=end; i++) {
535 pthread_attr_init(&attr);
536 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
537 ok = pthread_create(&pid, &attr, call_startfn, (void *)i);
539 CmiPrintf("CmiStartThreads: %s(%d)\n", strerror(errno), errno);
540 PerrorExit("pthread_create");
542 #if CMK_CONVERSE_PAMI
543 _Cmi_mypidlist[numThreads++] = pid; // store the pid in the array
545 pthread_attr_destroy(&attr);
547 #if ! (CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD)
549 if(!CharmLibInterOperate)
550 pthread_setspecific(Cmi_state_key, Cmi_state_vector+_Cmi_mynodesize);
553 pthread_setspecific(Cmi_state_key, Cmi_state_vector);
556 MACHSTATE(4,"CmiStartThreads done")
559 static void CmiDestroyLocks(void)
561 CmiDestroyLock(comm_mutex);
563 CmiDestroyLock(CmiMemLock_lock);
565 pthread_mutex_destroy(&barrier_mutex);
566 #ifdef CMK_NO_ASM_AVAILABLE
567 pthread_mutex_destroy(cmiMemoryLock);
573 #if !CMK_SHARED_VARS_UNAVAILABLE
575 /* Wait for all worker threads */
576 void CmiNodeBarrier(void) {
577 CmiNodeBarrierCount(CmiMyNodeSize(), CMI_NODE_BARRIER);
580 /* Wait for all worker threads as well as comm. thread */
581 /* unfortunately this could also be called in a seemingly non smp version
582 net-win32, which actually is implemented as smp with comm. thread */
583 void CmiNodeAllBarrier(void) {
584 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
586 CmiNodeBarrierCount(CmiMyNodeSize(), CMI_NODE_BARRIER);
589 CmiNodeBarrierCount(CmiMyNodeSize()+1, CMI_NODE_ALL_BARRIER);
594 /***********************************************************
596 * In an SMP system, idle processors need to sleep on a
597 * lock so that if a message for them arrives, they can be
599 **********************************************************/
601 static int CmiIdleLock_hasMessage(CmiState cs) {
602 return cs->idle.hasMessages;
605 #if CMK_SHARED_VARS_NT_THREADS
607 static void CmiIdleLock_init(CmiIdleLock *l) {
610 l->sem=CreateSemaphore(NULL,0,1, NULL);
613 static void CmiIdleLock_sleep(CmiIdleLock *l,int msTimeout) {
614 if (l->hasMessages) return;
616 MACHSTATE(4,"Processor going to sleep {")
617 WaitForSingleObject(l->sem,msTimeout);
618 MACHSTATE(4,"} Processor awake again")
622 static void CmiIdleLock_addMessage(CmiIdleLock *l) {
624 if (l->isSleeping) { /*The PE is sleeping on this lock-- wake him*/
625 MACHSTATE(4,"Waking sleeping processor")
626 ReleaseSemaphore(l->sem,1,NULL);
629 static void CmiIdleLock_checkMessage(CmiIdleLock *l) {
633 #elif CMK_SHARED_VARS_POSIX_THREADS_SMP
635 static void CmiIdleLock_init(CmiIdleLock *l) {
638 pthread_mutex_init(&l->mutex,NULL);
639 pthread_cond_init(&l->cond,NULL);
642 #include <sys/time.h>
644 static void getTimespec(int msFromNow,struct timespec *dest) {
647 /*Get the current time*/
648 gettimeofday(&cur,NULL);
649 dest->tv_sec=cur.tv_sec;
650 dest->tv_nsec=cur.tv_usec*1000;
651 /*Add in the wait time*/
652 secFromNow=msFromNow/1000;
653 msFromNow-=secFromNow*1000;
654 dest->tv_sec+=secFromNow;
655 dest->tv_nsec+=1000*1000*msFromNow;
656 /*Wrap around if we overflowed the nsec field*/
657 while (dest->tv_nsec>=1000000000ul) {
658 dest->tv_nsec-=1000000000ul;
663 static void CmiIdleLock_sleep(CmiIdleLock *l,int msTimeout) {
664 struct timespec wakeup;
666 if (l->hasMessages) return;
668 MACHSTATE(4,"Processor going to sleep {")
669 pthread_mutex_lock(&l->mutex);
670 getTimespec(msTimeout,&wakeup);
671 while (!l->hasMessages)
672 if (ETIMEDOUT==pthread_cond_timedwait(&l->cond,&l->mutex,&wakeup))
674 pthread_mutex_unlock(&l->mutex);
675 MACHSTATE(4,"} Processor awake again")
679 static void CmiIdleLock_wakeup(CmiIdleLock *l) {
681 MACHSTATE(4,"Waking sleeping processor")
682 /*The PE is sleeping on this condition variable-- wake him*/
683 pthread_mutex_lock(&l->mutex);
684 pthread_cond_signal(&l->cond);
685 pthread_mutex_unlock(&l->mutex);
688 static void CmiIdleLock_addMessage(CmiIdleLock *l) {
689 if (l->isSleeping) CmiIdleLock_wakeup(l);
692 static void CmiIdleLock_checkMessage(CmiIdleLock *l) {
696 #define CmiIdleLock_sleep(x, y) /*empty*/
698 static void CmiIdleLock_init(CmiIdleLock *l) {
701 static void CmiIdleLock_addMessage(CmiIdleLock *l) {
704 static void CmiIdleLock_checkMessage(CmiIdleLock *l) {
709 void CmiStateInit(int pe, int rank, CmiState state)
715 MACHSTATE(4,"StateInit")
718 if (rank==CmiMyNodeSize()) return; /* Communications thread */
720 state->recv = CMIQueueCreate();
722 for(i=0; i<MULTIQ_GRPSIZE; i++) state->recv[i]=CMIQueueCreate();
723 state->myGrpIdx = rank % MULTIQ_GRPSIZE;
724 state->curPolledIdx = 0;
726 state->localqueue = CdsFifo_Create();
727 CmiIdleLock_init(&state->idle);
730 void CmiNodeStateInit(CmiNodeState *nodeState)
732 MACHSTATE1(4,"NodeStateInit %p", nodeState)
733 #if CMK_IMMEDIATE_MSG
734 nodeState->immSendLock = CmiCreateLock();
735 nodeState->immRecvLock = CmiCreateLock();
736 nodeState->immQ = CMIQueueCreate();
737 nodeState->delayedImmQ = CMIQueueCreate();
739 #if CMK_NODE_QUEUE_AVAILABLE
740 nodeState->CmiNodeRecvLock = CmiCreateLock();
741 #if CMK_LOCKLESS_QUEUE
742 nodeState->NodeRecv = MPMCQueueCreate();
744 nodeState->NodeRecv = CMIQueueCreate();
747 MACHSTATE(4,"NodeStateInit done")