Branch data Line data Source code
1 : : // MIT License - see LICENSE file for details.
2 : : //
3 : : // Lock-free Single-Producer Multiple-Consumer ring buffer.
4 : : //
5 : : // Structural mirror of MPSC: single producer writes without CAS,
6 : : // multiple consumers CAS on tail to claim slots. Per-slot sequence
7 : : // counters for safe coordination.
8 : :
9 : : #pragma once
10 : :
11 : : #include <atomic>
12 : : #include <cstddef>
13 : : #include <cstdint>
14 : : #include <cstring>
15 : : #include <type_traits>
16 : :
17 : : namespace ouroboros::spmc
18 : : {
19 : :
20 : : // ─── Slot with sequence counter ─────────────────────────────────
21 : :
22 : : template <typename T>
23 : : struct Slot
24 : : {
25 : : std::atomic<uint32_t> sequence;
26 : : T data;
27 : : };
28 : :
29 : : // ─── RingBuffer ─────────────────────────────────────────────────
30 : :
31 : : template <typename T, uint32_t Capacity, uint32_t CacheLineSize = 64>
32 : : class RingBuffer
33 : : {
34 : : static_assert(Capacity > 0 && (Capacity & (Capacity - 1)) == 0,
35 : : "RingBuffer capacity must be a power of 2");
36 : : static_assert(std::is_trivially_copyable_v<T>,
37 : : "RingBuffer element type must be trivially copyable");
38 : : static_assert(CacheLineSize > sizeof(std::atomic<uint32_t>),
39 : : "CacheLineSize must be greater than sizeof(atomic<uint32_t>)");
40 : : static_assert(CacheLineSize > 0 && (CacheLineSize & (CacheLineSize - 1)) == 0,
41 : : "CacheLineSize must be a power of 2");
42 : : static_assert(Capacity < (uint32_t{1} << 31),
43 : : "Capacity must be less than 2^31 for signed-difference logic");
44 : : static_assert(std::atomic<uint32_t>::is_always_lock_free,
45 : : "uint32_t atomics must be lock-free on this platform");
46 : :
47 : : static constexpr uint32_t Mask = Capacity - 1;
48 : :
49 : : public:
50 : : using value_type = T;
51 : : // ── Layout types ────────────────────────────────────────────
52 : :
53 : : struct alignas(CacheLineSize) ControlBlock
54 : : {
55 : : std::atomic<uint32_t> head{0};
56 : : char pad1[CacheLineSize - sizeof(std::atomic<uint32_t>)];
57 : : std::atomic<uint32_t> tail{0};
58 : : char pad2[CacheLineSize - sizeof(std::atomic<uint32_t>)];
59 : : };
60 : :
61 : : // ── Lifecycle ───────────────────────────────────────────────
62 : :
63 : 18 : RingBuffer()
64 [ + + ]: 1983 : {
65 [ + + ]: 1983 : for (uint32_t i = 0; i < Capacity; ++i)
66 : 1965 : m_slots[i].sequence.store(i, std::memory_order_relaxed);
67 : 18 : }
68 : :
69 : : RingBuffer(const RingBuffer &) = delete;
70 : : RingBuffer &operator=(const RingBuffer &) = delete;
71 : : RingBuffer(RingBuffer &&) = delete;
72 : : RingBuffer &operator=(RingBuffer &&) = delete;
73 : :
74 : : // ── Producer (single thread) ────────────────────────────────
75 : :
76 : 2122433 : [[nodiscard]] bool push(const T &item)
77 : : {
78 : 2122433 : uint32_t pos = m_ctrl.head.load(std::memory_order_relaxed);
79 : 2122433 : Slot<T> &slot = m_slots[pos & Mask];
80 : 2122433 : uint32_t seq = slot.sequence.load(std::memory_order_acquire);
81 : :
82 [ + + ]: 2122433 : if (seq != pos)
83 : 482281 : return false; // full
84 : :
85 : 1640152 : std::memcpy(&slot.data, &item, sizeof(T));
86 : 1640152 : slot.sequence.store(pos + 1, std::memory_order_release);
87 : 1640152 : m_ctrl.head.store(pos + 1, std::memory_order_release);
88 : 1640152 : return true;
89 : : }
90 : :
91 : : // ── Consumer (multiple threads) ─────────────────────────────
92 : :
93 : 2131713 : [[nodiscard]] bool pop(T &item)
94 : : {
95 : 4274279 : uint32_t pos = m_ctrl.tail.load(std::memory_order_relaxed);
96 : :
97 : 726258 : for (;;)
98 : : {
99 : 2868824 : Slot<T> &slot = m_slots[pos & Mask];
100 : 2868824 : uint32_t seq = slot.sequence.load(std::memory_order_acquire);
101 : 3081292 : auto diff = static_cast<int32_t>(seq - (pos + 1));
102 : :
103 [ + + ]: 3081292 : if (diff == 0)
104 : : {
105 [ + + ]: 4744752 : if (m_ctrl.tail.compare_exchange_weak(
106 : : pos, pos + 1,
107 : : std::memory_order_acq_rel,
108 : : std::memory_order_relaxed))
109 : : {
110 : 1636827 : std::memcpy(&item, &slot.data, sizeof(T));
111 : 1636827 : slot.sequence.store(pos + Capacity, std::memory_order_release);
112 : 1563193 : return true;
113 : : }
114 : : // CAS failed — pos reloaded
115 : : }
116 [ + + ]: 684207 : else if (diff < 0)
117 : : {
118 : 641444 : return false; // empty
119 : : }
120 : : else
121 : : {
122 : : // Another consumer advanced tail; reload
123 : 61228 : pos = m_ctrl.tail.load(std::memory_order_relaxed);
124 : : }
125 : : }
126 : : }
127 : :
128 : : // ── Queries (approximate — consumers may have claimed but not freed) ──
129 : :
130 : 967 : [[nodiscard]] uint32_t readAvailable() const
131 : : {
132 : 967 : uint32_t head = m_ctrl.head.load(std::memory_order_acquire);
133 : 987 : uint32_t tail = m_ctrl.tail.load(std::memory_order_relaxed);
134 : 987 : return head - tail;
135 : : }
136 : :
137 : 104 : [[nodiscard]] uint32_t writeAvailable() const
138 : : {
139 : 104 : uint32_t head = m_ctrl.head.load(std::memory_order_relaxed);
140 : 104 : uint32_t tail = m_ctrl.tail.load(std::memory_order_acquire);
141 : 104 : return Capacity - (head - tail);
142 : : }
143 : :
144 : 972 : [[nodiscard]] bool isEmpty() const { return readAvailable() == 0; }
145 : 102 : [[nodiscard]] bool isFull() const { return writeAvailable() == 0; }
146 : :
147 : 2 : static constexpr uint32_t capacity() { return Capacity; }
148 : 2 : static constexpr uint32_t cacheLineSize() { return CacheLineSize; }
149 : :
150 : : // ── Reset (NOT thread-safe) ─────────────────────────────────
151 : :
152 : 1 : void reset()
153 : : {
154 : 1 : m_ctrl.head.store(0, std::memory_order_relaxed);
155 : 1 : m_ctrl.tail.store(0, std::memory_order_relaxed);
156 [ + + ]: 9 : for (uint32_t i = 0; i < Capacity; ++i)
157 : 8 : m_slots[i].sequence.store(i, std::memory_order_relaxed);
158 : 1 : }
159 : :
160 : : private:
161 : : ControlBlock m_ctrl;
162 : : Slot<T> m_slots[Capacity];
163 : : };
164 : :
165 : : } // namespace ouroboros::spmc
|