10 #include <unordered_map>
14 // Some day, messages may be handled as something other than void* within the runtime.
15 // Prepare for that day, while enhancing readability today.
19 * Charm Message Queue: Holds msg pointers and returns the next message to execute on a PE
21 * Templated on the type of the priority field. Defaults to int priority.
22 * All scheduling policies are encapsulated behind this queue interface.
24 * All messages of a given priority p are stored in a single container. Since
25 * each message can be enqueued either to the front or back of this container,
26 * a dequeue is used. Each such dequeue is referred to as a bucket.
27 * The set of priority values of all the messages in the container is stored in
28 * a min-heap. A deq() operation simply peeks at the most important prio
29 * value, and finds the bucket associated with that value. It then dequeues the
30 * message at the front of this bucket.
31 * A mapping between the priority values and the corresponding buckets is
32 * maintained. enq() operations simply find the bucket corresponding to a prio
33 * value and place the msg into it.
36 template <typename P
= int>
40 /// The datatype for msg priorities
43 /// Hardly any initialization required
46 /// Given a message (optionally with a priority and queuing policy), enqueue it for delivery
47 void enq(const msg_t
*msg
, const prio_t
&prio
= prio_t(), const bool isFifo
= true);
49 /// Pop (and return) the next message to deliver
52 /// Number of messages in the queue
53 inline size_t size() const { return qSize
; }
54 inline size_t max_size() const { return std::numeric_limits
<size_t>::max(); }
56 /// Is the queue empty?
57 inline bool empty() const { return (0 == qSize
); }
59 /// Just so that we can support CqsEnumerateQueue()
60 void enumerate(msg_t
**first
, msg_t
**last
) const;
62 /// An ostream operator overload, that currently just prints q size
63 friend std::ostream
& operator<< (std::ostream
&out
, const msgQ
&q
)
65 out
<<"\nmsgQ[" << q
.qSize
<< "]:";
71 #if CMK_RANDOMIZED_MSGQ
72 /// All the messages go in one bin when we're picking them out randomly
73 std::vector
<const msg_t
*> randQ
;
76 /// Message bucket type
77 typedef std::deque
<const msg_t
*> bkt_t
;
78 /// A key-val pair of a priority value and a handle to the bucket of msgs of that priority
79 typedef typename
std::pair
<prio_t
, bkt_t
*> prioidx_t
;
80 /// A type for mapping between priority values and msg buckets
81 typedef typename
std::unordered_map
<prio_t
, bkt_t
> bktmap_t
;
83 /// The size of this message queue
85 /// Collection of msg buckets, each holding msgs of a given priority (maps priorities to buckets)
87 /// A _min_ heap of distinct msg priorities along with handles to matching buckets
88 std::priority_queue
<prioidx_t
, std::vector
<prioidx_t
>, std::greater
<prioidx_t
> > prioQ
;
94 void msgQ
<P
>::enq(const msg_t
*msg
, const prio_t
&prio
, const bool isFifo
)
96 #if ! CMK_RANDOMIZED_MSGQ
97 // Find / create the bucket holding msgs of this priority
98 bkt_t
&bkt
= msgbuckets
[prio
];
99 // If this deq is empty, insert corresponding priority into prioQ
101 prioQ
.push( std::make_pair(prio
, &bkt
) );
102 // Enq msg either at front or back of deq
103 isFifo
? bkt
.push_back(msg
) : bkt
.push_front(msg
);
105 randQ
.push_back(msg
);
108 // Increment the total number of msgs in this container
114 #if ! CMK_RANDOMIZED_MSGQ // charm NOT built with a randomized queue
116 // Iterative applications typically have a set of msg priority values that just
117 // repeat over time. However, the arrival pattern of messages at a given PE is
118 // unknown. Within a single iteration, the size of a bucket holding msgs of a
119 // given priority may fluctuate. It may become empty as these messages are
120 // delivered. Instead of deleting the bucket every time it becomes empty (which
121 // could be several times in a single iteration), we choose to leave it around
122 // because it will eventually get filled again. This avoids the costs of
123 // repeatedly constructing / destroying buckets (dequeues).
125 // However, for non-iterative applications, a given priority value may not
126 // recur later in the execution. Not deleting buckets may increase container
127 // sizes, degrading performance over time. Hence, buckets should be deleted
128 // whenever they become empty (or with a small timeout).
130 // The choice between the two execution modes should be a compile-time
131 // decision. Ideally, this should be a decision during application compilation
132 // and be specified programmatically by the application (think template
133 // parameter). However, since this Q is buried so deep within charm, the
134 // decision has to be made during charm compilation.
136 // Assume by default that the application is iterative
137 #define CMK_ITERATIVE_MSG_PRIOS 1
139 template <typename P
>
140 const msg_t
* msgQ
<P
>::deq()
144 // Find the bucket holding the highest priority msgs
145 bkt_t
&bkt
= * prioQ
.top().second
;
146 // Pop msg from the front of the deque
147 const msg_t
*msg
= bkt
.front();
149 // If all msgs in the bucket have been consumed
152 #if ! CMK_ITERATIVE_MSG_PRIOS
153 // remove the empty bucket from the collection of buckets
154 msgbuckets
.erase( prioQ
.top().first
);
156 // pop corresponding priority from the priority Q
159 // Decrement the total number of msgs in this container
164 #else // If charm is built with a randomized msg queue
167 * For detecting races, and for a general check that applications dont
168 * depend on priorities or message ordering for correctness, charm can be built
169 * with a randomized scheduler queue. In this case, this container's deq()
170 * operation will not actually respect priorities. Instead it simply returns a
171 * randomly selected msg.
173 template <typename P
>
174 const msg_t
* msgQ
<P
>::deq()
179 #if defined(_WIN64) || defined(_WIN32)
180 long rnd
= rand() % randQ
.size();
182 long rnd
= lrand48() % randQ
.size();
184 const msg_t
*msg
= randQ
[rnd
];
185 randQ
[rnd
] = randQ
[randQ
.size()-1];
188 // Decrement the total number of msgs in this container
193 #endif // CMK_RANDOMIZED_MSGQ
195 template <typename P
>
196 void msgQ
<P
>::enumerate(msg_t
**first
, msg_t
**last
) const
203 #if !CMK_RANDOMIZED_MSGQ
204 for (typename
bktmap_t::const_iterator bktitr
= msgbuckets
.begin();
205 ptr
!= last
&& bktitr
!= msgbuckets
.end(); bktitr
++)
207 bkt_t::const_iterator msgitr
= bktitr
->second
.begin();
208 while (ptr
!= last
&& msgitr
!= bktitr
->second
.end())
209 *ptr
++ = (msg_t
*) *msgitr
++;
212 for (size_t i
= 0; i
< randQ
.size() && ptr
!= last
; ++i
, ++ptr
)
213 *ptr
= (msg_t
*) randQ
[i
];
217 #else //CMK_NO_MSG_PRIOS
218 template <typename P
= int>
224 inline size_t size() const { return bkt
.size(); }
225 inline size_t max_size() const { return bkt
.max_size(); }
226 inline bool empty() const { return bkt
.empty(); }
227 void enumerate(msg_t
**first
, msg_t
**last
) const {}
228 friend std::ostream
& operator<< (std::ostream
&out
, const msgQ
&q
) { return out
; }
230 inline void enq(const msg_t
*msg
, const prio_t
&prio
= prio_t(), const bool isFifo
= true) {
234 inline const msg_t
* deq() {
237 const msg_t
*msg
= bkt
.front();
243 std::deque
<const msg_t
*> bkt
;
245 #endif //CMK_NO_MSG_PRIOS
247 } // end namespace conv