Branch data Line data Source code
1 : : // MIT License - see LICENSE file for details.
2 : : //
3 : : // Lock-free Single-Producer Single-Consumer ring buffer.
4 : : //
5 : : // Generic template: works with any trivially copyable type T.
6 : : // For byte-level IPC / shared memory use, instantiate as RingBuffer<uint8_t, N>.
7 : : //
8 : : // Cache-friendly design:
9 : : // - ControlBlock is aligned to the cache line size (default 64 bytes).
10 : : // - Producer (head) and consumer (tail) atomics live on separate cache lines,
11 : : // eliminating false sharing between the two cores.
12 : : // - The CacheLineSize template parameter can be tuned for non-standard
13 : : // architectures (e.g. 128 bytes on Apple M-series / ARM big cores).
14 : : //
15 : : // Designed to live in shared memory. The control block (head/tail offsets)
16 : : // and data region are laid out contiguously so the entire buffer can be
17 : : // placed in a single mmap'd region.
18 : :
19 : : #pragma once
20 : :
21 : : #include <atomic>
22 : : #include <cstddef>
23 : : #include <cstdint>
24 : : #include <cstring>
25 : : #include <type_traits>
26 : :
27 : : namespace ouroboros::spsc
28 : : {
29 : :
30 : : // Library version — single source of truth.
31 : : struct Version
32 : : {
33 : : static constexpr uint8_t major = 1;
34 : : static constexpr uint8_t minor = 0;
35 : : static constexpr uint8_t patch = 0;
36 : : static constexpr uint32_t packed = (major << 16) | (minor << 8) | patch;
37 : : };
38 : :
39 : : // Lock-free Single-Producer Single-Consumer ring buffer.
40 : : //
41 : : // Template parameters:
42 : : // T - Element type. Must be trivially copyable.
43 : : // Capacity - Number of elements. Must be a power of 2.
44 : : // CacheLineSize - Cache line size in bytes (default: 64).
45 : : // Controls alignment and padding of the control block
46 : : // to prevent false sharing between producer and consumer.
47 : : // Use 128 for Apple M-series / ARM big cores.
48 : : template <typename T, uint32_t Capacity, uint32_t CacheLineSize = 64>
49 : : class RingBuffer
50 : : {
51 : : static_assert(Capacity > 0 && (Capacity & (Capacity - 1)) == 0,
52 : : "RingBuffer capacity must be a power of 2");
53 : : static_assert(std::is_trivially_copyable_v<T>,
54 : : "RingBuffer element type must be trivially copyable");
55 : : static_assert(CacheLineSize > sizeof(std::atomic<uint32_t>),
56 : : "CacheLineSize must be greater than sizeof(atomic<uint32_t>)");
57 : : static_assert(CacheLineSize > 0 && (CacheLineSize & (CacheLineSize - 1)) == 0,
58 : : "CacheLineSize must be a power of 2");
59 : : static_assert(std::atomic<uint32_t>::is_always_lock_free,
60 : : "uint32_t atomics must be lock-free on this platform");
61 : :
62 : : public:
63 : : using value_type = T;
64 : : static constexpr uint32_t Mask = Capacity - 1;
65 : :
66 : : // Control block - lives at the start of the shared memory region.
67 : : // Offsets are monotonically increasing; masked when indexing into m_data.
68 : : // Each atomic sits on its own cache line to prevent false sharing.
69 : : struct alignas(CacheLineSize) ControlBlock
70 : : {
71 : : std::atomic<uint32_t> head{0};
72 : : char pad1[CacheLineSize - sizeof(std::atomic<uint32_t>)];
73 : : std::atomic<uint32_t> tail{0};
74 : : char pad2[CacheLineSize - sizeof(std::atomic<uint32_t>)];
75 : : };
76 : :
77 : 155 : RingBuffer() = default;
78 : :
79 : : // Not copyable or movable (lives in shared memory).
80 : : RingBuffer(const RingBuffer &) = delete;
81 : : RingBuffer &operator=(const RingBuffer &) = delete;
82 : : RingBuffer(RingBuffer &&) = delete;
83 : : RingBuffer &operator=(RingBuffer &&) = delete;
84 : :
85 : : // Resets the buffer to empty state.
86 : 9 : void reset()
87 : : {
88 : 9 : m_ctrl.head.store(0, std::memory_order_relaxed);
89 : 9 : m_ctrl.tail.store(0, std::memory_order_relaxed);
90 : 9 : }
91 : :
92 : : // ---------------------------------------------------------------------------
93 : : // Producer API
94 : : // ---------------------------------------------------------------------------
95 : :
96 : : // Returns the number of elements available for writing.
97 : 25405 : [[nodiscard]] uint32_t writeAvailable() const
98 : : {
99 : 25405 : uint32_t head = m_ctrl.head.load(std::memory_order_relaxed);
100 : 25405 : uint32_t tail = m_ctrl.tail.load(std::memory_order_acquire);
101 : 25405 : return Capacity - (head - tail);
102 : : }
103 : :
104 : : // Pushes a single element into the ring buffer.
105 : : // Returns true on success, false if the buffer is full.
106 : 4805559 : [[nodiscard]] bool push(const T &item)
107 : : {
108 : 4805559 : return write(&item, 1);
109 : : }
110 : :
111 : : // Writes `count` elements from `data` into the ring buffer.
112 : : // Returns true on success, false if insufficient space.
113 : 4939401 : [[nodiscard]] bool write(const T *data, uint32_t count)
114 : : {
115 [ + + ]: 4939401 : if (count == 0)
116 : 1 : return true;
117 : :
118 : 4939400 : uint32_t head = m_ctrl.head.load(std::memory_order_relaxed);
119 : 4939400 : uint32_t tail = m_ctrl.tail.load(std::memory_order_acquire);
120 : :
121 [ + + ]: 4939400 : if (Capacity - (head - tail) < count)
122 : : {
123 : 1804336 : return false;
124 : : }
125 : :
126 : 3135064 : uint32_t offset = head & Mask;
127 : 3135064 : uint32_t firstChunk = Capacity - offset;
128 : :
129 [ + + ]: 3135064 : if (firstChunk >= count)
130 : : {
131 : 3135055 : std::memcpy(m_data + offset, data, count * sizeof(T));
132 : : }
133 : : else
134 : : {
135 : 9 : std::memcpy(m_data + offset, data, firstChunk * sizeof(T));
136 : 9 : std::memcpy(m_data, data + firstChunk,
137 : 9 : (count - firstChunk) * sizeof(T));
138 : : }
139 : :
140 : 3135064 : m_ctrl.head.store(head + count, std::memory_order_release);
141 : 3135064 : return true;
142 : : }
143 : :
144 : : // ---------------------------------------------------------------------------
145 : : // Consumer API
146 : : // ---------------------------------------------------------------------------
147 : :
148 : : // Returns the number of elements available for reading.
149 : 31289 : [[nodiscard]] uint32_t readAvailable() const
150 : : {
151 : 31289 : uint32_t head = m_ctrl.head.load(std::memory_order_acquire);
152 : 31289 : uint32_t tail = m_ctrl.tail.load(std::memory_order_relaxed);
153 : 31289 : return head - tail;
154 : : }
155 : :
156 : : // Pops a single element from the ring buffer.
157 : : // Returns true on success, false if the buffer is empty.
158 : 3233659 : [[nodiscard]] bool pop(T &item)
159 : : {
160 : 3233659 : return read(&item, 1);
161 : : }
162 : :
163 : : // Peeks at the next `count` elements without consuming them.
164 : : // Returns true if `count` elements are available, false otherwise.
165 : 22 : [[nodiscard]] bool peek(T *dest, uint32_t count) const
166 : : {
167 [ + + ]: 22 : if (count == 0)
168 : 1 : return true;
169 : :
170 : 21 : uint32_t head = m_ctrl.head.load(std::memory_order_acquire);
171 : 21 : uint32_t tail = m_ctrl.tail.load(std::memory_order_relaxed);
172 : :
173 [ + + ]: 21 : if (head - tail < count)
174 : : {
175 : 1 : return false;
176 : : }
177 : :
178 : 20 : uint32_t offset = tail & Mask;
179 : 20 : uint32_t firstChunk = Capacity - offset;
180 : :
181 [ + - ]: 20 : if (firstChunk >= count)
182 : : {
183 : 20 : std::memcpy(dest, m_data + offset, count * sizeof(T));
184 : : }
185 : : else
186 : : {
187 : 0 : std::memcpy(dest, m_data + offset, firstChunk * sizeof(T));
188 : 0 : std::memcpy(dest + firstChunk, m_data,
189 : 0 : (count - firstChunk) * sizeof(T));
190 : : }
191 : 20 : return true;
192 : : }
193 : :
194 : : // Reads `count` elements from the ring buffer into `dest`.
195 : : // Returns true on success, false if insufficient data.
196 : 3276666 : [[nodiscard]] bool read(T *dest, uint32_t count)
197 : : {
198 [ + + ]: 3276666 : if (count == 0)
199 : 1 : return true;
200 : :
201 : 3276665 : uint32_t head = m_ctrl.head.load(std::memory_order_acquire);
202 : 3276665 : uint32_t tail = m_ctrl.tail.load(std::memory_order_relaxed);
203 : :
204 [ + + ]: 3276665 : if (head - tail < count)
205 : : {
206 : 231247 : return false;
207 : : }
208 : :
209 : 3045418 : uint32_t offset = tail & Mask;
210 : 3045418 : uint32_t firstChunk = Capacity - offset;
211 : :
212 [ + + ]: 3045418 : if (firstChunk >= count)
213 : : {
214 : 3045244 : std::memcpy(dest, m_data + offset, count * sizeof(T));
215 : : }
216 : : else
217 : : {
218 : 174 : std::memcpy(dest, m_data + offset, firstChunk * sizeof(T));
219 : 174 : std::memcpy(dest + firstChunk, m_data,
220 : 174 : (count - firstChunk) * sizeof(T));
221 : : }
222 : :
223 : 3045418 : m_ctrl.tail.store(tail + count, std::memory_order_release);
224 : 3045418 : return true;
225 : : }
226 : :
227 : : // Skips `count` elements without reading them.
228 : : // Returns true on success, false if insufficient data.
229 : 22 : [[nodiscard]] bool skip(uint32_t count)
230 : : {
231 [ + + ]: 22 : if (count == 0)
232 : 1 : return true;
233 : :
234 : 21 : uint32_t head = m_ctrl.head.load(std::memory_order_acquire);
235 : 21 : uint32_t tail = m_ctrl.tail.load(std::memory_order_relaxed);
236 : :
237 [ + + ]: 21 : if (head - tail < count)
238 : : {
239 : 1 : return false;
240 : : }
241 : :
242 : 20 : m_ctrl.tail.store(tail + count, std::memory_order_release);
243 : 20 : return true;
244 : : }
245 : :
246 : : // ---------------------------------------------------------------------------
247 : : // Capacity / Info
248 : : // ---------------------------------------------------------------------------
249 : :
250 : 9 : static constexpr uint32_t capacity() { return Capacity; }
251 : 2 : static constexpr uint32_t cacheLineSize() { return CacheLineSize; }
252 : :
253 : 127 : [[nodiscard]] bool isEmpty() const { return readAvailable() == 0; }
254 : 117 : [[nodiscard]] bool isFull() const { return writeAvailable() == 0; }
255 : :
256 : : private:
257 : : ControlBlock m_ctrl;
258 : : T m_data[Capacity];
259 : : };
260 : :
261 : : // Convenience alias for byte-oriented IPC / shared memory use.
262 : : template <uint32_t Size>
263 : : using ByteRingBuffer = RingBuffer<uint8_t, Size>;
264 : :
265 : : } // namespace ouroboros::spsc
|