1 /****************************************************************************
3 ** Copyright (C) 2009 Nokia Corporation and/or its subsidiary(-ies).
4 ** All rights reserved.
5 ** Contact: Nokia Corporation (qt-info@nokia.com)
7 ** This file is part of the QtCore module of the Qt Toolkit.
9 ** $QT_BEGIN_LICENSE:LGPL$
10 ** No Commercial Usage
11 ** This file contains pre-release code and may not be distributed.
12 ** You may use this file in accordance with the terms and conditions
13 ** contained in the Technology Preview License Agreement accompanying
16 ** GNU Lesser General Public License Usage
17 ** Alternatively, this file may be used under the terms of the GNU Lesser
18 ** General Public License version 2.1 as published by the Free Software
19 ** Foundation and appearing in the file LICENSE.LGPL included in the
20 ** packaging of this file. Please review the following information to
21 ** ensure the GNU Lesser General Public License version 2.1 requirements
22 ** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
24 ** In addition, as a special exception, Nokia gives you certain additional
25 ** rights. These rights are described in the Nokia Qt LGPL Exception
26 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
28 ** If you have questions regarding the use of this file, please contact
29 ** Nokia at qt-info@nokia.com.
40 ****************************************************************************/
42 #ifndef QTCONCURRENT_REDUCEKERNEL_H
43 #define QTCONCURRENT_REDUCEKERNEL_H
45 #include <QtCore/qglobal.h>
47 #ifndef QT_NO_CONCURRENT
49 #include <QtCore/qatomic.h>
50 #include <QtCore/qlist.h>
51 #include <QtCore/qmap.h>
52 #include <QtCore/qmutex.h>
53 #include <QtCore/qthread.h>
54 #include <QtCore/qthreadpool.h>
55 #include <QtCore/qvector.h>
62 namespace QtConcurrent
{
67 The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
68 limit the reduce queue size for MapReduce. When the number of
69 reduce blocks in the queue exceeds ReduceQueueStartLimit,
70 MapReduce won't start any new threads, and when it exceeds
71 ReduceQueueThrottleLimit running threads will be stopped.
74 ReduceQueueStartLimit
= 20,
75 ReduceQueueThrottleLimit
= 30
78 // IntermediateResults holds a block of intermediate results from a
79 // map or filter functor. The begin/end offsets indicates the origin
80 // and range of the block.
82 class IntermediateResults
92 UnorderedReduce
= 0x1,
94 SequentialReduce
= 0x4
95 // ParallelReduce = 0x8
97 Q_DECLARE_FLAGS(ReduceOptions
, ReduceOption
)
98 Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions
)
102 // supports both ordered and out-of-order reduction
103 template <typename ReduceFunctor
, typename ReduceResultType
, typename T
>
106 typedef QMap
<int, IntermediateResults
<T
> > ResultsMap
;
108 const ReduceOptions reduceOptions
;
111 int progress
, resultsMapSize
, threadCount
;
112 ResultsMap resultsMap
;
114 bool canReduce(int begin
) const
116 return (((reduceOptions
& UnorderedReduce
)
118 || ((reduceOptions
& OrderedReduce
)
119 && progress
== begin
));
122 void reduceResult(ReduceFunctor
&reduce
,
124 const IntermediateResults
<T
> &result
)
126 for (int i
= 0; i
< result
.vector
.size(); ++i
) {
127 reduce(r
, result
.vector
.at(i
));
131 void reduceResults(ReduceFunctor
&reduce
,
135 typename
ResultsMap::iterator it
= map
.begin();
136 while (it
!= map
.end()) {
137 reduceResult(reduce
, r
, it
.value());
143 ReduceKernel(ReduceOptions _reduceOptions
)
144 : reduceOptions(_reduceOptions
), progress(0), resultsMapSize(0),
145 threadCount(QThreadPool::globalInstance()->maxThreadCount())
148 void runReduce(ReduceFunctor
&reduce
,
150 const IntermediateResults
<T
> &result
)
152 QMutexLocker
locker(&mutex
);
153 if (!canReduce(result
.begin
)) {
155 resultsMap
.insert(result
.begin
, result
);
159 if (reduceOptions
& UnorderedReduce
) {
163 // reduce this result
165 reduceResult(reduce
, r
, result
);
168 // reduce all stored results as well
169 while (!resultsMap
.isEmpty()) {
170 ResultsMap resultsMapCopy
= resultsMap
;
174 reduceResults(reduce
, r
, resultsMapCopy
);
177 resultsMapSize
-= resultsMapCopy
.size();
182 // reduce this result
184 reduceResult(reduce
, r
, result
);
188 progress
+= result
.end
- result
.begin
;
190 // reduce as many other results as possible
191 typename
ResultsMap::iterator it
= resultsMap
.begin();
192 while (it
!= resultsMap
.end()) {
193 if (it
.value().begin
!= progress
)
197 reduceResult(reduce
, r
, it
.value());
201 progress
+= it
.value().end
- it
.value().begin
;
202 it
= resultsMap
.erase(it
);
208 void finish(ReduceFunctor
&reduce
, ReduceResultType
&r
)
210 reduceResults(reduce
, r
, resultsMap
);
213 inline bool shouldThrottle()
215 return (resultsMapSize
> (ReduceQueueThrottleLimit
* threadCount
));
218 inline bool shouldStartThread()
220 return (resultsMapSize
<= (ReduceQueueStartLimit
* threadCount
));
224 template <typename Sequence
, typename Base
, typename Functor1
, typename Functor2
>
225 struct SequenceHolder2
: public Base
227 SequenceHolder2(const Sequence
&_sequence
,
230 ReduceOptions reduceOptions
)
231 : Base(_sequence
.begin(), _sequence
.end(), functor1
, functor2
, reduceOptions
),
240 // Clear the sequence to make sure all temporaries are destroyed
241 // before finished is signaled.
242 sequence
= Sequence();
248 } // namespace QtConcurrent
253 #endif // QT_NO_CONCURRENT