From 4e6dc203f56f4ecede75a2ede093ea74d6d8d0ec Mon Sep 17 00:00:00 2001 From: Tim Blechmann Date: Sat, 12 Dec 2009 13:22:30 +0100 Subject: [PATCH] lockfree: add lockfree spsc ringbuffer Signed-off-by: Tim Blechmann --- boost/lockfree/ringbuffer.hpp | 142 +++++++++++++++++++++++++++++++++ libs/lockfree/test/ringbuffer_test.cpp | 142 +++++++++++++++++++++++++++++++++ 2 files changed, 284 insertions(+) create mode 100644 boost/lockfree/ringbuffer.hpp create mode 100644 libs/lockfree/test/ringbuffer_test.cpp diff --git a/boost/lockfree/ringbuffer.hpp b/boost/lockfree/ringbuffer.hpp new file mode 100644 index 0000000..139f798 --- /dev/null +++ b/boost/lockfree/ringbuffer.hpp @@ -0,0 +1,142 @@ +// lock-free single-producer/single-consumer ringbuffer +// this algorithm is implemented in various projects (jack, portaudio, supercollider) +// +// implementation for c++ +// +// Copyright (C) 2009 Tim Blechmann +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// Disclaimer: Not a Boost library. + +#ifndef BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED +#define BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED + +#include +#include +#include +#include + +#include "detail/branch_hints.hpp" +#include "detail/prefix.hpp" + +namespace boost +{ +namespace lockfree +{ + +namespace detail +{ + +template +class ringbuffer_internal: + boost::noncopyable +{ + static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t); + atomic write_index_; + char padding1[padding_size]; /* force read_index and write_index to different cache lines */ + atomic read_index_; + +protected: + ringbuffer_internal(void): + write_index_(0), read_index_(0) + {} + + static size_t next_index(size_t arg, size_t max_size) + { + size_t ret = arg + 1; + while (unlikely(ret >= max_size)) + ret -= max_size; + return ret; + } + + bool enqueue(T const & t, T * buffer, size_t max_size) + { + size_t next = next_index(write_index_.load(memory_order_acquire), max_size); + + if (next == read_index_.load(memory_order_acquire)) + return false; /* ringbuffer is full */ + + buffer[next] = t; + + write_index_.store(next, memory_order_release); + + return true; + } + + bool dequeue (T * ret, T * buffer, size_t max_size) + { + if (empty()) + return false; + + size_t next = next_index(read_index_.load(memory_order_acquire), max_size); + *ret = buffer[next]; + read_index_.store(next, memory_order_release); + return true; + } + +public: + void reset(void) + { + write_index_.store(0, memory_order_relaxed); + read_index_.store(0, memory_order_release); + } + + bool empty(void) + { + return write_index_.load(memory_order_relaxed) == read_index_.load(memory_order_relaxed); + } + +}; + +} /* namespace detail */ + +template +class ringbuffer: + public detail::ringbuffer_internal +{ + boost::array array_; + +public: + bool enqueue(T const & t) + { + return detail::ringbuffer_internal::enqueue(t, array_.c_array(), max_size); + } + + bool dequeue(T * ret) + { + return detail::ringbuffer_internal::dequeue(ret, array_.c_array(), max_size); + } +}; + +template +class ringbuffer: + public detail::ringbuffer_internal +{ + size_t max_size_; + scoped_array array_; + +public: + ringbuffer(size_t max_size): + max_size_(max_size), array_(new T[max_size]) + {} + + bool enqueue(T const & t) + { + return detail::ringbuffer_internal::enqueue(t, array_.get(), max_size_); + } + + bool dequeue(T * ret) + { + return detail::ringbuffer_internal::dequeue(ret, array_.get(), max_size_); + } +}; + + +} /* namespace lockfree */ +} /* namespace boost */ + + +#endif /* BOOST_LOCKFREE_RINGBUFFER_HPP_INCLUDED */ diff --git a/libs/lockfree/test/ringbuffer_test.cpp b/libs/lockfree/test/ringbuffer_test.cpp new file mode 100644 index 0000000..df2a915 --- /dev/null +++ b/libs/lockfree/test/ringbuffer_test.cpp @@ -0,0 +1,142 @@ +#include + +#include +#define BOOST_TEST_MODULE lockfree_tests +#include + + +#include +#include +#include + + +#include "test_helpers.hpp" + +using namespace boost; +using namespace boost::lockfree; +using namespace std; + + +BOOST_AUTO_TEST_CASE( simple_ringbuffer_test ) +{ + ringbuffer f; + + BOOST_REQUIRE(f.empty()); + f.enqueue(1); + f.enqueue(2); + + int i1(0), i2(0); + + BOOST_REQUIRE(f.dequeue(&i1)); + BOOST_REQUIRE_EQUAL(i1, 1); + + BOOST_REQUIRE(f.dequeue(&i2)); + BOOST_REQUIRE_EQUAL(i2, 2); + BOOST_REQUIRE(f.empty()); +} + + +struct ringbuffer_tester +{ + ringbuffer sf; + + atomic ringbuffer_cnt, received_nodes; + + static_hashed_set working_set; + + static const uint nodes_per_thread = 20000000; + + static const int reader_threads = 1; + static const int writer_threads = 1; + + ringbuffer_tester(void): + ringbuffer_cnt(0), received_nodes(0) + {} + + void add(void) + { + for (uint i = 0; i != nodes_per_thread; ++i) + { + while(ringbuffer_cnt > 10000) + thread::yield(); + + int id = generate_id(); + + working_set.insert(id); + + while (sf.enqueue(id) == false) + { + thread::yield(); + } + + ++ringbuffer_cnt; + } + } + + bool get_element(void) + { + int data; + + bool success = sf.dequeue(&data); + + if (success) + { + ++received_nodes; + --ringbuffer_cnt; + bool erased = working_set.erase(data); + assert(erased); + return true; + } + else + return false; + } + + volatile bool running; + + void get(void) + { + for(;;) + { + bool success = get_element(); + if (not running and not success) + return; + if (not success) + thread::yield(); + } + } + + void run(void) + { + running = true; + + thread_group writer; + thread_group reader; + + BOOST_REQUIRE(sf.empty()); + for (int i = 0; i != reader_threads; ++i) + reader.create_thread(boost::bind(&ringbuffer_tester::get, this)); + + for (int i = 0; i != writer_threads; ++i) + writer.create_thread(boost::bind(&ringbuffer_tester::add, this)); + cout << "reader and writer threads created" << endl; + + writer.join_all(); + cout << "writer threads joined. waiting for readers to finish" << endl; + + running = false; + reader.join_all(); + + BOOST_REQUIRE_EQUAL(received_nodes, writer_threads * nodes_per_thread); + BOOST_REQUIRE_EQUAL(ringbuffer_cnt, 0); + BOOST_REQUIRE(sf.empty()); + BOOST_REQUIRE(working_set.count_nodes() == 0); + } +}; + + + +BOOST_AUTO_TEST_CASE( ringbuffer_test_caching ) +{ + ringbuffer_tester test1; + test1.run(); +} \ No newline at end of file -- 2.11.4.GIT