1 // lock-free single-producer/single-consumer ringbuffer
2 // this algorithm is implemented in various projects (linux kernel)
4 // implementation for c++
6 // Copyright (C) 2009 Tim Blechmann
8 // Distributed under the Boost Software License, Version 1.0. (See
9 // accompanying file LICENSE_1_0.txt or copy at
10 // http://www.boost.org/LICENSE_1_0.txt)
12 // Disclaimer: Not a Boost library.
14 #ifndef BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED
15 #define BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED
16 #include <boost/lockfree/detail/atomic.hpp>
17 #include <boost/array.hpp>
18 #include <boost/noncopyable.hpp>
19 #include <boost/smart_ptr/scoped_array.hpp>
20 #include <boost/range.hpp>
22 #include "detail/branch_hints.hpp"
23 #include "detail/prefix.hpp"
36 class ringbuffer_base
:
39 #ifndef BOOST_DOXYGEN_INVOKED
40 typedef std::size_t size_t;
41 static const int padding_size
= BOOST_LOCKFREE_CACHELINE_BYTES
- sizeof(size_t);
42 atomic
<size_t> write_index_
;
43 char padding1
[padding_size
]; /* force read_index and write_index to different cache lines */
44 atomic
<size_t> read_index_
;
47 ringbuffer_base(void):
48 write_index_(0), read_index_(0)
51 static size_t next_index(size_t arg
, size_t max_size
)
54 while (unlikely(ret
>= max_size
))
59 static size_t read_available(size_t write_index
, size_t read_index
, size_t max_size
)
61 if (write_index
>= read_index
)
62 return write_index
- read_index
;
64 size_t ret
= write_index
+ max_size
- read_index
;
68 static size_t write_available(size_t write_index
, size_t read_index
, size_t max_size
)
70 size_t ret
= read_index
- write_index
- 1;
71 if (write_index
>= read_index
)
76 bool enqueue(T
const & t
, T
* buffer
, size_t max_size
)
78 size_t write_index
= write_index_
.load(memory_order_relaxed
); // only written from enqueue thread
79 size_t next
= next_index(write_index
, max_size
);
81 if (next
== read_index_
.load(memory_order_acquire
))
82 return false; /* ringbuffer is full */
84 buffer
[write_index
] = t
;
86 write_index_
.store(next
, memory_order_release
);
91 size_t enqueue(const T
* input_buffer
, size_t input_count
, T
* internal_buffer
, size_t max_size
)
93 size_t write_index
= write_index_
.load(memory_order_relaxed
); // only written from enqueue thread
94 const size_t read_index
= read_index_
.load(memory_order_acquire
);
95 const size_t avail
= write_available(write_index
, read_index
, max_size
);
100 input_count
= std::min(input_count
, avail
);
102 size_t new_write_index
= write_index
+ input_count
;
104 if (write_index
+ input_count
> max_size
) {
105 /* copy data in two sections */
106 size_t count0
= max_size
- write_index
;
108 std::copy(input_buffer
, input_buffer
+ count0
, internal_buffer
+ write_index
);
109 std::copy(input_buffer
+ count0
, input_buffer
+ input_count
, internal_buffer
);
110 new_write_index
-= max_size
;
112 std::copy(input_buffer
, input_buffer
+ input_count
, internal_buffer
+ write_index
);
114 if (new_write_index
== max_size
)
118 write_index_
.store(new_write_index
, memory_order_release
);
122 template <typename ConstIterator
>
123 ConstIterator
enqueue(ConstIterator begin
, ConstIterator end
, T
* internal_buffer
, size_t max_size
)
125 // FIXME: avoid std::distance and std::advance
127 size_t write_index
= write_index_
.load(memory_order_relaxed
); // only written from enqueue thread
128 const size_t read_index
= read_index_
.load(memory_order_acquire
);
129 const size_t avail
= write_available(write_index
, read_index
, max_size
);
134 size_t input_count
= std::distance(begin
, end
);
135 input_count
= std::min(input_count
, avail
);
137 size_t new_write_index
= write_index
+ input_count
;
139 ConstIterator last
= begin
;
140 std::advance(last
, input_count
);
142 if (write_index
+ input_count
> max_size
) {
143 /* copy data in two sections */
144 size_t count0
= max_size
- write_index
;
145 ConstIterator midpoint
= begin
;
146 std::advance(midpoint
, count0
);
148 std::copy(begin
, midpoint
, internal_buffer
+ write_index
);
149 std::copy(midpoint
, last
, internal_buffer
);
150 new_write_index
-= max_size
;
152 std::copy(begin
, last
, internal_buffer
+ write_index
);
154 if (new_write_index
== max_size
)
158 write_index_
.store(new_write_index
, memory_order_release
);
162 bool dequeue (T
& ret
, T
* buffer
, size_t max_size
)
164 size_t write_index
= write_index_
.load(memory_order_acquire
);
165 size_t read_index
= read_index_
.load(memory_order_relaxed
); // only written from dequeue thread
166 if (empty(write_index
, read_index
))
169 ret
= buffer
[read_index
];
170 size_t next
= next_index(read_index
, max_size
);
171 read_index_
.store(next
, memory_order_release
);
175 size_t dequeue (T
* output_buffer
, size_t output_count
, const T
* internal_buffer
, size_t max_size
)
177 const size_t write_index
= write_index_
.load(memory_order_acquire
);
178 size_t read_index
= read_index_
.load(memory_order_relaxed
); // only written from dequeue thread
180 const size_t avail
= read_available(write_index
, read_index
, max_size
);
185 output_count
= std::min(output_count
, avail
);
187 size_t new_read_index
= read_index
+ output_count
;
189 if (read_index
+ output_count
> max_size
) {
190 /* copy data in two sections */
191 size_t count0
= max_size
- read_index
;
192 size_t count1
= output_count
- count0
;
194 std::copy(internal_buffer
+ read_index
, internal_buffer
+ max_size
, output_buffer
);
195 std::copy(internal_buffer
, internal_buffer
+ count1
, output_buffer
+ count0
);
197 new_read_index
-= max_size
;
199 std::copy(internal_buffer
+ read_index
, internal_buffer
+ read_index
+ output_count
, output_buffer
);
200 if (new_read_index
== max_size
)
204 read_index_
.store(new_read_index
, memory_order_release
);
208 template <typename OutputIterator
>
209 size_t dequeue (OutputIterator it
, const T
* internal_buffer
, size_t max_size
)
211 const size_t write_index
= write_index_
.load(memory_order_acquire
);
212 size_t read_index
= read_index_
.load(memory_order_relaxed
); // only written from dequeue thread
214 const size_t avail
= read_available(write_index
, read_index
, max_size
);
218 size_t new_read_index
= read_index
+ avail
;
220 if (read_index
+ avail
> max_size
) {
221 /* copy data in two sections */
222 size_t count0
= max_size
- read_index
;
223 size_t count1
= avail
- count0
;
225 std::copy(internal_buffer
+ read_index
, internal_buffer
+ max_size
, it
);
226 std::copy(internal_buffer
, internal_buffer
+ count1
, it
);
228 new_read_index
-= max_size
;
230 std::copy(internal_buffer
+ read_index
, internal_buffer
+ read_index
+ avail
, it
);
231 if (new_read_index
== max_size
)
235 read_index_
.store(new_read_index
, memory_order_release
);
242 /** reset the ringbuffer
244 * \warning Not thread-safe, use for debugging purposes only
248 write_index_
.store(0, memory_order_relaxed
);
249 read_index_
.store(0, memory_order_release
);
252 /** Check if the ringbuffer is empty
254 * \warning Not thread-safe, use for debugging purposes only
258 return empty(write_index_
.load(memory_order_relaxed
), read_index_
.load(memory_order_relaxed
));
261 //! \copydoc boost::lockfree::stack::is_lock_free
262 bool is_lock_free(void) const
264 return write_index_
.is_lock_free() && read_index_
.is_lock_free();
268 bool empty(size_t write_index
, size_t read_index
)
270 return write_index
== read_index
;
274 } /* namespace detail */
276 template <typename T
, size_t max_size
>
278 public detail::ringbuffer_base
<T
>
280 typedef std::size_t size_t;
281 boost::array
<T
, max_size
> array_
;
284 /** Enqueues object t to the ringbuffer. Enqueueing may fail, if the ringbuffer is full.
286 * \return true, if the enqueue operation is successful.
288 * \note Thread-safe and non-blocking
290 bool enqueue(T
const & t
)
292 return detail::ringbuffer_base
<T
>::enqueue(t
, array_
.c_array(), max_size
);
295 /** Dequeue object from ringbuffer.
297 * If dequeue operation is successful, object is written to memory location denoted by ret.
299 * \return true, if the dequeue operation is successful, false if ringbuffer was empty.
301 * \note Thread-safe and non-blocking
303 bool dequeue(T
& ret
)
305 return detail::ringbuffer_base
<T
>::dequeue(ret
, array_
.c_array(), max_size
);
308 /** Enqueues size objects from the array t to the ringbuffer.
310 * Will enqueue as many objects as there is space available
312 * \Returns number of enqueued items
314 * \note Thread-safe and non-blocking
316 size_t enqueue(T
const * t
, size_t size
)
318 return detail::ringbuffer_base
<T
>::enqueue(t
, size
, array_
.c_array(), max_size
);
321 /** Enqueues all objects from the array t to the ringbuffer.
323 * Will enqueue as many objects as there is space available
325 * \Returns number of enqueued items
327 * \note Thread-safe and non-blocking
329 template <size_t size
>
330 size_t enqueue(T
const (&t
)[size
])
332 return enqueue(t
, size
);
335 /** Enqueues size objects from the iterator range [begin, end[ to the ringbuffer.
337 * Enqueueing may fail, if the ringbuffer is full.
339 * \return iterator to the first element, which has not been enqueued
341 * \note Thread-safe and non-blocking
343 template <typename ConstIterator
>
344 ConstIterator
enqueue(ConstIterator begin
, ConstIterator end
)
346 return detail::ringbuffer_base
<T
>::enqueue(begin
, end
, array_
.c_array(), max_size
);
349 /** Dequeue a maximum of size objects from ringbuffer.
351 * If dequeue operation is successful, object is written to memory location denoted by ret.
353 * \return number of dequeued items
355 * \note Thread-safe and non-blocking
358 size_t dequeue(T
* ret
, size_t size
)
360 return detail::ringbuffer_base
<T
>::dequeue(ret
, size
, array_
.c_array(), max_size
);
363 /** Enqueues all objects from the array t to the ringbuffer.
365 * Will enqueue as many objects as there is space available
367 * \Returns number of enqueued items
369 * \note Thread-safe and non-blocking
371 template <size_t size
>
372 size_t dequeue(T (&t
)[size
])
374 return dequeue(t
, size
);
377 /** Dequeue objects to the output iterator it
379 * \return number of dequeued items
381 * \note Thread-safe and non-blocking
383 template <typename OutputIterator
>
384 size_t dequeue(OutputIterator it
)
386 return detail::ringbuffer_base
<T
>::dequeue(it
, array_
.c_array(), max_size
);
390 template <typename T
>
391 class ringbuffer
<T
, 0>:
392 public detail::ringbuffer_base
<T
>
394 typedef std::size_t size_t;
396 scoped_array
<T
> array_
;
399 //! Constructs a ringbuffer for max_size elements
400 explicit ringbuffer(size_t max_size
):
401 max_size_(max_size
), array_(new T
[max_size
])
404 /** Enqueues object t to the ringbuffer. Enqueueing may fail, if the ringbuffer is full.
406 * \return true, if the enqueue operation is successful.
408 * \note Thread-safe and non-blocking
410 bool enqueue(T
const & t
)
412 return detail::ringbuffer_base
<T
>::enqueue(t
, array_
.get(), max_size_
);
415 /** Dequeue object from ringbuffer.
417 * If dequeue operation is successful, object is written to memory location denoted by ret.
419 * \return true, if the dequeue operation is successful, false if ringbuffer was empty.
421 * \note Thread-safe and non-blocking
423 bool dequeue(T
& ret
)
425 return detail::ringbuffer_base
<T
>::dequeue(ret
, array_
.get(), max_size_
);
428 /** Enqueues size objects from the array t to the ringbuffer.
430 * Will enqueue as many objects as there is space available
432 * \Returns number of enqueued items
434 * \note Thread-safe and non-blocking
436 size_t enqueue(T
const * t
, size_t size
)
438 return detail::ringbuffer_base
<T
>::enqueue(t
, size
, array_
.get(), max_size_
);
441 /** Enqueues all objects from the array t to the ringbuffer.
443 * Will enqueue as many objects as there is space available
445 * \Returns number of enqueued items
447 * \note Thread-safe and non-blocking
449 template <size_t size
>
450 size_t enqueue(T
const (&t
)[size
])
452 return enqueue(t
, size
);
455 /** Enqueues size objects from the iterator range [begin, end[ to the ringbuffer.
457 * Enqueueing may fail, if the ringbuffer is full.
459 * \return iterator to the first element, which has not been enqueued
461 * \note Thread-safe and non-blocking
463 template <typename ConstIterator
>
464 ConstIterator
enqueue(ConstIterator begin
, ConstIterator end
)
466 return detail::ringbuffer_base
<T
>::enqueue(begin
, end
, array_
.get(), max_size_
);
469 /** Dequeue a maximum of size objects from ringbuffer.
471 * If dequeue operation is successful, object is written to memory location denoted by ret.
473 * \return number of dequeued items
475 * \note Thread-safe and non-blocking
477 size_t dequeue(T
* ret
, size_t size
)
479 return detail::ringbuffer_base
<T
>::dequeue(ret
, size
, array_
.get(), max_size_
);
482 /** Dequeue objects from ringbuffer.
484 * If dequeue operation is successful, object is written to memory location denoted by ret.
486 * \return number of dequeued items
488 * \note Thread-safe and non-blocking
490 template <size_t size
>
491 size_t dequeue(T (&t
)[size
])
493 return dequeue(t
, size
);
496 /** Dequeue objects to the output iterator it
498 * \return number of dequeued items
500 * \note Thread-safe and non-blocking
502 template <typename OutputIterator
>
503 size_t dequeue(OutputIterator it
)
505 return detail::ringbuffer_base
<T
>::dequeue(it
, array_
.get(), max_size_
);
510 } /* namespace lockfree */
511 } /* namespace boost */
514 #endif /* BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED */