1 // lock-free single-producer/single-consumer ringbuffer
2 // this algorithm is implemented in various projects (linux kernel)
4 // implementation for c++
6 // Copyright (C) 2009 Tim Blechmann
8 // Distributed under the Boost Software License, Version 1.0. (See
9 // accompanying file LICENSE_1_0.txt or copy at
10 // http://www.boost.org/LICENSE_1_0.txt)
12 // Disclaimer: Not a Boost library.
14 #ifndef BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED
15 #define BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED
16 #include <boost/lockfree/detail/atomic.hpp>
17 #include <boost/array.hpp>
18 #include <boost/noncopyable.hpp>
19 #include <boost/smart_ptr/scoped_array.hpp>
21 #include "detail/branch_hints.hpp"
22 #include "detail/prefix.hpp"
35 class ringbuffer_base
:
38 #ifndef BOOST_DOXYGEN_INVOKED
39 typedef std::size_t size_t;
40 static const int padding_size
= BOOST_LOCKFREE_CACHELINE_BYTES
- sizeof(size_t);
41 atomic
<size_t> write_index_
;
42 char padding1
[padding_size
]; /* force read_index and write_index to different cache lines */
43 atomic
<size_t> read_index_
;
46 ringbuffer_base(void):
47 write_index_(0), read_index_(0)
50 static size_t next_index(size_t arg
, size_t max_size
)
53 while (unlikely(ret
>= max_size
))
58 static size_t read_available(size_t write_index
, size_t read_index
, size_t max_size
)
60 if (write_index
>= read_index
)
61 return write_index
- read_index
;
63 size_t ret
= write_index
+ max_size
- read_index
;
67 static size_t write_available(size_t write_index
, size_t read_index
, size_t max_size
)
69 size_t ret
= read_index
- write_index
- 1;
70 if (write_index
>= read_index
)
75 bool enqueue(T
const & t
, T
* buffer
, size_t max_size
)
77 size_t write_index
= write_index_
.load(memory_order_relaxed
); // only written from enqueue thread
78 size_t next
= next_index(write_index
, max_size
);
80 if (next
== read_index_
.load(memory_order_acquire
))
81 return false; /* ringbuffer is full */
83 buffer
[write_index
] = t
;
85 write_index_
.store(next
, memory_order_release
);
90 size_t enqueue(const T
* input_buffer
, size_t input_count
, T
* internal_buffer
, size_t max_size
)
92 size_t write_index
= write_index_
.load(memory_order_relaxed
); // only written from enqueue thread
93 const size_t read_index
= read_index_
.load(memory_order_acquire
);
94 const size_t avail
= write_available(write_index
, read_index
, max_size
);
99 input_count
= std::min(input_count
, avail
);
101 size_t new_write_index
= write_index
+ input_count
;
103 if (write_index
+ input_count
> max_size
) {
104 /* copy data in two sections */
105 size_t count0
= max_size
- write_index
;
107 std::copy(input_buffer
, input_buffer
+ count0
, internal_buffer
+ write_index
);
108 std::copy(input_buffer
+ count0
, input_buffer
+ input_count
, internal_buffer
);
109 new_write_index
-= max_size
;
111 std::copy(input_buffer
, input_buffer
+ input_count
, internal_buffer
+ write_index
);
113 if (new_write_index
== max_size
)
117 write_index_
.store(new_write_index
, memory_order_release
);
121 template <typename ConstIterator
>
122 ConstIterator
enqueue(ConstIterator begin
, ConstIterator end
, T
* internal_buffer
, size_t max_size
)
124 // FIXME: avoid std::distance and std::advance
126 size_t write_index
= write_index_
.load(memory_order_relaxed
); // only written from enqueue thread
127 const size_t read_index
= read_index_
.load(memory_order_acquire
);
128 const size_t avail
= write_available(write_index
, read_index
, max_size
);
133 size_t input_count
= std::distance(begin
, end
);
134 input_count
= std::min(input_count
, avail
);
136 size_t new_write_index
= write_index
+ input_count
;
138 ConstIterator last
= begin
;
139 std::advance(last
, input_count
);
141 if (write_index
+ input_count
> max_size
) {
142 /* copy data in two sections */
143 size_t count0
= max_size
- write_index
;
144 ConstIterator midpoint
= begin
;
145 std::advance(midpoint
, count0
);
147 std::copy(begin
, midpoint
, internal_buffer
+ write_index
);
148 std::copy(midpoint
, last
, internal_buffer
);
149 new_write_index
-= max_size
;
151 std::copy(begin
, last
, internal_buffer
+ write_index
);
153 if (new_write_index
== max_size
)
157 write_index_
.store(new_write_index
, memory_order_release
);
161 bool dequeue (T
& ret
, T
* buffer
, size_t max_size
)
163 size_t write_index
= write_index_
.load(memory_order_acquire
);
164 size_t read_index
= read_index_
.load(memory_order_relaxed
); // only written from dequeue thread
165 if (empty(write_index
, read_index
))
168 ret
= buffer
[read_index
];
169 size_t next
= next_index(read_index
, max_size
);
170 read_index_
.store(next
, memory_order_release
);
174 size_t dequeue (T
* output_buffer
, size_t output_count
, const T
* internal_buffer
, size_t max_size
)
176 const size_t write_index
= write_index_
.load(memory_order_acquire
);
177 size_t read_index
= read_index_
.load(memory_order_relaxed
); // only written from dequeue thread
179 const size_t avail
= read_available(write_index
, read_index
, max_size
);
184 output_count
= std::min(output_count
, avail
);
186 size_t new_read_index
= read_index
+ output_count
;
188 if (read_index
+ output_count
> max_size
) {
189 /* copy data in two sections */
190 size_t count0
= max_size
- read_index
;
191 size_t count1
= output_count
- count0
;
193 std::copy(internal_buffer
+ read_index
, internal_buffer
+ max_size
, output_buffer
);
194 std::copy(internal_buffer
, internal_buffer
+ count1
, output_buffer
+ count0
);
196 new_read_index
-= max_size
;
198 std::copy(internal_buffer
+ read_index
, internal_buffer
+ read_index
+ output_count
, output_buffer
);
199 if (new_read_index
== max_size
)
203 read_index_
.store(new_read_index
, memory_order_release
);
207 template <typename OutputIterator
>
208 size_t dequeue (OutputIterator it
, const T
* internal_buffer
, size_t max_size
)
210 const size_t write_index
= write_index_
.load(memory_order_acquire
);
211 size_t read_index
= read_index_
.load(memory_order_relaxed
); // only written from dequeue thread
213 const size_t avail
= read_available(write_index
, read_index
, max_size
);
217 size_t new_read_index
= read_index
+ avail
;
219 if (read_index
+ avail
> max_size
) {
220 /* copy data in two sections */
221 size_t count0
= max_size
- read_index
;
222 size_t count1
= avail
- count0
;
224 std::copy(internal_buffer
+ read_index
, internal_buffer
+ max_size
, it
);
225 std::copy(internal_buffer
, internal_buffer
+ count1
, it
);
227 new_read_index
-= max_size
;
229 std::copy(internal_buffer
+ read_index
, internal_buffer
+ read_index
+ avail
, it
);
230 if (new_read_index
== max_size
)
234 read_index_
.store(new_read_index
, memory_order_release
);
241 /** reset the ringbuffer
243 * \warning Not thread-safe, use for debugging purposes only
247 write_index_
.store(0, memory_order_relaxed
);
248 read_index_
.store(0, memory_order_release
);
251 /** Check if the ringbuffer is empty
253 * \warning Not thread-safe, use for debugging purposes only
257 return empty(write_index_
.load(memory_order_relaxed
), read_index_
.load(memory_order_relaxed
));
260 //! \copydoc boost::lockfree::stack::is_lock_free
261 bool is_lock_free(void) const
263 return write_index_
.is_lock_free() && read_index_
.is_lock_free();
267 bool empty(size_t write_index
, size_t read_index
)
269 return write_index
== read_index
;
273 } /* namespace detail */
275 template <typename T
, size_t max_size
>
277 public detail::ringbuffer_base
<T
>
279 typedef std::size_t size_t;
280 boost::array
<T
, max_size
> array_
;
283 /** Enqueues object t to the ringbuffer. Enqueueing may fail, if the ringbuffer is full.
285 * \return true, if the enqueue operation is successful.
287 * \note Thread-safe and non-blocking
289 bool enqueue(T
const & t
)
291 return detail::ringbuffer_base
<T
>::enqueue(t
, array_
.c_array(), max_size
);
294 /** Dequeue object from ringbuffer.
296 * If dequeue operation is successful, object is written to memory location denoted by ret.
298 * \return true, if the dequeue operation is successful, false if ringbuffer was empty.
300 * \note Thread-safe and non-blocking
302 bool dequeue(T
& ret
)
304 return detail::ringbuffer_base
<T
>::dequeue(ret
, array_
.c_array(), max_size
);
307 /** Enqueues size objects from the array t to the ringbuffer.
309 * Will enqueue as many objects as there is space available
311 * \Returns number of enqueued items
313 * \note Thread-safe and non-blocking
315 size_t enqueue(T
const * t
, size_t size
)
317 return detail::ringbuffer_base
<T
>::enqueue(t
, size
, array_
.c_array(), max_size
);
320 /** Enqueues all objects from the array t to the ringbuffer.
322 * Will enqueue as many objects as there is space available
324 * \Returns number of enqueued items
326 * \note Thread-safe and non-blocking
328 template <size_t size
>
329 size_t enqueue(T
const (&t
)[size
])
331 return enqueue(t
, size
);
334 /** Enqueues size objects from the iterator range [begin, end[ to the ringbuffer.
336 * Enqueueing may fail, if the ringbuffer is full.
338 * \return iterator to the first element, which has not been enqueued
340 * \note Thread-safe and non-blocking
342 template <typename ConstIterator
>
343 ConstIterator
enqueue(ConstIterator begin
, ConstIterator end
)
345 return detail::ringbuffer_base
<T
>::enqueue(begin
, end
, array_
.c_array(), max_size
);
348 /** Dequeue a maximum of size objects from ringbuffer.
350 * If dequeue operation is successful, object is written to memory location denoted by ret.
352 * \return number of dequeued items
354 * \note Thread-safe and non-blocking
357 size_t dequeue(T
* ret
, size_t size
)
359 return detail::ringbuffer_base
<T
>::dequeue(ret
, size
, array_
.c_array(), max_size
);
362 /** Enqueues all objects from the array t to the ringbuffer.
364 * Will enqueue as many objects as there is space available
366 * \Returns number of enqueued items
368 * \note Thread-safe and non-blocking
370 template <size_t size
>
371 size_t dequeue(T (&t
)[size
])
373 return dequeue(t
, size
);
376 /** Dequeue objects to the output iterator it
378 * \return number of dequeued items
380 * \note Thread-safe and non-blocking
382 template <typename OutputIterator
>
383 size_t dequeue(OutputIterator it
)
385 return detail::ringbuffer_base
<T
>::dequeue(it
, array_
.c_array(), max_size
);
389 template <typename T
>
390 class ringbuffer
<T
, 0>:
391 public detail::ringbuffer_base
<T
>
393 typedef std::size_t size_t;
395 scoped_array
<T
> array_
;
398 //! Constructs a ringbuffer for max_size elements
399 explicit ringbuffer(size_t max_size
):
400 max_size_(max_size
), array_(new T
[max_size
])
403 /** Enqueues object t to the ringbuffer. Enqueueing may fail, if the ringbuffer is full.
405 * \return true, if the enqueue operation is successful.
407 * \note Thread-safe and non-blocking
409 bool enqueue(T
const & t
)
411 return detail::ringbuffer_base
<T
>::enqueue(t
, array_
.get(), max_size_
);
414 /** Dequeue object from ringbuffer.
416 * If dequeue operation is successful, object is written to memory location denoted by ret.
418 * \return true, if the dequeue operation is successful, false if ringbuffer was empty.
420 * \note Thread-safe and non-blocking
422 bool dequeue(T
& ret
)
424 return detail::ringbuffer_base
<T
>::dequeue(ret
, array_
.get(), max_size_
);
427 /** Enqueues size objects from the array t to the ringbuffer.
429 * Will enqueue as many objects as there is space available
431 * \Returns number of enqueued items
433 * \note Thread-safe and non-blocking
435 size_t enqueue(T
const * t
, size_t size
)
437 return detail::ringbuffer_base
<T
>::enqueue(t
, size
, array_
.get(), max_size_
);
440 /** Enqueues all objects from the array t to the ringbuffer.
442 * Will enqueue as many objects as there is space available
444 * \Returns number of enqueued items
446 * \note Thread-safe and non-blocking
448 template <size_t size
>
449 size_t enqueue(T
const (&t
)[size
])
451 return enqueue(t
, size
);
454 /** Enqueues size objects from the iterator range [begin, end[ to the ringbuffer.
456 * Enqueueing may fail, if the ringbuffer is full.
458 * \return iterator to the first element, which has not been enqueued
460 * \note Thread-safe and non-blocking
462 template <typename ConstIterator
>
463 ConstIterator
enqueue(ConstIterator begin
, ConstIterator end
)
465 return detail::ringbuffer_base
<T
>::enqueue(begin
, end
, array_
.get(), max_size_
);
468 /** Dequeue a maximum of size objects from ringbuffer.
470 * If dequeue operation is successful, object is written to memory location denoted by ret.
472 * \return number of dequeued items
474 * \note Thread-safe and non-blocking
476 size_t dequeue(T
* ret
, size_t size
)
478 return detail::ringbuffer_base
<T
>::dequeue(ret
, size
, array_
.get(), max_size_
);
481 /** Dequeue objects from ringbuffer.
483 * If dequeue operation is successful, object is written to memory location denoted by ret.
485 * \return number of dequeued items
487 * \note Thread-safe and non-blocking
489 template <size_t size
>
490 size_t dequeue(T (&t
)[size
])
492 return dequeue(t
, size
);
495 /** Dequeue objects to the output iterator it
497 * \return number of dequeued items
499 * \note Thread-safe and non-blocking
501 template <typename OutputIterator
>
502 size_t dequeue(OutputIterator it
)
504 return detail::ringbuffer_base
<T
>::dequeue(it
, array_
.get(), max_size_
);
509 } /* namespace lockfree */
510 } /* namespace boost */
513 #endif /* BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED */