From 949ec8ba5dbaff126c220f22da80e93902d41b42 Mon Sep 17 00:00:00 2001 From: constroy Date: Mon, 30 Jul 2018 19:36:27 +0800 Subject: [PATCH 01/10] implement bounded MPMC queue and add doc for it --- CMakeLists.txt | 1 + doc/source/bounded_mpmc_queue.rst | 64 +++++++++++++ include/clue/bounded_mpmc_queue.hpp | 133 ++++++++++++++++++++++++++++ tests/test_bounded_mpmc_queue.cpp | 106 ++++++++++++++++++++++ 4 files changed, 304 insertions(+) create mode 100644 doc/source/bounded_mpmc_queue.rst create mode 100644 include/clue/bounded_mpmc_queue.hpp create mode 100644 tests/test_bounded_mpmc_queue.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 2fc1315..497c1e9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -94,6 +94,7 @@ set(THREADING_TESTS test_shared_mutex test_concurrent_counter test_concurrent_queue + test_bounded_mpmc_queue test_thread_pool ) diff --git a/doc/source/bounded_mpmc_queue.rst b/doc/source/bounded_mpmc_queue.rst new file mode 100644 index 0000000..8fd5a41 --- /dev/null +++ b/doc/source/bounded_mpmc_queue.rst @@ -0,0 +1,64 @@ +Bounded MPMC Queue +================= + +MPMC queue is very useful in concurrent programming, where MPMC stands for +Multi-Producer-Multi-Consumer. For example, task queue can be considered as +a special kind of MPMC queue. *CLUE* implements +a MPMC queue class, in header file ````. + +.. cpp:class:: template bounded_mpmc_queue + + MPMC queue class. ``T`` is the element type. + +This class has a default constructor, but it is not copyable or movable. The +class provides the following member functions: + +.. cpp:function:: size_t capacity() const + + Get the max number of elements the queue can contain. + +.. cpp:function:: bool empty() const + + Get whether the queue is empty (contains no elements). + +.. cpp:function:: bool full() const + + Get whether the queue is full (has no space left). + +.. cpp:function:: void push(const T& x) + + Spin till the queue is non-full, and push an element ``x`` to the back + of the queue. + +.. cpp:function:: bool try_push(T&& x) + + If the queue is not full, push an element ``x`` to the back of the queue + and return ``true``. Otherwise, return ``false`` immediately. + +.. cpp:function:: void emplace(Args&&... args) + + If the queue is not full, construct an element using the given arguments + and push it to the back of the queue. + +.. cpp:function:: bool try_emplace(Args&&... args) + + Spin till the queue is non-full, construct an element using the given + arguments, push it to the back of the queue and return ``true``. + Otherwise, return ``false`` immediately. + +.. cpp:function:: void pop(T& dst) + + Spin until the queue is non-empty, and pop the element at the front and + store it to ``dst``. + +.. cpp:function:: bool try_pop(T& dst) + + If the queue is not empty, pop the element at the front, store it to + ``dst``, and return ``true``. Otherwise, return ``false`` immediately. + + +.. note:: + + All updating methods, including ``push``, ``emplace``, ``pop``, + ``try_push``, ``try_emplace`` and ``try_pop``, are thread-safe. + It is safe to call these methods in concurrent threads. diff --git a/include/clue/bounded_mpmc_queue.hpp b/include/clue/bounded_mpmc_queue.hpp new file mode 100644 index 0000000..bd0e428 --- /dev/null +++ b/include/clue/bounded_mpmc_queue.hpp @@ -0,0 +1,133 @@ +#ifndef CLUE_MPMC_QUEUE__ +#define CLUE_MPMC_QUEUE__ + +#include +#include +#include + +namespace clue { + +template +class bounded_mpmc_queue final { +private: + struct Slot { + std::atomic_size_t sequence; + T data; + }; + + static constexpr size_t kCacheLineSize = 128; + + const size_t capacity_; + std::atomic_size_t head_, tail_; + Slot* slots_; + + size_t index(size_t x) const noexcept { + return x % capacity_; + } + +public: + bounded_mpmc_queue(size_t capacity) : capacity_(capacity), head_(0), tail_(0) { + if (capacity < 1) { + throw std::invalid_argument("capacity must be greater than 0"); + } + slots_ = static_cast(aligned_alloc(capacity_ * sizeof(Slot), kCacheLineSize)); + if (!slots_) { + throw std::bad_alloc(); + } + for (size_t i = 0; i < capacity_; ++i) { + new (slots_ + i) Slot(); + slots_[i].sequence = i; + } + } + + ~bounded_mpmc_queue() { + aligned_free(slots_); + } + + bool empty() const noexcept { + return head_ == tail_; + } + + bool full() const noexcept { + return head_ == tail_ + capacity_; + } + + size_t capacity() const noexcept { + return capacity_; + } + + template + void emplace(Args&&... args) noexcept { + for (size_t head = head_++; ;) { + auto& slot = slots_[index(head)]; + if (slot.sequence == head) { + slot.data = T(std::forward(args)...); + slot.sequence = head + 1; + break; + } + } + } + + template + bool try_emplace(Args&&... args) noexcept { + for (size_t head = head_; ;) { + auto& slot = slots_[index(head)]; + size_t seq = slot.sequence; + if (seq < head) { + return false; + } else if (seq > head) { + head = head_; + } else { + if (head_.compare_exchange_weak(head, head + 1)) { + slot.data = T(std::forward(args)...); + slot.sequence = head + 1; + return true; + } + } + } + return false; + } + + void push(const T& data) noexcept { + emplace(data); + } + + bool try_push(const T& data) noexcept { + return try_emplace(data); + } + + void pop(T& data) noexcept { + for (size_t tail = tail_++; ;) { + auto& slot = slots_[index(tail)]; + if (slot.sequence == tail + 1) { + data = std::move(slot.data); + slot.sequence = tail + capacity_; + break; + } + } + } + + bool try_pop(T& data) noexcept { + for (size_t tail = tail_; ;) { + auto& slot = slots_[index(tail)]; + size_t seq = slot.sequence; + if (seq < tail + 1) { + return false; + } else if (seq > tail + 1){ + tail = tail_; + } else { + if (tail_.compare_exchange_weak(tail, tail + 1)) { + data = std::move(slot.data); + slot.sequence = tail + capacity_; + return true; + } + } + } + return false; + } +}; + + +} // end namespace clue + +#endif // CLUE_MPMC_QUEUE__ diff --git a/tests/test_bounded_mpmc_queue.cpp b/tests/test_bounded_mpmc_queue.cpp new file mode 100644 index 0000000..013d834 --- /dev/null +++ b/tests/test_bounded_mpmc_queue.cpp @@ -0,0 +1,106 @@ +#include +#include +#include +#include + +void test_push_then_pop(size_t nt) { + std::printf("testing push_then_pop ...\n"); + assert(nt > 0); + + int N = 10000; + clue::bounded_mpmc_queue Q(nt * N); + + assert(Q.capacity() == nt * N); + assert(Q.empty()); + + std::vector producers; + for (size_t t = 0; t < nt; ++t) { + producers.emplace_back([&Q,N](){ + for (int i = 0; i < N; ++i) { + assert(Q.try_push(i + 1)); + } + }); + } + + for (size_t t = 0; t < nt; ++t) { + producers.at(t).join(); + } + + assert(!Q.empty()); + assert(Q.full()); + + std::vector consumers; + std::vector sums(nt, 0); + for (size_t t = 0; t < nt; ++t) { + int& s = sums[t]; + consumers.emplace_back([&Q,&s]{ + int v = 0; + while (Q.try_pop(v)) { + s += v; + } + }); + } + + int total = 0; + for (size_t t = 0; t < nt; ++t) { + consumers.at(t).join(); + total += sums.at(t); + } + + int expect_total = nt * (N * (N + 1) / 2); + assert(total == expect_total); +} + +void test_push_and_pop(size_t nt) { + std::printf("push_and_pop with %lu threads ...\n", nt); + + assert(nt > 0); + + clue::bounded_mpmc_queue Q(100); + int N = 100; + + std::vector producers; + for (size_t t = 0; t < nt; ++t) { + producers.emplace_back([&Q,N](){ + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + for (int i = 0; i < N; ++i) { + Q.push(i + 1); + } + }); + } + + std::vector consumers; + std::vector sums(nt, 0); + for (size_t t = 0; t < nt; ++t) { + int& s = sums[t]; + consumers.emplace_back([&Q,N,&s]{ + for (int i = 0; i < N; ++i) { + int v; + Q.pop(v); + s += v; + } + }); + } + + for (size_t t = 0; t < nt; ++t) { + producers.at(t).join(); + } + + int total = 0; + for (size_t t = 0; t < nt; ++t) { + consumers.at(t).join(); + total += sums.at(t); + } + + assert(Q.empty()); + + int expect_total = nt * (N * (N + 1) / 2); + assert(total == expect_total); +} + +int main() { + size_t nt = 4; + test_push_then_pop(nt); + test_push_and_pop(nt); + return 0; +} From d9a2882ea92fb8b647636ea483f732a1a7ec8086 Mon Sep 17 00:00:00 2001 From: constroy Date: Tue, 31 Jul 2018 19:17:12 +0800 Subject: [PATCH 02/10] refine the memory order of atomic operations to gain performance --- include/clue/bounded_mpmc_queue.hpp | 42 ++++++++++++++++------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/include/clue/bounded_mpmc_queue.hpp b/include/clue/bounded_mpmc_queue.hpp index bd0e428..b016282 100644 --- a/include/clue/bounded_mpmc_queue.hpp +++ b/include/clue/bounded_mpmc_queue.hpp @@ -36,7 +36,7 @@ class bounded_mpmc_queue final { } for (size_t i = 0; i < capacity_; ++i) { new (slots_ + i) Slot(); - slots_[i].sequence = i; + slots_[i].sequence.store(i, std::memory_order_relaxed); } } @@ -45,11 +45,13 @@ class bounded_mpmc_queue final { } bool empty() const noexcept { - return head_ == tail_; + return head_.load(std::memory_order_relaxed) == + tail_.load(std::memory_order_relaxed); } bool full() const noexcept { - return head_ == tail_ + capacity_; + return head_.load(std::memory_order_relaxed) == + tail_.load(std::memory_order_relaxed) + capacity_; } size_t capacity() const noexcept { @@ -58,11 +60,11 @@ class bounded_mpmc_queue final { template void emplace(Args&&... args) noexcept { - for (size_t head = head_++; ;) { + for (size_t head = head_.fetch_add(1, std::memory_order_relaxed); ;) { auto& slot = slots_[index(head)]; - if (slot.sequence == head) { + if (slot.sequence.load(std::memory_order_acquire) == head) { slot.data = T(std::forward(args)...); - slot.sequence = head + 1; + slot.sequence.store(head + 1, std::memory_order_release); break; } } @@ -70,17 +72,18 @@ class bounded_mpmc_queue final { template bool try_emplace(Args&&... args) noexcept { - for (size_t head = head_; ;) { + for (size_t head = head_.load(std::memory_order_relaxed); ;) { auto& slot = slots_[index(head)]; - size_t seq = slot.sequence; + size_t seq = slot.sequence.load(std::memory_order_acquire); if (seq < head) { return false; } else if (seq > head) { - head = head_; + head = head_.load(std::memory_order_relaxed); } else { - if (head_.compare_exchange_weak(head, head + 1)) { + if (head_.compare_exchange_weak(head, head + 1, + std::memory_order_relaxed)) { slot.data = T(std::forward(args)...); - slot.sequence = head + 1; + slot.sequence.store(head + 1, std::memory_order_release); return true; } } @@ -97,28 +100,29 @@ class bounded_mpmc_queue final { } void pop(T& data) noexcept { - for (size_t tail = tail_++; ;) { + for (size_t tail = tail_.fetch_add(1, std::memory_order_relaxed); ;) { auto& slot = slots_[index(tail)]; - if (slot.sequence == tail + 1) { + if (slot.sequence.load(std::memory_order_acquire) == tail + 1) { data = std::move(slot.data); - slot.sequence = tail + capacity_; + slot.sequence.store(tail + capacity_, std::memory_order_release); break; } } } bool try_pop(T& data) noexcept { - for (size_t tail = tail_; ;) { + for (size_t tail = tail_.load(std::memory_order_relaxed); ;) { auto& slot = slots_[index(tail)]; - size_t seq = slot.sequence; + size_t seq = slot.sequence.load(std::memory_order_acquire); if (seq < tail + 1) { return false; } else if (seq > tail + 1){ - tail = tail_; + tail = tail_.load(std::memory_order_relaxed); } else { - if (tail_.compare_exchange_weak(tail, tail + 1)) { + if (tail_.compare_exchange_weak(tail, tail + 1, + std::memory_order_relaxed)) { data = std::move(slot.data); - slot.sequence = tail + capacity_; + slot.sequence.store(tail + capacity_, std::memory_order_release); return true; } } From c8c01c5291f134593345aafb24ea427e92825466 Mon Sep 17 00:00:00 2001 From: constroy Date: Tue, 31 Jul 2018 20:20:03 +0800 Subject: [PATCH 03/10] align memory to eliminate false sharing --- include/clue/bounded_mpmc_queue.hpp | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/include/clue/bounded_mpmc_queue.hpp b/include/clue/bounded_mpmc_queue.hpp index b016282..08c1fd9 100644 --- a/include/clue/bounded_mpmc_queue.hpp +++ b/include/clue/bounded_mpmc_queue.hpp @@ -10,15 +10,16 @@ namespace clue { template class bounded_mpmc_queue final { private: + static constexpr size_t kCacheLineSize = 128; + struct Slot { - std::atomic_size_t sequence; + alignas(kCacheLineSize) std::atomic_size_t sequence; T data; }; - static constexpr size_t kCacheLineSize = 128; - const size_t capacity_; - std::atomic_size_t head_, tail_; + alignas(kCacheLineSize) std::atomic_size_t head_; + alignas(kCacheLineSize) std::atomic_size_t tail_; Slot* slots_; size_t index(size_t x) const noexcept { @@ -26,7 +27,10 @@ class bounded_mpmc_queue final { } public: - bounded_mpmc_queue(size_t capacity) : capacity_(capacity), head_(0), tail_(0) { + explicit bounded_mpmc_queue(size_t capacity) + : capacity_(capacity) + , head_(0) + , tail_(0) { if (capacity < 1) { throw std::invalid_argument("capacity must be greater than 0"); } From 9cfc4aa5573de62a619a77b3c52e77cf3a5dd80f Mon Sep 17 00:00:00 2001 From: constroy Date: Tue, 31 Jul 2018 21:04:53 +0800 Subject: [PATCH 04/10] implement non-copy emplace, add strict type check --- include/clue/bounded_mpmc_queue.hpp | 63 ++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/include/clue/bounded_mpmc_queue.hpp b/include/clue/bounded_mpmc_queue.hpp index 08c1fd9..28fb3b7 100644 --- a/include/clue/bounded_mpmc_queue.hpp +++ b/include/clue/bounded_mpmc_queue.hpp @@ -4,17 +4,42 @@ #include #include #include +#include namespace clue { template class bounded_mpmc_queue final { +static_assert(std::is_nothrow_copy_assignable::value || + std::is_nothrow_move_assignable::value, + "T must be nothrow copy or move assignable"); + +static_assert(std::is_nothrow_destructible::value, + "T must be nothrow destructible"); + private: static constexpr size_t kCacheLineSize = 128; struct Slot { alignas(kCacheLineSize) std::atomic_size_t sequence; - T data; + typename std::aligned_storage::type storage; + + T&& move() noexcept { + return reinterpret_cast(storage); + } + + template + void construct(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible::value, + "T must be nothrow constructible with Args&&..."); + new (&storage) T(std::forward(args)...); + } + + void destroy() noexcept { + static_assert(std::is_nothrow_destructible::value, + "T must be nothrow destructible"); + reinterpret_cast(storage).~T(); + } }; const size_t capacity_; @@ -45,6 +70,12 @@ class bounded_mpmc_queue final { } ~bounded_mpmc_queue() { + for (size_t i = 0; i < capacity_; ++i) { + if (index(slots_[i].sequence.load(std::memory_order_relaxed)) != i) { + slots_[i].destroy(); + } + slots_[i].~Slot(); + } aligned_free(slots_); } @@ -64,10 +95,12 @@ class bounded_mpmc_queue final { template void emplace(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible::value, + "T must be nothrow constructible with Args&&..."); for (size_t head = head_.fetch_add(1, std::memory_order_relaxed); ;) { auto& slot = slots_[index(head)]; if (slot.sequence.load(std::memory_order_acquire) == head) { - slot.data = T(std::forward(args)...); + slot.construct(std::forward(args)...); slot.sequence.store(head + 1, std::memory_order_release); break; } @@ -76,6 +109,8 @@ class bounded_mpmc_queue final { template bool try_emplace(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible::value, + "T must be nothrow constructible with Args&&..."); for (size_t head = head_.load(std::memory_order_relaxed); ;) { auto& slot = slots_[index(head)]; size_t seq = slot.sequence.load(std::memory_order_acquire); @@ -86,7 +121,7 @@ class bounded_mpmc_queue final { } else { if (head_.compare_exchange_weak(head, head + 1, std::memory_order_relaxed)) { - slot.data = T(std::forward(args)...); + slot.construct(std::forward(args)...); slot.sequence.store(head + 1, std::memory_order_release); return true; } @@ -96,18 +131,35 @@ class bounded_mpmc_queue final { } void push(const T& data) noexcept { + static_assert(std::is_nothrow_copy_constructible::value, + "T must be nothrow copy constructible"); emplace(data); } + void push(T&& data) noexcept { + static_assert(std::is_nothrow_move_constructible::value, + "T must be nothrow move constructible"); + emplace(std::move(data)); + } + bool try_push(const T& data) noexcept { + static_assert(std::is_nothrow_copy_constructible::value, + "T must be nothrow copy constructible"); return try_emplace(data); } + bool try_push(T&& data) noexcept { + static_assert(std::is_nothrow_move_constructible::value, + "T must be nothrow move constructible"); + return try_emplace(std::move(data)); + } + void pop(T& data) noexcept { for (size_t tail = tail_.fetch_add(1, std::memory_order_relaxed); ;) { auto& slot = slots_[index(tail)]; if (slot.sequence.load(std::memory_order_acquire) == tail + 1) { - data = std::move(slot.data); + data = slot.move(); + slot.destroy(); slot.sequence.store(tail + capacity_, std::memory_order_release); break; } @@ -125,7 +177,8 @@ class bounded_mpmc_queue final { } else { if (tail_.compare_exchange_weak(tail, tail + 1, std::memory_order_relaxed)) { - data = std::move(slot.data); + data = slot.move(); + slot.destroy(); slot.sequence.store(tail + capacity_, std::memory_order_release); return true; } From 4ebf32a91d43812247c6da28defc9bf516f72db2 Mon Sep 17 00:00:00 2001 From: constroy Date: Tue, 31 Jul 2018 22:15:53 +0800 Subject: [PATCH 05/10] add move constructor, move assignment and swap --- include/clue/bounded_mpmc_queue.hpp | 32 +++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/include/clue/bounded_mpmc_queue.hpp b/include/clue/bounded_mpmc_queue.hpp index 28fb3b7..fae3588 100644 --- a/include/clue/bounded_mpmc_queue.hpp +++ b/include/clue/bounded_mpmc_queue.hpp @@ -42,11 +42,13 @@ static_assert(std::is_nothrow_destructible::value, } }; - const size_t capacity_; + size_t capacity_; + Slot* slots_; + alignas(kCacheLineSize) std::atomic_size_t head_; alignas(kCacheLineSize) std::atomic_size_t tail_; - Slot* slots_; +private: size_t index(size_t x) const noexcept { return x % capacity_; } @@ -69,6 +71,28 @@ static_assert(std::is_nothrow_destructible::value, } } + bounded_mpmc_queue(const bounded_mpmc_queue&) = delete; + + bounded_mpmc_queue(bounded_mpmc_queue&& rhs) noexcept + : capacity_(rhs.capacity_) + , slots_(rhs.slots_) + , head_(rhs.head_.load(std::memory_order_relaxed)) + , tail_(rhs.tail_.load(std::memory_order_relaxed)) {} + + bounded_mpmc_queue&& operator =(const bounded_mpmc_queue&) = delete; + + bounded_mpmc_queue&& operator =(bounded_mpmc_queue&& rhs) noexcept { + rhs.swap(*this); + return *this; + } + + void swap(bounded_mpmc_queue& other) noexcept { + std::swap(capacity_, other.capacity_); + std::swap(slots_, other.slots_); + head_.exchange(other.head_, std::memory_order_relaxed); + tail_.exchange(other.tail_, std::memory_order_relaxed); + } + ~bounded_mpmc_queue() { for (size_t i = 0; i < capacity_; ++i) { if (index(slots_[i].sequence.load(std::memory_order_relaxed)) != i) { @@ -188,6 +212,10 @@ static_assert(std::is_nothrow_destructible::value, } }; +template +inline void swap(bounded_mpmc_queue& lhs, bounded_mpmc_queue& rhs) noexcept { + lhs.swap(rhs); +} } // end namespace clue From 8c814c00e67c3940f7bc0e38b1b92a2c482eeca9 Mon Sep 17 00:00:00 2001 From: constroy Date: Wed, 1 Aug 2018 15:42:20 +0800 Subject: [PATCH 06/10] add more function tests and fix some bugs --- CMakeLists.txt | 3 +- include/clue/bounded_mpmc_queue.hpp | 25 ++- tests/test_bounded_mpmc_queue.cpp | 180 +++++++++----------- tests/test_bounded_mpmc_queue_threading.cpp | 106 ++++++++++++ 4 files changed, 206 insertions(+), 108 deletions(-) create mode 100644 tests/test_bounded_mpmc_queue_threading.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 497c1e9..fe55fef 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -80,6 +80,7 @@ set(UNIT_TESTS test_meta test_meta_seq test_textio + test_bounded_mpmc_queue test_include_all ) @@ -94,7 +95,7 @@ set(THREADING_TESTS test_shared_mutex test_concurrent_counter test_concurrent_queue - test_bounded_mpmc_queue + test_bounded_mpmc_queue_threading test_thread_pool ) diff --git a/include/clue/bounded_mpmc_queue.hpp b/include/clue/bounded_mpmc_queue.hpp index fae3588..67f3f12 100644 --- a/include/clue/bounded_mpmc_queue.hpp +++ b/include/clue/bounded_mpmc_queue.hpp @@ -73,15 +73,20 @@ static_assert(std::is_nothrow_destructible::value, bounded_mpmc_queue(const bounded_mpmc_queue&) = delete; - bounded_mpmc_queue(bounded_mpmc_queue&& rhs) noexcept - : capacity_(rhs.capacity_) - , slots_(rhs.slots_) - , head_(rhs.head_.load(std::memory_order_relaxed)) - , tail_(rhs.tail_.load(std::memory_order_relaxed)) {} + bounded_mpmc_queue(bounded_mpmc_queue&& other) noexcept + : capacity_(other.capacity_) + , slots_(other.slots_) + , head_(other.head_.load(std::memory_order_relaxed)) + , tail_(other.tail_.load(std::memory_order_relaxed)) { + other.capacity_ = 0; + other.slots_ = nullptr; + other.head_.store(0, std::memory_order_relaxed); + other.tail_.store(0, std::memory_order_relaxed); + } - bounded_mpmc_queue&& operator =(const bounded_mpmc_queue&) = delete; + bounded_mpmc_queue& operator =(const bounded_mpmc_queue&) = delete; - bounded_mpmc_queue&& operator =(bounded_mpmc_queue&& rhs) noexcept { + bounded_mpmc_queue& operator =(bounded_mpmc_queue&& rhs) noexcept { rhs.swap(*this); return *this; } @@ -89,8 +94,10 @@ static_assert(std::is_nothrow_destructible::value, void swap(bounded_mpmc_queue& other) noexcept { std::swap(capacity_, other.capacity_); std::swap(slots_, other.slots_); - head_.exchange(other.head_, std::memory_order_relaxed); - tail_.exchange(other.tail_, std::memory_order_relaxed); + other.head_.store(head_.exchange(other.head_, std::memory_order_relaxed), + std::memory_order_relaxed); + other.tail_.store(tail_.exchange(other.tail_, std::memory_order_relaxed), + std::memory_order_relaxed); } ~bounded_mpmc_queue() { diff --git a/tests/test_bounded_mpmc_queue.cpp b/tests/test_bounded_mpmc_queue.cpp index 013d834..a5bd413 100644 --- a/tests/test_bounded_mpmc_queue.cpp +++ b/tests/test_bounded_mpmc_queue.cpp @@ -1,106 +1,90 @@ +#define private public #include -#include -#include -#include - -void test_push_then_pop(size_t nt) { - std::printf("testing push_then_pop ...\n"); - assert(nt > 0); - - int N = 10000; - clue::bounded_mpmc_queue Q(nt * N); - - assert(Q.capacity() == nt * N); - assert(Q.empty()); - - std::vector producers; - for (size_t t = 0; t < nt; ++t) { - producers.emplace_back([&Q,N](){ - for (int i = 0; i < N; ++i) { - assert(Q.try_push(i + 1)); - } - }); - } - - for (size_t t = 0; t < nt; ++t) { - producers.at(t).join(); - } - - assert(!Q.empty()); - assert(Q.full()); - - std::vector consumers; - std::vector sums(nt, 0); - for (size_t t = 0; t < nt; ++t) { - int& s = sums[t]; - consumers.emplace_back([&Q,&s]{ - int v = 0; - while (Q.try_pop(v)) { - s += v; - } - }); - } - - int total = 0; - for (size_t t = 0; t < nt; ++t) { - consumers.at(t).join(); - total += sums.at(t); - } - - int expect_total = nt * (N * (N + 1) / 2); - assert(total == expect_total); +#undef private + +#include +#include + +using clue::bounded_mpmc_queue; + +TEST(BoundedMpmcQueue, Construct) { + using int_queue = bounded_mpmc_queue; + + EXPECT_THROW(int_queue(0), std::invalid_argument); + int_queue que1(64); + EXPECT_EQ(que1.capacity(), 64); + EXPECT_TRUE(que1.empty()); + int_queue que2(std::move(que1)); + EXPECT_EQ(que2.capacity(), 64); + EXPECT_TRUE(que2.empty()); + int_queue que3(128); + que3 = std::move(que2); + EXPECT_EQ(que3.capacity(), 64); + EXPECT_TRUE(que3.empty()); + int_queue que4(128); + clue::swap(que3, que4); + EXPECT_EQ(que3.capacity(), 128); + EXPECT_EQ(que4.capacity(), 64); } -void test_push_and_pop(size_t nt) { - std::printf("push_and_pop with %lu threads ...\n", nt); - - assert(nt > 0); - - clue::bounded_mpmc_queue Q(100); - int N = 100; - - std::vector producers; - for (size_t t = 0; t < nt; ++t) { - producers.emplace_back([&Q,N](){ - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - for (int i = 0; i < N; ++i) { - Q.push(i + 1); - } - }); - } +struct Item { + int x, y; + Item(int x, int y) noexcept : x(x), y(y) {} +}; + +TEST(BoundedMpmcQueue, Emplace) { + using item_queue = bounded_mpmc_queue; + item_queue que(64); + EXPECT_TRUE(que.empty()); + int x = 0, y = 0; + que.emplace(x++, y--); + while (que.try_emplace(x++, y--)) continue; + EXPECT_TRUE(que.full()); + EXPECT_FALSE(que.try_push(Item(x, y))); + + item_queue swap_que(128); + EXPECT_TRUE(swap_que.empty()); + swap_que.swap(que); + EXPECT_TRUE(que.empty()); + EXPECT_TRUE(swap_que.full()); +} - std::vector consumers; - std::vector sums(nt, 0); - for (size_t t = 0; t < nt; ++t) { - int& s = sums[t]; - consumers.emplace_back([&Q,N,&s]{ - for (int i = 0; i < N; ++i) { - int v; - Q.pop(v); - s += v; - } - }); - } +TEST(BoundedMpmcQueue, Push) { + using item_queue = bounded_mpmc_queue; + item_queue que(64); + EXPECT_TRUE(que.empty()); + int x = 0, y = 0; + que.push(Item(x++, y--)); + while (que.try_push(Item(x++, y--))) continue; + EXPECT_TRUE(que.full()); + EXPECT_FALSE(que.try_emplace(x, y)); + + item_queue swap_que(128); + EXPECT_TRUE(swap_que.empty()); + swap_que.swap(que); + EXPECT_TRUE(que.empty()); + EXPECT_TRUE(swap_que.full()); +} - for (size_t t = 0; t < nt; ++t) { - producers.at(t).join(); +TEST(BoundedMpmcQueue, Pop) { + using int_queue = bounded_mpmc_queue; + const size_t n = 64; + int_queue push_que(n); + EXPECT_TRUE(push_que.empty()); + for (size_t i = 0; i < n; ++i) { + push_que.push(i); } - - int total = 0; - for (size_t t = 0; t < nt; ++t) { - consumers.at(t).join(); - total += sums.at(t); + EXPECT_TRUE(push_que.full()); + EXPECT_FALSE(push_que.try_push(n)); + + int_queue pop_que(std::move(push_que)); + EXPECT_TRUE(push_que.full()); + for (size_t i = 0; i < n; ++i) { + int x; + pop_que.pop(x); + EXPECT_EQ(x, i); } - - assert(Q.empty()); - - int expect_total = nt * (N * (N + 1) / 2); - assert(total == expect_total); -} - -int main() { - size_t nt = 4; - test_push_then_pop(nt); - test_push_and_pop(nt); - return 0; + EXPECT_TRUE(pop_que.empty()); + int x; + EXPECT_FALSE(pop_que.try_pop(x)); } diff --git a/tests/test_bounded_mpmc_queue_threading.cpp b/tests/test_bounded_mpmc_queue_threading.cpp new file mode 100644 index 0000000..013d834 --- /dev/null +++ b/tests/test_bounded_mpmc_queue_threading.cpp @@ -0,0 +1,106 @@ +#include +#include +#include +#include + +void test_push_then_pop(size_t nt) { + std::printf("testing push_then_pop ...\n"); + assert(nt > 0); + + int N = 10000; + clue::bounded_mpmc_queue Q(nt * N); + + assert(Q.capacity() == nt * N); + assert(Q.empty()); + + std::vector producers; + for (size_t t = 0; t < nt; ++t) { + producers.emplace_back([&Q,N](){ + for (int i = 0; i < N; ++i) { + assert(Q.try_push(i + 1)); + } + }); + } + + for (size_t t = 0; t < nt; ++t) { + producers.at(t).join(); + } + + assert(!Q.empty()); + assert(Q.full()); + + std::vector consumers; + std::vector sums(nt, 0); + for (size_t t = 0; t < nt; ++t) { + int& s = sums[t]; + consumers.emplace_back([&Q,&s]{ + int v = 0; + while (Q.try_pop(v)) { + s += v; + } + }); + } + + int total = 0; + for (size_t t = 0; t < nt; ++t) { + consumers.at(t).join(); + total += sums.at(t); + } + + int expect_total = nt * (N * (N + 1) / 2); + assert(total == expect_total); +} + +void test_push_and_pop(size_t nt) { + std::printf("push_and_pop with %lu threads ...\n", nt); + + assert(nt > 0); + + clue::bounded_mpmc_queue Q(100); + int N = 100; + + std::vector producers; + for (size_t t = 0; t < nt; ++t) { + producers.emplace_back([&Q,N](){ + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + for (int i = 0; i < N; ++i) { + Q.push(i + 1); + } + }); + } + + std::vector consumers; + std::vector sums(nt, 0); + for (size_t t = 0; t < nt; ++t) { + int& s = sums[t]; + consumers.emplace_back([&Q,N,&s]{ + for (int i = 0; i < N; ++i) { + int v; + Q.pop(v); + s += v; + } + }); + } + + for (size_t t = 0; t < nt; ++t) { + producers.at(t).join(); + } + + int total = 0; + for (size_t t = 0; t < nt; ++t) { + consumers.at(t).join(); + total += sums.at(t); + } + + assert(Q.empty()); + + int expect_total = nt * (N * (N + 1) / 2); + assert(total == expect_total); +} + +int main() { + size_t nt = 4; + test_push_then_pop(nt); + test_push_and_pop(nt); + return 0; +} From d6f7b8780e05632bbcf8c83e41d85a94326057a5 Mon Sep 17 00:00:00 2001 From: constroy Date: Wed, 1 Aug 2018 15:59:48 +0800 Subject: [PATCH 07/10] update doc for bounded_mpmc_queue --- doc/source/bounded_mpmc_queue.rst | 34 ++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/doc/source/bounded_mpmc_queue.rst b/doc/source/bounded_mpmc_queue.rst index 8fd5a41..1bd76d2 100644 --- a/doc/source/bounded_mpmc_queue.rst +++ b/doc/source/bounded_mpmc_queue.rst @@ -10,8 +10,30 @@ a MPMC queue class, in header file ````. MPMC queue class. ``T`` is the element type. -This class has a default constructor, but it is not copyable or movable. The -class provides the following member functions: +This class has a move constructor, but it is not copyable. The class provides +the following member functions: + +Construction +------------- +.. cpp:function:: explicit bounded_mpmc_queue(size_t capacity) + Construct a ``bounded_mpmc_queue`` with capacity ``capacity``. + +.. cpp:function:: bounded_mpmc_queue(bounded_mpmc_queue&& other) + Move constructor. Constructs with the contents of ``other`` using + move semantics. After the move, ``other`` is guaranteed to be ``empty()``. + +.. cpp:function:: bounded_mpmc_queue(const bounded_mpmc_queue&) = delete + Copy constructor is disabled. + +.. cpp:function:: bounded_mpmc_queue(bounded_mpmc_queue&& other) + Move assignment operator. Replaces the contents with those of ``other`` + using move semantics. + +.. cpp:function:: bounded_mpmc_queue(const bounded_mpmc_queue&) = delete + Copy assignment operator is disabled. + +Capacity +--------- .. cpp:function:: size_t capacity() const @@ -25,11 +47,17 @@ class provides the following member functions: Get whether the queue is full (has no space left). + +Modifiers +--------- + .. cpp:function:: void push(const T& x) +.. cpp:function:: void push(T&& x) Spin till the queue is non-full, and push an element ``x`` to the back of the queue. +.. cpp:function:: bool try_push(const T& x) .. cpp:function:: bool try_push(T&& x) If the queue is not full, push an element ``x`` to the back of the queue @@ -59,6 +87,6 @@ class provides the following member functions: .. note:: - All updating methods, including ``push``, ``emplace``, ``pop``, + All modifiers, including ``push``, ``emplace``, ``pop``, ``try_push``, ``try_emplace`` and ``try_pop``, are thread-safe. It is safe to call these methods in concurrent threads. From f216ad83a264a882a0dbc82f8a0ab6df95f5606a Mon Sep 17 00:00:00 2001 From: constroy Date: Wed, 1 Aug 2018 16:35:51 +0800 Subject: [PATCH 08/10] minor fix --- tests/test_bounded_mpmc_queue.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_bounded_mpmc_queue.cpp b/tests/test_bounded_mpmc_queue.cpp index a5bd413..355568b 100644 --- a/tests/test_bounded_mpmc_queue.cpp +++ b/tests/test_bounded_mpmc_queue.cpp @@ -1,6 +1,4 @@ -#define private public #include -#undef private #include #include From 58fc472cc5f3c41b9cfa8784622613e230901652 Mon Sep 17 00:00:00 2001 From: constroy Date: Wed, 1 Aug 2018 20:50:03 +0800 Subject: [PATCH 09/10] add testcase for move semantics of element --- tests/test_bounded_mpmc_queue.cpp | 32 +++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/test_bounded_mpmc_queue.cpp b/tests/test_bounded_mpmc_queue.cpp index 355568b..eff2651 100644 --- a/tests/test_bounded_mpmc_queue.cpp +++ b/tests/test_bounded_mpmc_queue.cpp @@ -1,5 +1,6 @@ #include +#include #include #include @@ -86,3 +87,34 @@ TEST(BoundedMpmcQueue, Pop) { int x; EXPECT_FALSE(pop_que.try_pop(x)); } + +struct MovableItem { + int x; + explicit MovableItem(int x) noexcept : x(x) {} + MovableItem(const MovableItem&) = delete; + MovableItem(MovableItem&& other) noexcept : x(other.x) {} + + MovableItem& operator =(const MovableItem&) = delete; + MovableItem& operator =(MovableItem&& rhs) noexcept { + x = rhs.x; + return *this; + } +}; + +TEST(BoundedMpmcQueue, Move) { + using item_queue = bounded_mpmc_queue; + const size_t n = 64; + item_queue que(n); + que.emplace(1); + MovableItem item(2); + que.push(std::move(item)); + item.x = 3; + que.try_push(std::move(item)); + + que.pop(item); + EXPECT_EQ(item.x, 1); + que.pop(item); + EXPECT_EQ(item.x, 2); + que.try_pop(item); + EXPECT_EQ(item.x, 3); +} From f7ce193bfbd627ae91914777372d149ba0a4f799 Mon Sep 17 00:00:00 2001 From: constroy Date: Thu, 9 Aug 2018 21:51:24 +0800 Subject: [PATCH 10/10] swap name of head_ and tail_ --- include/clue/bounded_mpmc_queue.hpp | 48 ++++++++++++++--------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/include/clue/bounded_mpmc_queue.hpp b/include/clue/bounded_mpmc_queue.hpp index 67f3f12..b3771dd 100644 --- a/include/clue/bounded_mpmc_queue.hpp +++ b/include/clue/bounded_mpmc_queue.hpp @@ -116,8 +116,8 @@ static_assert(std::is_nothrow_destructible::value, } bool full() const noexcept { - return head_.load(std::memory_order_relaxed) == - tail_.load(std::memory_order_relaxed) + capacity_; + return head_.load(std::memory_order_relaxed) + capacity_ == + tail_.load(std::memory_order_relaxed); } size_t capacity() const noexcept { @@ -128,11 +128,11 @@ static_assert(std::is_nothrow_destructible::value, void emplace(Args&&... args) noexcept { static_assert(std::is_nothrow_constructible::value, "T must be nothrow constructible with Args&&..."); - for (size_t head = head_.fetch_add(1, std::memory_order_relaxed); ;) { - auto& slot = slots_[index(head)]; - if (slot.sequence.load(std::memory_order_acquire) == head) { + for (size_t tail = tail_.fetch_add(1, std::memory_order_relaxed); ;) { + auto& slot = slots_[index(tail)]; + if (slot.sequence.load(std::memory_order_acquire) == tail) { slot.construct(std::forward(args)...); - slot.sequence.store(head + 1, std::memory_order_release); + slot.sequence.store(tail + 1, std::memory_order_release); break; } } @@ -142,18 +142,18 @@ static_assert(std::is_nothrow_destructible::value, bool try_emplace(Args&&... args) noexcept { static_assert(std::is_nothrow_constructible::value, "T must be nothrow constructible with Args&&..."); - for (size_t head = head_.load(std::memory_order_relaxed); ;) { - auto& slot = slots_[index(head)]; + for (size_t tail = tail_.load(std::memory_order_relaxed); ;) { + auto& slot = slots_[index(tail)]; size_t seq = slot.sequence.load(std::memory_order_acquire); - if (seq < head) { + if (seq < tail) { return false; - } else if (seq > head) { - head = head_.load(std::memory_order_relaxed); + } else if (seq > tail) { + tail = tail_.load(std::memory_order_relaxed); } else { - if (head_.compare_exchange_weak(head, head + 1, + if (tail_.compare_exchange_weak(tail, tail + 1, std::memory_order_relaxed)) { slot.construct(std::forward(args)...); - slot.sequence.store(head + 1, std::memory_order_release); + slot.sequence.store(tail + 1, std::memory_order_release); return true; } } @@ -186,31 +186,31 @@ static_assert(std::is_nothrow_destructible::value, } void pop(T& data) noexcept { - for (size_t tail = tail_.fetch_add(1, std::memory_order_relaxed); ;) { - auto& slot = slots_[index(tail)]; - if (slot.sequence.load(std::memory_order_acquire) == tail + 1) { + for (size_t head = head_.fetch_add(1, std::memory_order_relaxed); ;) { + auto& slot = slots_[index(head)]; + if (slot.sequence.load(std::memory_order_acquire) == head + 1) { data = slot.move(); slot.destroy(); - slot.sequence.store(tail + capacity_, std::memory_order_release); + slot.sequence.store(head + capacity_, std::memory_order_release); break; } } } bool try_pop(T& data) noexcept { - for (size_t tail = tail_.load(std::memory_order_relaxed); ;) { - auto& slot = slots_[index(tail)]; + for (size_t head = head_.load(std::memory_order_relaxed); ;) { + auto& slot = slots_[index(head)]; size_t seq = slot.sequence.load(std::memory_order_acquire); - if (seq < tail + 1) { + if (seq < head + 1) { return false; - } else if (seq > tail + 1){ - tail = tail_.load(std::memory_order_relaxed); + } else if (seq > head + 1){ + head = head_.load(std::memory_order_relaxed); } else { - if (tail_.compare_exchange_weak(tail, tail + 1, + if (head_.compare_exchange_weak(head, head + 1, std::memory_order_relaxed)) { data = slot.move(); slot.destroy(); - slot.sequence.store(tail + capacity_, std::memory_order_release); + slot.sequence.store(head + capacity_, std::memory_order_release); return true; } }