-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSPMCLockFreeQueue.h
More file actions
119 lines (99 loc) · 3.87 KB
/
SPMCLockFreeQueue.h
File metadata and controls
119 lines (99 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#pragma once
#include <cstddef>
#include <array>
#include <atomic>
#include <utility>
#include <optional>
#include <thread>
#include <new>
namespace JacobVW {
constexpr auto CACHED_LINE_SIZE = 128;
// Uses Sequence Tracking
template <typename T, std::size_t SIZE>
class SPMCLockFreeQueue {
static_assert((SIZE != 0) && ((SIZE & (SIZE - 1)) == 0), "SIZE must be a power of 2");
public:
SPMCLockFreeQueue()
{
for (std::size_t i = 0; i < SIZE; ++i)
{
m_buffer[i].sequenceNumber.store(i);
}
}
~SPMCLockFreeQueue()
{
for (std::size_t i = 0; i < SIZE; ++i)
{
auto sequence = m_buffer[i].sequenceNumber.load(std::memory_order_relaxed);
auto seqRemainder = sequence & (SIZE - 1);
auto expectedFull = (i + 1) & (SIZE - 1);
if (seqRemainder == expectedFull)
{
T* ptr = reinterpret_cast<T*>(m_buffer[i].storage);
ptr->~T();
}
}
}
template <typename... Args>
bool Emplace(Args&&... args)
{
const auto writeIndex = m_writeIndex & (SIZE - 1);
const auto sequence = m_buffer[writeIndex].sequenceNumber.load(std::memory_order_acquire);
if (m_writeIndex == sequence)
{
// consumers are not behind, we can write here
T* ptr = reinterpret_cast<T*>(&m_buffer[writeIndex].storage);
new (ptr) T(std::forward<Args>(args)...);
m_buffer[writeIndex].sequenceNumber.store(m_writeIndex + 1, std::memory_order_release);
m_writeIndex++;
return true;
} else {
// queue full - consumers are behind and have not read this slot
return false;
}
}
bool Push(const T& value) { return Emplace(value); }
bool Push(T&& value) { return Emplace(std::move(value)); }
std::optional<T> Pop()
{
auto readIndex = m_readIndex.load(std::memory_order_relaxed);
do {
const auto readPos = readIndex & (SIZE - 1);
const auto sequence = m_buffer[readPos].sequenceNumber.load(std::memory_order_acquire);
intptr_t dif = (intptr_t)sequence - (intptr_t)(readIndex + 1);
if (dif == 0) {
// there is data to read, try to claim it
if (m_readIndex.compare_exchange_weak(
readIndex,
readIndex + 1,
std::memory_order_release,
std::memory_order_relaxed))
{
T* ptr = reinterpret_cast<T*>(&m_buffer[readPos].storage);
auto value = std::move(*ptr);
ptr->~T();
m_buffer[readPos].sequenceNumber.store(readIndex + SIZE, std::memory_order_release);
return value;
}
} else if (dif < 0) {
// empty
return std::nullopt;
} else {
// if we are way behind, this can help catch up by yielding and skipping ahead
std::this_thread::yield();
readIndex = m_readIndex.load(std::memory_order_relaxed);
}
} while (true);
}
private:
struct Data
{
std::atomic<std::size_t> sequenceNumber {};
alignas(T) std::byte storage[sizeof(T)];
};
private:
alignas(CACHED_LINE_SIZE) std::size_t m_writeIndex {};
alignas(CACHED_LINE_SIZE) std::atomic<std::size_t> m_readIndex {};
alignas(CACHED_LINE_SIZE) std::array<Data, SIZE> m_buffer {};
};
}