lockfree: improve documentation of `empty' methods
[boost_lockfree.git] / boost / lockfree / ringbuffer.hpp
blob5be671bb204a272569d5ee940d02f5be96ce62e1
1 // lock-free single-producer/single-consumer ringbuffer
2 // this algorithm is implemented in various projects (linux kernel)
3 //
4 // implementation for c++
5 //
6 // Copyright (C) 2009 Tim Blechmann
7 //
8 // Distributed under the Boost Software License, Version 1.0. (See
9 // accompanying file LICENSE_1_0.txt or copy at
10 // http://www.boost.org/LICENSE_1_0.txt)
12 // Disclaimer: Not a Boost library.
14 #ifndef BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED
15 #define BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED
16 #include <boost/lockfree/detail/atomic.hpp>
17 #include <boost/array.hpp>
18 #include <boost/noncopyable.hpp>
19 #include <boost/smart_ptr/scoped_array.hpp>
20 #include <boost/range.hpp>
22 #include "detail/branch_hints.hpp"
23 #include "detail/prefix.hpp"
25 #include <algorithm>
27 namespace boost
29 namespace lockfree
32 namespace detail
35 template <typename T>
36 class ringbuffer_base:
37 boost::noncopyable
39 #ifndef BOOST_DOXYGEN_INVOKED
40 typedef std::size_t size_t;
41 static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
42 atomic<size_t> write_index_;
43 char padding1[padding_size]; /* force read_index and write_index to different cache lines */
44 atomic<size_t> read_index_;
46 protected:
47 ringbuffer_base(void):
48 write_index_(0), read_index_(0)
51 static size_t next_index(size_t arg, size_t max_size)
53 size_t ret = arg + 1;
54 while (unlikely(ret >= max_size))
55 ret -= max_size;
56 return ret;
59 static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
61 if (write_index >= read_index)
62 return write_index - read_index;
64 size_t ret = write_index + max_size - read_index;
65 return ret;
68 static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
70 size_t ret = read_index - write_index - 1;
71 if (write_index >= read_index)
72 ret += max_size;
73 return ret;
76 bool enqueue(T const & t, T * buffer, size_t max_size)
78 size_t write_index = write_index_.load(memory_order_relaxed); // only written from enqueue thread
79 size_t next = next_index(write_index, max_size);
81 if (next == read_index_.load(memory_order_acquire))
82 return false; /* ringbuffer is full */
84 buffer[write_index] = t;
86 write_index_.store(next, memory_order_release);
88 return true;
91 size_t enqueue(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
93 size_t write_index = write_index_.load(memory_order_relaxed); // only written from enqueue thread
94 const size_t read_index = read_index_.load(memory_order_acquire);
95 const size_t avail = write_available(write_index, read_index, max_size);
97 if (avail == 0)
98 return 0;
100 input_count = std::min(input_count, avail);
102 size_t new_write_index = write_index + input_count;
104 if (write_index + input_count > max_size) {
105 /* copy data in two sections */
106 size_t count0 = max_size - write_index;
108 std::copy(input_buffer, input_buffer + count0, internal_buffer + write_index);
109 std::copy(input_buffer + count0, input_buffer + input_count, internal_buffer);
110 new_write_index -= max_size;
111 } else {
112 std::copy(input_buffer, input_buffer + input_count, internal_buffer + write_index);
114 if (new_write_index == max_size)
115 new_write_index = 0;
118 write_index_.store(new_write_index, memory_order_release);
119 return input_count;
122 template <typename ConstIterator>
123 ConstIterator enqueue(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
125 // FIXME: avoid std::distance and std::advance
127 size_t write_index = write_index_.load(memory_order_relaxed); // only written from enqueue thread
128 const size_t read_index = read_index_.load(memory_order_acquire);
129 const size_t avail = write_available(write_index, read_index, max_size);
131 if (avail == 0)
132 return begin;
134 size_t input_count = std::distance(begin, end);
135 input_count = std::min(input_count, avail);
137 size_t new_write_index = write_index + input_count;
139 ConstIterator last = begin;
140 std::advance(last, input_count);
142 if (write_index + input_count > max_size) {
143 /* copy data in two sections */
144 size_t count0 = max_size - write_index;
145 ConstIterator midpoint = begin;
146 std::advance(midpoint, count0);
148 std::copy(begin, midpoint, internal_buffer + write_index);
149 std::copy(midpoint, last, internal_buffer);
150 new_write_index -= max_size;
151 } else {
152 std::copy(begin, last, internal_buffer + write_index);
154 if (new_write_index == max_size)
155 new_write_index = 0;
158 write_index_.store(new_write_index, memory_order_release);
159 return last;
162 bool dequeue (T & ret, T * buffer, size_t max_size)
164 size_t write_index = write_index_.load(memory_order_acquire);
165 size_t read_index = read_index_.load(memory_order_relaxed); // only written from dequeue thread
166 if (empty(write_index, read_index))
167 return false;
169 ret = buffer[read_index];
170 size_t next = next_index(read_index, max_size);
171 read_index_.store(next, memory_order_release);
172 return true;
175 size_t dequeue (T * output_buffer, size_t output_count, const T * internal_buffer, size_t max_size)
177 const size_t write_index = write_index_.load(memory_order_acquire);
178 size_t read_index = read_index_.load(memory_order_relaxed); // only written from dequeue thread
180 const size_t avail = read_available(write_index, read_index, max_size);
182 if (avail == 0)
183 return 0;
185 output_count = std::min(output_count, avail);
187 size_t new_read_index = read_index + output_count;
189 if (read_index + output_count > max_size) {
190 /* copy data in two sections */
191 size_t count0 = max_size - read_index;
192 size_t count1 = output_count - count0;
194 std::copy(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
195 std::copy(internal_buffer, internal_buffer + count1, output_buffer + count0);
197 new_read_index -= max_size;
198 } else {
199 std::copy(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
200 if (new_read_index == max_size)
201 new_read_index = 0;
204 read_index_.store(new_read_index, memory_order_release);
205 return output_count;
208 template <typename OutputIterator>
209 size_t dequeue (OutputIterator it, const T * internal_buffer, size_t max_size)
211 const size_t write_index = write_index_.load(memory_order_acquire);
212 size_t read_index = read_index_.load(memory_order_relaxed); // only written from dequeue thread
214 const size_t avail = read_available(write_index, read_index, max_size);
215 if (avail == 0)
216 return 0;
218 size_t new_read_index = read_index + avail;
220 if (read_index + avail > max_size) {
221 /* copy data in two sections */
222 size_t count0 = max_size - read_index;
223 size_t count1 = avail - count0;
225 std::copy(internal_buffer + read_index, internal_buffer + max_size, it);
226 std::copy(internal_buffer, internal_buffer + count1, it);
228 new_read_index -= max_size;
229 } else {
230 std::copy(internal_buffer + read_index, internal_buffer + read_index + avail, it);
231 if (new_read_index == max_size)
232 new_read_index = 0;
235 read_index_.store(new_read_index, memory_order_release);
236 return avail;
238 #endif
241 public:
242 /** reset the ringbuffer
244 * \warning Not thread-safe, use for debugging purposes only
245 * */
246 void reset(void)
248 write_index_.store(0, memory_order_relaxed);
249 read_index_.store(0, memory_order_release);
252 /** Check if the ringbuffer is empty
254 * \warning Not thread-safe, use for debugging purposes only
255 * */
256 bool empty(void)
258 return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
261 //! \copydoc boost::lockfree::stack::is_lock_free
262 bool is_lock_free(void) const
264 return write_index_.is_lock_free() && read_index_.is_lock_free();
267 private:
268 bool empty(size_t write_index, size_t read_index)
270 return write_index == read_index;
274 } /* namespace detail */
276 template <typename T, size_t max_size>
277 class ringbuffer:
278 public detail::ringbuffer_base<T>
280 typedef std::size_t size_t;
281 boost::array<T, max_size> array_;
283 public:
284 /** Enqueues object t to the ringbuffer. Enqueueing may fail, if the ringbuffer is full.
286 * \return true, if the enqueue operation is successful.
288 * \note Thread-safe and non-blocking
289 * */
290 bool enqueue(T const & t)
292 return detail::ringbuffer_base<T>::enqueue(t, array_.c_array(), max_size);
295 /** Dequeue object from ringbuffer.
297 * If dequeue operation is successful, object is written to memory location denoted by ret.
299 * \return true, if the dequeue operation is successful, false if ringbuffer was empty.
301 * \note Thread-safe and non-blocking
303 bool dequeue(T & ret)
305 return detail::ringbuffer_base<T>::dequeue(ret, array_.c_array(), max_size);
308 /** Enqueues size objects from the array t to the ringbuffer.
310 * Will enqueue as many objects as there is space available
312 * \Returns number of enqueued items
314 * \note Thread-safe and non-blocking
316 size_t enqueue(T const * t, size_t size)
318 return detail::ringbuffer_base<T>::enqueue(t, size, array_.c_array(), max_size);
321 /** Enqueues all objects from the array t to the ringbuffer.
323 * Will enqueue as many objects as there is space available
325 * \Returns number of enqueued items
327 * \note Thread-safe and non-blocking
329 template <size_t size>
330 size_t enqueue(T const (&t)[size])
332 return enqueue(t, size);
335 /** Enqueues size objects from the iterator range [begin, end[ to the ringbuffer.
337 * Enqueueing may fail, if the ringbuffer is full.
339 * \return iterator to the first element, which has not been enqueued
341 * \note Thread-safe and non-blocking
343 template <typename ConstIterator>
344 ConstIterator enqueue(ConstIterator begin, ConstIterator end)
346 return detail::ringbuffer_base<T>::enqueue(begin, end, array_.c_array(), max_size);
349 /** Dequeue a maximum of size objects from ringbuffer.
351 * If dequeue operation is successful, object is written to memory location denoted by ret.
353 * \return number of dequeued items
355 * \note Thread-safe and non-blocking
356 * */
357 /* @{ */
358 size_t dequeue(T * ret, size_t size)
360 return detail::ringbuffer_base<T>::dequeue(ret, size, array_.c_array(), max_size);
363 /** Enqueues all objects from the array t to the ringbuffer.
365 * Will enqueue as many objects as there is space available
367 * \Returns number of enqueued items
369 * \note Thread-safe and non-blocking
371 template <size_t size>
372 size_t dequeue(T (&t)[size])
374 return dequeue(t, size);
377 /** Dequeue objects to the output iterator it
379 * \return number of dequeued items
381 * \note Thread-safe and non-blocking
382 * */
383 template <typename OutputIterator>
384 size_t dequeue(OutputIterator it)
386 return detail::ringbuffer_base<T>::dequeue(it, array_.c_array(), max_size);
390 template <typename T>
391 class ringbuffer<T, 0>:
392 public detail::ringbuffer_base<T>
394 typedef std::size_t size_t;
395 size_t max_size_;
396 scoped_array<T> array_;
398 public:
399 //! Constructs a ringbuffer for max_size elements
400 explicit ringbuffer(size_t max_size):
401 max_size_(max_size), array_(new T[max_size])
404 /** Enqueues object t to the ringbuffer. Enqueueing may fail, if the ringbuffer is full.
406 * \return true, if the enqueue operation is successful.
408 * \note Thread-safe and non-blocking
409 * */
410 bool enqueue(T const & t)
412 return detail::ringbuffer_base<T>::enqueue(t, array_.get(), max_size_);
415 /** Dequeue object from ringbuffer.
417 * If dequeue operation is successful, object is written to memory location denoted by ret.
419 * \return true, if the dequeue operation is successful, false if ringbuffer was empty.
421 * \note Thread-safe and non-blocking
423 bool dequeue(T & ret)
425 return detail::ringbuffer_base<T>::dequeue(ret, array_.get(), max_size_);
428 /** Enqueues size objects from the array t to the ringbuffer.
430 * Will enqueue as many objects as there is space available
432 * \Returns number of enqueued items
434 * \note Thread-safe and non-blocking
436 size_t enqueue(T const * t, size_t size)
438 return detail::ringbuffer_base<T>::enqueue(t, size, array_.get(), max_size_);
441 /** Enqueues all objects from the array t to the ringbuffer.
443 * Will enqueue as many objects as there is space available
445 * \Returns number of enqueued items
447 * \note Thread-safe and non-blocking
449 template <size_t size>
450 size_t enqueue(T const (&t)[size])
452 return enqueue(t, size);
455 /** Enqueues size objects from the iterator range [begin, end[ to the ringbuffer.
457 * Enqueueing may fail, if the ringbuffer is full.
459 * \return iterator to the first element, which has not been enqueued
461 * \note Thread-safe and non-blocking
463 template <typename ConstIterator>
464 ConstIterator enqueue(ConstIterator begin, ConstIterator end)
466 return detail::ringbuffer_base<T>::enqueue(begin, end, array_.get(), max_size_);
469 /** Dequeue a maximum of size objects from ringbuffer.
471 * If dequeue operation is successful, object is written to memory location denoted by ret.
473 * \return number of dequeued items
475 * \note Thread-safe and non-blocking
476 * */
477 size_t dequeue(T * ret, size_t size)
479 return detail::ringbuffer_base<T>::dequeue(ret, size, array_.get(), max_size_);
482 /** Dequeue objects from ringbuffer.
484 * If dequeue operation is successful, object is written to memory location denoted by ret.
486 * \return number of dequeued items
488 * \note Thread-safe and non-blocking
489 * */
490 template <size_t size>
491 size_t dequeue(T (&t)[size])
493 return dequeue(t, size);
496 /** Dequeue objects to the output iterator it
498 * \return number of dequeued items
500 * \note Thread-safe and non-blocking
501 * */
502 template <typename OutputIterator>
503 size_t dequeue(OutputIterator it)
505 return detail::ringbuffer_base<T>::dequeue(it, array_.get(), max_size_);
510 } /* namespace lockfree */
511 } /* namespace boost */
514 #endif /* BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED */