diff --git a/CMakeLists.txt b/CMakeLists.txt index 2fc1315..fe55fef 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -80,6 +80,7 @@ set(UNIT_TESTS test_meta test_meta_seq test_textio + test_bounded_mpmc_queue test_include_all ) @@ -94,6 +95,7 @@ set(THREADING_TESTS test_shared_mutex test_concurrent_counter test_concurrent_queue + test_bounded_mpmc_queue_threading test_thread_pool ) diff --git a/doc/source/bounded_mpmc_queue.rst b/doc/source/bounded_mpmc_queue.rst new file mode 100644 index 0000000..1bd76d2 --- /dev/null +++ b/doc/source/bounded_mpmc_queue.rst @@ -0,0 +1,92 @@ +Bounded MPMC Queue +================= + +MPMC queue is very useful in concurrent programming, where MPMC stands for +Multi-Producer-Multi-Consumer. For example, task queue can be considered as +a special kind of MPMC queue. *CLUE* implements +a MPMC queue class, in header file ````. + +.. cpp:class:: template bounded_mpmc_queue + + MPMC queue class. ``T`` is the element type. + +This class has a move constructor, but it is not copyable. The class provides +the following member functions: + +Construction +------------- +.. cpp:function:: explicit bounded_mpmc_queue(size_t capacity) + Construct a ``bounded_mpmc_queue`` with capacity ``capacity``. + +.. cpp:function:: bounded_mpmc_queue(bounded_mpmc_queue&& other) + Move constructor. Constructs with the contents of ``other`` using + move semantics. After the move, ``other`` is guaranteed to be ``empty()``. + +.. cpp:function:: bounded_mpmc_queue(const bounded_mpmc_queue&) = delete + Copy constructor is disabled. + +.. cpp:function:: bounded_mpmc_queue(bounded_mpmc_queue&& other) + Move assignment operator. Replaces the contents with those of ``other`` + using move semantics. + +.. cpp:function:: bounded_mpmc_queue(const bounded_mpmc_queue&) = delete + Copy assignment operator is disabled. + +Capacity +--------- + +.. cpp:function:: size_t capacity() const + + Get the max number of elements the queue can contain. + +.. cpp:function:: bool empty() const + + Get whether the queue is empty (contains no elements). + +.. cpp:function:: bool full() const + + Get whether the queue is full (has no space left). + + +Modifiers +--------- + +.. cpp:function:: void push(const T& x) +.. cpp:function:: void push(T&& x) + + Spin till the queue is non-full, and push an element ``x`` to the back + of the queue. + +.. cpp:function:: bool try_push(const T& x) +.. cpp:function:: bool try_push(T&& x) + + If the queue is not full, push an element ``x`` to the back of the queue + and return ``true``. Otherwise, return ``false`` immediately. + +.. cpp:function:: void emplace(Args&&... args) + + If the queue is not full, construct an element using the given arguments + and push it to the back of the queue. + +.. cpp:function:: bool try_emplace(Args&&... args) + + Spin till the queue is non-full, construct an element using the given + arguments, push it to the back of the queue and return ``true``. + Otherwise, return ``false`` immediately. + +.. cpp:function:: void pop(T& dst) + + Spin until the queue is non-empty, and pop the element at the front and + store it to ``dst``. + +.. cpp:function:: bool try_pop(T& dst) + + If the queue is not empty, pop the element at the front, store it to + ``dst``, and return ``true``. Otherwise, return ``false`` immediately. + + +.. note:: + + All modifiers, including ``push``, ``emplace``, ``pop``, + ``try_push``, ``try_emplace`` and ``try_pop``, are thread-safe. + It is safe to call these methods in concurrent threads. diff --git a/include/clue/bounded_mpmc_queue.hpp b/include/clue/bounded_mpmc_queue.hpp new file mode 100644 index 0000000..b3771dd --- /dev/null +++ b/include/clue/bounded_mpmc_queue.hpp @@ -0,0 +1,229 @@ +#ifndef CLUE_MPMC_QUEUE__ +#define CLUE_MPMC_QUEUE__ + +#include +#include +#include +#include + +namespace clue { + +template +class bounded_mpmc_queue final { +static_assert(std::is_nothrow_copy_assignable::value || + std::is_nothrow_move_assignable::value, + "T must be nothrow copy or move assignable"); + +static_assert(std::is_nothrow_destructible::value, + "T must be nothrow destructible"); + +private: + static constexpr size_t kCacheLineSize = 128; + + struct Slot { + alignas(kCacheLineSize) std::atomic_size_t sequence; + typename std::aligned_storage::type storage; + + T&& move() noexcept { + return reinterpret_cast(storage); + } + + template + void construct(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible::value, + "T must be nothrow constructible with Args&&..."); + new (&storage) T(std::forward(args)...); + } + + void destroy() noexcept { + static_assert(std::is_nothrow_destructible::value, + "T must be nothrow destructible"); + reinterpret_cast(storage).~T(); + } + }; + + size_t capacity_; + Slot* slots_; + + alignas(kCacheLineSize) std::atomic_size_t head_; + alignas(kCacheLineSize) std::atomic_size_t tail_; + +private: + size_t index(size_t x) const noexcept { + return x % capacity_; + } + +public: + explicit bounded_mpmc_queue(size_t capacity) + : capacity_(capacity) + , head_(0) + , tail_(0) { + if (capacity < 1) { + throw std::invalid_argument("capacity must be greater than 0"); + } + slots_ = static_cast(aligned_alloc(capacity_ * sizeof(Slot), kCacheLineSize)); + if (!slots_) { + throw std::bad_alloc(); + } + for (size_t i = 0; i < capacity_; ++i) { + new (slots_ + i) Slot(); + slots_[i].sequence.store(i, std::memory_order_relaxed); + } + } + + bounded_mpmc_queue(const bounded_mpmc_queue&) = delete; + + bounded_mpmc_queue(bounded_mpmc_queue&& other) noexcept + : capacity_(other.capacity_) + , slots_(other.slots_) + , head_(other.head_.load(std::memory_order_relaxed)) + , tail_(other.tail_.load(std::memory_order_relaxed)) { + other.capacity_ = 0; + other.slots_ = nullptr; + other.head_.store(0, std::memory_order_relaxed); + other.tail_.store(0, std::memory_order_relaxed); + } + + bounded_mpmc_queue& operator =(const bounded_mpmc_queue&) = delete; + + bounded_mpmc_queue& operator =(bounded_mpmc_queue&& rhs) noexcept { + rhs.swap(*this); + return *this; + } + + void swap(bounded_mpmc_queue& other) noexcept { + std::swap(capacity_, other.capacity_); + std::swap(slots_, other.slots_); + other.head_.store(head_.exchange(other.head_, std::memory_order_relaxed), + std::memory_order_relaxed); + other.tail_.store(tail_.exchange(other.tail_, std::memory_order_relaxed), + std::memory_order_relaxed); + } + + ~bounded_mpmc_queue() { + for (size_t i = 0; i < capacity_; ++i) { + if (index(slots_[i].sequence.load(std::memory_order_relaxed)) != i) { + slots_[i].destroy(); + } + slots_[i].~Slot(); + } + aligned_free(slots_); + } + + bool empty() const noexcept { + return head_.load(std::memory_order_relaxed) == + tail_.load(std::memory_order_relaxed); + } + + bool full() const noexcept { + return head_.load(std::memory_order_relaxed) + capacity_ == + tail_.load(std::memory_order_relaxed); + } + + size_t capacity() const noexcept { + return capacity_; + } + + template + void emplace(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible::value, + "T must be nothrow constructible with Args&&..."); + for (size_t tail = tail_.fetch_add(1, std::memory_order_relaxed); ;) { + auto& slot = slots_[index(tail)]; + if (slot.sequence.load(std::memory_order_acquire) == tail) { + slot.construct(std::forward(args)...); + slot.sequence.store(tail + 1, std::memory_order_release); + break; + } + } + } + + template + bool try_emplace(Args&&... args) noexcept { + static_assert(std::is_nothrow_constructible::value, + "T must be nothrow constructible with Args&&..."); + for (size_t tail = tail_.load(std::memory_order_relaxed); ;) { + auto& slot = slots_[index(tail)]; + size_t seq = slot.sequence.load(std::memory_order_acquire); + if (seq < tail) { + return false; + } else if (seq > tail) { + tail = tail_.load(std::memory_order_relaxed); + } else { + if (tail_.compare_exchange_weak(tail, tail + 1, + std::memory_order_relaxed)) { + slot.construct(std::forward(args)...); + slot.sequence.store(tail + 1, std::memory_order_release); + return true; + } + } + } + return false; + } + + void push(const T& data) noexcept { + static_assert(std::is_nothrow_copy_constructible::value, + "T must be nothrow copy constructible"); + emplace(data); + } + + void push(T&& data) noexcept { + static_assert(std::is_nothrow_move_constructible::value, + "T must be nothrow move constructible"); + emplace(std::move(data)); + } + + bool try_push(const T& data) noexcept { + static_assert(std::is_nothrow_copy_constructible::value, + "T must be nothrow copy constructible"); + return try_emplace(data); + } + + bool try_push(T&& data) noexcept { + static_assert(std::is_nothrow_move_constructible::value, + "T must be nothrow move constructible"); + return try_emplace(std::move(data)); + } + + void pop(T& data) noexcept { + for (size_t head = head_.fetch_add(1, std::memory_order_relaxed); ;) { + auto& slot = slots_[index(head)]; + if (slot.sequence.load(std::memory_order_acquire) == head + 1) { + data = slot.move(); + slot.destroy(); + slot.sequence.store(head + capacity_, std::memory_order_release); + break; + } + } + } + + bool try_pop(T& data) noexcept { + for (size_t head = head_.load(std::memory_order_relaxed); ;) { + auto& slot = slots_[index(head)]; + size_t seq = slot.sequence.load(std::memory_order_acquire); + if (seq < head + 1) { + return false; + } else if (seq > head + 1){ + head = head_.load(std::memory_order_relaxed); + } else { + if (head_.compare_exchange_weak(head, head + 1, + std::memory_order_relaxed)) { + data = slot.move(); + slot.destroy(); + slot.sequence.store(head + capacity_, std::memory_order_release); + return true; + } + } + } + return false; + } +}; + +template +inline void swap(bounded_mpmc_queue& lhs, bounded_mpmc_queue& rhs) noexcept { + lhs.swap(rhs); +} + +} // end namespace clue + +#endif // CLUE_MPMC_QUEUE__ diff --git a/tests/test_bounded_mpmc_queue.cpp b/tests/test_bounded_mpmc_queue.cpp new file mode 100644 index 0000000..eff2651 --- /dev/null +++ b/tests/test_bounded_mpmc_queue.cpp @@ -0,0 +1,120 @@ +#include + +#include +#include +#include + +using clue::bounded_mpmc_queue; + +TEST(BoundedMpmcQueue, Construct) { + using int_queue = bounded_mpmc_queue; + + EXPECT_THROW(int_queue(0), std::invalid_argument); + int_queue que1(64); + EXPECT_EQ(que1.capacity(), 64); + EXPECT_TRUE(que1.empty()); + int_queue que2(std::move(que1)); + EXPECT_EQ(que2.capacity(), 64); + EXPECT_TRUE(que2.empty()); + int_queue que3(128); + que3 = std::move(que2); + EXPECT_EQ(que3.capacity(), 64); + EXPECT_TRUE(que3.empty()); + int_queue que4(128); + clue::swap(que3, que4); + EXPECT_EQ(que3.capacity(), 128); + EXPECT_EQ(que4.capacity(), 64); +} + +struct Item { + int x, y; + Item(int x, int y) noexcept : x(x), y(y) {} +}; + +TEST(BoundedMpmcQueue, Emplace) { + using item_queue = bounded_mpmc_queue; + item_queue que(64); + EXPECT_TRUE(que.empty()); + int x = 0, y = 0; + que.emplace(x++, y--); + while (que.try_emplace(x++, y--)) continue; + EXPECT_TRUE(que.full()); + EXPECT_FALSE(que.try_push(Item(x, y))); + + item_queue swap_que(128); + EXPECT_TRUE(swap_que.empty()); + swap_que.swap(que); + EXPECT_TRUE(que.empty()); + EXPECT_TRUE(swap_que.full()); +} + +TEST(BoundedMpmcQueue, Push) { + using item_queue = bounded_mpmc_queue; + item_queue que(64); + EXPECT_TRUE(que.empty()); + int x = 0, y = 0; + que.push(Item(x++, y--)); + while (que.try_push(Item(x++, y--))) continue; + EXPECT_TRUE(que.full()); + EXPECT_FALSE(que.try_emplace(x, y)); + + item_queue swap_que(128); + EXPECT_TRUE(swap_que.empty()); + swap_que.swap(que); + EXPECT_TRUE(que.empty()); + EXPECT_TRUE(swap_que.full()); +} + +TEST(BoundedMpmcQueue, Pop) { + using int_queue = bounded_mpmc_queue; + const size_t n = 64; + int_queue push_que(n); + EXPECT_TRUE(push_que.empty()); + for (size_t i = 0; i < n; ++i) { + push_que.push(i); + } + EXPECT_TRUE(push_que.full()); + EXPECT_FALSE(push_que.try_push(n)); + + int_queue pop_que(std::move(push_que)); + EXPECT_TRUE(push_que.full()); + for (size_t i = 0; i < n; ++i) { + int x; + pop_que.pop(x); + EXPECT_EQ(x, i); + } + EXPECT_TRUE(pop_que.empty()); + int x; + EXPECT_FALSE(pop_que.try_pop(x)); +} + +struct MovableItem { + int x; + explicit MovableItem(int x) noexcept : x(x) {} + MovableItem(const MovableItem&) = delete; + MovableItem(MovableItem&& other) noexcept : x(other.x) {} + + MovableItem& operator =(const MovableItem&) = delete; + MovableItem& operator =(MovableItem&& rhs) noexcept { + x = rhs.x; + return *this; + } +}; + +TEST(BoundedMpmcQueue, Move) { + using item_queue = bounded_mpmc_queue; + const size_t n = 64; + item_queue que(n); + que.emplace(1); + MovableItem item(2); + que.push(std::move(item)); + item.x = 3; + que.try_push(std::move(item)); + + que.pop(item); + EXPECT_EQ(item.x, 1); + que.pop(item); + EXPECT_EQ(item.x, 2); + que.try_pop(item); + EXPECT_EQ(item.x, 3); +} diff --git a/tests/test_bounded_mpmc_queue_threading.cpp b/tests/test_bounded_mpmc_queue_threading.cpp new file mode 100644 index 0000000..013d834 --- /dev/null +++ b/tests/test_bounded_mpmc_queue_threading.cpp @@ -0,0 +1,106 @@ +#include +#include +#include +#include + +void test_push_then_pop(size_t nt) { + std::printf("testing push_then_pop ...\n"); + assert(nt > 0); + + int N = 10000; + clue::bounded_mpmc_queue Q(nt * N); + + assert(Q.capacity() == nt * N); + assert(Q.empty()); + + std::vector producers; + for (size_t t = 0; t < nt; ++t) { + producers.emplace_back([&Q,N](){ + for (int i = 0; i < N; ++i) { + assert(Q.try_push(i + 1)); + } + }); + } + + for (size_t t = 0; t < nt; ++t) { + producers.at(t).join(); + } + + assert(!Q.empty()); + assert(Q.full()); + + std::vector consumers; + std::vector sums(nt, 0); + for (size_t t = 0; t < nt; ++t) { + int& s = sums[t]; + consumers.emplace_back([&Q,&s]{ + int v = 0; + while (Q.try_pop(v)) { + s += v; + } + }); + } + + int total = 0; + for (size_t t = 0; t < nt; ++t) { + consumers.at(t).join(); + total += sums.at(t); + } + + int expect_total = nt * (N * (N + 1) / 2); + assert(total == expect_total); +} + +void test_push_and_pop(size_t nt) { + std::printf("push_and_pop with %lu threads ...\n", nt); + + assert(nt > 0); + + clue::bounded_mpmc_queue Q(100); + int N = 100; + + std::vector producers; + for (size_t t = 0; t < nt; ++t) { + producers.emplace_back([&Q,N](){ + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + for (int i = 0; i < N; ++i) { + Q.push(i + 1); + } + }); + } + + std::vector consumers; + std::vector sums(nt, 0); + for (size_t t = 0; t < nt; ++t) { + int& s = sums[t]; + consumers.emplace_back([&Q,N,&s]{ + for (int i = 0; i < N; ++i) { + int v; + Q.pop(v); + s += v; + } + }); + } + + for (size_t t = 0; t < nt; ++t) { + producers.at(t).join(); + } + + int total = 0; + for (size_t t = 0; t < nt; ++t) { + consumers.at(t).join(); + total += sums.at(t); + } + + assert(Q.empty()); + + int expect_total = nt * (N * (N + 1) / 2); + assert(total == expect_total); +} + +int main() { + size_t nt = 4; + test_push_then_pop(nt); + test_push_and_pop(nt); + return 0; +}