Rename the majority of remaining C files in the RTS to C++
[charm.git] / src / arch / util / machine-smp.C
blob37f65f5b13cd88cc7f9fbf6e32f72f8aa535eee5
1 /** @file
2  * @brief Common function reimplementation for SMP machines
3  * @ingroup Machine
4  *
5  * OS Threads
6  *
7  * This version of converse is for multiple-processor workstations,
8  * and we assume that the OS provides threads to gain access to those
9  * multiple processors.  This section contains an interface layer for
10  * the OS specific threads package.  It contains routines to start
11  * the threads, routines to access their thread-specific state, and
12  * routines to control mutual exclusion between them.
13  *
14  * In addition, we wish to support nonthreaded operation.  To do this,
15  * we provide a version of these functions that uses the main/only thread
16  * as a single PE, and simulates a communication thread using interrupts.
17  *
18  *
19  * CmiStartThreads()
20  *
21  *    Allocates one CmiState structure per PE.  Initializes all of
22  *    the CmiState structures using the function CmiStateInit.
23  *    Starts processor threads 1..N (not 0, that's the one
24  *    that calls CmiStartThreads), as well as the communication
25  *    thread.  Each processor thread (other than 0) must call ConverseInitPE
26  *    followed by Cmi_startfn.  The communication thread must be an infinite
27  *    loop that calls the function CommunicationServer over and over.
28  *
29  * CmiGetState()
30  *
31  *    When called by a PE-thread, returns the processor-specific state
32  *    structure for that PE.
33  *
34  * CmiGetStateN(int n)
35  *
36  *    returns processor-specific state structure for the PE of rank n.
37  *
38  * CmiMemLock() and CmiMemUnlock()
39  *
40  *    The memory module calls these functions to obtain mutual exclusion
41  *    in the memory routines, and to keep interrupts from reentering malloc.
42  *
43  * CmiCommLock() and CmiCommUnlock()
44  *
45  *    These functions lock a mutex that insures mutual exclusion in the
46  *    communication routines.
47  *
48  * CmiMyPe() and CmiMyRank()
49  *
50  *    The usual.  Implemented here, since a highly-optimized version
51  *    is possible in the nonthreaded case.
52  *
54   
55   FIXME: There is horrible duplication of code (e.g. locking code)
56    both here and in converse.h.  It could be much shorter.  OSL 9/9/2000
58  *****************************************************************************/
60 /**
61  * \addtogroup Machine
62  * @{
63  */
66 for SMP versions:
68 CmiStateInit
69 CmiNodeStateInit
70 CmiGetState
71 CmiGetStateN
72 CmiYield
73 CmiStartThreads
75 CmiIdleLock_init
76 CmiIdleLock_sleep
77 CmiIdleLock_addMessage
78 CmiIdleLock_checkMessage
81 #include "machine-smp.h"
82 #include "sockRoutines.h"
84 void CmiStateInit(int pe, int rank, CmiState state);
85 void CommunicationServerInit(void);
87 static struct CmiStateStruct Cmi_default_state; /* State structure to return during startup */
89 #define CMI_NUM_NODE_BARRIER_TYPES 2
90 #define CMI_NODE_BARRIER 0
91 #define CMI_NODE_ALL_BARRIER 1
93 /************************ Win32 kernel SMP threads **************/
95 #if CMK_SHARED_VARS_NT_THREADS
97 CmiNodeLock CmiMemLock_lock;
98 #ifdef CMK_NO_ASM_AVAILABLE
99 CmiNodeLock cmiMemoryLock;
100 #endif
101 static HANDLE comm_mutex;
102 #define CmiCommLockOrElse(x) /*empty*/
103 #define CmiCommLock() (WaitForSingleObject(comm_mutex, INFINITE))
104 #define CmiCommUnlock() (ReleaseMutex(comm_mutex))
106 static DWORD Cmi_state_key = 0xFFFFFFFF;
107 static CmiState     Cmi_state_vector = 0;
109 #if 0
110 #  define CmiGetState() ((CmiState)TlsGetValue(Cmi_state_key))
111 #else
112 CmiState CmiGetState(void)
114   CmiState result;
115   result = (CmiState)TlsGetValue(Cmi_state_key);
116   if(result == 0) {
117         return &Cmi_default_state;
118         /* PerrorExit("CmiGetState: TlsGetValue");*/
119   }
120   return result;
122 #endif
124 void CmiYield(void) 
126   Sleep(0);
129 #define CmiGetStateN(n) (Cmi_state_vector+(n))
131 CMI_EXTERNC
132 void CommunicationServerThread(int sleepTime);
135 static DWORD WINAPI comm_thread(LPVOID dummy)
136 {  
137   if (Cmi_charmrun_fd!=-1)
138     while (1) CommunicationServerThread(5);
139   return 0;
142 static DWORD WINAPI call_startfn(LPVOID vindex)
144   int index = (int)vindex;
146   CmiState state = Cmi_state_vector + index;
147   if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc");
148   if(TlsSetValue(Cmi_state_key, (LPVOID)state) == 0) PerrorExit("TlsSetValue");
150   ConverseRunPE(0);
151   return 0;
155 static DWORD WINAPI call_startfn(LPVOID vindex)
157   int index = (int)(intptr_t)vindex;
159   CmiState state = Cmi_state_vector + index;
160   if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc");
161   if(TlsSetValue(Cmi_state_key, (LPVOID)state) == 0) PerrorExit("TlsSetValue");
163   ConverseRunPE(0);
164 #if 0
165   if (index<_Cmi_mynodesize)
166           ConverseRunPE(0); /*Regular worker thread*/
167   else { /*Communication thread*/
168           CommunicationServerInit();
169           if (Cmi_charmrun_fd!=-1)
170                   while (1) CommunicationServerThread(5);
171   } 
172 #endif
173   return 0;
178  * Double-sided barrier algorithm (threads wait to enter, and wait to exit the barrier)
179  * There are 2 different barriers: one for CmiNodeAllBarrier, and another for CmiNodeBarrier,
180  * determined by 'mode' parameter.
181  */
182 static volatile LONG entered_barrier_count[CMI_NUM_NODE_BARRIER_TYPES] = {0};
183 static volatile LONG exited_barrier_count[CMI_NUM_NODE_BARRIER_TYPES] = {0};
184 static HANDLE entrance_semaphore[CMI_NUM_NODE_BARRIER_TYPES];
185 static HANDLE exit_semaphore[CMI_NUM_NODE_BARRIER_TYPES];
187 // Adapted from https://adilevin.wordpress.com/category/multithreading/
188 // (Based on the reasoning behind the double-sided barrier, I'm not sure the exit_semaphore
189 // can be omitted from this implementation -Juan)
190 void CmiNodeBarrierCount(int nThreads, uint8_t mode)
192   LONG prev;
193   if (InterlockedIncrement(&entered_barrier_count[mode]) < nThreads)
194     WaitForSingleObject(entrance_semaphore[mode], INFINITE);
195   else {
196     exited_barrier_count[mode] = 0;
197     ReleaseSemaphore(entrance_semaphore[mode], nThreads-1, &prev);
198   }
199   if (InterlockedIncrement(&exited_barrier_count[mode]) < nThreads)
200     WaitForSingleObject(exit_semaphore[mode], INFINITE);
201   else {
202     entered_barrier_count[mode] = 0;
203     ReleaseSemaphore(exit_semaphore[mode], nThreads-1, &prev);
204   }
207 static void CmiStartThreads(char **argv)
209   int     i,tocreate;
210   DWORD   threadID;
211   HANDLE  thr;
213   CmiMemLock_lock=CmiCreateLock();
214   comm_mutex = CmiCreateLock();
215   for (i=0; i < CMI_NUM_NODE_BARRIER_TYPES; i++) {
216     entrance_semaphore[i] = CreateSemaphore(NULL, 0, _Cmi_mynodesize+1, NULL);
217     exit_semaphore[i] = CreateSemaphore(NULL, 0, _Cmi_mynodesize+1, NULL);
218   }
219 #ifdef CMK_NO_ASM_AVAILABLE
220   cmiMemoryLock = CmiCreateLock();
221   if (CmiMyNode()==0) printf("Charm++ warning> fences and atomic operations not available in native assembly\n");
222 #endif
224   Cmi_state_key = TlsAlloc();
225   if(Cmi_state_key == 0xFFFFFFFF) PerrorExit("TlsAlloc main");
226   
227   Cmi_state_vector =
228     (CmiState)calloc(_Cmi_mynodesize+1, sizeof(struct CmiStateStruct));
229   
230   for (i=0; i<_Cmi_mynodesize; i++)
231     CmiStateInit(i+Cmi_nodestart, i, CmiGetStateN(i));
232   /*Create a fake state structure for the comm. thread*/
233 /*  CmiStateInit(-1,_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize)); */
234   CmiStateInit(_Cmi_mynode+CmiNumPes(),_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize));
235   
236 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
237   if (!Cmi_commthread)
238     tocreate = _Cmi_mynodesize-1;
239   else
240 #endif
241   tocreate = _Cmi_mynodesize;
242   for (i=1; i<=tocreate; i++) {
243     if((thr = CreateThread(NULL, 0, call_startfn, (LPVOID)(intptr_t)i, 0, &threadID)) 
244        == NULL) PerrorExit("CreateThread");
245     CloseHandle(thr);
246   }
247   
248   if(TlsSetValue(Cmi_state_key, (LPVOID)Cmi_state_vector) == 0) 
249     PerrorExit("TlsSetValue");
252 static void CmiDestroyLocks(void)
254   CloseHandle(comm_mutex);
255   comm_mutex = 0;
256   CloseHandle(CmiMemLock_lock);
257   CmiMemLock_lock = 0;
258   for (int i=0; i < CMI_NUM_NODE_BARRIER_TYPES; i++) {
259     CloseHandle(entrance_semaphore[i]);
260     CloseHandle(exit_semaphore[i]);
261   }
262 #ifdef CMK_NO_ASM_AVAILABLE
263   CloseHandle(cmiMemoryLock);
264 #endif
267 /***************** Pthreads kernel SMP threads ******************/
268 #elif CMK_SHARED_VARS_POSIX_THREADS_SMP
270 CmiNodeLock CmiMemLock_lock;
271 #ifdef CMK_NO_ASM_AVAILABLE
272 CmiNodeLock cmiMemoryLock;
273 #endif
274 int _Cmi_sleepOnIdle=0;
275 int _Cmi_forceSpinOnIdle=0;
276 extern int _cleanUp;
277 extern void CharmScheduler(void);
279 #if CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD
280 static CMK_THREADLOCAL struct CmiStateStruct     Cmi_mystate;
281 static CmiState     *Cmi_state_vector;
283 CmiState CmiGetState(void) {
284         return &Cmi_mystate;
286 #define CmiGetStateN(n) Cmi_state_vector[n]
288 #else
290 static pthread_key_t Cmi_state_key=(pthread_key_t)(-1);
291 static CmiState     Cmi_state_vector;
293 #if 0
294 #define CmiGetState() ((CmiState)pthread_getspecific(Cmi_state_key))
295 #else
296 CmiState CmiGetState(void) {
297         CmiState ret;
298         if (Cmi_state_key == (pthread_key_t)(-1)) return &Cmi_default_state;
299         ret=(CmiState)pthread_getspecific(Cmi_state_key);
300         return (ret==NULL)? &Cmi_default_state : ret;
303 #endif
304 #define CmiGetStateN(n) (Cmi_state_vector+(n))
305 #endif
308 #if !CMK_USE_LRTS
309 #if CMK_HAS_SPINLOCK && CMK_USE_SPINLOCK
310 CmiNodeLock CmiCreateLock(void)
312   CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(pthread_spinlock_t));
313   _MEMCHECK(lk);
314   pthread_spin_init(lk, 0);
315   return lk;
318 void CmiDestroyLock(CmiNodeLock lk)
320   pthread_spin_destroy(lk);
321   free((void*)lk);
323 #else
324 CmiNodeLock CmiCreateLock(void)
326   CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(pthread_mutex_t));
327   _MEMCHECK(lk);
328   pthread_mutex_init(lk,(pthread_mutexattr_t *)0);
329   return lk;
332 void CmiDestroyLock(CmiNodeLock lk)
334   pthread_mutex_destroy(lk);
335   free(lk);
337 #endif
338 #endif //CMK_USE_LRTS
340 void CmiYield(void) { sched_yield(); }
342 int barrier = 0;
343 pthread_cond_t barrier_cond = PTHREAD_COND_INITIALIZER;
344 pthread_mutex_t barrier_mutex = PTHREAD_MUTEX_INITIALIZER;
346 void CmiNodeBarrierCount(int nThreads, uint8_t mode)
348   static unsigned int volatile level = 0;
349   unsigned int cur;
350   pthread_mutex_lock(&barrier_mutex);
351   cur = level;
352   /* CmiPrintf("[%d] CmiNodeBarrierCount: %d of %d level:%d\n", CmiMyPe(), barrier, nThreads, level); */
353   barrier++;
354   if(barrier != nThreads) {
355       /* occasionally it wakes up without having reach the count */
356     while (cur == level)
357       pthread_cond_wait(&barrier_cond, &barrier_mutex);
358   }
359   else{
360     barrier = 0;
361     level++;  /* !level;  */
362     pthread_cond_broadcast(&barrier_cond);
363   }
364   pthread_mutex_unlock(&barrier_mutex);
367 static CmiNodeLock comm_mutex;
369 #define CmiCommLockOrElse(x) /*empty*/
371 #if 1
372 /*Regular comm. lock*/
373 #  define CmiCommLock() CmiLock(comm_mutex)
374 #  define CmiCommUnlock() CmiUnlock(comm_mutex)
375 #else
376 /*Verbose debugging comm. lock*/
377 static int comm_mutex_isLocked=0;
378 void CmiCommLock(void) {
379         if (comm_mutex_isLocked) 
380                 CmiAbort("CommLock: already locked!\n");
381         CmiLock(comm_mutex);
382         comm_mutex_isLocked=1;
384 void CmiCommUnlock(void) {
385         if (!comm_mutex_isLocked)
386                 CmiAbort("CommUnlock: double unlock!\n");
387         comm_mutex_isLocked=0;
388         CmiUnlock(comm_mutex);
390 #endif
393 static void comm_thread(void)
395   while (1) CommunicationServer(5);
398 static void *call_startfn(void *vindex)
400   int index = (int)vindex;
401   CmiState state = Cmi_state_vector + index;
402   pthread_setspecific(Cmi_state_key, state);
403   ConverseRunPE(0);
404   return 0;
408 CMI_EXTERNC
409 void StartInteropScheduler(void);
410 CMI_EXTERNC
411 void CommunicationServerThread(int sleepTime);
413 static void *call_startfn(void *vindex)
415   size_t index = (size_t)vindex;
416 #if CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD
417   if (index<_Cmi_mynodesize) 
418     CmiStateInit(index+Cmi_nodestart, index, &Cmi_mystate);
419   else
420     CmiStateInit(_Cmi_mynode+CmiNumPes(),_Cmi_mynodesize,&Cmi_mystate);
421   Cmi_state_vector[index] = &Cmi_mystate;
422 #else
423   CmiState state = Cmi_state_vector + index;
424   pthread_setspecific(Cmi_state_key, state);
425 #endif
427   ConverseRunPE(0);
429   if(CharmLibInterOperate) {
430     while(1) {
431       if(!_cleanUp) {
432         StartInteropScheduler();
433         CmiNodeAllBarrier();
434       } else {
435         if (CmiMyRank() == CmiMyNodeSize()) {
436           while (ckExitComplete.load() == 0) { CommunicationServerThread(5); }
437         } else { 
438           CsdScheduler(-1);
439           CmiNodeAllBarrier();
440         }
441         break;
442       }
443     }
444   }
446 #if 0
447   if (index<_Cmi_mynodesize) 
448           ConverseRunPE(0); /*Regular worker thread*/
449   else 
450   { /*Communication thread*/
451           CommunicationServerInit();
452           if (Cmi_charmrun_fd!=-1)
453                   while (1) CommunicationServer(5,COM_SERVER_FROM_SMP);
454   }
455 #endif  
456   return 0;
459 #if CMK_BLUEGENEQ && !CMK_USE_LRTS
460 /* pami/machine.C defines its own version of this: */
461 CMI_EXTERNC void PerrorExit(const char*);
462 #endif
464 #if CMK_CONVERSE_PAMI
465 // Array used by the 'rank 0' thread to wait for other threads using pthread_join
466 pthread_t *_Cmi_mypidlist;
467 #endif
469 static void CmiStartThreads(char **argv)
471   pthread_t pid;
472   size_t i;
473   int ok, tocreate;
474   pthread_attr_t attr;
475   int start, end;
477   MACHSTATE(4,"CmiStartThreads")
478   CmiMemLock_lock=CmiCreateLock();
479   _smp_mutex = CmiCreateLock();
480 #if defined(CMK_NO_ASM_AVAILABLE) && CMK_PCQUEUE_LOCK
481   cmiMemoryLock = CmiCreateLock();
482   if (CmiMyNode()==0) printf("Charm++ warning> fences and atomic operations not available in native assembly\n");
483 #endif
485 #if ! (CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD)
486   pthread_key_create(&Cmi_state_key, 0);
487   Cmi_state_vector =
488     (CmiState)calloc(_Cmi_mynodesize+1, sizeof(struct CmiStateStruct));
489   for (i=0; i<_Cmi_mynodesize; i++)
490     CmiStateInit(i+Cmi_nodestart, i, CmiGetStateN(i));
491   /*Create a fake state structure for the comm. thread*/
492 /*  CmiStateInit(-1,_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize)); */
493   CmiStateInit(_Cmi_mynode+CmiNumPes(),_Cmi_mynodesize,CmiGetStateN(_Cmi_mynodesize));
494 #else
495     /* for main thread */
496   Cmi_state_vector = (CmiState *)calloc(_Cmi_mynodesize+1, sizeof(CmiState));
497 #if CMK_CONVERSE_MPI
498       /* main thread is communication thread */
499   if(!CharmLibInterOperate) {
500     CmiStateInit(_Cmi_mynode+CmiNumPes(), _Cmi_mynodesize, &Cmi_mystate);
501     Cmi_state_vector[_Cmi_mynodesize] = &Cmi_mystate;
502   } else 
503 #endif
504   {
505     /* main thread is of rank 0 */
506     CmiStateInit(Cmi_nodestart, 0, &Cmi_mystate);
507     Cmi_state_vector[0] = &Cmi_mystate;
508   }
509 #endif
511 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
512   if (!Cmi_commthread)
513     tocreate = _Cmi_mynodesize-1;
514   else
515 #endif
516   tocreate = _Cmi_mynodesize;
517 #if CMK_CONVERSE_MPI
518   if(!CharmLibInterOperate) {
519     start = 0;
520     end = tocreate - 1;                    /* skip comm thread */
521   } else 
522 #endif
523   {
524     start = 1;
525     end = tocreate;                       /* skip rank 0 main thread */
526   }
528 #if CMK_CONVERSE_PAMI
529   // allocate space for the pids
530   _Cmi_mypidlist = (pthread_t *)malloc(sizeof(pthread_t)*(end - start +1));
531   int numThreads = 0;
532 #endif
534   for (i=start; i<=end; i++) {        
535     pthread_attr_init(&attr);
536     pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
537     ok = pthread_create(&pid, &attr, call_startfn, (void *)i);
538     if (ok!=0){
539       CmiPrintf("CmiStartThreads: %s(%d)\n", strerror(errno), errno);
540       PerrorExit("pthread_create");
541     }
542 #if CMK_CONVERSE_PAMI
543     _Cmi_mypidlist[numThreads++] = pid; // store the pid in the array
544 #endif
545     pthread_attr_destroy(&attr);
546   }
547 #if ! (CMK_HAS_TLS_VARIABLES && !CMK_NOT_USE_TLS_THREAD)
548 #if CMK_CONVERSE_MPI
549   if(!CharmLibInterOperate)
550     pthread_setspecific(Cmi_state_key, Cmi_state_vector+_Cmi_mynodesize);
551   else 
552 #endif
553     pthread_setspecific(Cmi_state_key, Cmi_state_vector);
554 #endif
556   MACHSTATE(4,"CmiStartThreads done")
559 static void CmiDestroyLocks(void)
561   CmiDestroyLock(comm_mutex);
562   comm_mutex = 0;
563   CmiDestroyLock(CmiMemLock_lock);
564   CmiMemLock_lock = 0;
565   pthread_mutex_destroy(&barrier_mutex);
566 #ifdef CMK_NO_ASM_AVAILABLE
567   pthread_mutex_destroy(cmiMemoryLock);
568 #endif
571 #endif
573 #if !CMK_SHARED_VARS_UNAVAILABLE
575 /* Wait for all worker threads */
576 void  CmiNodeBarrier(void) {
577   CmiNodeBarrierCount(CmiMyNodeSize(), CMI_NODE_BARRIER);
580 /* Wait for all worker threads as well as comm. thread */
581 /* unfortunately this could also be called in a seemingly non smp version
582    net-win32, which actually is implemented as smp with comm. thread */
583 void CmiNodeAllBarrier(void) {
584 #if CMK_MULTICORE || CMK_SMP_NO_COMMTHD
585   if (!Cmi_commthread)
586   CmiNodeBarrierCount(CmiMyNodeSize(), CMI_NODE_BARRIER);
587   else
588 #endif
589   CmiNodeBarrierCount(CmiMyNodeSize()+1, CMI_NODE_ALL_BARRIER);
592 #endif
594 /***********************************************************
595  * SMP Idle Locking
596  *   In an SMP system, idle processors need to sleep on a
597  * lock so that if a message for them arrives, they can be
598  * woken up.
599  **********************************************************/
601 static int CmiIdleLock_hasMessage(CmiState cs) {
602   return cs->idle.hasMessages;
605 #if CMK_SHARED_VARS_NT_THREADS
607 static void CmiIdleLock_init(CmiIdleLock *l) {
608   l->hasMessages=0;
609   l->isSleeping=0;
610   l->sem=CreateSemaphore(NULL,0,1, NULL);
613 static void CmiIdleLock_sleep(CmiIdleLock *l,int msTimeout) {
614   if (l->hasMessages) return;
615   l->isSleeping=1;
616   MACHSTATE(4,"Processor going to sleep {")
617   WaitForSingleObject(l->sem,msTimeout);
618   MACHSTATE(4,"} Processor awake again")
619   l->isSleeping=0;
622 static void CmiIdleLock_addMessage(CmiIdleLock *l) {
623   l->hasMessages=1;
624   if (l->isSleeping) { /*The PE is sleeping on this lock-- wake him*/  
625     MACHSTATE(4,"Waking sleeping processor")
626     ReleaseSemaphore(l->sem,1,NULL);
627   }
629 static void CmiIdleLock_checkMessage(CmiIdleLock *l) {
630   l->hasMessages=0;
633 #elif CMK_SHARED_VARS_POSIX_THREADS_SMP
635 static void CmiIdleLock_init(CmiIdleLock *l) {
636   l->hasMessages=0;
637   l->isSleeping=0;
638   pthread_mutex_init(&l->mutex,NULL);
639   pthread_cond_init(&l->cond,NULL);
642 #include <sys/time.h>
644 static void getTimespec(int msFromNow,struct timespec *dest) {
645   struct timeval cur;
646   int secFromNow;
647   /*Get the current time*/
648   gettimeofday(&cur,NULL);
649   dest->tv_sec=cur.tv_sec;
650   dest->tv_nsec=cur.tv_usec*1000;
651   /*Add in the wait time*/
652   secFromNow=msFromNow/1000;
653   msFromNow-=secFromNow*1000;
654   dest->tv_sec+=secFromNow;
655   dest->tv_nsec+=1000*1000*msFromNow;
656   /*Wrap around if we overflowed the nsec field*/
657   while (dest->tv_nsec>=1000000000ul) {
658     dest->tv_nsec-=1000000000ul;
659     dest->tv_sec++;
660   }
663 static void CmiIdleLock_sleep(CmiIdleLock *l,int msTimeout) {
664   struct timespec wakeup;
666   if (l->hasMessages) return;
667   l->isSleeping=1;
668   MACHSTATE(4,"Processor going to sleep {")
669   pthread_mutex_lock(&l->mutex);
670   getTimespec(msTimeout,&wakeup);
671   while (!l->hasMessages)
672     if (ETIMEDOUT==pthread_cond_timedwait(&l->cond,&l->mutex,&wakeup))
673       break;
674   pthread_mutex_unlock(&l->mutex);
675   MACHSTATE(4,"} Processor awake again")
676   l->isSleeping=0;
679 static void CmiIdleLock_wakeup(CmiIdleLock *l) {
680   l->hasMessages=1; 
681   MACHSTATE(4,"Waking sleeping processor")
682   /*The PE is sleeping on this condition variable-- wake him*/
683   pthread_mutex_lock(&l->mutex);
684   pthread_cond_signal(&l->cond);
685   pthread_mutex_unlock(&l->mutex);
688 static void CmiIdleLock_addMessage(CmiIdleLock *l) {
689   if (l->isSleeping) CmiIdleLock_wakeup(l);
690   l->hasMessages=1;
692 static void CmiIdleLock_checkMessage(CmiIdleLock *l) {
693   l->hasMessages=0;
695 #else
696 #define CmiIdleLock_sleep(x, y) /*empty*/
698 static void CmiIdleLock_init(CmiIdleLock *l) {
699   l->hasMessages=0;
701 static void CmiIdleLock_addMessage(CmiIdleLock *l) {
702   l->hasMessages=1;
704 static void CmiIdleLock_checkMessage(CmiIdleLock *l) {
705   l->hasMessages=0;
707 #endif
709 void CmiStateInit(int pe, int rank, CmiState state)
711 #if CMK_SMP_MULTIQ
712   int i;
713 #endif
715   MACHSTATE(4,"StateInit")
716   state->pe = pe;
717   state->rank = rank;
718   if (rank==CmiMyNodeSize()) return; /* Communications thread */
719 #if !CMK_SMP_MULTIQ
720   state->recv = CMIQueueCreate();
721 #else
722   for(i=0; i<MULTIQ_GRPSIZE; i++) state->recv[i]=CMIQueueCreate();
723   state->myGrpIdx = rank % MULTIQ_GRPSIZE;
724   state->curPolledIdx = 0;
725 #endif
726   state->localqueue = CdsFifo_Create();
727   CmiIdleLock_init(&state->idle);
730 void CmiNodeStateInit(CmiNodeState *nodeState)
732   MACHSTATE1(4,"NodeStateInit %p", nodeState)
733 #if CMK_IMMEDIATE_MSG
734   nodeState->immSendLock = CmiCreateLock();
735   nodeState->immRecvLock = CmiCreateLock();
736   nodeState->immQ = CMIQueueCreate();
737   nodeState->delayedImmQ = CMIQueueCreate();
738 #endif
739 #if CMK_NODE_QUEUE_AVAILABLE
740   nodeState->CmiNodeRecvLock = CmiCreateLock();
741 #if CMK_LOCKLESS_QUEUE
742   nodeState->NodeRecv = MPMCQueueCreate();
743 #else
744   nodeState->NodeRecv = CMIQueueCreate();
745 #endif
746 #endif
747   MACHSTATE(4,"NodeStateInit done")
750 /*@}*/