Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ set(UNIT_TESTS
test_meta
test_meta_seq
test_textio
test_bounded_mpmc_queue
test_include_all
)

Expand All @@ -94,6 +95,7 @@ set(THREADING_TESTS
test_shared_mutex
test_concurrent_counter
test_concurrent_queue
test_bounded_mpmc_queue_threading
test_thread_pool
)

Expand Down
92 changes: 92 additions & 0 deletions doc/source/bounded_mpmc_queue.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
Bounded MPMC Queue
=================

MPMC queue is very useful in concurrent programming, where MPMC stands for
Multi-Producer-Multi-Consumer. For example, task queue can be considered as
a special kind of MPMC queue. *CLUE* implements
a MPMC queue class, in header file ``<clue/bounded_mpmc_queue.hpp>``.

.. cpp:class:: template<T> bounded_mpmc_queue

MPMC queue class. ``T`` is the element type.

This class has a move constructor, but it is not copyable. The class provides
the following member functions:

Construction
-------------
.. cpp:function:: explicit bounded_mpmc_queue(size_t capacity)
Construct a ``bounded_mpmc_queue`` with capacity ``capacity``.

.. cpp:function:: bounded_mpmc_queue(bounded_mpmc_queue&& other)
Move constructor. Constructs with the contents of ``other`` using
move semantics. After the move, ``other`` is guaranteed to be ``empty()``.

.. cpp:function:: bounded_mpmc_queue(const bounded_mpmc_queue&) = delete
Copy constructor is disabled.

.. cpp:function:: bounded_mpmc_queue(bounded_mpmc_queue&& other)
Move assignment operator. Replaces the contents with those of ``other``
using move semantics.

.. cpp:function:: bounded_mpmc_queue(const bounded_mpmc_queue&) = delete
Copy assignment operator is disabled.

Capacity
---------

.. cpp:function:: size_t capacity() const

Get the max number of elements the queue can contain.

.. cpp:function:: bool empty() const

Get whether the queue is empty (contains no elements).

.. cpp:function:: bool full() const

Get whether the queue is full (has no space left).


Modifiers
---------

.. cpp:function:: void push(const T& x)
.. cpp:function:: void push(T&& x)

Spin till the queue is non-full, and push an element ``x`` to the back
of the queue.

.. cpp:function:: bool try_push(const T& x)
.. cpp:function:: bool try_push(T&& x)

If the queue is not full, push an element ``x`` to the back of the queue
and return ``true``. Otherwise, return ``false`` immediately.

.. cpp:function:: void emplace(Args&&... args)

If the queue is not full, construct an element using the given arguments
and push it to the back of the queue.

.. cpp:function:: bool try_emplace(Args&&... args)

Spin till the queue is non-full, construct an element using the given
arguments, push it to the back of the queue and return ``true``.
Otherwise, return ``false`` immediately.

.. cpp:function:: void pop(T& dst)

Spin until the queue is non-empty, and pop the element at the front and
store it to ``dst``.

.. cpp:function:: bool try_pop(T& dst)

If the queue is not empty, pop the element at the front, store it to
``dst``, and return ``true``. Otherwise, return ``false`` immediately.


.. note::

All modifiers, including ``push``, ``emplace``, ``pop``,
``try_push``, ``try_emplace`` and ``try_pop``, are thread-safe.
It is safe to call these methods in concurrent threads.
229 changes: 229 additions & 0 deletions include/clue/bounded_mpmc_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
#ifndef CLUE_MPMC_QUEUE__
#define CLUE_MPMC_QUEUE__

#include <clue/memory.hpp>
#include <atomic>
#include <stdexcept>
#include <utility>

namespace clue {

template <typename T>
class bounded_mpmc_queue final {
static_assert(std::is_nothrow_copy_assignable<T>::value ||
std::is_nothrow_move_assignable<T>::value,
"T must be nothrow copy or move assignable");

static_assert(std::is_nothrow_destructible<T>::value,
"T must be nothrow destructible");

private:
static constexpr size_t kCacheLineSize = 128;

struct Slot {
alignas(kCacheLineSize) std::atomic_size_t sequence;
typename std::aligned_storage<sizeof(T), alignof(T)>::type storage;

T&& move() noexcept {
return reinterpret_cast<T&&>(storage);
}

template <typename... Args>
void construct(Args&&... args) noexcept {
static_assert(std::is_nothrow_constructible<T, Args&&...>::value,
"T must be nothrow constructible with Args&&...");
new (&storage) T(std::forward<Args>(args)...);
}

void destroy() noexcept {
static_assert(std::is_nothrow_destructible<T>::value,
"T must be nothrow destructible");
reinterpret_cast<T&>(storage).~T();
}
};

size_t capacity_;
Slot* slots_;

alignas(kCacheLineSize) std::atomic_size_t head_;
alignas(kCacheLineSize) std::atomic_size_t tail_;

private:
size_t index(size_t x) const noexcept {
return x % capacity_;
}

public:
explicit bounded_mpmc_queue(size_t capacity)
: capacity_(capacity)
, head_(0)
, tail_(0) {
if (capacity < 1) {
throw std::invalid_argument("capacity must be greater than 0");
}
slots_ = static_cast<Slot*>(aligned_alloc(capacity_ * sizeof(Slot), kCacheLineSize));
if (!slots_) {
throw std::bad_alloc();
}
for (size_t i = 0; i < capacity_; ++i) {
new (slots_ + i) Slot();
slots_[i].sequence.store(i, std::memory_order_relaxed);
}
}

bounded_mpmc_queue(const bounded_mpmc_queue&) = delete;

bounded_mpmc_queue(bounded_mpmc_queue&& other) noexcept
: capacity_(other.capacity_)
, slots_(other.slots_)
, head_(other.head_.load(std::memory_order_relaxed))
, tail_(other.tail_.load(std::memory_order_relaxed)) {
other.capacity_ = 0;
other.slots_ = nullptr;
other.head_.store(0, std::memory_order_relaxed);
other.tail_.store(0, std::memory_order_relaxed);
}

bounded_mpmc_queue& operator =(const bounded_mpmc_queue&) = delete;

bounded_mpmc_queue& operator =(bounded_mpmc_queue&& rhs) noexcept {
rhs.swap(*this);
return *this;
}

void swap(bounded_mpmc_queue& other) noexcept {
std::swap(capacity_, other.capacity_);
std::swap(slots_, other.slots_);
other.head_.store(head_.exchange(other.head_, std::memory_order_relaxed),
std::memory_order_relaxed);
other.tail_.store(tail_.exchange(other.tail_, std::memory_order_relaxed),
std::memory_order_relaxed);
}

~bounded_mpmc_queue() {
for (size_t i = 0; i < capacity_; ++i) {
if (index(slots_[i].sequence.load(std::memory_order_relaxed)) != i) {
slots_[i].destroy();
}
slots_[i].~Slot();
}
aligned_free(slots_);
}

bool empty() const noexcept {
return head_.load(std::memory_order_relaxed) ==
tail_.load(std::memory_order_relaxed);
}

bool full() const noexcept {
return head_.load(std::memory_order_relaxed) + capacity_ ==
tail_.load(std::memory_order_relaxed);
}

size_t capacity() const noexcept {
return capacity_;
}

template <typename... Args>
void emplace(Args&&... args) noexcept {
static_assert(std::is_nothrow_constructible<T, Args&&...>::value,
"T must be nothrow constructible with Args&&...");
for (size_t tail = tail_.fetch_add(1, std::memory_order_relaxed); ;) {
auto& slot = slots_[index(tail)];
if (slot.sequence.load(std::memory_order_acquire) == tail) {
slot.construct(std::forward<Args>(args)...);
slot.sequence.store(tail + 1, std::memory_order_release);
break;
}
}
}

template <typename... Args>
bool try_emplace(Args&&... args) noexcept {
static_assert(std::is_nothrow_constructible<T, Args&&...>::value,
"T must be nothrow constructible with Args&&...");
for (size_t tail = tail_.load(std::memory_order_relaxed); ;) {
auto& slot = slots_[index(tail)];
size_t seq = slot.sequence.load(std::memory_order_acquire);
if (seq < tail) {
return false;
} else if (seq > tail) {
tail = tail_.load(std::memory_order_relaxed);
} else {
if (tail_.compare_exchange_weak(tail, tail + 1,
std::memory_order_relaxed)) {
slot.construct(std::forward<Args>(args)...);
slot.sequence.store(tail + 1, std::memory_order_release);
return true;
}
}
}
return false;
}

void push(const T& data) noexcept {
static_assert(std::is_nothrow_copy_constructible<T>::value,
"T must be nothrow copy constructible");
emplace(data);
}

void push(T&& data) noexcept {
static_assert(std::is_nothrow_move_constructible<T>::value,
"T must be nothrow move constructible");
emplace(std::move(data));
}

bool try_push(const T& data) noexcept {
static_assert(std::is_nothrow_copy_constructible<T>::value,
"T must be nothrow copy constructible");
return try_emplace(data);
}

bool try_push(T&& data) noexcept {
static_assert(std::is_nothrow_move_constructible<T>::value,
"T must be nothrow move constructible");
return try_emplace(std::move(data));
}

void pop(T& data) noexcept {
for (size_t head = head_.fetch_add(1, std::memory_order_relaxed); ;) {
auto& slot = slots_[index(head)];
if (slot.sequence.load(std::memory_order_acquire) == head + 1) {
data = slot.move();
slot.destroy();
slot.sequence.store(head + capacity_, std::memory_order_release);
break;
}
}
}

bool try_pop(T& data) noexcept {
for (size_t head = head_.load(std::memory_order_relaxed); ;) {
auto& slot = slots_[index(head)];
size_t seq = slot.sequence.load(std::memory_order_acquire);
if (seq < head + 1) {
return false;
} else if (seq > head + 1){
head = head_.load(std::memory_order_relaxed);
} else {
if (head_.compare_exchange_weak(head, head + 1,
std::memory_order_relaxed)) {
data = slot.move();
slot.destroy();
slot.sequence.store(head + capacity_, std::memory_order_release);
return true;
}
}
}
return false;
}
};

template<typename T>
inline void swap(bounded_mpmc_queue<T>& lhs, bounded_mpmc_queue<T>& rhs) noexcept {
lhs.swap(rhs);
}

} // end namespace clue

#endif // CLUE_MPMC_QUEUE__
Loading