Fix parallel build of examples/charm++/user-driven-interop
[charm.git] / src / conv-core / queueing.C
blob785388adb8dd8c3aafd0e8f8c76b87393ed469a5
1 #include "queueing.h"
2 #include <converse.h>
3 #include <string.h>
5 #if CMK_USE_STL_MSGQ
6 #include "msgq.h"
7 typedef CMK_MSG_PRIO_TYPE prio_t;
8 #endif
11 /** @defgroup CharmScheduler 
12     @brief The portion of Charm++ responsible for scheduling the execution 
13     of Charm++ entry methods
15     CqsEnqueueGeneral() is the main function that is responsible for enqueueing 
16     messages. It will store the messages in one of three queues based on the 
17     specified priorities or strategies. The Charm++ message queue is really three 
18     queues, one for positive priorities, one for zero priorities, and one for 
19     negative priorities. The positive and negative priorty queues are actually heaps.
22     The charm++ messages are only scheduled after the \ref
23     ConverseScheduler "converse message queues" have been emptied:
25     - CsdScheduleForever() is the commonly used Converse scheduler loop
26        - CsdNextMessage()
27           - First processes converse message from a  \ref ConverseScheduler "converse queue" if one exists using CdsFifo_Dequeue() 
28           - Then if no converse message was found, call CqsDequeue() which gets a charm++ message to execute if one exists
31     @file
32     Implementation of queuing data structure functions.
33     @ingroup CharmScheduler
34     
35     @addtogroup CharmScheduler
36     @{
39 #if 1
40 /* I don't see why we need CmiAlloc here, allocated from pinned memory
41  * can be expensive */
42 #define CmiAlloc   malloc
43 #define CmiFree    free
44 #endif
46 /** A memory limit threshold for adaptively scheduling */
47 int schedAdaptMemThresholdMB;
50 /** Initialize a deq */
51 static void CqsDeqInit(_deq d)
53   d->bgn  = d->space;
54   d->end  = d->space+4;
55   d->head = d->space;
56   d->tail = d->space;
59 /** Double the size of a deq */
60 static void CqsDeqExpand(_deq d)
62   int rsize = (d->end - d->head);
63   int lsize = (d->head - d->bgn);
64   int oldsize = (d->end - d->bgn);
65   int newsize = (oldsize << 1);
66   void **ovec = d->bgn;
67   void **nvec = (void **)CmiAlloc(newsize * sizeof(void *));
68   memcpy(nvec, d->head, rsize * sizeof(void *));
69   memcpy(nvec+rsize, d->bgn, lsize * sizeof(void *));
70   d->bgn = nvec;
71   d->end = nvec + newsize;
72   d->head = nvec;
73   d->tail = nvec + oldsize;
74   if (ovec != d->space) CmiFree(ovec);
77 /** Insert a data pointer at the tail of a deq */
78 void CqsDeqEnqueueFifo(_deq d, void *data)
80   void **tail = d->tail;
81   *tail = data;
82   tail++;
83   if (tail == d->end) tail = d->bgn;
84   d->tail = tail;
85   if (tail == d->head) CqsDeqExpand(d);
88 /** Insert a data pointer at the head of a deq */
89 void CqsDeqEnqueueLifo(_deq d, void *data)
91   void **head = d->head;
92   if (head == d->bgn) head = d->end;
93   head--;
94   *head = data;
95   d->head = head;
96   if (head == d->tail) CqsDeqExpand(d);
99 /** Remove a data pointer from the head of a deq */
100 void *CqsDeqDequeue(_deq d)
102   void **head;
103   void **tail;
104   void *data;
105   head = d->head;
106   tail = d->tail;
107   if (head == tail) return 0;
108   data = *head;
109   head++;
110   if (head == d->end) head = d->bgn;
111   d->head = head;
112   return data;
115 /** Initialize a Priority Queue */
116 static void CqsPrioqInit(_prioq pq)
118   int i;
119   pq->heapsize = 100;
120   pq->heapnext = 1;
121   pq->hash_key_size = PRIOQ_TABSIZE;
122   pq->hash_entry_size = 0;
123   pq->heap = (_prioqelt *)CmiAlloc(100 * sizeof(_prioqelt));
124   pq->hashtab = (_prioqelt *)CmiAlloc(pq->hash_key_size * sizeof(_prioqelt));
125   for (i=0; i<pq->hash_key_size; i++) pq->hashtab[i]=0;
128 #if CMK_C_INLINE
129 inline
130 #endif
131 /** Double the size of a Priority Queue's heap */
132 static void CqsPrioqExpand(_prioq pq)
134   int oldsize = pq->heapsize;
135   int newsize = oldsize * 2;
136   _prioqelt *oheap = pq->heap;
137   _prioqelt *nheap = (_prioqelt *)CmiAlloc(newsize*sizeof(_prioqelt));
138   memcpy(nheap, oheap, oldsize * sizeof(_prioqelt));
139   pq->heap = nheap;
140   pq->heapsize = newsize;
141   CmiFree(oheap);
143 #ifndef FASTQ
144 /** Double the size of a Priority Queue's hash table */
145 void CqsPrioqRehash(_prioq pq)
147   int oldHsize = pq->hash_key_size;
148   int newHsize = oldHsize * 2;
149   unsigned int hashval;
150   _prioqelt pe, pe1, pe2;
151   int i,j;
153   _prioqelt *ohashtab = pq->hashtab;
154   _prioqelt *nhashtab = (_prioqelt *)CmiAlloc(newHsize*sizeof(_prioqelt));
156   pq->hash_key_size = newHsize;
158   for(i=0; i<newHsize; i++)
159     nhashtab[i] = 0;
161   for(i=0; i<oldHsize; i++) {
162     for(pe=ohashtab[i]; pe; ) {
163       pe2 = pe->ht_next;
164       hashval = pe->pri.bits;
165       for (j=0; j<pe->pri.ints; j++) hashval ^= pe->pri.data[j];
166       hashval = (hashval&0x7FFFFFFF)%newHsize;
168       pe1=nhashtab[hashval];
169       pe->ht_next = pe1;
170       pe->ht_handle = (nhashtab+hashval);
171       if (pe1) pe1->ht_handle = &(pe->ht_next);
172       nhashtab[hashval]=pe;
173       pe = pe2;
174     }
175   }
176   pq->hashtab = nhashtab;
177   pq->hash_key_size = newHsize;
178   CmiFree(ohashtab);
180 #endif
182 int CqsPrioGT_(unsigned int ints1, unsigned int *data1, unsigned int ints2, unsigned int *data2)
184   unsigned int val1;
185   unsigned int val2;
186   while (1) {
187     if (ints1==0) return 0;
188     if (ints2==0) return 1;
189     val1 = *data1++;
190     val2 = *data2++;
191     if (val1 < val2) return 0;
192     if (val1 > val2) return 1;
193     ints1--;
194     ints2--;
195   }
199  * Compare two priorities (treated as unsigned).
200  * @return 1 if prio1 > prio2
201  * @return ? if prio1 == prio2
202  * @return 0 if prio1 < prio2
203  */
204 int CqsPrioGT(_prio prio1, _prio prio2)
206 #ifndef FASTQ
207   unsigned int ints1 = prio1->ints;
208   unsigned int ints2 = prio2->ints;
209 #endif
210   unsigned int *data1 = prio1->data;
211   unsigned int *data2 = prio2->data;
212 #ifndef FASTQ
213   unsigned int val1;
214   unsigned int val2;
215 #endif
216   while (1) {
217 #ifndef FASTQ
218     if (ints1==0) return 0;
219     if (ints2==0) return 1;
220 #else
221     if (prio1->ints==0) return 0;
222     if (prio2->ints==0) return 1;
223 #endif
224 #ifndef FASTQ
225     val1 = *data1++;
226     val2 = *data2++;
227     if (val1 < val2) return 0;
228     if (val1 > val2) return 1;
229     ints1--;
230     ints2--;
231 #else
232     if(*data1++ < *data2++) return 0;
233     if(*data1++ > *data2++) return 1;
234     (prio1->ints)--;
235     (prio2->ints)--;
236 #endif
237   }
240 /** Find or create a bucket in the hash table for the specified priority. */
241 _deq CqsPrioqGetDeq(_prioq pq, unsigned int priobits, unsigned int *priodata)
243   unsigned int prioints = (priobits+CINTBITS-1)/CINTBITS;
244   unsigned int hashval, i;
245   int heappos; 
246   _prioqelt *heap, pe, next, parent;
247   _prio pri;
248   int mem_cmp_res;
249   unsigned int pri_bits_cmp;
250   static int cnt_nilesh=0;
252 #ifdef FASTQ
253   /*  printf("Hi I'm here %d\n",cnt_nilesh++); */
254 #endif
255   /* Scan for priority in hash-table, and return it if present */
256   hashval = priobits;
257   for (i=0; i<prioints; i++) hashval ^= priodata[i];
258   hashval = (hashval&0x7FFFFFFF)%PRIOQ_TABSIZE;
259 #ifndef FASTQ
260   for (pe=pq->hashtab[hashval]; pe; pe=pe->ht_next)
261     if (priobits == pe->pri.bits)
262       if (memcmp(priodata, pe->pri.data, sizeof(int)*prioints)==0)
263         return &(pe->data);
264 #else
265   parent=NULL;
266   for(pe=pq->hashtab[hashval]; pe; )
267   {
268     parent=pe;
269     pri_bits_cmp=pe->pri.bits;
270     mem_cmp_res=memcmp(priodata,pe->pri.data,sizeof(int)*prioints);
271     if(priobits == pri_bits_cmp && mem_cmp_res==0)
272       return &(pe->data);
273     else if(priobits > pri_bits_cmp || (priobits == pri_bits_cmp && mem_cmp_res>0))
274     {
275       pe=pe->ht_right;
276     }
277     else 
278     {
279       pe=pe->ht_left;
280     }
281   }
282 #endif
283   
284   /* If not present, allocate a bucket for specified priority */
285   pe = (_prioqelt)CmiAlloc(sizeof(struct prioqelt_struct)+((prioints-1)*sizeof(int)));
286   pe->pri.bits = priobits;
287   pe->pri.ints = prioints;
288   memcpy(pe->pri.data, priodata, (prioints*sizeof(int)));
289   CqsDeqInit(&(pe->data));
290   pri=&(pe->pri);
292   /* Insert bucket into hash-table */
293   next = pq->hashtab[hashval];
294 #ifndef FASTQ
295   pe->ht_next = next;
296   pe->ht_handle = (pq->hashtab+hashval);
297   if (next) next->ht_handle = &(pe->ht_next);
298   pq->hashtab[hashval] = pe;
299 #else
300   pe->ht_parent = parent;
301   pe->ht_left = NULL;
302   pe->ht_right = NULL;
303   if(priobits > pri_bits_cmp || (priobits == pri_bits_cmp && mem_cmp_res>0))
304   {
305     if(parent) {
306       parent->ht_right = pe;
307       pe->ht_handle = &(parent->ht_right);
308     }
309     else {
310       pe->ht_handle = (pq->hashtab+hashval);
311       pq->hashtab[hashval] = pe;
312     }
313     /*    pe->ht_handle = &(pe); */
314   }
315   else
316   {
317     if(parent) {
318       parent->ht_left = pe;
319       pe->ht_handle = &(parent->ht_left);
320     }
321     else {
322       pe->ht_handle = (pq->hashtab+hashval);
323       pq->hashtab[hashval] = pe;
324     }
325     /*    pe->ht_handle = &(pe); */
326   }
327   if(!next)
328     pq->hashtab[hashval] = pe;
329 #endif
330   pq->hash_entry_size++;
331 #ifndef FASTQ
332   if(pq->hash_entry_size > 2*pq->hash_key_size)
333     CqsPrioqRehash(pq);
334 #endif  
335   /* Insert bucket into heap */
336   heappos = pq->heapnext++;
337   if (heappos == pq->heapsize) CqsPrioqExpand(pq);
338   heap = pq->heap;
339   while (heappos > 1) {
340     int parentpos = (heappos >> 1);
341     _prioqelt parent = heap[parentpos];
342     if (CqsPrioGT(pri, &(parent->pri))) break;
343     heap[heappos] = parent; heappos=parentpos;
344   }
345   heap[heappos] = pe;
347 #ifdef FASTQ
348   /*  printf("Hi I'm here222\n"); */
349 #endif
350   
351   return &(pe->data);
354 /** Dequeue an entry */
355 void *CqsPrioqDequeue(_prioq pq)
357   _prio pri;
358   _prioqelt pe, old; void *data;
359   int heappos, heapnext;
360   _prioqelt *heap = pq->heap;
361   int left_child;
362   _prioqelt temp1_ht_right, temp1_ht_left, temp1_ht_parent;
363   _prioqelt *temp1_ht_handle;
364   static int cnt_nilesh1=0;
366 #ifdef FASTQ
367   /*  printf("Hi I'm here too!! %d\n",cnt_nilesh1++); */
368 #endif
369   if (pq->heapnext==1) return 0;
370   pe = heap[1];
371   data = CqsDeqDequeue(&(pe->data));
372   if (pe->data.head == pe->data.tail) {
373     /* Unlink prio-bucket from hash-table */
374 #ifndef FASTQ
375     _prioqelt next = pe->ht_next;
376     _prioqelt *handle = pe->ht_handle;
377     if (next) next->ht_handle = handle;
378     *handle = next;
379     old=pe;
380 #else
381     old=pe;
382     prioqelt *handle;
383     if(pe->ht_parent)
384     { 
385       if(pe->ht_parent->ht_left==pe) left_child=1;
386       else left_child=0;
387     }
388     else
389       {  /* it is the root in the hashtable entry, so its ht_handle should be used by whoever is the new root */
390       handle = pe->ht_handle;
391     }
392     
393     if(!pe->ht_left && !pe->ht_right)
394     {
395       if(pe->ht_parent) {
396         if(left_child) pe->ht_parent->ht_left=NULL;
397         else pe->ht_parent->ht_right=NULL;
398       }
399       else {
400         *handle = NULL;
401       }
402     }
403     else if(!pe->ht_right)
404     {
405       /*if the node does not have a right subtree, its left subtree root is the new child of its parent */
406       pe->ht_left->ht_parent=pe->ht_parent;
407       if(pe->ht_parent)
408       {
409         if(left_child) {
410           pe->ht_parent->ht_left = pe->ht_left;
411           pe->ht_left->ht_handle = &(pe->ht_parent->ht_left);
412         }
413         else {
414           pe->ht_parent->ht_right = pe->ht_left;
415           pe->ht_left->ht_handle = &(pe->ht_parent->ht_right);
416         }
417       }
418       else {
419         pe->ht_left->ht_handle = handle;
420         *handle = pe->ht_left;
421       }
422     }
423     else if(!pe->ht_left)
424     {
425       /*if the node does not have a left subtree, its right subtree root is the new child of its parent */
426       pe->ht_right->ht_parent=pe->ht_parent;
427       /*pe->ht_right->ht_left=pe->ht_left; */
428       if(pe->ht_parent)
429       {
430         if(left_child) {
431           pe->ht_parent->ht_left = pe->ht_right;
432           pe->ht_right->ht_handle = &(pe->ht_parent->ht_left);
433         }
434         else {
435           pe->ht_parent->ht_right = pe->ht_right;
436           pe->ht_right->ht_handle = &(pe->ht_parent->ht_right);
437         }
438       }
439       else {
440         pe->ht_right->ht_handle = handle;
441         *handle = pe->ht_right;
442       }
443     }
444     else if(!pe->ht_right->ht_left)
445     {
446       pe->ht_right->ht_parent=pe->ht_parent;
447       if(pe->ht_parent)
448       {
449         if(left_child) {
450           pe->ht_parent->ht_left = pe->ht_right;
451           pe->ht_right->ht_handle = &(pe->ht_parent->ht_left);
452         }
453         else {
454           pe->ht_parent->ht_right = pe->ht_right;
455           pe->ht_right->ht_handle = &(pe->ht_parent->ht_right);
456         }
457       }
458       else {
459         pe->ht_right->ht_handle = handle;
460         *handle = pe->ht_right;
461       }
462       if(pe->ht_left) {
463         pe->ht_right->ht_left = pe->ht_left;
464         pe->ht_left->ht_parent = pe->ht_right;
465         pe->ht_left->ht_handle = &(pe->ht_right->ht_left);
466       }
467     }
468     else
469     {
470       /*if it has both subtrees, swap it with its successor */
471       for(pe=pe->ht_right; pe; )
472       {
473         if(pe->ht_left) pe=pe->ht_left;
474         else  /*found the sucessor */
475           { /*take care of the connections */
476           if(old->ht_parent)
477           {
478             if(left_child) {
479               old->ht_parent->ht_left = pe;
480               pe->ht_handle = &(old->ht_parent->ht_left);
481             }
482             else {
483               old->ht_parent->ht_right = pe;
484               pe->ht_handle = &(old->ht_parent->ht_right);
485             }
486           }
487           else {
488             pe->ht_handle = handle;
489             *handle = pe;
490           }
491           temp1_ht_right = pe->ht_right;
492           temp1_ht_left = pe->ht_left;
493           temp1_ht_parent = pe->ht_parent;
494           temp1_ht_handle = pe->ht_handle;
496           pe->ht_parent = old->ht_parent;
497           pe->ht_left = old->ht_left;
498           pe->ht_right = old->ht_right;
499           if(pe->ht_left) {
500             pe->ht_left->ht_parent = pe;
501             pe->ht_right->ht_handle = &(pe->ht_right);
502           }
503           if(pe->ht_right) {
504             pe->ht_right->ht_parent = pe;
505             pe->ht_right->ht_handle = &(pe->ht_right);
506           }
507           temp1_ht_parent->ht_left = temp1_ht_right;
508           if(temp1_ht_right) {
509             temp1_ht_right->ht_handle = &(temp1_ht_parent->ht_left);
510             temp1_ht_right->ht_parent = temp1_ht_parent;
511           }
512           break;
513         }
514       }
515     }
516 #endif
517     pq->hash_entry_size--;
518     
519     /* Restore the heap */
520     heapnext = (--pq->heapnext);
521     pe = heap[heapnext];
522     pri = &(pe->pri);
523     heappos = 1;
524     while (1) {
525       int childpos1, childpos2, childpos;
526       _prioqelt ch1, ch2, child;
527       childpos1 = heappos<<1;
528       if (childpos1>=heapnext) break;
529       childpos2 = childpos1+1;
530       if (childpos2>=heapnext)
531         { childpos=childpos1; child=heap[childpos1]; }
532       else {
533         ch1 = heap[childpos1];
534         ch2 = heap[childpos2];
535         if (CqsPrioGT(&(ch1->pri), &(ch2->pri)))
536              {childpos=childpos2; child=ch2;}
537         else {childpos=childpos1; child=ch1;}
538       }
539       if (CqsPrioGT(&(child->pri), pri)) break;
540       heap[heappos]=child; heappos=childpos;
541     }
542     heap[heappos]=pe;
543     
544     /* Free prio-bucket */
545     if (old->data.bgn != old->data.space) CmiFree(old->data.bgn);
546     CmiFree(old);
547   }
548   return data;
551 Queue CqsCreate(void)
553   Queue q = (Queue)CmiAlloc(sizeof(struct Queue_struct));
554 #if CMK_USE_STL_MSGQ
555   q->stlQ = (void*) new conv::msgQ<prio_t>;
556 #else
557   q->length = 0;
558   q->maxlen = 0;
559   CqsDeqInit(&(q->zeroprio));
560   CqsPrioqInit(&(q->negprioq));
561   CqsPrioqInit(&(q->posprioq));
562 #endif
563   return q;
566 void CqsDelete(Queue q)
568 #if CMK_USE_STL_MSGQ
569   if (q->stlQ != NULL) delete (conv::msgQ<prio_t>*)(q->stlQ);
570 #else
571   CmiFree(q->negprioq.heap);
572   CmiFree(q->posprioq.heap);
573 #endif
574   CmiFree(q);
577 unsigned int CqsMaxLength(Queue q)
579 #if CMK_USE_STL_MSGQ
580   return (unsigned int)((conv::msgQ<prio_t> *)q->stlQ)->max_size();
581 #else
582   return q->maxlen;
583 #endif
586 #if CMK_USE_STL_MSGQ
588 unsigned int CqsLength(Queue q)
589 { return ( (conv::msgQ<prio_t>*)(q->stlQ) )->size(); }
591 int CqsEmpty(Queue q)
592 { return ( (conv::msgQ<prio_t>*)(q->stlQ) )->empty(); }
594 void CqsEnqueueGeneral(Queue q, void *data, int strategy, int priobits,unsigned int *prioptr)
596     bool isFifo = (strategy == CQS_QUEUEING_FIFO  ||
597                    strategy == CQS_QUEUEING_IFIFO ||
598                    strategy == CQS_QUEUEING_BFIFO ||
599                    strategy == CQS_QUEUEING_LFIFO);
600     if (priobits >= sizeof(int)*8 && strategy != CQS_QUEUEING_FIFO && strategy != CQS_QUEUEING_LIFO)
601         ( (conv::msgQ<prio_t>*)(q->stlQ) )->enq( data, prioptr[0], isFifo);
602     else
603         ( (conv::msgQ<prio_t>*)(q->stlQ) )->enq( data, 0, isFifo);
606 void CqsEnqueueFifo(Queue q, void *data)
607 { ( (conv::msgQ<prio_t>*)(q->stlQ) )->enq(data); }
609 void CqsEnqueueLifo(Queue q, void *data)
610 { ( (conv::msgQ<prio_t>*)(q->stlQ) )->enq(data, 0, false); }
612 void CqsEnqueue(Queue q, void *data)
613 { ( (conv::msgQ<prio_t>*)(q->stlQ) )->enq(data); }
615 void CqsDequeue(Queue q, void **resp)
616 { *resp = (void*) ( (conv::msgQ<prio_t>*)(q->stlQ) )->deq(); }
618 #else
620 unsigned int CqsLength(Queue q)
622   return q->length;
625 int CqsEmpty(Queue q)
627   return (q->length == 0);
630 void CqsEnqueueGeneral(Queue q, void *data, int strategy, 
631            int priobits,unsigned int *prioptr)
633   _deq d; int iprio;
634   CmiInt8 lprio0, lprio;
635   switch (strategy) {
636   case CQS_QUEUEING_FIFO: 
637     CqsDeqEnqueueFifo(&(q->zeroprio), data); 
638     break;
639   case CQS_QUEUEING_LIFO: 
640     CqsDeqEnqueueLifo(&(q->zeroprio), data); 
641     break;
642   case CQS_QUEUEING_IFIFO:
643     iprio=prioptr[0]+(1U<<(CINTBITS-1));
644     if ((int)iprio<0)
645       d=CqsPrioqGetDeq(&(q->posprioq), CINTBITS, (unsigned int*)&iprio);
646     else d=CqsPrioqGetDeq(&(q->negprioq), CINTBITS, (unsigned int*)&iprio);
647     CqsDeqEnqueueFifo(d, data);
648     break;
649   case CQS_QUEUEING_ILIFO:
650     iprio=prioptr[0]+(1U<<(CINTBITS-1));
651     if ((int)iprio<0)
652       d=CqsPrioqGetDeq(&(q->posprioq), CINTBITS, (unsigned int*)&iprio);
653     else d=CqsPrioqGetDeq(&(q->negprioq), CINTBITS, (unsigned int*)&iprio);
654     CqsDeqEnqueueLifo(d, data);
655     break;
656   case CQS_QUEUEING_BFIFO:
657     if (priobits&&(((int)(prioptr[0]))<0))
658        d=CqsPrioqGetDeq(&(q->posprioq), priobits, prioptr);
659     else d=CqsPrioqGetDeq(&(q->negprioq), priobits, prioptr);
660     CqsDeqEnqueueFifo(d, data);
661     break;
662   case CQS_QUEUEING_BLIFO:
663     if (priobits&&(((int)(prioptr[0]))<0))
664        d=CqsPrioqGetDeq(&(q->posprioq), priobits, prioptr);
665     else d=CqsPrioqGetDeq(&(q->negprioq), priobits, prioptr);
666     CqsDeqEnqueueLifo(d, data);
667     break;
669     /* The following two cases have a 64bit integer as priority value. Therefore,
670      * we can cast the address of the CmiInt8 lprio to an "unsigned int" pointer
671      * when passing it to CqsPrioqGetDeq. The endianness is taken care explicitly.
672      */
673   case CQS_QUEUEING_LFIFO:     
674     CmiAssert(priobits == CLONGBITS);
675     lprio0 =((CmiInt8 *)prioptr)[0];
676     lprio0 += (1ULL<<(CLONGBITS-1));
677     if (CmiEndianness() == 0) {           /* little-endian */
678       lprio =(((CmiUInt4 *)&lprio0)[0]*1LL)<<CINTBITS | ((CmiUInt4 *)&lprio0)[1];
679     }
680     else {                /* little-endian */
681       lprio = lprio0;
682     }
683     if (lprio0<0)
684         d=CqsPrioqGetDeq(&(q->posprioq), priobits, (unsigned int *)&lprio);
685     else
686         d=CqsPrioqGetDeq(&(q->negprioq), priobits, (unsigned int *)&lprio);
687     CqsDeqEnqueueFifo(d, data);
688     break;
689   case CQS_QUEUEING_LLIFO:
690     lprio0 =((CmiInt8 *)prioptr)[0];
691     lprio0 += (1ULL<<(CLONGBITS-1));
692     if (CmiEndianness() == 0) {           /* little-endian happen to compare least significant part first */
693       lprio =(((CmiUInt4 *)&lprio0)[0]*1LL)<<CINTBITS | ((CmiUInt4 *)&lprio0)[1];
694     }
695     else {                /* little-endian */
696       lprio = lprio0;
697     }
698     if (lprio0<0)
699         d=CqsPrioqGetDeq(&(q->posprioq), priobits, (unsigned int *)&lprio);
700     else
701         d=CqsPrioqGetDeq(&(q->negprioq), priobits, (unsigned int *)&lprio);
702     CqsDeqEnqueueLifo(d, data);
703     break;
704   default:
705     CmiAbort("CqsEnqueueGeneral: invalid queueing strategy.\n");
706   }
707   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
710 void CqsEnqueueFifo(Queue q, void *data)
712   CqsDeqEnqueueFifo(&(q->zeroprio), data);
713   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
716 void CqsEnqueueLifo(Queue q, void *data)
718   CqsDeqEnqueueLifo(&(q->zeroprio), data);
719   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
722 void CqsEnqueue(Queue q, void *data)
724   CqsDeqEnqueueFifo(&(q->zeroprio), data);
725   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
728 void CqsDequeue(Queue q, void **resp)
730 #ifdef ADAPT_SCHED_MEM
731     /* Added by Isaac for testing purposes: */
732     if((q->length > 1) && (CmiMemoryUsage() > schedAdaptMemThresholdMB*1024*1024) ){
733         /* CqsIncreasePriorityForEntryMethod(q, 153); */
734         CqsIncreasePriorityForMemCriticalEntries(q); 
735     }
736 #endif
737     
738   if (q->length==0) 
739     { *resp = 0; return; }
740   if (q->negprioq.heapnext>1)
741     { *resp = CqsPrioqDequeue(&(q->negprioq)); q->length--; return; }
742   if (q->zeroprio.head != q->zeroprio.tail)
743     { *resp = CqsDeqDequeue(&(q->zeroprio)); q->length--; return; }
744   if (q->posprioq.heapnext>1)
745     { *resp = CqsPrioqDequeue(&(q->posprioq)); q->length--; return; }
746   *resp = 0; return;
749 #endif // CMK_USE_STL_MSGQ
750 static struct prio_struct kprio_zero = { 0, 0, {0} };
751 static struct prio_struct kprio_max  = { 32, 1, {((unsigned int)(-1))} };
753 _prio CqsGetPriority(Queue q)
755 #if !CMK_USE_STL_MSGQ
756   if (q->negprioq.heapnext>1) return &(q->negprioq.heap[1]->pri);
757   if (q->zeroprio.head != q->zeroprio.tail) { return &kprio_zero; }
758   if (q->posprioq.heapnext>1) return &(q->posprioq.heap[1]->pri);
759 #endif
760   return &kprio_max;
764 /* prio CqsGetSecondPriority(q) */
765 /* Queue q; */
766 /* { */
767 /*   return CqsGetPriority(q); */
768 /* } */
771 /** Produce an array containing all the entries in a deq
772     @return a newly allocated array filled with copies of the (void*) elements in the deq. 
773     @param [in] q a deq
774     @param [out] num the number of pointers in the returned array
776 void** CqsEnumerateDeq(_deq q, int *num){
777   void **head, **tail;
778   void **result;
779   int count = 0;
780   int i;
782   head = q->head;
783   tail = q->tail;
785   while(head != tail){
786     count++;
787     head++;
788     if(head == q->end)
789       head = q->bgn;
790   }
792   result = (void **)CmiAlloc(count * sizeof(void *));
793   i = 0;
794   head = q->head;
795   tail = q->tail;
796   while(head != tail){
797     result[i] = *head;
798     i++;
799     head++;
800     if(head == q->end)
801       head = q->bgn;
802   }
803   *num = count;
804   return(result);
807 /** Produce an array containing all the entries in a prioq
808     @return a newly allocated array filled with copies of the (void*) elements in the prioq. 
809     @param [in] q a deq
810     @param [out] num the number of pointers in the returned array
812 void** CqsEnumeratePrioq(_prioq q, int *num){
813   void **head, **tail;
814   void **result;
815   int i,j;
816   int count = 0;
817   _prioqelt pe;
819   for(i = 1; i < q->heapnext; i++){
820     pe = (q->heap)[i];
821     head = pe->data.head;
822     tail = pe->data.tail;
823     while(head != tail){
824       count++;
825       head++;
826       if(head == (pe->data).end)
827         head = (pe->data).bgn;
828     }
829   }
831   result = (void **)CmiAlloc((count) * sizeof(void *));
832   *num = count;
833   
834   j = 0;
835   for(i = 1; i < q->heapnext; i++){
836     pe = (q->heap)[i];
837     head = pe->data.head;
838     tail = pe->data.tail;
839     while(head != tail){
840       result[j] = *head;
841       j++;
842       head++;
843       if(head ==(pe->data).end)
844         head = (pe->data).bgn; 
845     }
846   }
848   return result;
851 #if CMK_USE_STL_MSGQ
852 void CqsEnumerateQueue(Queue q, void ***resp){
853   conv::msgQ<prio_t> *stlQ = (conv::msgQ<prio_t>*) q->stlQ;
854   *resp = (void **)CmiAlloc(stlQ->size() * sizeof(conv::msg_t*));
855   stlQ->enumerate(*resp, *resp + stlQ->size());
858 #else
859 void CqsEnumerateQueue(Queue q, void ***resp){
860   void **result;
861   int num;
862   int i,j;
864   *resp = (void **)CmiAlloc(q->length * sizeof(void *));
865   j = 0;
867   result = CqsEnumeratePrioq(&(q->negprioq), &num);
868   for(i = 0; i < num; i++){
869     (*resp)[j] = result[i];
870     j++;
871   }
872   CmiFree(result);
873   
874   result = CqsEnumerateDeq(&(q->zeroprio), &num);
875   for(i = 0; i < num; i++){
876     (*resp)[j] = result[i];
877     j++;
878   }
879   CmiFree(result);
881   result = CqsEnumeratePrioq(&(q->posprioq), &num);
882   for(i = 0; i < num; i++){
883     (*resp)[j] = result[i];
884     j++;
885   }
886   CmiFree(result);
888 #endif
891    Remove first occurence of a specified entry from the deq  by
892    setting the entry to NULL.
894    The size of the deq will not change, it will now just contain an
895    entry for a NULL pointer.
897    @return number of entries that were replaced with NULL
899 int CqsRemoveSpecificDeq(_deq q, const void *msgPtr){
900   void **head, **tail;
902   head = q->head;
903   tail = q->tail;
905   while(head != tail){
906     if(*head == msgPtr){
907       /*    CmiPrintf("Replacing %p in deq with NULL\n", msgPtr); */
908       /*     *head = NULL;  */
909       return 1;
910     }
911     head++;
912     if(head == q->end)
913       head = q->bgn;
914   }
915   return 0;
919    Remove first occurence of a specified entry from the prioq by
920    setting the entry to NULL.
922    The size of the prioq will not change, it will now just contain an
923    entry for a NULL pointer.
925    @return number of entries that were replaced with NULL
927 int CqsRemoveSpecificPrioq(_prioq q, const void *msgPtr){
928   void **head, **tail;
929   void **result;
930   int i;
931   _prioqelt pe;
933   for(i = 1; i < q->heapnext; i++){
934     pe = (q->heap)[i];
935     head = pe->data.head;
936     tail = pe->data.tail;
937     while(head != tail){
938       if(*head == msgPtr){
939         /*      CmiPrintf("Replacing %p in prioq with NULL\n", msgPtr); */
940         *head = NULL;
941         return 1;
942       }     
943       head++;
944       if(head == (pe->data).end)
945         head = (pe->data).bgn;
946     }
947   } 
948   return 0;
951 void CqsRemoveSpecific(Queue q, const void *msgPtr){
952 #if !CMK_USE_STL_MSGQ
953   if( CqsRemoveSpecificPrioq(&(q->negprioq), msgPtr) == 0 )
954     if( CqsRemoveSpecificDeq(&(q->zeroprio), msgPtr) == 0 )  
955       if(CqsRemoveSpecificPrioq(&(q->posprioq), msgPtr) == 0){
956         CmiPrintf("Didn't remove the specified entry because it was not found\n");
957       }
958 #endif
961 #ifdef ADAPT_SCHED_MEM
962 int numMemCriticalEntries=0;
963 int *memCriticalEntries=NULL;
964 #endif
966 /** @} */