LRTS Comm Thread Tracing in message recieve
[charm.git] / src / ck-core / modifyScheduler.C
blob34379bacce8ad8a4ecf33d1a860857388f4b5636
1 /** 
2     @file 
3     Routines for modifying the Charm++ prioritized message queue
4     @ingroup CharmScheduler
5     
6     @addtogroup CharmScheduler
7     @{
8  */
11 #include "charm++.h"
12 #include "queueing.h" // For access to scheduler data structures
13 #include "conv-trace.h"
16 // Predeclarations:
17 int CqsFindRemoveSpecificPrioq(_prioq q, void *&msgPtr, const int *entryMethod, const int numEntryMethods );
18 int CqsFindRemoveSpecificDeq(_deq q, void *&msgPtr, const int *entryMethod, const int numEntryMethods );
21 /** Search Queue for messages associated with a specified entry method */ 
22 void CqsIncreasePriorityForEntryMethod(Queue q, const int entrymethod){
23     void *removedMsgPtr;
24     int numRemoved;
25     
26     int entryMethods[1];
27     entryMethods[0] = entrymethod;
29     numRemoved = CqsFindRemoveSpecificPrioq(&(q->negprioq), removedMsgPtr, entryMethods, 1 );
30     if(numRemoved == 0)
31         numRemoved = CqsFindRemoveSpecificDeq(&(q->zeroprio), removedMsgPtr, entryMethods, 1 );
32     if(numRemoved == 0)
33         numRemoved = CqsFindRemoveSpecificPrioq(&(q->posprioq), removedMsgPtr, entryMethods, 1 );
34     
35     if(numRemoved > 0){
36         CkAssert(numRemoved==1); // We need to reenqueue all removed messages, but we currently only handle one
37         int prio = -1000000; 
38         CqsEnqueueGeneral(q, removedMsgPtr, CQS_QUEUEING_IFIFO, 0, (unsigned int*)&prio);
40 #if CMK_TRACE_ENABLED
41         char traceStr[64];
42         sprintf(traceStr, "Replacing %p in message queue with NULL", removedMsgPtr);
43         traceUserSuppliedNote(traceStr);
44 #endif
45     }
48 #ifdef ADAPT_SCHED_MEM
49 /** Search Queue for messages associated with memory-critical entry methods */ 
50 void CqsIncreasePriorityForMemCriticalEntries(Queue q){
51     void *removedMsgPtr;
52     int numRemoved;
54     numRemoved = CqsFindRemoveSpecificPrioq(&(q->negprioq), removedMsgPtr, memCriticalEntries, numMemCriticalEntries);
55     if(numRemoved == 0)
56         numRemoved = CqsFindRemoveSpecificDeq(&(q->zeroprio), removedMsgPtr, memCriticalEntries, numMemCriticalEntries);
57     if(numRemoved == 0)
58         numRemoved = CqsFindRemoveSpecificPrioq(&(q->posprioq), removedMsgPtr, memCriticalEntries, numMemCriticalEntries);
59     
60     if(numRemoved > 0){
61         CkAssert(numRemoved==1); // We need to reenqueue all removed messages, but we currently only handle one
62         int prio = -1000000; 
63         CqsEnqueueGeneral(q, removedMsgPtr, CQS_QUEUEING_IFIFO, 0, (unsigned int*)&prio);
65 #if CMK_TRACE_ENABLED
66         char traceStr[64];
67         sprintf(traceStr, "Replacing %p in message queue with NULL", removedMsgPtr);
68         traceUserSuppliedNote(traceStr);
69 #endif
70     }
72 #endif
74 static bool checkAndRemoveMatching(void *&msgPtr, const int *entryMethod, const int numEntryMethods, envelope *env, void** &p) {
75         if(env != NULL && (env->getMsgtype() == ForArrayEltMsg ||
76                        env->getMsgtype() == ForIDedObjMsg ||
77                        env->getMsgtype() == ForChareMsg)
78       ){
79             const int ep = env->getsetArrayEp();
80             bool foundMatch = false;
81             // Search for ep by linearly searching through entryMethod
82             for(int i=0;i<numEntryMethods;++i){
83                 if(ep==entryMethod[i]){
84                     foundMatch=true;
85                     break;
86                 }
87             }
88             if(foundMatch){
89                 // Remove the entry from the queue
90                 *p = NULL;
91                 msgPtr = env;
92                 return true;
93             }
94         }
97 /** Find and remove the first 1 occurences of messages that matches a specified entry method index.
98     The size of the deq will not change, it will just contain an entry for a NULL pointer.
100     @return number of entries that were replaced with NULL
102     @param [in] q A circular double ended queue
103     @param [out] msgPtr returns the message that was removed from the prioq
104     @param [in] entryMethod An array of entry method ids that should be considered for removal
105     @param [in] numEntryMethods The number of the values in the entryMethod array.
107 int CqsFindRemoveSpecificDeq(_deq q, void *&msgPtr, const int *entryMethod, const int numEntryMethods ){
108     void **iter = q->head; ///< An iterator used to iterate through the circular queue
110     while(iter != q->tail){
111         // *iter is contains a pointer to a message
112         envelope *env = (envelope*)*iter;
113         if (checkAndRemoveMatching(msgPtr, entryMethod, numEntryMethods, env, iter))
114           return 1;
116         // Advance head to the next item in the circular queue
117         iter++;
118         if(iter == q->end)
119             iter = q->bgn;
120     }
121     return 0;
126 /** Find and remove the first 1 occurences of messages that matches a specified entry method index.
127     The size of the prioq will not change, it will just contain an entry for a NULL pointer.
129     @return number of entries that were replaced with NULL
131     @param [in] q A priority queue
132     @param [out] msgPtr returns the message that was removed from the prioq
133     @param [in] entryMethod An array of entry method ids that should be considered for removal
134     @param [in] numEntryMethods The number of the values in the entryMethod array.
136 int CqsFindRemoveSpecificPrioq(_prioq q, void *&msgPtr, const int *entryMethod, const int numEntryMethods ){
138     // A priority queue is a heap of circular queues
139     for(int i = 1; i < q->heapnext; i++){
140         // For each of the circular queues:
141         _prioqelt pe = (q->heap)[i];
142         void **head; ///< An iterator used to iterate through a circular queue
143         void **tail; ///< The end of the circular queue
144         head = pe->data.head;
145         tail = pe->data.tail;
146         while(head != tail){
147             // *head contains a pointer to a message
148             envelope *env = (envelope*)*head;
149             if (checkAndRemoveMatching(msgPtr, entryMethod, numEntryMethods, env, head))
150               return 1;
152             // Advance head to the next item in the circular queue
153             head++;
154             if(head == (pe->data).end)
155                 head = (pe->data).bgn;
156         }
157     }
158     return 0;
165 /** @} */