Branch data Line data Source code
1 : : // MIT License - see LICENSE file for details.
2 : : //
3 : : // Lock-free Multiple-Producer Single-Consumer ring buffer.
4 : : //
5 : : // Uses per-slot sequence counters (Vyukov-style bounded queue) for safe
6 : : // multi-producer coordination. Producers CAS on head to reserve slots;
7 : : // the single consumer reads without CAS.
8 : : //
9 : : // Same cache-line isolation strategy as the SPSC variant.
10 : :
11 : : #pragma once
12 : :
13 : : #include <atomic>
14 : : #include <cstddef>
15 : : #include <cstdint>
16 : : #include <cstring>
17 : : #include <type_traits>
18 : :
19 : : namespace ouroboros::mpsc
20 : : {
21 : :
22 : : // ─── Slot with sequence counter ─────────────────────────────────
23 : :
24 : : template <typename T>
25 : : struct Slot
26 : : {
27 : : std::atomic<uint32_t> sequence;
28 : : T data;
29 : : };
30 : :
31 : : // ─── RingBuffer ─────────────────────────────────────────────────
32 : :
33 : : template <typename T, uint32_t Capacity, uint32_t CacheLineSize = 64>
34 : : class RingBuffer
35 : : {
36 : : static_assert(Capacity > 0 && (Capacity & (Capacity - 1)) == 0,
37 : : "RingBuffer capacity must be a power of 2");
38 : : static_assert(std::is_trivially_copyable_v<T>,
39 : : "RingBuffer element type must be trivially copyable");
40 : : static_assert(CacheLineSize > sizeof(std::atomic<uint32_t>),
41 : : "CacheLineSize must be greater than sizeof(atomic<uint32_t>)");
42 : : static_assert(CacheLineSize > 0 && (CacheLineSize & (CacheLineSize - 1)) == 0,
43 : : "CacheLineSize must be a power of 2");
44 : : static_assert(Capacity < (uint32_t{1} << 31),
45 : : "Capacity must be less than 2^31 for signed-difference logic");
46 : : static_assert(std::atomic<uint32_t>::is_always_lock_free,
47 : : "uint32_t atomics must be lock-free on this platform");
48 : :
49 : : static constexpr uint32_t Mask = Capacity - 1;
50 : :
51 : : public:
52 : : using value_type = T;
53 : : // ── Layout types ────────────────────────────────────────────
54 : :
55 : : struct alignas(CacheLineSize) ControlBlock
56 : : {
57 : : std::atomic<uint32_t> head{0};
58 : : char pad1[CacheLineSize - sizeof(std::atomic<uint32_t>)];
59 : : std::atomic<uint32_t> tail{0};
60 : : char pad2[CacheLineSize - sizeof(std::atomic<uint32_t>)];
61 : : };
62 : :
63 : : // ── Lifecycle ───────────────────────────────────────────────
64 : :
65 : 18 : RingBuffer()
66 [ + + ]: 1983 : {
67 [ + + ]: 1983 : for (uint32_t i = 0; i < Capacity; ++i)
68 : 1965 : m_slots[i].sequence.store(i, std::memory_order_relaxed);
69 : 18 : }
70 : :
71 : : RingBuffer(const RingBuffer &) = delete;
72 : : RingBuffer &operator=(const RingBuffer &) = delete;
73 : : RingBuffer(RingBuffer &&) = delete;
74 : : RingBuffer &operator=(RingBuffer &&) = delete;
75 : :
76 : : // ── Producer (multiple threads) ─────────────────────────────
77 : :
78 : 3662881 : [[nodiscard]] bool push(const T &item)
79 : : {
80 : 7284474 : uint32_t pos = m_ctrl.head.load(std::memory_order_relaxed);
81 : :
82 : 491095 : for (;;)
83 : : {
84 : 4112688 : Slot<T> &slot = m_slots[pos & Mask];
85 : 4112688 : uint32_t seq = slot.sequence.load(std::memory_order_acquire);
86 : 4246423 : auto diff = static_cast<int32_t>(seq - pos);
87 : :
88 [ + + ]: 4246423 : if (diff == 0)
89 : : {
90 [ + + ]: 4242155 : if (m_ctrl.head.compare_exchange_weak(
91 : : pos, pos + 1,
92 : : std::memory_order_acq_rel,
93 : : std::memory_order_relaxed))
94 : : {
95 : 1640034 : std::memcpy(&slot.data, &item, sizeof(T));
96 : 1640034 : slot.sequence.store(pos + 1, std::memory_order_release);
97 : 1634548 : return true;
98 : : }
99 : : // CAS failed — pos reloaded by compare_exchange_weak
100 : : }
101 [ + - ]: 2118066 : else if (diff < 0)
102 : : {
103 : 2202756 : return false; // full
104 : : }
105 : : else
106 : : {
107 : : // Another producer advanced head; reload
108 : 0 : pos = m_ctrl.head.load(std::memory_order_relaxed);
109 : : }
110 : : }
111 : : }
112 : :
113 : : // ── Consumer (single thread) ────────────────────────────────
114 : :
115 : 1679474 : [[nodiscard]] bool pop(T &item)
116 : : {
117 : 1679474 : uint32_t pos = m_ctrl.tail.load(std::memory_order_relaxed);
118 : 1679474 : Slot<T> &slot = m_slots[pos & Mask];
119 : 1679474 : uint32_t seq = slot.sequence.load(std::memory_order_acquire);
120 : :
121 [ + + ]: 1679474 : if (seq != pos + 1)
122 : 39334 : return false;
123 : :
124 : 1640140 : std::memcpy(&item, &slot.data, sizeof(T));
125 : 1640140 : slot.sequence.store(pos + Capacity, std::memory_order_release);
126 : 1640140 : m_ctrl.tail.store(pos + 1, std::memory_order_release);
127 : 1640140 : return true;
128 : : }
129 : :
130 : : // ── Queries (approximate — producers may have reserved but not published) ─
131 : :
132 : 453 : [[nodiscard]] uint32_t readAvailable() const
133 : : {
134 : 453 : uint32_t head = m_ctrl.head.load(std::memory_order_acquire);
135 : 453 : uint32_t tail = m_ctrl.tail.load(std::memory_order_relaxed);
136 : 453 : return head - tail;
137 : : }
138 : :
139 : 104 : [[nodiscard]] uint32_t writeAvailable() const
140 : : {
141 : 104 : uint32_t head = m_ctrl.head.load(std::memory_order_relaxed);
142 : 104 : uint32_t tail = m_ctrl.tail.load(std::memory_order_acquire);
143 : 104 : return Capacity - (head - tail);
144 : : }
145 : :
146 : 451 : [[nodiscard]] bool isEmpty() const { return readAvailable() == 0; }
147 : 102 : [[nodiscard]] bool isFull() const { return writeAvailable() == 0; }
148 : :
149 : 2 : static constexpr uint32_t capacity() { return Capacity; }
150 : 2 : static constexpr uint32_t cacheLineSize() { return CacheLineSize; }
151 : :
152 : : // ── Reset (NOT thread-safe) ─────────────────────────────────
153 : :
154 : 1 : void reset()
155 : : {
156 : 1 : m_ctrl.head.store(0, std::memory_order_relaxed);
157 : 1 : m_ctrl.tail.store(0, std::memory_order_relaxed);
158 [ + + ]: 9 : for (uint32_t i = 0; i < Capacity; ++i)
159 : 8 : m_slots[i].sequence.store(i, std::memory_order_relaxed);
160 : 1 : }
161 : :
162 : : private:
163 : : ControlBlock m_ctrl;
164 : : Slot<T> m_slots[Capacity];
165 : : };
166 : :
167 : : } // namespace ouroboros::mpsc
|