LCOV - code coverage report
Current view: top level - mpsc - RingBuffer.h (source / functions) Coverage Total Hit
Test: ouroboros Lines: 97.9 % 47 46
Test Date: 2026-03-28 04:44:39 Functions: 100.0 % 46 46
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: 92.9 % 14 13

             Branch data     Line data    Source code
       1                 :             : // MIT License - see LICENSE file for details.
       2                 :             : //
       3                 :             : // Lock-free Multiple-Producer Single-Consumer ring buffer.
       4                 :             : //
       5                 :             : // Uses per-slot sequence counters (Vyukov-style bounded queue) for safe
       6                 :             : // multi-producer coordination. Producers CAS on head to reserve slots;
       7                 :             : // the single consumer reads without CAS.
       8                 :             : //
       9                 :             : // Same cache-line isolation strategy as the SPSC variant.
      10                 :             : 
      11                 :             : #pragma once
      12                 :             : 
      13                 :             : #include <atomic>
      14                 :             : #include <cstddef>
      15                 :             : #include <cstdint>
      16                 :             : #include <cstring>
      17                 :             : #include <type_traits>
      18                 :             : 
      19                 :             : namespace ouroboros::mpsc
      20                 :             : {
      21                 :             : 
      22                 :             :     // ─── Slot with sequence counter ─────────────────────────────────
      23                 :             : 
      24                 :             :     template <typename T>
      25                 :             :     struct Slot
      26                 :             :     {
      27                 :             :         std::atomic<uint32_t> sequence;
      28                 :             :         T data;
      29                 :             :     };
      30                 :             : 
      31                 :             :     // ─── RingBuffer ─────────────────────────────────────────────────
      32                 :             : 
      33                 :             :     template <typename T, uint32_t Capacity, uint32_t CacheLineSize = 64>
      34                 :             :     class RingBuffer
      35                 :             :     {
      36                 :             :         static_assert(Capacity > 0 && (Capacity & (Capacity - 1)) == 0,
      37                 :             :                       "RingBuffer capacity must be a power of 2");
      38                 :             :         static_assert(std::is_trivially_copyable_v<T>,
      39                 :             :                       "RingBuffer element type must be trivially copyable");
      40                 :             :         static_assert(CacheLineSize > sizeof(std::atomic<uint32_t>),
      41                 :             :                       "CacheLineSize must be greater than sizeof(atomic<uint32_t>)");
      42                 :             :         static_assert(CacheLineSize > 0 && (CacheLineSize & (CacheLineSize - 1)) == 0,
      43                 :             :                       "CacheLineSize must be a power of 2");
      44                 :             :         static_assert(Capacity < (uint32_t{1} << 31),
      45                 :             :                       "Capacity must be less than 2^31 for signed-difference logic");
      46                 :             :         static_assert(std::atomic<uint32_t>::is_always_lock_free,
      47                 :             :                       "uint32_t atomics must be lock-free on this platform");
      48                 :             : 
      49                 :             :         static constexpr uint32_t Mask = Capacity - 1;
      50                 :             : 
      51                 :             :     public:
      52                 :             :         using value_type = T;
      53                 :             :         // ── Layout types ────────────────────────────────────────────
      54                 :             : 
      55                 :             :         struct alignas(CacheLineSize) ControlBlock
      56                 :             :         {
      57                 :             :             std::atomic<uint32_t> head{0};
      58                 :             :             char pad1[CacheLineSize - sizeof(std::atomic<uint32_t>)];
      59                 :             :             std::atomic<uint32_t> tail{0};
      60                 :             :             char pad2[CacheLineSize - sizeof(std::atomic<uint32_t>)];
      61                 :             :         };
      62                 :             : 
      63                 :             :         // ── Lifecycle ───────────────────────────────────────────────
      64                 :             : 
      65                 :          18 :         RingBuffer()
      66         [ +  + ]:        1983 :         {
      67         [ +  + ]:        1983 :             for (uint32_t i = 0; i < Capacity; ++i)
      68                 :        1965 :                 m_slots[i].sequence.store(i, std::memory_order_relaxed);
      69                 :          18 :         }
      70                 :             : 
      71                 :             :         RingBuffer(const RingBuffer &) = delete;
      72                 :             :         RingBuffer &operator=(const RingBuffer &) = delete;
      73                 :             :         RingBuffer(RingBuffer &&) = delete;
      74                 :             :         RingBuffer &operator=(RingBuffer &&) = delete;
      75                 :             : 
      76                 :             :         // ── Producer (multiple threads) ─────────────────────────────
      77                 :             : 
      78                 :     3662881 :         [[nodiscard]] bool push(const T &item)
      79                 :             :         {
      80                 :     7284474 :             uint32_t pos = m_ctrl.head.load(std::memory_order_relaxed);
      81                 :             : 
      82                 :      491095 :             for (;;)
      83                 :             :             {
      84                 :     4112688 :                 Slot<T> &slot = m_slots[pos & Mask];
      85                 :     4112688 :                 uint32_t seq = slot.sequence.load(std::memory_order_acquire);
      86                 :     4246423 :                 auto diff = static_cast<int32_t>(seq - pos);
      87                 :             : 
      88         [ +  + ]:     4246423 :                 if (diff == 0)
      89                 :             :                 {
      90         [ +  + ]:     4242155 :                     if (m_ctrl.head.compare_exchange_weak(
      91                 :             :                             pos, pos + 1,
      92                 :             :                             std::memory_order_acq_rel,
      93                 :             :                             std::memory_order_relaxed))
      94                 :             :                     {
      95                 :     1640034 :                         std::memcpy(&slot.data, &item, sizeof(T));
      96                 :     1640034 :                         slot.sequence.store(pos + 1, std::memory_order_release);
      97                 :     1634548 :                         return true;
      98                 :             :                     }
      99                 :             :                     // CAS failed — pos reloaded by compare_exchange_weak
     100                 :             :                 }
     101         [ +  - ]:     2118066 :                 else if (diff < 0)
     102                 :             :                 {
     103                 :     2202756 :                     return false; // full
     104                 :             :                 }
     105                 :             :                 else
     106                 :             :                 {
     107                 :             :                     // Another producer advanced head; reload
     108                 :           0 :                     pos = m_ctrl.head.load(std::memory_order_relaxed);
     109                 :             :                 }
     110                 :             :             }
     111                 :             :         }
     112                 :             : 
     113                 :             :         // ── Consumer (single thread) ────────────────────────────────
     114                 :             : 
     115                 :     1679474 :         [[nodiscard]] bool pop(T &item)
     116                 :             :         {
     117                 :     1679474 :             uint32_t pos = m_ctrl.tail.load(std::memory_order_relaxed);
     118                 :     1679474 :             Slot<T> &slot = m_slots[pos & Mask];
     119                 :     1679474 :             uint32_t seq = slot.sequence.load(std::memory_order_acquire);
     120                 :             : 
     121         [ +  + ]:     1679474 :             if (seq != pos + 1)
     122                 :       39334 :                 return false;
     123                 :             : 
     124                 :     1640140 :             std::memcpy(&item, &slot.data, sizeof(T));
     125                 :     1640140 :             slot.sequence.store(pos + Capacity, std::memory_order_release);
     126                 :     1640140 :             m_ctrl.tail.store(pos + 1, std::memory_order_release);
     127                 :     1640140 :             return true;
     128                 :             :         }
     129                 :             : 
     130                 :             :         // ── Queries (approximate — producers may have reserved but not published) ─
     131                 :             : 
     132                 :         453 :         [[nodiscard]] uint32_t readAvailable() const
     133                 :             :         {
     134                 :         453 :             uint32_t head = m_ctrl.head.load(std::memory_order_acquire);
     135                 :         453 :             uint32_t tail = m_ctrl.tail.load(std::memory_order_relaxed);
     136                 :         453 :             return head - tail;
     137                 :             :         }
     138                 :             : 
     139                 :         104 :         [[nodiscard]] uint32_t writeAvailable() const
     140                 :             :         {
     141                 :         104 :             uint32_t head = m_ctrl.head.load(std::memory_order_relaxed);
     142                 :         104 :             uint32_t tail = m_ctrl.tail.load(std::memory_order_acquire);
     143                 :         104 :             return Capacity - (head - tail);
     144                 :             :         }
     145                 :             : 
     146                 :         451 :         [[nodiscard]] bool isEmpty() const { return readAvailable() == 0; }
     147                 :         102 :         [[nodiscard]] bool isFull() const { return writeAvailable() == 0; }
     148                 :             : 
     149                 :           2 :         static constexpr uint32_t capacity() { return Capacity; }
     150                 :           2 :         static constexpr uint32_t cacheLineSize() { return CacheLineSize; }
     151                 :             : 
     152                 :             :         // ── Reset (NOT thread-safe) ─────────────────────────────────
     153                 :             : 
     154                 :           1 :         void reset()
     155                 :             :         {
     156                 :           1 :             m_ctrl.head.store(0, std::memory_order_relaxed);
     157                 :           1 :             m_ctrl.tail.store(0, std::memory_order_relaxed);
     158         [ +  + ]:           9 :             for (uint32_t i = 0; i < Capacity; ++i)
     159                 :           8 :                 m_slots[i].sequence.store(i, std::memory_order_relaxed);
     160                 :           1 :         }
     161                 :             : 
     162                 :             :     private:
     163                 :             :         ControlBlock m_ctrl;
     164                 :             :         Slot<T> m_slots[Capacity];
     165                 :             :     };
     166                 :             : 
     167                 :             : } // namespace ouroboros::mpsc
        

Generated by: LCOV version 2.0-1