diff --git a/src/MPMCQueue.NET/MPMCQueue.cs b/src/MPMCQueue.NET/MPMCQueue.cs index e0abb23..fae1b1a 100644 --- a/src/MPMCQueue.NET/MPMCQueue.cs +++ b/src/MPMCQueue.NET/MPMCQueue.cs @@ -4,48 +4,65 @@ namespace MPMCQueue.NET { - [StructLayout(LayoutKind.Explicit, Size = 192, CharSet = CharSet.Ansi)] + [StructLayout(LayoutKind.Explicit, Size = 336)] public class MPMCQueue { - [FieldOffset(0)] - private readonly Cell[] _buffer; - [FieldOffset(8)] - private readonly int _bufferMask; - [FieldOffset(64)] - private int _enqueuePos; - [FieldOffset(128)] - private int _dequeuePos; + /// + /// 128 bytes cache line already exists in some CPUs. + /// + /// + /// Also "the spatial prefetcher strives to keep pairs of cache lines in the L2 cache." + /// https://stackoverflow.com/questions/29199779/false-sharing-and-128-byte-alignment-padding + /// + internal const int SAFE_CACHE_LINE = 128; + [FieldOffset(SAFE_CACHE_LINE)] + private readonly Cell[] _enqueueBuffer; + + [FieldOffset(SAFE_CACHE_LINE + 8)] + private volatile int _enqueuePos; + + // Separate access to buffers from enqueue and dequeue. + // This removes false sharing and accessing a buffer + // reference also prefetches the following Pos with [(64 - (8 + 4 + 4)) = 52]/64 probability. + + [FieldOffset(SAFE_CACHE_LINE * 2)] + private readonly Cell[] _dequeueBuffer; + + [FieldOffset(SAFE_CACHE_LINE * 2 + 8)] + private volatile int _dequeuePos; public MPMCQueue(int bufferSize) { if (bufferSize < 2) throw new ArgumentException($"{nameof(bufferSize)} should be greater than or equal to 2"); if ((bufferSize & (bufferSize - 1)) != 0) throw new ArgumentException($"{nameof(bufferSize)} should be a power of 2"); - _bufferMask = bufferSize - 1; - _buffer = new Cell[bufferSize]; + _enqueueBuffer = new Cell[bufferSize]; for (var i = 0; i < bufferSize; i++) { - _buffer[i] = new Cell(i, null); + _enqueueBuffer[i] = new Cell(i, null); } + _dequeueBuffer = _enqueueBuffer; _enqueuePos = 0; _dequeuePos = 0; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool TryEnqueue(object item) { + var spinner = new SpinWait(); do { - var buffer = _buffer; + var buffer = _enqueueBuffer; var pos = _enqueuePos; - var index = pos & _bufferMask; - var cell = buffer[index]; + var index = pos & (buffer.Length - 1); + ref var cell = ref buffer[index]; if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos) { - buffer[index].Element = item; - Volatile.Write(ref buffer[index].Sequence, pos + 1); + cell.Element = item; + cell.Sequence = pos + 1; return true; } @@ -53,39 +70,47 @@ public bool TryEnqueue(object item) { return false; } + + spinner.SpinOnce(); } while (true); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool TryDequeue(out object result) { + result = null; + var spinner = new SpinWait(); do { - var buffer = _buffer; - var bufferMask = _bufferMask; + var buffer = _dequeueBuffer; var pos = _dequeuePos; - var index = pos & bufferMask; - var cell = buffer[index]; + var index = pos & (buffer.Length - 1); + ref var cell = ref buffer[index]; if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos) { result = cell.Element; - buffer[index].Element = null; - Volatile.Write(ref buffer[index].Sequence, pos + bufferMask + 1); - return true; + cell.Element = null; + cell.Sequence = pos + buffer.Length; + break; } if (cell.Sequence < pos + 1) { - result = default(object); - return false; + break; } + + spinner.SpinOnce(); } while (true); + + return result != null; } - [StructLayout(LayoutKind.Explicit, Size = 16, CharSet = CharSet.Ansi)] + [StructLayout(LayoutKind.Explicit, Size = 16)] private struct Cell { [FieldOffset(0)] - public int Sequence; + public volatile int Sequence; + [FieldOffset(8)] public object Element; @@ -96,4 +121,4 @@ public Cell(int sequence, object element) } } } -} \ No newline at end of file +}