Rename the majority of remaining C files in the RTS to C++
[charm.git] / src / conv-core / msgq.h
blob765f64e6c48a0c51f3c0cd440c1805292e01c1f6
1 #ifndef MSG_Q_H
2 #define MSG_Q_H
4 #include <deque>
5 #include <queue>
6 #include <ostream>
7 #include <functional>
8 #include <limits>
10 #include <unordered_map>
12 namespace conv {
14 // Some day, messages may be handled as something other than void* within the runtime.
15 // Prepare for that day, while enhancing readability today.
16 typedef void msg_t;
18 /**
19 * Charm Message Queue: Holds msg pointers and returns the next message to execute on a PE
21 * Templated on the type of the priority field. Defaults to int priority.
22 * All scheduling policies are encapsulated behind this queue interface.
24 * All messages of a given priority p are stored in a single container. Since
25 * each message can be enqueued either to the front or back of this container,
26 * a dequeue is used. Each such dequeue is referred to as a bucket.
27 * The set of priority values of all the messages in the container is stored in
28 * a min-heap. A deq() operation simply peeks at the most important prio
29 * value, and finds the bucket associated with that value. It then dequeues the
30 * message at the front of this bucket.
31 * A mapping between the priority values and the corresponding buckets is
32 * maintained. enq() operations simply find the bucket corresponding to a prio
33 * value and place the msg into it.
35 #if !CMK_NO_MSG_PRIOS
36 template <typename P = int>
37 class msgQ
39 public:
40 /// The datatype for msg priorities
41 typedef P prio_t;
43 /// Hardly any initialization required
44 msgQ(): qSize(0) {}
46 /// Given a message (optionally with a priority and queuing policy), enqueue it for delivery
47 void enq(const msg_t *msg, const prio_t &prio = prio_t(), const bool isFifo = true);
49 /// Pop (and return) the next message to deliver
50 const msg_t* deq();
52 /// Number of messages in the queue
53 inline size_t size() const { return qSize; }
54 inline size_t max_size() const { return std::numeric_limits<size_t>::max(); }
56 /// Is the queue empty?
57 inline bool empty() const { return (0 == qSize); }
59 /// Just so that we can support CqsEnumerateQueue()
60 void enumerate(msg_t **first, msg_t **last) const;
62 /// An ostream operator overload, that currently just prints q size
63 friend std::ostream& operator<< (std::ostream &out, const msgQ &q)
65 out <<"\nmsgQ[" << q.qSize << "]:";
66 out<<"\n";
67 return out;
70 private:
71 #if CMK_RANDOMIZED_MSGQ
72 /// All the messages go in one bin when we're picking them out randomly
73 std::vector<const msg_t*> randQ;
74 #endif
76 /// Message bucket type
77 typedef std::deque<const msg_t*> bkt_t;
78 /// A key-val pair of a priority value and a handle to the bucket of msgs of that priority
79 typedef typename std::pair<prio_t, bkt_t*> prioidx_t;
80 /// A type for mapping between priority values and msg buckets
81 typedef typename std::unordered_map<prio_t, bkt_t> bktmap_t;
83 /// The size of this message queue
84 size_t qSize;
85 /// Collection of msg buckets, each holding msgs of a given priority (maps priorities to buckets)
86 bktmap_t msgbuckets;
87 /// A _min_ heap of distinct msg priorities along with handles to matching buckets
88 std::priority_queue<prioidx_t, std::vector<prioidx_t>, std::greater<prioidx_t> > prioQ;
93 template <typename P>
94 void msgQ<P>::enq(const msg_t *msg, const prio_t &prio, const bool isFifo)
96 #if ! CMK_RANDOMIZED_MSGQ
97 // Find / create the bucket holding msgs of this priority
98 bkt_t &bkt = msgbuckets[prio];
99 // If this deq is empty, insert corresponding priority into prioQ
100 if (bkt.empty())
101 prioQ.push( std::make_pair(prio, &bkt) );
102 // Enq msg either at front or back of deq
103 isFifo ? bkt.push_back(msg) : bkt.push_front(msg);
104 #else
105 randQ.push_back(msg);
106 #endif
108 // Increment the total number of msgs in this container
109 qSize++;
114 #if ! CMK_RANDOMIZED_MSGQ // charm NOT built with a randomized queue
116 // Iterative applications typically have a set of msg priority values that just
117 // repeat over time. However, the arrival pattern of messages at a given PE is
118 // unknown. Within a single iteration, the size of a bucket holding msgs of a
119 // given priority may fluctuate. It may become empty as these messages are
120 // delivered. Instead of deleting the bucket every time it becomes empty (which
121 // could be several times in a single iteration), we choose to leave it around
122 // because it will eventually get filled again. This avoids the costs of
123 // repeatedly constructing / destroying buckets (dequeues).
125 // However, for non-iterative applications, a given priority value may not
126 // recur later in the execution. Not deleting buckets may increase container
127 // sizes, degrading performance over time. Hence, buckets should be deleted
128 // whenever they become empty (or with a small timeout).
130 // The choice between the two execution modes should be a compile-time
131 // decision. Ideally, this should be a decision during application compilation
132 // and be specified programmatically by the application (think template
133 // parameter). However, since this Q is buried so deep within charm, the
134 // decision has to be made during charm compilation.
136 // Assume by default that the application is iterative
137 #define CMK_ITERATIVE_MSG_PRIOS 1
139 template <typename P>
140 const msg_t* msgQ<P>::deq()
142 if (empty())
143 return NULL;
144 // Find the bucket holding the highest priority msgs
145 bkt_t &bkt = * prioQ.top().second;
146 // Pop msg from the front of the deque
147 const msg_t *msg = bkt.front();
148 bkt.pop_front();
149 // If all msgs in the bucket have been consumed
150 if (bkt.empty())
152 #if ! CMK_ITERATIVE_MSG_PRIOS
153 // remove the empty bucket from the collection of buckets
154 msgbuckets.erase( prioQ.top().first );
155 #endif
156 // pop corresponding priority from the priority Q
157 prioQ.pop();
159 // Decrement the total number of msgs in this container
160 qSize--;
161 return msg;
164 #else // If charm is built with a randomized msg queue
167 * For detecting races, and for a general check that applications dont
168 * depend on priorities or message ordering for correctness, charm can be built
169 * with a randomized scheduler queue. In this case, this container's deq()
170 * operation will not actually respect priorities. Instead it simply returns a
171 * randomly selected msg.
173 template <typename P>
174 const msg_t* msgQ<P>::deq()
176 if (empty())
177 return NULL;
179 #if defined(_WIN64) || defined(_WIN32)
180 long rnd = rand() % randQ.size();
181 #else
182 long rnd = lrand48() % randQ.size();
183 #endif
184 const msg_t *msg = randQ[rnd];
185 randQ[rnd] = randQ[randQ.size()-1];
186 randQ.pop_back();
188 // Decrement the total number of msgs in this container
189 qSize--;
190 return msg;
193 #endif // CMK_RANDOMIZED_MSGQ
195 template <typename P>
196 void msgQ<P>::enumerate(msg_t **first, msg_t **last) const
198 if (first >= last)
199 return;
201 msg_t **ptr = first;
203 #if !CMK_RANDOMIZED_MSGQ
204 for (typename bktmap_t::const_iterator bktitr = msgbuckets.begin();
205 ptr != last && bktitr != msgbuckets.end(); bktitr++)
207 bkt_t::const_iterator msgitr = bktitr->second.begin();
208 while (ptr != last && msgitr != bktitr->second.end())
209 *ptr++ = (msg_t*) *msgitr++;
211 #else
212 for (size_t i = 0; i < randQ.size() && ptr != last; ++i, ++ptr)
213 *ptr = (msg_t*) randQ[i];
214 #endif
217 #else //CMK_NO_MSG_PRIOS
218 template <typename P = int>
219 class msgQ
221 public:
222 typedef P prio_t;
223 msgQ() {}
224 inline size_t size() const { return bkt.size(); }
225 inline size_t max_size() const { return bkt.max_size(); }
226 inline bool empty() const { return bkt.empty(); }
227 void enumerate(msg_t **first, msg_t **last) const {}
228 friend std::ostream& operator<< (std::ostream &out, const msgQ &q) { return out; }
230 inline void enq(const msg_t *msg, const prio_t &prio = prio_t(), const bool isFifo = true) {
231 bkt.push_back(msg);
234 inline const msg_t* deq() {
235 if (bkt.empty())
236 return NULL;
237 const msg_t *msg = bkt.front();
238 bkt.pop_front();
239 return msg;
242 private:
243 std::deque<const msg_t*> bkt;
245 #endif //CMK_NO_MSG_PRIOS
247 } // end namespace conv
249 #endif // MSG_Q_H