Cleanup some lock calls in machine-smp.c
[charm.git] / src / arch / util / machine-smp.C
blobc7edbcb3b694106f2c6ba0a3e09742c50dd22b57
1 /** @file
2  * @brief Common function reimplementation for SMP machines
3  * @ingroup Machine
4  *
5  * OS Threads
6  *
7  * This version of converse is for multiple-processor workstations,
8  * and we assume that the OS provides threads to gain access to those
9  * multiple processors.  This section contains an interface layer for
10  * the OS specific threads package.  It contains routines to start
11  * the threads, routines to access their thread-specific state, and
12  * routines to control mutual exclusion between them.
13  *
14  * In addition, we wish to support nonthreaded operation.  To do this,
15  * we provide a version of these functions that uses the main/only thread
16  * as a single PE, and simulates a communication thread using interrupts.
17  *
18  *
19  * CmiStartThreads()
20  *
21  *    Allocates one CmiState structure per PE.  Initializes all of
22  *    the CmiState structures using the function CmiStateInit.
23  *    Starts processor threads 1..N (not 0, that's the one
24  *    that calls CmiStartThreads), as well as the communication
25  *    thread.  Each processor thread (other than 0) must call ConverseInitPE
26  *    followed by Cmi_startfn.  The communication thread must be an infinite
27  *    loop that calls the function CommunicationServer over and over.
28  *
29  * CmiGetState()
30  *
31  *    When called by a PE-thread, returns the processor-specific state
32  *    structure for that PE.
33  *
34  * CmiGetStateN(int n)
35  *
36  *    returns processor-specific state structure for the PE of rank n.
37  *
38  * CmiMemLock() and CmiMemUnlock()
39  *
40  *    The memory module calls these functions to obtain mutual exclusion
41  *    in the memory routines, and to keep interrupts from reentering malloc.
42  *
43  * CmiCommLock() and CmiCommUnlock()
44  *
45  *    These functions lock a mutex that insures mutual exclusion in the
46  *    communication routines.
47  *
48  * CmiMyPe() and CmiMyRank()
49  *
50  *    The usual.  Implemented here, since a highly-optimized version
51  *    is possible in the nonthreaded case.
52  *
54   
55   FIXME: There is horrible duplication of code (e.g. locking code)
56    both here and in converse.h.  It could be much shorter.  OSL 9/9/2000
58  *****************************************************************************/
60 /**
61  * \addtogroup Machine
62  * @{
63  */
66 for SMP versions:
68 CmiStateInit
69 CmiNodeStateInit
70 CmiGetState
71 CmiGetStateN
72 CmiYield
73 CmiStartThreads
75 CmiIdleLock_init
76 CmiIdleLock_sleep
77 CmiIdleLock_addMessage
78 CmiIdleLock_checkMessage
81 #include "machine-smp.h"
82 #include "sockRoutines.h"
84 void CmiStateInit(int pe, int rank, CmiState state);
85 void CommunicationServerInit(void);
87 static struct CmiStateStruct Cmi_default_state; /* State structure to return during startup */
89 #define CMI_NUM_NODE_BARRIER_TYPES 2
90 #define CMI_NODE_BARRIER 0
91 #define CMI_NODE_ALL_BARRIER 1
93 /************************ Win32 kernel SMP threads **************/
95 #if CMK_SHARED_VARS_NT_THREADS
97 CmiNodeLock CmiMemLock_lock;
98 #ifdef CMK_NO_ASM_AVAILABLE
99 CmiNodeLock cmiMemoryLock;
100 #endif
101 static CmiNodeLock comm_mutex;
102 #define CmiCommLockOrElse(x) /*empty*/
103 #define CmiCommLock() (CmiLock(comm_mutex))
104 #define CmiCommUnlock() (CmiUnlock(comm_mutex))
106 static DWORD Cmi_state_key = 0xFFFFFFFF;
107 static CmiState     Cmi_state_vector = 0;
109 #if 0
110 #  define CmiGetState() ((CmiState)TlsGetValue(Cmi_state_key))
111 #else
112 CmiState CmiGetState(void)
114   CmiState result;
115   result = (CmiState)TlsGetValue(Cmi_state_key);
116   if(result == 0) {
117         return &Cmi_default_state;
118         /* PerrorExit("CmiGetState: TlsGetValue");*/
119   }
120   return result;
122 #endif
124 void CmiYield(void) 
126   Sleep(0);
129 #define CmiGetStateN(n) (Cmi_state_vector+(n))
131 void CommunicationServerThread(int sleepTime);
134 static DWORD WINAPI comm_thread(LPVOID dummy)
135 {  
136   if (Cmi_charmrun_fd!=-1)
137     while (1) CommunicationServerThread(5);
138   return 0;
141 static DWORD WINAPI call_startfn(LPVOID vindex)
143   int index = (int)vindex;
145   CmiState state = Cmi_state_vector + index;
146   if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc");
147   if(TlsSetValue(Cmi_state_key, (LPVOID)state) == 0) PerrorExit("TlsSetValue");
149   ConverseRunPE(0);
150   return 0;
154 static DWORD WINAPI call_startfn(LPVOID vindex)
156   int index = (int)(intptr_t)vindex;
158   CmiState state = Cmi_state_vector + index;
159   if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc");
160   if(TlsSetValue(Cmi_state_key, (LPVOID)state) == 0) PerrorExit("TlsSetValue");
162   ConverseRunPE(0);
163 #if 0
164   if (index<_Cmi_mynodesize)
165           ConverseRunPE(0); /*Regular worker thread*/
166   else { /*Communication thread*/
167           CommunicationServerInit();
168           if (Cmi_charmrun_fd!=-1)
169                   while (1) CommunicationServerThread(5);
170   } 
171 #endif
172   return 0;
177  * Double-sided barrier algorithm (threads wait to enter, and wait to exit the barrier)
178  * There are 2 different barriers: one for CmiNodeAllBarrier, and another for CmiNodeBarrier,
179  * determined by 'mode' parameter.
180  */
181 static volatile LONG entered_barrier_count[CMI_NUM_NODE_BARRIER_TYPES] = {0};
182 static volatile LONG exited_barrier_count[CMI_NUM_NODE_BARRIER_TYPES] = {0};
183 static HANDLE entrance_semaphore[CMI_NUM_NODE_BARRIER_TYPES];
184 static HANDLE exit_semaphore[CMI_NUM_NODE_BARRIER_TYPES];
186 // Adapted from https://adilevin.wordpress.com/category/multithreading/
187 // (Based on the reasoning behind the double-sided barrier, I'm not sure the exit_semaphore
188 // can be omitted from this implementation -Juan)
189 void CmiNodeBarrierCount(int nThreads, uint8_t mode)
191   LONG prev;
192   if (InterlockedIncrement(&entered_barrier_count[mode]) < nThreads)
193     WaitForSingleObject(entrance_semaphore[mode], INFINITE);
194   else {
195     exited_barrier_count[mode] = 0;
196     ReleaseSemaphore(entrance_semaphore[mode], nThreads-1, &prev);
197   }
198   if (InterlockedIncrement(&exited_barrier_count[mode]) < nThreads)
199     WaitForSingleObject(exit_semaphore[mode], INFINITE);
200   else {
201     entered_barrier_count[mode] = 0;
202     ReleaseSemaphore(exit_semaphore[mode], nThreads-1, &prev);
203   }
206 static void CmiStartThreads(char **argv)
208   int     i,tocreate;
209   DWORD   threadID;
210   HANDLE  thr;
212   CmiMemLock_lock=CmiCreateLock();
213   comm_mutex = CmiCreateLock();
214   for (i=0; i < CMI_NUM_NODE_BARRIER_TYPES; i++) {
215     entrance_semaphore[i] = CreateSemaphore(NULL, 0, _Cmi_mynodesize+1, NULL);
216     exit_semaphore[i] = CreateSemaphore(NULL, 0, _Cmi_mynodesize+1, NULL);
217   }
218 #ifdef CMK_NO_ASM_AVAILABLE
219   cmiMemoryLock = CmiCreateLock();
220   if (CmiMyNode()==0) printf("Charm++ warning> fences and atomic operations not available in native assembly\n");
221 #endif
223   Cmi_state_key = TlsAlloc();
224   if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc main");
225   
226   Cmi_state_vector =
227     (CmiState)calloc(_Cmi_mynodesize+1, sizeof(struct CmiStateStruct));
228   
229   for (i=0; i<_Cmi_mynodesize; i++)
230     CmiStateInit(i+Cmi_nodestart, i, CmiGetStateN(i));
231   /*Create a fake state structure for the comm. thread*/
232 /*  CmiStateInit(-1,_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize)); */
233   CmiStateInit(_Cmi_mynode+CmiNumPes(),_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize));
234   
235 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
236   if (!Cmi_commthread)
237     tocreate = _Cmi_mynodesize-1;
238   else
239 #endif
240   tocreate = _Cmi_mynodesize;
241   for (i=1; i<=tocreate; i++) {
242     if((thr = CreateThread(NULL, 0, call_startfn, (LPVOID)(intptr_t)i, 0, &threadID)) 
243        == NULL) PerrorExit("CreateThread");
244     CloseHandle(thr);
245   }
246   
247   if(TlsSetValue(Cmi_state_key, (LPVOID)Cmi_state_vector) == 0) 
248     PerrorExit("TlsSetValue");
251 static void CmiDestroyLocks(void)
253   CmiDestroyLock(comm_mutex);
254   comm_mutex = 0;
255   CmiDestroyLock(CmiMemLock_lock);
256   CmiMemLock_lock = 0;
257   for (int i=0; i < CMI_NUM_NODE_BARRIER_TYPES; i++) {
258     CloseHandle(entrance_semaphore[i]);
259     CloseHandle(exit_semaphore[i]);
260   }
261 #ifdef CMK_NO_ASM_AVAILABLE
262   CmiDestroyLock(cmiMemoryLock);
263 #endif
266 /***************** Pthreads kernel SMP threads ******************/
267 #elif CMK_SHARED_VARS_POSIX_THREADS_SMP
269 CmiNodeLock CmiMemLock_lock;
270 #ifdef CMK_NO_ASM_AVAILABLE
271 CmiNodeLock cmiMemoryLock;
272 #endif
273 int _Cmi_sleepOnIdle=0;
274 int _Cmi_forceSpinOnIdle=0;
275 extern int _cleanUp;
276 extern void CharmScheduler(void);
278 #if CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD
279 static CMK_THREADLOCAL struct CmiStateStruct     Cmi_mystate;
280 static CmiState     *Cmi_state_vector;
282 CmiState CmiGetState(void) {
283         return &Cmi_mystate;
285 #define CmiGetStateN(n) Cmi_state_vector[n]
287 #else
289 static pthread_key_t Cmi_state_key=(pthread_key_t)(-1);
290 static CmiState     Cmi_state_vector;
292 #if 0
293 #define CmiGetState() ((CmiState)pthread_getspecific(Cmi_state_key))
294 #else
295 CmiState CmiGetState(void) {
296         CmiState ret;
297         if (Cmi_state_key == (pthread_key_t)(-1)) return &Cmi_default_state;
298         ret=(CmiState)pthread_getspecific(Cmi_state_key);
299         return (ret==NULL)? &Cmi_default_state : ret;
302 #endif
303 #define CmiGetStateN(n) (Cmi_state_vector+(n))
304 #endif
307 #if !CMK_USE_LRTS
308 #if CMK_HAS_SPINLOCK && CMK_USE_SPINLOCK
309 CmiNodeLock CmiCreateLock(void)
311   CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(pthread_spinlock_t));
312   _MEMCHECK(lk);
313   pthread_spin_init(lk, 0);
314   return lk;
317 void CmiDestroyLock(CmiNodeLock lk)
319   pthread_spin_destroy(lk);
320   free((void*)lk);
322 #else
323 CmiNodeLock CmiCreateLock(void)
325   CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(pthread_mutex_t));
326   _MEMCHECK(lk);
327   pthread_mutex_init(lk,(pthread_mutexattr_t *)0);
328   return lk;
331 void CmiDestroyLock(CmiNodeLock lk)
333   pthread_mutex_destroy(lk);
334   free(lk);
336 #endif
337 #endif //CMK_USE_LRTS
339 void CmiYield(void) { sched_yield(); }
341 int barrier = 0;
342 pthread_cond_t barrier_cond = PTHREAD_COND_INITIALIZER;
343 pthread_mutex_t barrier_mutex = PTHREAD_MUTEX_INITIALIZER;
345 void CmiNodeBarrierCount(int nThreads, uint8_t mode)
347   static unsigned int volatile level = 0;
348   unsigned int cur;
349   pthread_mutex_lock(&barrier_mutex);
350   cur = level;
351   /* CmiPrintf("[%d] CmiNodeBarrierCount: %d of %d level:%d\n", CmiMyPe(), barrier, nThreads, level); */
352   barrier++;
353   if(barrier != nThreads) {
354       /* occasionally it wakes up without having reach the count */
355     while (cur == level)
356       pthread_cond_wait(&barrier_cond, &barrier_mutex);
357   }
358   else{
359     barrier = 0;
360     level++;  /* !level;  */
361     pthread_cond_broadcast(&barrier_cond);
362   }
363   pthread_mutex_unlock(&barrier_mutex);
366 static CmiNodeLock comm_mutex;
368 #define CmiCommLockOrElse(x) /*empty*/
370 #if 1
371 /*Regular comm. lock*/
372 #  define CmiCommLock() CmiLock(comm_mutex)
373 #  define CmiCommUnlock() CmiUnlock(comm_mutex)
374 #else
375 /*Verbose debugging comm. lock*/
376 static int comm_mutex_isLocked=0;
377 void CmiCommLock(void) {
378         if (comm_mutex_isLocked) 
379                 CmiAbort("CommLock: already locked!\n");
380         CmiLock(comm_mutex);
381         comm_mutex_isLocked=1;
383 void CmiCommUnlock(void) {
384         if (!comm_mutex_isLocked)
385                 CmiAbort("CommUnlock: double unlock!\n");
386         comm_mutex_isLocked=0;
387         CmiUnlock(comm_mutex);
389 #endif
392 static void comm_thread(void)
394   while (1) CommunicationServer(5);
397 static void *call_startfn(void *vindex)
399   int index = (int)vindex;
400   CmiState state = Cmi_state_vector + index;
401   pthread_setspecific(Cmi_state_key, state);
402   ConverseRunPE(0);
403   return 0;
407 void StartInteropScheduler(void);
408 void CommunicationServerThread(int sleepTime);
410 static void *call_startfn(void *vindex)
412   size_t index = (size_t)vindex;
413 #if CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD
414   if (index<_Cmi_mynodesize) 
415     CmiStateInit(index+Cmi_nodestart, index, &Cmi_mystate);
416   else
417     CmiStateInit(_Cmi_mynode+CmiNumPes(),_Cmi_mynodesize,&Cmi_mystate);
418   Cmi_state_vector[index] = &Cmi_mystate;
419 #else
420   CmiState state = Cmi_state_vector + index;
421   pthread_setspecific(Cmi_state_key, state);
422 #endif
424   ConverseRunPE(0);
426   if(CharmLibInterOperate) {
427     while(1) {
428       if(!_cleanUp) {
429         StartInteropScheduler();
430         CmiNodeAllBarrier();
431       } else {
432         if (CmiMyRank() == CmiMyNodeSize()) {
433           while (ckExitComplete.load() == 0) { CommunicationServerThread(5); }
434         } else { 
435           CsdScheduler(-1);
436           CmiNodeAllBarrier();
437         }
438         break;
439       }
440     }
441   }
443 #if 0
444   if (index<_Cmi_mynodesize) 
445           ConverseRunPE(0); /*Regular worker thread*/
446   else 
447   { /*Communication thread*/
448           CommunicationServerInit();
449           if (Cmi_charmrun_fd!=-1)
450                   while (1) CommunicationServer(5,COM_SERVER_FROM_SMP);
451   }
452 #endif  
453   return 0;
456 #if CMK_BLUEGENEQ && !CMK_USE_LRTS
457 /* pami/machine.C defines its own version of this: */
458 void PerrorExit(const char*);
459 #endif
461 #if CMK_CONVERSE_PAMI
462 // Array used by the 'rank 0' thread to wait for other threads using pthread_join
463 pthread_t *_Cmi_mypidlist;
464 #endif
466 static void CmiStartThreads(char **argv)
468   pthread_t pid;
469   size_t i;
470   int ok, tocreate;
471   pthread_attr_t attr;
472   int start, end;
474   MACHSTATE(4,"CmiStartThreads")
475   CmiMemLock_lock=CmiCreateLock();
476   _smp_mutex = CmiCreateLock();
477 #if defined(CMK_NO_ASM_AVAILABLE) && CMK_PCQUEUE_LOCK
478   cmiMemoryLock = CmiCreateLock();
479   if (CmiMyNode()==0) printf("Charm++ warning> fences and atomic operations not available in native assembly\n");
480 #endif
482 #if ! (CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD)
483   pthread_key_create(&Cmi_state_key, 0);
484   Cmi_state_vector =
485     (CmiState)calloc(_Cmi_mynodesize+1, sizeof(struct CmiStateStruct));
486   for (i=0; i<_Cmi_mynodesize; i++)
487     CmiStateInit(i+Cmi_nodestart, i, CmiGetStateN(i));
488   /*Create a fake state structure for the comm. thread*/
489 /*  CmiStateInit(-1,_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize)); */
490   CmiStateInit(_Cmi_mynode+CmiNumPes(),_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize));
491 #else
492     /* for main thread */
493   Cmi_state_vector = (CmiState *)calloc(_Cmi_mynodesize+1, sizeof(CmiState));
494 #if CMK_CONVERSE_MPI
495       /* main thread is communication thread */
496   if(!CharmLibInterOperate) {
497     CmiStateInit(_Cmi_mynode+CmiNumPes(), _Cmi_mynodesize, &Cmi_mystate);
498     Cmi_state_vector[_Cmi_mynodesize] = &Cmi_mystate;
499   } else 
500 #endif
501   {
502     /* main thread is of rank 0 */
503     CmiStateInit(Cmi_nodestart, 0, &Cmi_mystate);
504     Cmi_state_vector[0] = &Cmi_mystate;
505   }
506 #endif
508 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
509   if (!Cmi_commthread)
510     tocreate = _Cmi_mynodesize-1;
511   else
512 #endif
513   tocreate = _Cmi_mynodesize;
514 #if CMK_CONVERSE_MPI
515   if(!CharmLibInterOperate) {
516     start = 0;
517     end = tocreate - 1;                    /* skip comm thread */
518   } else 
519 #endif
520   {
521     start = 1;
522     end = tocreate;                       /* skip rank 0 main thread */
523   }
525 #if CMK_CONVERSE_PAMI
526   // allocate space for the pids
527   _Cmi_mypidlist = (pthread_t *)malloc(sizeof(pthread_t)*(end - start +1));
528   int numThreads = 0;
529 #endif
531   for (i=start; i<=end; i++) {        
532     pthread_attr_init(&attr);
533     pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
534     ok = pthread_create(&pid, &attr, call_startfn, (void *)i);
535     if (ok!=0){
536       CmiPrintf("CmiStartThreads: %s(%d)\n", strerror(errno), errno);
537       PerrorExit("pthread_create");
538     }
539 #if CMK_CONVERSE_PAMI
540     _Cmi_mypidlist[numThreads++] = pid; // store the pid in the array
541 #endif
542     pthread_attr_destroy(&attr);
543   }
544 #if ! (CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD)
545 #if CMK_CONVERSE_MPI
546   if(!CharmLibInterOperate)
547     pthread_setspecific(Cmi_state_key, Cmi_state_vector+_Cmi_mynodesize);
548   else 
549 #endif
550     pthread_setspecific(Cmi_state_key, Cmi_state_vector);
551 #endif
553   MACHSTATE(4,"CmiStartThreads done")
556 static void CmiDestroyLocks(void)
558   CmiDestroyLock(comm_mutex);
559   comm_mutex = 0;
560   CmiDestroyLock(CmiMemLock_lock);
561   CmiMemLock_lock = 0;
562   pthread_mutex_destroy(&barrier_mutex);
563 #ifdef CMK_NO_ASM_AVAILABLE
564   CmiDestroyLock(cmiMemoryLock);
565 #endif
568 #endif
570 #if !CMK_SHARED_VARS_UNAVAILABLE
572 /* Wait for all worker threads */
573 void  CmiNodeBarrier(void) {
574   CmiNodeBarrierCount(CmiMyNodeSize(), CMI_NODE_BARRIER);
577 /* Wait for all worker threads as well as comm. thread */
578 /* unfortunately this could also be called in a seemingly non smp version
579    net-win32, which actually is implemented as smp with comm. thread */
580 void CmiNodeAllBarrier(void) {
581 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
582   if (!Cmi_commthread)
583   CmiNodeBarrierCount(CmiMyNodeSize(), CMI_NODE_BARRIER);
584   else
585 #endif
586   CmiNodeBarrierCount(CmiMyNodeSize()+1, CMI_NODE_ALL_BARRIER);
589 #endif
591 /***********************************************************
592  * SMP Idle Locking
593  *   In an SMP system, idle processors need to sleep on a
594  * lock so that if a message for them arrives, they can be
595  * woken up.
596  **********************************************************/
598 static int CmiIdleLock_hasMessage(CmiState cs) {
599   return cs->idle.hasMessages;
602 #if CMK_SHARED_VARS_NT_THREADS
604 static void CmiIdleLock_init(CmiIdleLock *l) {
605   l->hasMessages=0;
606   l->isSleeping=0;
607   l->sem=CreateSemaphore(NULL,0,1, NULL);
610 static void CmiIdleLock_sleep(CmiIdleLock *l,int msTimeout) {
611   if (l->hasMessages) return;
612   l->isSleeping=1;
613   MACHSTATE(4,"Processor going to sleep {")
614   WaitForSingleObject(l->sem,msTimeout);
615   MACHSTATE(4,"} Processor awake again")
616   l->isSleeping=0;
619 static void CmiIdleLock_addMessage(CmiIdleLock *l) {
620   l->hasMessages=1;
621   if (l->isSleeping) { /*The PE is sleeping on this lock-- wake him*/  
622     MACHSTATE(4,"Waking sleeping processor")
623     ReleaseSemaphore(l->sem,1,NULL);
624   }
626 static void CmiIdleLock_checkMessage(CmiIdleLock *l) {
627   l->hasMessages=0;
630 #elif CMK_SHARED_VARS_POSIX_THREADS_SMP
632 static void CmiIdleLock_init(CmiIdleLock *l) {
633   l->hasMessages=0;
634   l->isSleeping=0;
635   pthread_mutex_init(&l->mutex,NULL);
636   pthread_cond_init(&l->cond,NULL);
639 #include <sys/time.h>
641 static void getTimespec(int msFromNow,struct timespec *dest) {
642   struct timeval cur;
643   int secFromNow;
644   /*Get the current time*/
645   gettimeofday(&cur,NULL);
646   dest->tv_sec=cur.tv_sec;
647   dest->tv_nsec=cur.tv_usec*1000;
648   /*Add in the wait time*/
649   secFromNow=msFromNow/1000;
650   msFromNow-=secFromNow*1000;
651   dest->tv_sec+=secFromNow;
652   dest->tv_nsec+=1000*1000*msFromNow;
653   /*Wrap around if we overflowed the nsec field*/
654   while (dest->tv_nsec>=1000000000ul) {
655     dest->tv_nsec-=1000000000ul;
656     dest->tv_sec++;
657   }
660 static void CmiIdleLock_sleep(CmiIdleLock *l,int msTimeout) {
661   struct timespec wakeup;
663   if (l->hasMessages) return;
664   l->isSleeping=1;
665   MACHSTATE(4,"Processor going to sleep {")
666   pthread_mutex_lock(&l->mutex);
667   getTimespec(msTimeout,&wakeup);
668   while (!l->hasMessages)
669     if (ETIMEDOUT==pthread_cond_timedwait(&l->cond,&l->mutex,&wakeup))
670       break;
671   pthread_mutex_unlock(&l->mutex);
672   MACHSTATE(4,"} Processor awake again")
673   l->isSleeping=0;
676 static void CmiIdleLock_wakeup(CmiIdleLock *l) {
677   l->hasMessages=1; 
678   MACHSTATE(4,"Waking sleeping processor")
679   /*The PE is sleeping on this condition variable-- wake him*/
680   pthread_mutex_lock(&l->mutex);
681   pthread_cond_signal(&l->cond);
682   pthread_mutex_unlock(&l->mutex);
685 static void CmiIdleLock_addMessage(CmiIdleLock *l) {
686   if (l->isSleeping) CmiIdleLock_wakeup(l);
687   l->hasMessages=1;
689 static void CmiIdleLock_checkMessage(CmiIdleLock *l) {
690   l->hasMessages=0;
692 #else
693 #define CmiIdleLock_sleep(x, y) /*empty*/
695 static void CmiIdleLock_init(CmiIdleLock *l) {
696   l->hasMessages=0;
698 static void CmiIdleLock_addMessage(CmiIdleLock *l) {
699   l->hasMessages=1;
701 static void CmiIdleLock_checkMessage(CmiIdleLock *l) {
702   l->hasMessages=0;
704 #endif
706 void CmiStateInit(int pe, int rank, CmiState state)
708 #if CMK_SMP_MULTIQ
709   int i;
710 #endif
712   MACHSTATE(4,"StateInit")
713   state->pe = pe;
714   state->rank = rank;
715   if (rank==CmiMyNodeSize()) return; /* Communications thread */
716 #if !CMK_SMP_MULTIQ
717   state->recv = CMIQueueCreate();
718 #else
719   for(i=0; i<MULTIQ_GRPSIZE; i++) state->recv[i]=CMIQueueCreate();
720   state->myGrpIdx = rank % MULTIQ_GRPSIZE;
721   state->curPolledIdx = 0;
722 #endif
723   state->localqueue = CdsFifo_Create();
724   CmiIdleLock_init(&state->idle);
727 void CmiNodeStateInit(CmiNodeState *nodeState)
729   MACHSTATE1(4,"NodeStateInit %p", nodeState)
730 #if CMK_IMMEDIATE_MSG
731   nodeState->immSendLock = CmiCreateLock();
732   nodeState->immRecvLock = CmiCreateLock();
733   nodeState->immQ = CMIQueueCreate();
734   nodeState->delayedImmQ = CMIQueueCreate();
735 #endif
736 #if CMK_NODE_QUEUE_AVAILABLE
737   nodeState->CmiNodeRecvLock = CmiCreateLock();
738 #if CMK_LOCKLESS_QUEUE
739   nodeState->NodeRecv = MPMCQueueCreate();
740 #else
741   nodeState->NodeRecv = CMIQueueCreate();
742 #endif
743 #endif
744   MACHSTATE(4,"NodeStateInit done")
747 /*@}*/