boost.atomic: ppc fix
[boost_lockfree.git] / boost / lockfree / ringbuffer.hpp
blobd660747041f6db139fb7bb75d174d135ae2e8420
1 // lock-free single-producer/single-consumer ringbuffer
2 // this algorithm is implemented in various projects (linux kernel)
3 //
4 // implementation for c++
5 //
6 // Copyright (C) 2009 Tim Blechmann
7 //
8 // Distributed under the Boost Software License, Version 1.0. (See
9 // accompanying file LICENSE_1_0.txt or copy at
10 // http://www.boost.org/LICENSE_1_0.txt)
12 // Disclaimer: Not a Boost library.
14 #ifndef BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED
15 #define BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED
16 #include <boost/lockfree/detail/atomic.hpp>
17 #include <boost/array.hpp>
18 #include <boost/noncopyable.hpp>
19 #include <boost/smart_ptr/scoped_array.hpp>
21 #include "detail/branch_hints.hpp"
22 #include "detail/prefix.hpp"
24 #include <algorithm>
26 namespace boost
28 namespace lockfree
31 namespace detail
34 template <typename T>
35 class ringbuffer_base:
36 boost::noncopyable
38 #ifndef BOOST_DOXYGEN_INVOKED
39 typedef std::size_t size_t;
40 static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
41 atomic<size_t> write_index_;
42 char padding1[padding_size]; /* force read_index and write_index to different cache lines */
43 atomic<size_t> read_index_;
45 protected:
46 ringbuffer_base(void):
47 write_index_(0), read_index_(0)
50 static size_t next_index(size_t arg, size_t max_size)
52 size_t ret = arg + 1;
53 while (unlikely(ret >= max_size))
54 ret -= max_size;
55 return ret;
58 static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
60 if (write_index >= read_index)
61 return write_index - read_index;
63 size_t ret = write_index + max_size - read_index;
64 return ret;
67 static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
69 size_t ret = read_index - write_index - 1;
70 if (write_index >= read_index)
71 ret += max_size;
72 return ret;
75 bool enqueue(T const & t, T * buffer, size_t max_size)
77 size_t write_index = write_index_.load(memory_order_relaxed); // only written from enqueue thread
78 size_t next = next_index(write_index, max_size);
80 if (next == read_index_.load(memory_order_acquire))
81 return false; /* ringbuffer is full */
83 buffer[write_index] = t;
85 write_index_.store(next, memory_order_release);
87 return true;
90 size_t enqueue(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
92 size_t write_index = write_index_.load(memory_order_relaxed); // only written from enqueue thread
93 const size_t read_index = read_index_.load(memory_order_acquire);
94 const size_t avail = write_available(write_index, read_index, max_size);
96 if (avail == 0)
97 return 0;
99 input_count = std::min(input_count, avail);
101 size_t new_write_index = write_index + input_count;
103 if (write_index + input_count > max_size) {
104 /* copy data in two sections */
105 size_t count0 = max_size - write_index;
107 std::copy(input_buffer, input_buffer + count0, internal_buffer + write_index);
108 std::copy(input_buffer + count0, input_buffer + input_count, internal_buffer);
109 new_write_index -= max_size;
110 } else {
111 std::copy(input_buffer, input_buffer + input_count, internal_buffer + write_index);
113 if (new_write_index == max_size)
114 new_write_index = 0;
117 write_index_.store(new_write_index, memory_order_release);
118 return input_count;
121 template <typename ConstIterator>
122 ConstIterator enqueue(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
124 // FIXME: avoid std::distance and std::advance
126 size_t write_index = write_index_.load(memory_order_relaxed); // only written from enqueue thread
127 const size_t read_index = read_index_.load(memory_order_acquire);
128 const size_t avail = write_available(write_index, read_index, max_size);
130 if (avail == 0)
131 return begin;
133 size_t input_count = std::distance(begin, end);
134 input_count = std::min(input_count, avail);
136 size_t new_write_index = write_index + input_count;
138 ConstIterator last = begin;
139 std::advance(last, input_count);
141 if (write_index + input_count > max_size) {
142 /* copy data in two sections */
143 size_t count0 = max_size - write_index;
144 ConstIterator midpoint = begin;
145 std::advance(midpoint, count0);
147 std::copy(begin, midpoint, internal_buffer + write_index);
148 std::copy(midpoint, last, internal_buffer);
149 new_write_index -= max_size;
150 } else {
151 std::copy(begin, last, internal_buffer + write_index);
153 if (new_write_index == max_size)
154 new_write_index = 0;
157 write_index_.store(new_write_index, memory_order_release);
158 return last;
161 bool dequeue (T & ret, T * buffer, size_t max_size)
163 size_t write_index = write_index_.load(memory_order_acquire);
164 size_t read_index = read_index_.load(memory_order_relaxed); // only written from dequeue thread
165 if (empty(write_index, read_index))
166 return false;
168 ret = buffer[read_index];
169 size_t next = next_index(read_index, max_size);
170 read_index_.store(next, memory_order_release);
171 return true;
174 size_t dequeue (T * output_buffer, size_t output_count, const T * internal_buffer, size_t max_size)
176 const size_t write_index = write_index_.load(memory_order_acquire);
177 size_t read_index = read_index_.load(memory_order_relaxed); // only written from dequeue thread
179 const size_t avail = read_available(write_index, read_index, max_size);
181 if (avail == 0)
182 return 0;
184 output_count = std::min(output_count, avail);
186 size_t new_read_index = read_index + output_count;
188 if (read_index + output_count > max_size) {
189 /* copy data in two sections */
190 size_t count0 = max_size - read_index;
191 size_t count1 = output_count - count0;
193 std::copy(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
194 std::copy(internal_buffer, internal_buffer + count1, output_buffer + count0);
196 new_read_index -= max_size;
197 } else {
198 std::copy(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
199 if (new_read_index == max_size)
200 new_read_index = 0;
203 read_index_.store(new_read_index, memory_order_release);
204 return output_count;
207 template <typename OutputIterator>
208 size_t dequeue (OutputIterator it, const T * internal_buffer, size_t max_size)
210 const size_t write_index = write_index_.load(memory_order_acquire);
211 size_t read_index = read_index_.load(memory_order_relaxed); // only written from dequeue thread
213 const size_t avail = read_available(write_index, read_index, max_size);
214 if (avail == 0)
215 return 0;
217 size_t new_read_index = read_index + avail;
219 if (read_index + avail > max_size) {
220 /* copy data in two sections */
221 size_t count0 = max_size - read_index;
222 size_t count1 = avail - count0;
224 std::copy(internal_buffer + read_index, internal_buffer + max_size, it);
225 std::copy(internal_buffer, internal_buffer + count1, it);
227 new_read_index -= max_size;
228 } else {
229 std::copy(internal_buffer + read_index, internal_buffer + read_index + avail, it);
230 if (new_read_index == max_size)
231 new_read_index = 0;
234 read_index_.store(new_read_index, memory_order_release);
235 return avail;
237 #endif
240 public:
241 /** reset the ringbuffer
243 * \warning Not thread-safe, use for debugging purposes only
244 * */
245 void reset(void)
247 write_index_.store(0, memory_order_relaxed);
248 read_index_.store(0, memory_order_release);
251 /** Check if the ringbuffer is empty
253 * \warning Not thread-safe, use for debugging purposes only
254 * */
255 bool empty(void)
257 return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
260 //! \copydoc boost::lockfree::stack::is_lock_free
261 bool is_lock_free(void) const
263 return write_index_.is_lock_free() && read_index_.is_lock_free();
266 private:
267 bool empty(size_t write_index, size_t read_index)
269 return write_index == read_index;
273 } /* namespace detail */
275 template <typename T, size_t max_size>
276 class ringbuffer:
277 public detail::ringbuffer_base<T>
279 typedef std::size_t size_t;
280 boost::array<T, max_size> array_;
282 public:
283 /** Enqueues object t to the ringbuffer. Enqueueing may fail, if the ringbuffer is full.
285 * \return true, if the enqueue operation is successful.
287 * \note Thread-safe and non-blocking
288 * */
289 bool enqueue(T const & t)
291 return detail::ringbuffer_base<T>::enqueue(t, array_.c_array(), max_size);
294 /** Dequeue object from ringbuffer.
296 * If dequeue operation is successful, object is written to memory location denoted by ret.
298 * \return true, if the dequeue operation is successful, false if ringbuffer was empty.
300 * \note Thread-safe and non-blocking
302 bool dequeue(T & ret)
304 return detail::ringbuffer_base<T>::dequeue(ret, array_.c_array(), max_size);
307 /** Enqueues size objects from the array t to the ringbuffer.
309 * Will enqueue as many objects as there is space available
311 * \Returns number of enqueued items
313 * \note Thread-safe and non-blocking
315 size_t enqueue(T const * t, size_t size)
317 return detail::ringbuffer_base<T>::enqueue(t, size, array_.c_array(), max_size);
320 /** Enqueues all objects from the array t to the ringbuffer.
322 * Will enqueue as many objects as there is space available
324 * \Returns number of enqueued items
326 * \note Thread-safe and non-blocking
328 template <size_t size>
329 size_t enqueue(T const (&t)[size])
331 return enqueue(t, size);
334 /** Enqueues size objects from the iterator range [begin, end[ to the ringbuffer.
336 * Enqueueing may fail, if the ringbuffer is full.
338 * \return iterator to the first element, which has not been enqueued
340 * \note Thread-safe and non-blocking
342 template <typename ConstIterator>
343 ConstIterator enqueue(ConstIterator begin, ConstIterator end)
345 return detail::ringbuffer_base<T>::enqueue(begin, end, array_.c_array(), max_size);
348 /** Dequeue a maximum of size objects from ringbuffer.
350 * If dequeue operation is successful, object is written to memory location denoted by ret.
352 * \return number of dequeued items
354 * \note Thread-safe and non-blocking
355 * */
356 /* @{ */
357 size_t dequeue(T * ret, size_t size)
359 return detail::ringbuffer_base<T>::dequeue(ret, size, array_.c_array(), max_size);
362 /** Enqueues all objects from the array t to the ringbuffer.
364 * Will enqueue as many objects as there is space available
366 * \Returns number of enqueued items
368 * \note Thread-safe and non-blocking
370 template <size_t size>
371 size_t dequeue(T (&t)[size])
373 return dequeue(t, size);
376 /** Dequeue objects to the output iterator it
378 * \return number of dequeued items
380 * \note Thread-safe and non-blocking
381 * */
382 template <typename OutputIterator>
383 size_t dequeue(OutputIterator it)
385 return detail::ringbuffer_base<T>::dequeue(it, array_.c_array(), max_size);
389 template <typename T>
390 class ringbuffer<T, 0>:
391 public detail::ringbuffer_base<T>
393 typedef std::size_t size_t;
394 size_t max_size_;
395 scoped_array<T> array_;
397 public:
398 //! Constructs a ringbuffer for max_size elements
399 explicit ringbuffer(size_t max_size):
400 max_size_(max_size), array_(new T[max_size])
403 /** Enqueues object t to the ringbuffer. Enqueueing may fail, if the ringbuffer is full.
405 * \return true, if the enqueue operation is successful.
407 * \note Thread-safe and non-blocking
408 * */
409 bool enqueue(T const & t)
411 return detail::ringbuffer_base<T>::enqueue(t, array_.get(), max_size_);
414 /** Dequeue object from ringbuffer.
416 * If dequeue operation is successful, object is written to memory location denoted by ret.
418 * \return true, if the dequeue operation is successful, false if ringbuffer was empty.
420 * \note Thread-safe and non-blocking
422 bool dequeue(T & ret)
424 return detail::ringbuffer_base<T>::dequeue(ret, array_.get(), max_size_);
427 /** Enqueues size objects from the array t to the ringbuffer.
429 * Will enqueue as many objects as there is space available
431 * \Returns number of enqueued items
433 * \note Thread-safe and non-blocking
435 size_t enqueue(T const * t, size_t size)
437 return detail::ringbuffer_base<T>::enqueue(t, size, array_.get(), max_size_);
440 /** Enqueues all objects from the array t to the ringbuffer.
442 * Will enqueue as many objects as there is space available
444 * \Returns number of enqueued items
446 * \note Thread-safe and non-blocking
448 template <size_t size>
449 size_t enqueue(T const (&t)[size])
451 return enqueue(t, size);
454 /** Enqueues size objects from the iterator range [begin, end[ to the ringbuffer.
456 * Enqueueing may fail, if the ringbuffer is full.
458 * \return iterator to the first element, which has not been enqueued
460 * \note Thread-safe and non-blocking
462 template <typename ConstIterator>
463 ConstIterator enqueue(ConstIterator begin, ConstIterator end)
465 return detail::ringbuffer_base<T>::enqueue(begin, end, array_.get(), max_size_);
468 /** Dequeue a maximum of size objects from ringbuffer.
470 * If dequeue operation is successful, object is written to memory location denoted by ret.
472 * \return number of dequeued items
474 * \note Thread-safe and non-blocking
475 * */
476 size_t dequeue(T * ret, size_t size)
478 return detail::ringbuffer_base<T>::dequeue(ret, size, array_.get(), max_size_);
481 /** Dequeue objects from ringbuffer.
483 * If dequeue operation is successful, object is written to memory location denoted by ret.
485 * \return number of dequeued items
487 * \note Thread-safe and non-blocking
488 * */
489 template <size_t size>
490 size_t dequeue(T (&t)[size])
492 return dequeue(t, size);
495 /** Dequeue objects to the output iterator it
497 * \return number of dequeued items
499 * \note Thread-safe and non-blocking
500 * */
501 template <typename OutputIterator>
502 size_t dequeue(OutputIterator it)
504 return detail::ringbuffer_base<T>::dequeue(it, array_.get(), max_size_);
509 } /* namespace lockfree */
510 } /* namespace boost */
513 #endif /* BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED */