2 * @brief Common function reimplementation for SMP machines
7 * This version of converse is for multiple-processor workstations,
8 * and we assume that the OS provides threads to gain access to those
9 * multiple processors. This section contains an interface layer for
10 * the OS specific threads package. It contains routines to start
11 * the threads, routines to access their thread-specific state, and
12 * routines to control mutual exclusion between them.
14 * In addition, we wish to support nonthreaded operation. To do this,
15 * we provide a version of these functions that uses the main/only thread
16 * as a single PE, and simulates a communication thread using interrupts.
21 * Allocates one CmiState structure per PE. Initializes all of
22 * the CmiState structures using the function CmiStateInit.
23 * Starts processor threads 1..N (not 0, that's the one
24 * that calls CmiStartThreads), as well as the communication
25 * thread. Each processor thread (other than 0) must call ConverseInitPE
26 * followed by Cmi_startfn. The communication thread must be an infinite
27 * loop that calls the function CommunicationServer over and over.
31 * When called by a PE-thread, returns the processor-specific state
32 * structure for that PE.
36 * returns processor-specific state structure for the PE of rank n.
38 * CmiMemLock() and CmiMemUnlock()
40 * The memory module calls these functions to obtain mutual exclusion
41 * in the memory routines, and to keep interrupts from reentering malloc.
43 * CmiCommLock() and CmiCommUnlock()
45 * These functions lock a mutex that insures mutual exclusion in the
46 * communication routines.
48 * CmiMyPe() and CmiMyRank()
50 * The usual. Implemented here, since a highly-optimized version
51 * is possible in the nonthreaded case.
55 FIXME: There is horrible duplication of code (e.g. locking code)
56 both here and in converse.h. It could be much shorter. OSL 9/9/2000
58 *****************************************************************************/
77 CmiIdleLock_addMessage
78 CmiIdleLock_checkMessage
81 #include "machine-smp.h"
82 #include "sockRoutines.h"
84 void CmiStateInit(int pe
, int rank
, CmiState state
);
85 void CommunicationServerInit();
87 static struct CmiStateStruct Cmi_default_state
; /* State structure to return during startup */
89 /************************ Win32 kernel SMP threads **************/
91 #if CMK_SHARED_VARS_NT_THREADS
93 CmiNodeLock CmiMemLock_lock
;
94 #ifdef CMK_NO_ASM_AVAILABLE
95 CmiNodeLock cmiMemoryLock
;
97 static HANDLE comm_mutex
;
98 #define CmiCommLockOrElse(x) /*empty*/
99 #define CmiCommLock() (WaitForSingleObject(comm_mutex, INFINITE))
100 #define CmiCommUnlock() (ReleaseMutex(comm_mutex))
102 static DWORD Cmi_state_key
= 0xFFFFFFFF;
103 static CmiState Cmi_state_vector
= 0;
106 # define CmiGetState() ((CmiState)TlsGetValue(Cmi_state_key))
108 CmiState
CmiGetState()
111 result
= (CmiState
)TlsGetValue(Cmi_state_key
);
113 return &Cmi_default_state
;
114 /* PerrorExit("CmiGetState: TlsGetValue");*/
125 #define CmiGetStateN(n) (Cmi_state_vector+(n))
128 static DWORD WINAPI comm_thread(LPVOID dummy)
130 if (Cmi_charmrun_fd!=-1)
131 while (1) CommunicationServerThread(5);
135 static DWORD WINAPI call_startfn(LPVOID vindex)
137 int index = (int)vindex;
139 CmiState state = Cmi_state_vector + index;
140 if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc");
141 if(TlsSetValue(Cmi_state_key, (LPVOID)state) == 0) PerrorExit("TlsSetValue");
148 static DWORD WINAPI
call_startfn(LPVOID vindex
)
150 int index
= (int)vindex
;
152 CmiState state
= Cmi_state_vector
+ index
;
153 if(Cmi_state_key
== 0xFFFFFFFF) PerrorExit("TlsAlloc");
154 if(TlsSetValue(Cmi_state_key
, (LPVOID
)state
) == 0) PerrorExit("TlsSetValue");
158 if (index
<_Cmi_mynodesize
)
159 ConverseRunPE(0); /*Regular worker thread*/
160 else { /*Communication thread*/
161 CommunicationServerInit();
162 if (Cmi_charmrun_fd
!=-1)
163 while (1) CommunicationServerThread(5);
170 /*Classic sense-reversing barrier algorithm.
171 FIXME: This should be the barrier implementation for
174 static volatile HANDLE barrier_mutex
;
175 static volatile int barrier_wait
[2] = {0,0};
176 static volatile int barrier_which
= 0;
178 void CmiNodeBarrierCount(int nThreads
) {
182 while (WaitForSingleObject(barrier_mutex
, INFINITE
)!=WAIT_OBJECT_0
);
184 barrier_wait
[which
]++;
185 if (barrier_wait
[which
] == nThreads
) {
186 barrier_which
= !which
;
187 barrier_wait
[barrier_which
] = 0;/*Reset new counter*/
190 while (!ReleaseMutex(barrier_mutex
));
193 while(barrier_wait
[which
] != nThreads
)
194 sleep(0);/*<- could also just spin here*/
197 static void CmiStartThreads(char **argv
)
203 CmiMemLock_lock
=CmiCreateLock();
204 comm_mutex
= CmiCreateLock();
205 barrier_mutex
= CmiCreateLock();
206 #ifdef CMK_NO_ASM_AVAILABLE
207 cmiMemoryLock
= CmiCreateLock();
208 if (CmiMyNode()==0) printf("Charm++ warning> fences and atomic operations not available in native assembly\n");
211 Cmi_state_key
= TlsAlloc();
212 if(Cmi_state_key
== 0xFFFFFFFF) PerrorExit("TlsAlloc main");
215 (CmiState
)calloc(_Cmi_mynodesize
+1, sizeof(struct CmiStateStruct
));
217 for (i
=0; i
<_Cmi_mynodesize
; i
++)
218 CmiStateInit(i
+Cmi_nodestart
, i
, CmiGetStateN(i
));
219 /*Create a fake state structure for the comm. thread*/
220 /* CmiStateInit(-1,_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize)); */
221 CmiStateInit(_Cmi_mynode
+CmiNumPes(),_Cmi_mynodesize
,CmiGetStateN(_Cmi_mynodesize
));
223 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
225 tocreate
= _Cmi_mynodesize
-1;
228 tocreate
= _Cmi_mynodesize
;
229 for (i
=1; i
<=tocreate
; i
++) {
230 if((thr
= CreateThread(NULL
, 0, call_startfn
, (LPVOID
)i
, 0, &threadID
))
231 == NULL
) PerrorExit("CreateThread");
235 if(TlsSetValue(Cmi_state_key
, (LPVOID
)Cmi_state_vector
) == 0)
236 PerrorExit("TlsSetValue");
239 static void CmiDestroyLocks()
241 CloseHandle(comm_mutex
);
243 CloseHandle(CmiMemLock_lock
);
245 CloseHandle(barrier_mutex
);
246 #ifdef CMK_NO_ASM_AVAILABLE
247 CloseHandle(cmiMemoryLock
);
251 /***************** Pthreads kernel SMP threads ******************/
252 #elif CMK_SHARED_VARS_POSIX_THREADS_SMP
254 CmiNodeLock CmiMemLock_lock
;
255 #ifdef CMK_NO_ASM_AVAILABLE
256 CmiNodeLock cmiMemoryLock
;
258 int _Cmi_sleepOnIdle
=0;
259 int _Cmi_forceSpinOnIdle
=0;
261 extern void CharmScheduler();
263 #if CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD
264 static __thread
struct CmiStateStruct Cmi_mystate
;
265 static CmiState
*Cmi_state_vector
;
267 CmiState
CmiGetState() {
270 #define CmiGetStateN(n) Cmi_state_vector[n]
274 static pthread_key_t Cmi_state_key
=(pthread_key_t
)(-1);
275 static CmiState Cmi_state_vector
;
278 #define CmiGetState() ((CmiState)pthread_getspecific(Cmi_state_key))
280 CmiState
CmiGetState() {
282 if (Cmi_state_key
== (pthread_key_t
)(-1)) return &Cmi_default_state
;
283 ret
=(CmiState
)pthread_getspecific(Cmi_state_key
);
284 return (ret
==NULL
)? &Cmi_default_state
: ret
;
288 #define CmiGetStateN(n) (Cmi_state_vector+(n))
293 #if CMK_HAS_SPINLOCK && CMK_USE_SPINLOCK
294 CmiNodeLock
CmiCreateLock()
296 CmiNodeLock lk
= (CmiNodeLock
)malloc(sizeof(pthread_spinlock_t
));
298 pthread_spin_init(lk
, 0);
302 void CmiDestroyLock(CmiNodeLock lk
)
304 pthread_spin_destroy(lk
);
308 CmiNodeLock
CmiCreateLock()
310 CmiNodeLock lk
= (CmiNodeLock
)malloc(sizeof(pthread_mutex_t
));
312 pthread_mutex_init(lk
,(pthread_mutexattr_t
*)0);
316 void CmiDestroyLock(CmiNodeLock lk
)
318 pthread_mutex_destroy(lk
);
322 #endif //CMK_USE_LRTS
324 void CmiYield(void) { sched_yield(); }
327 pthread_cond_t barrier_cond
= PTHREAD_COND_INITIALIZER
;
328 pthread_mutex_t barrier_mutex
= PTHREAD_MUTEX_INITIALIZER
;
330 void CmiNodeBarrierCount(int nThreads
)
332 static unsigned int volatile level
= 0;
334 pthread_mutex_lock(&barrier_mutex
);
336 /* CmiPrintf("[%d] CmiNodeBarrierCount: %d of %d level:%d\n", CmiMyPe(), barrier, nThreads, level); */
338 if(barrier
!= nThreads
) {
339 /* occasionally it wakes up without having reach the count */
341 pthread_cond_wait(&barrier_cond
, &barrier_mutex
);
345 level
++; /* !level; */
346 pthread_cond_broadcast(&barrier_cond
);
348 pthread_mutex_unlock(&barrier_mutex
);
351 static CmiNodeLock comm_mutex
;
353 #define CmiCommLockOrElse(x) /*empty*/
356 /*Regular comm. lock*/
357 # define CmiCommLock() CmiLock(comm_mutex)
358 # define CmiCommUnlock() CmiUnlock(comm_mutex)
360 /*Verbose debugging comm. lock*/
361 static int comm_mutex_isLocked
=0;
362 void CmiCommLock(void) {
363 if (comm_mutex_isLocked
)
364 CmiAbort("CommLock: already locked!\n");
366 comm_mutex_isLocked
=1;
368 void CmiCommUnlock(void) {
369 if (!comm_mutex_isLocked
)
370 CmiAbort("CommUnlock: double unlock!\n");
371 comm_mutex_isLocked
=0;
372 CmiUnlock(comm_mutex
);
377 static void comm_thread(void)
379 while (1) CommunicationServer(5);
382 static void *call_startfn(void *vindex)
384 int index = (int)vindex;
385 CmiState state = Cmi_state_vector + index;
386 pthread_setspecific(Cmi_state_key, state);
392 static void *call_startfn(void *vindex
)
394 size_t index
= (size_t)vindex
;
395 #if CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD
396 if (index
<_Cmi_mynodesize
)
397 CmiStateInit(index
+Cmi_nodestart
, index
, &Cmi_mystate
);
399 CmiStateInit(_Cmi_mynode
+CmiNumPes(),_Cmi_mynodesize
,&Cmi_mystate
);
400 Cmi_state_vector
[index
] = &Cmi_mystate
;
402 CmiState state
= Cmi_state_vector
+ index
;
403 pthread_setspecific(Cmi_state_key
, state
);
408 if(CharmLibInterOperate
) {
411 StartInteropScheduler();
414 if (CmiMyRank() == CmiMyNodeSize()) {
415 while (1) { CommunicationServerThread(5); }
425 if (index
<_Cmi_mynodesize
)
426 ConverseRunPE(0); /*Regular worker thread*/
428 { /*Communication thread*/
429 CommunicationServerInit();
430 if (Cmi_charmrun_fd
!=-1)
431 while (1) CommunicationServer(5,COM_SERVER_FROM_SMP
);
437 static void CmiStartThreads(char **argv
)
445 MACHSTATE(4,"CmiStartThreads")
446 CmiMemLock_lock
=CmiCreateLock();
447 _smp_mutex
= CmiCreateLock();
448 #if defined(CMK_NO_ASM_AVAILABLE) && CMK_PCQUEUE_LOCK
449 cmiMemoryLock
= CmiCreateLock();
450 if (CmiMyNode()==0) printf("Charm++ warning> fences and atomic operations not available in native assembly\n");
453 #if ! (CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD)
454 pthread_key_create(&Cmi_state_key
, 0);
456 (CmiState
)calloc(_Cmi_mynodesize
+1, sizeof(struct CmiStateStruct
));
457 for (i
=0; i
<_Cmi_mynodesize
; i
++)
458 CmiStateInit(i
+Cmi_nodestart
, i
, CmiGetStateN(i
));
459 /*Create a fake state structure for the comm. thread*/
460 /* CmiStateInit(-1,_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize)); */
461 CmiStateInit(_Cmi_mynode
+CmiNumPes(),_Cmi_mynodesize
,CmiGetStateN(_Cmi_mynodesize
));
463 /* for main thread */
464 Cmi_state_vector
= (CmiState
*)calloc(_Cmi_mynodesize
+1, sizeof(CmiState
));
466 /* main thread is communication thread */
467 if(!CharmLibInterOperate
) {
468 CmiStateInit(_Cmi_mynode
+CmiNumPes(), _Cmi_mynodesize
, &Cmi_mystate
);
469 Cmi_state_vector
[_Cmi_mynodesize
] = &Cmi_mystate
;
473 /* main thread is of rank 0 */
474 CmiStateInit(Cmi_nodestart
, 0, &Cmi_mystate
);
475 Cmi_state_vector
[0] = &Cmi_mystate
;
479 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
481 tocreate
= _Cmi_mynodesize
-1;
484 tocreate
= _Cmi_mynodesize
;
486 if(!CharmLibInterOperate
) {
488 end
= tocreate
- 1; /* skip comm thread */
493 end
= tocreate
; /* skip rank 0 main thread */
495 for (i
=start
; i
<=end
; i
++) {
496 pthread_attr_init(&attr
);
497 pthread_attr_setscope(&attr
, PTHREAD_SCOPE_SYSTEM
);
498 ok
= pthread_create(&pid
, &attr
, call_startfn
, (void *)i
);
500 CmiPrintf("CmiStartThreads: %s(%d)\n", strerror(errno
), errno
);
501 PerrorExit("pthread_create");
503 pthread_attr_destroy(&attr
);
505 #if ! (CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD)
507 if(!CharmLibInterOperate
)
508 pthread_setspecific(Cmi_state_key
, Cmi_state_vector
+_Cmi_mynodesize
);
511 pthread_setspecific(Cmi_state_key
, Cmi_state_vector
);
514 MACHSTATE(4,"CmiStartThreads done")
517 static void CmiDestroyLocks()
519 CmiDestroyLock(comm_mutex
);
521 CmiDestroyLock(CmiMemLock_lock
);
523 pthread_mutex_destroy(&barrier_mutex
);
524 #ifdef CMK_NO_ASM_AVAILABLE
525 pthread_mutex_destroy(cmiMemoryLock
);
531 #if !CMK_SHARED_VARS_UNAVAILABLE
533 /* Wait for all worker threads */
534 void CmiNodeBarrier(void) {
535 CmiNodeBarrierCount(CmiMyNodeSize());
538 /* Wait for all worker threads as well as comm. thread */
539 /* unfortunately this could also be called in a seemingly non smp version
540 net-win32, which actually is implemented as smp with comm. thread */
541 void CmiNodeAllBarrier(void) {
542 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
544 CmiNodeBarrierCount(CmiMyNodeSize());
547 CmiNodeBarrierCount(CmiMyNodeSize()+1);
552 /***********************************************************
554 * In an SMP system, idle processors need to sleep on a
555 * lock so that if a message for them arrives, they can be
557 **********************************************************/
559 static int CmiIdleLock_hasMessage(CmiState cs
) {
560 return cs
->idle
.hasMessages
;
563 #if CMK_SHARED_VARS_NT_THREADS
565 static void CmiIdleLock_init(CmiIdleLock
*l
) {
568 l
->sem
=CreateSemaphore(NULL
,0,1, NULL
);
571 static void CmiIdleLock_sleep(CmiIdleLock
*l
,int msTimeout
) {
572 if (l
->hasMessages
) return;
574 MACHSTATE(4,"Processor going to sleep {")
575 WaitForSingleObject(l
->sem
,msTimeout
);
576 MACHSTATE(4,"} Processor awake again")
580 static void CmiIdleLock_addMessage(CmiIdleLock
*l
) {
582 if (l
->isSleeping
) { /*The PE is sleeping on this lock-- wake him*/
583 MACHSTATE(4,"Waking sleeping processor")
584 ReleaseSemaphore(l
->sem
,1,NULL
);
587 static void CmiIdleLock_checkMessage(CmiIdleLock
*l
) {
591 #elif CMK_SHARED_VARS_POSIX_THREADS_SMP
593 static void CmiIdleLock_init(CmiIdleLock
*l
) {
596 pthread_mutex_init(&l
->mutex
,NULL
);
597 pthread_cond_init(&l
->cond
,NULL
);
600 static void getTimespec(int msFromNow
,struct timespec
*dest
) {
603 /*Get the current time*/
604 gettimeofday(&cur
,NULL
);
605 dest
->tv_sec
=cur
.tv_sec
;
606 dest
->tv_nsec
=cur
.tv_usec
*1000;
607 /*Add in the wait time*/
608 secFromNow
=msFromNow
/1000;
609 msFromNow
-=secFromNow
*1000;
610 dest
->tv_sec
+=secFromNow
;
611 dest
->tv_nsec
+=1000*1000*msFromNow
;
612 /*Wrap around if we overflowed the nsec field*/
613 while (dest
->tv_nsec
>=1000000000ul) {
614 dest
->tv_nsec
-=1000000000ul;
619 static void CmiIdleLock_sleep(CmiIdleLock
*l
,int msTimeout
) {
620 struct timespec wakeup
;
622 if (l
->hasMessages
) return;
624 MACHSTATE(4,"Processor going to sleep {")
625 pthread_mutex_lock(&l
->mutex
);
626 getTimespec(msTimeout
,&wakeup
);
627 while (!l
->hasMessages
)
628 if (ETIMEDOUT
==pthread_cond_timedwait(&l
->cond
,&l
->mutex
,&wakeup
))
630 pthread_mutex_unlock(&l
->mutex
);
631 MACHSTATE(4,"} Processor awake again")
635 static void CmiIdleLock_wakeup(CmiIdleLock
*l
) {
637 MACHSTATE(4,"Waking sleeping processor")
638 /*The PE is sleeping on this condition variable-- wake him*/
639 pthread_mutex_lock(&l
->mutex
);
640 pthread_cond_signal(&l
->cond
);
641 pthread_mutex_unlock(&l
->mutex
);
644 static void CmiIdleLock_addMessage(CmiIdleLock
*l
) {
645 if (l
->isSleeping
) CmiIdleLock_wakeup(l
);
648 static void CmiIdleLock_checkMessage(CmiIdleLock
*l
) {
652 #define CmiIdleLock_sleep(x, y) /*empty*/
654 static void CmiIdleLock_init(CmiIdleLock
*l
) {
657 static void CmiIdleLock_addMessage(CmiIdleLock
*l
) {
660 static void CmiIdleLock_checkMessage(CmiIdleLock
*l
) {
665 void CmiStateInit(int pe
, int rank
, CmiState state
)
671 MACHSTATE(4,"StateInit")
674 if (rank
==CmiMyNodeSize()) return; /* Communications thread */
676 state
->recv
= CMIQueueCreate();
678 for(i
=0; i
<MULTIQ_GRPSIZE
; i
++) state
->recv
[i
]=CMIQueueCreate();
679 state
->myGrpIdx
= rank
% MULTIQ_GRPSIZE
;
680 state
->curPolledIdx
= 0;
682 state
->localqueue
= CdsFifo_Create();
683 CmiIdleLock_init(&state
->idle
);
686 void CmiNodeStateInit(CmiNodeState
*nodeState
)
688 MACHSTATE1(4,"NodeStateInit %p", nodeState
)
689 #if CMK_IMMEDIATE_MSG
690 nodeState
->immSendLock
= CmiCreateLock();
691 nodeState
->immRecvLock
= CmiCreateLock();
692 nodeState
->immQ
= CMIQueueCreate();
693 nodeState
->delayedImmQ
= CMIQueueCreate();
695 #if CMK_NODE_QUEUE_AVAILABLE
696 nodeState
->CmiNodeRecvLock
= CmiCreateLock();
697 nodeState
->NodeRecv
= CMIQueueCreate();
699 MACHSTATE(4,"NodeStateInit done")