Skip to content

Commit

Permalink
Merge pull request alexandrnikitin#4 from alexandrnikitin/volatile-write
Browse files Browse the repository at this point in the history
Volatile Write around Sequence field
  • Loading branch information
alexandrnikitin authored Jan 24, 2018
2 parents 042dfc9 + 6196cd7 commit 3750330
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 116 deletions.
286 changes: 174 additions & 112 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This is an attempt to port [the famous Bounded MPMC queue algorithm by Dmitry Vy
### Implementation

The queue class layout is shown below. The `_buffer` field stores enqueued elements and their sequences. It has size that is a power of two. The `_bufferMask` field is used to avoid the expensive modulo operation and use `AND` instead. There's padding applied to avoid [false sharing][false-sharing] of `_enqueuePos` and `_dequeuePos` counters.
The queue class layout is shown below. The `_buffer` field stores enqueued elements and their sequences. It has size that is a power of two. The `_bufferMask` field is used to avoid the expensive modulo operation and use `AND` instead. There's padding applied to avoid [false sharing][false-sharing] of `_enqueuePos` and `_dequeuePos` counters. And [Volatile.Read/Write to suppress memory instructions reordering when read/write cell.Sequence][memory-barriers-in-dot-net].

```csharp
[StructLayout(LayoutKind.Explicit, Size = 192, CharSet = CharSet.Ansi)]
Expand Down Expand Up @@ -47,9 +47,9 @@ public bool TryEnqueue(object item)
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
// write the item we want to enqueue
Volatile.Write(ref buffer[index].Element, item);
buffer[index].Element = item;
// bump the sequence
buffer[index].Sequence = pos + 1;
Volatile.Write(ref buffer[index].Sequence, pos + 1);
return true;
}

Expand Down Expand Up @@ -81,9 +81,11 @@ public bool TryDequeue(out object result)
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
// read the item
result = Volatile.Read(ref cell.Element);
result = cell.Element;
// no more reference the dequeue data
buffer[index].Element = null;
// update for the next round of the buffer
buffer[index] = new Cell(pos + bufferMask + 1, null);
Volatile.Write(ref buffer[index].Sequence, pos + bufferMask + 1);
return true;
}

Expand Down Expand Up @@ -136,120 +138,180 @@ _`MPMCQueue` shows worse than `ConcurrentQueue` results on many core and multi s
`MPMCQueue.NET.MPMCQueue.TryEnqueue(System.Object)`

```
00007ffe`162b0d40 57 push rdi
00007ffe`162b0d41 56 push rsi
00007ffe`162b0d42 55 push rbp
00007ffe`162b0d43 53 push rbx
00007ffe`162b0d44 4883ec28 sub rsp,28h
00007ffe`162b0d48 488b7108 mov rsi,qword ptr [rcx+8]
00007ffe`162b0d4c 8b7948 mov edi,dword ptr [rcx+48h]
00007ffe`162b0d4f 8b4110 mov eax,dword ptr [rcx+10h]
00007ffe`162b0d52 23c7 and eax,edi
00007ffe`162b0d54 8bd8 mov ebx,eax
00007ffe`162b0d56 8b4608 mov eax,dword ptr [rsi+8]
00007ffe`162b0d59 3bd8 cmp ebx,eax
00007ffe`162b0d5b 735c jae 00007ffe`162b0db9
00007ffe`162b0d5d 4863c3 movsxd rax,ebx
00007ffe`162b0d60 48c1e004 shl rax,4
00007ffe`162b0d64 4c8d440610 lea r8,[rsi+rax+10h]
00007ffe`162b0d69 498bc0 mov rax,r8
00007ffe`162b0d6c 8b28 mov ebp,dword ptr [rax]
00007ffe`162b0d6e 3bef cmp ebp,edi
00007ffe`162b0d70 7538 jne 00007ffe`162b0daa
00007ffe`162b0d72 4c8d4948 lea r9,[rcx+48h]
00007ffe`162b0d76 448d5701 lea r10d,[rdi+1]
00007ffe`162b0d7a 8bc7 mov eax,edi
00007ffe`162b0d7c f0450fb111 lock cmpxchg dword ptr [r9],r10d
00007ffe`162b0d81 3bc7 cmp eax,edi
00007ffe`162b0d83 7525 jne 00007ffe`162b0daa
00007ffe`162b0d85 498d4808 lea rcx,[r8+8]
00007ffe`162b0d89 e872305f5f call clr!JIT_CheckedWriteBarrier (00007ffe`758a3e00)
00007ffe`162b0d8e 8d4701 lea eax,[rdi+1]
00007ffe`162b0d91 4863d3 movsxd rdx,ebx
00007ffe`162b0d94 48c1e204 shl rdx,4
00007ffe`162b0d98 89441610 mov dword ptr [rsi+rdx+10h],eax
00007ffe`162b0d9c b801000000 mov eax,1
00007ffe`162b0da1 4883c428 add rsp,28h
00007ffe`162b0da5 5b pop rbx
00007ffe`162b0da6 5d pop rbp
00007ffe`162b0da7 5e pop rsi
00007ffe`162b0da8 5f pop rdi
00007ffe`162b0da9 c3 ret
00007ffe`162b0daa 3bef cmp ebp,edi
00007ffe`162b0dac 7d9a jge 00007ffe`162b0d48
00007ffe`162b0dae 33c0 xor eax,eax
00007ffe`162b0db0 4883c428 add rsp,28h
00007ffe`162b0db4 5b pop rbx
00007ffe`162b0db5 5d pop rbp
00007ffe`162b0db6 5e pop rsi
00007ffe`162b0db7 5f pop rdi
00007ffe`162b0db8 c3 ret
00007ffe`162b0db9 e8226ea95f call clr!JIT_RngChkFail (00007ffe`75d47be0)
00007ffe`162b0dbe cc int 3
var buffer = _buffer;
>>>
00007ffb`3a790660 57 push rdi
00007ffb`3a790661 56 push rsi
00007ffb`3a790662 53 push rbx
00007ffb`3a790663 4883ec20 sub rsp,20h
00007ffb`3a790667 4c8b4108 mov r8,qword ptr [rcx+8]
var pos = _enqueuePos;
00007ffb`3a79066b 8b7148 mov esi,dword ptr [rcx+48h]
var index = pos & _bufferMask;
00007ffb`3a79066e 448bce mov r9d,esi
00007ffb`3a790671 44234910 and r9d,dword ptr [rcx+10h]
var cell = buffer[index];
00007ffb`3a790675 453b4808 cmp r9d,dword ptr [r8+8]
00007ffb`3a790679 735e jae 00007ffb`3a7906d9
00007ffb`3a79067b 4963c1 movsxd rax,r9d
00007ffb`3a79067e 48c1e004 shl rax,4
00007ffb`3a790682 498d7c0010 lea rdi,[r8+rax+10h]
00007ffb`3a790687 488bc7 mov rax,rdi
00007ffb`3a79068a 8b5808 mov ebx,dword ptr [rax+8]
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
00007ffb`3a79068d 3bde cmp ebx,esi
00007ffb`3a79068f 753a jne 00007ffb`3a7906cb
00007ffb`3a790691 4c8d5148 lea r10,[rcx+48h]
00007ffb`3a790695 448d5e01 lea r11d,[rsi+1]
00007ffb`3a790699 8bc6 mov eax,esi
00007ffb`3a79069b f0450fb11a lock cmpxchg dword ptr [r10],r11d
00007ffb`3a7906a0 3bc6 cmp eax,esi
00007ffb`3a7906a2 7527 jne 00007ffb`3a7906cb
buffer[index].Element = item;
00007ffb`3a7906a4 4963c9 movsxd rcx,r9d
00007ffb`3a7906a7 48c1e104 shl rcx,4
00007ffb`3a7906ab 498d4c0810 lea rcx,[r8+rcx+10h]
00007ffb`3a7906b0 e83b37605f call clr+0x3df0 (00007ffb`99d93df0) (JitHelp: CORINFO_HELP_ASSIGN_REF)
Volatile.Write(ref buffer[index].Sequence, pos + 1);
00007ffb`3a7906b5 488d4708 lea rax,[rdi+8]
00007ffb`3a7906b9 8d5601 lea edx,[rsi+1]
00007ffb`3a7906bc 8910 mov dword ptr [rax],edx
return true;
00007ffb`3a7906be b801000000 mov eax,1
src\MPMCQueue.NET\MPMCQueue.cs @ 54: (return false;)
00007ffb`3a7906c3 4883c420 add rsp,20h
00007ffb`3a7906c7 5b pop rbx
00007ffb`3a7906c8 5e pop rsi
00007ffb`3a7906c9 5f pop rdi
00007ffb`3a7906ca c3 ret
if (cell.Sequence < pos)
00007ffb`3a7906cb 3bde cmp ebx,esi
00007ffb`3a7906cd 7d98 jge 00007ffb`3a790667
src\MPMCQueue.NET\MPMCQueue.cs @ 54: (return false;)
00007ffb`3a7906cf 33c0 xor eax,eax
00007ffb`3a7906d1 4883c420 add rsp,20h
00007ffb`3a7906d5 5b pop rbx
00007ffb`3a7906d6 5e pop rsi
00007ffb`3a7906d7 5f pop rdi
00007ffb`3a7906d8 c3 ret
src\MPMCQueue.NET\MPMCQueue.cs @ 41:
00007ffb`3a7906d9 e8e21caa5f call clr!TranslateSecurityAttributes+0x88050 (00007ffb`9a2323c0) (JitHelp: CORINFO_HELP_RNGCHKFAIL)
00007ffb`3a7906de cc int 3
```

`MPMCQueue.NET.MPMCQueue.TryDequeue(System.Object ByRef)`

```
00007ffe`162b0b10 4156 push r14
00007ffe`162b0b12 57 push rdi
00007ffe`162b0b13 56 push rsi
00007ffe`162b0b14 55 push rbp
00007ffe`162b0b15 53 push rbx
00007ffe`162b0b16 4883ec20 sub rsp,20h
00007ffe`162b0b1a 4c8bc2 mov r8,rdx
00007ffe`162b0b1d 488b4108 mov rax,qword ptr [rcx+8]
00007ffe`162b0b21 8b7110 mov esi,dword ptr [rcx+10h]
00007ffe`162b0b24 8bb988000000 mov edi,dword ptr [rcx+88h]
00007ffe`162b0b2a 8bd7 mov edx,edi
00007ffe`162b0b2c 23d6 and edx,esi
00007ffe`162b0b2e 448b4808 mov r9d,dword ptr [rax+8]
00007ffe`162b0b32 413bd1 cmp edx,r9d
00007ffe`162b0b35 736b jae 00007ffe`162b0ba2
00007ffe`162b0b37 4863d2 movsxd rdx,edx
00007ffe`162b0b3a 48c1e204 shl rdx,4
00007ffe`162b0b3e 488d5c1010 lea rbx,[rax+rdx+10h]
00007ffe`162b0b43 488bc3 mov rax,rbx
00007ffe`162b0b46 8b28 mov ebp,dword ptr [rax]
00007ffe`162b0b48 488b5008 mov rdx,qword ptr [rax+8]
00007ffe`162b0b4c 448d7701 lea r14d,[rdi+1]
00007ffe`162b0b50 413bee cmp ebp,r14d
00007ffe`162b0b53 7536 jne 00007ffe`162b0b8b
00007ffe`162b0b55 4c8d8988000000 lea r9,[rcx+88h]
00007ffe`162b0b5c 8bc7 mov eax,edi
00007ffe`162b0b5e f0450fb131 lock cmpxchg dword ptr [r9],r14d
00007ffe`162b0b63 3bc7 cmp eax,edi
00007ffe`162b0b65 7524 jne 00007ffe`162b0b8b
00007ffe`162b0b67 498bc8 mov rcx,r8
00007ffe`162b0b6a e891325f5f call clr!JIT_CheckedWriteBarrier (00007ffe`758a3e00)
00007ffe`162b0b6f 8d443701 lea eax,[rdi+rsi+1]
00007ffe`162b0b73 33d2 xor edx,edx
00007ffe`162b0b75 8903 mov dword ptr [rbx],eax
00007ffe`162b0b77 48895308 mov qword ptr [rbx+8],rdx
00007ffe`162b0b7b b801000000 mov eax,1
00007ffe`162b0b80 4883c420 add rsp,20h
00007ffe`162b0b84 5b pop rbx
00007ffe`162b0b85 5d pop rbp
00007ffe`162b0b86 5e pop rsi
00007ffe`162b0b87 5f pop rdi
00007ffe`162b0b88 415e pop r14
00007ffe`162b0b8a c3 ret
00007ffe`162b0b8b 413bee cmp ebp,r14d
00007ffe`162b0b8e 7d8d jge 00007ffe`162b0b1d
00007ffe`162b0b90 33c0 xor eax,eax
00007ffe`162b0b92 498900 mov qword ptr [r8],rax
00007ffe`162b0b95 33c0 xor eax,eax
00007ffe`162b0b97 4883c420 add rsp,20h
00007ffe`162b0b9b 5b pop rbx
00007ffe`162b0b9c 5d pop rbp
00007ffe`162b0b9d 5e pop rsi
00007ffe`162b0b9e 5f pop rdi
00007ffe`162b0b9f 415e pop r14
00007ffe`162b0ba1 c3 ret
00007ffe`162b0ba2 e83970a95f call clr!JIT_RngChkFail (00007ffe`75d47be0)
00007ffe`162b0ba7 cc int 3
var buffer = _buffer;
>>>
00007ffb`3a790700 4157 push r15
00007ffb`3a790702 4156 push r14
00007ffb`3a790704 4154 push r12
00007ffb`3a790706 57 push rdi
00007ffb`3a790707 56 push rsi
00007ffb`3a790708 55 push rbp
00007ffb`3a790709 53 push rbx
00007ffb`3a79070a 4883ec20 sub rsp,20h
00007ffb`3a79070e 4c8bc2 mov r8,rdx
00007ffb`3a790711 488b7108 mov rsi,qword ptr [rcx+8]
var bufferMask = _bufferMask;
00007ffb`3a790715 8b7910 mov edi,dword ptr [rcx+10h]
var pos = _dequeuePos;
00007ffb`3a790718 8b9988000000 mov ebx,dword ptr [rcx+88h]
var index = pos & bufferMask;
00007ffb`3a79071e 8beb mov ebp,ebx
00007ffb`3a790720 23ef and ebp,edi
var cell = buffer[index];
00007ffb`3a790722 3b6e08 cmp ebp,dword ptr [rsi+8]
00007ffb`3a790725 0f8384000000 jae 00007ffb`3a7907af
00007ffb`3a79072b 4863c5 movsxd rax,ebp
00007ffb`3a79072e 48c1e004 shl rax,4
00007ffb`3a790732 4c8d740610 lea r14,[rsi+rax+10h]
00007ffb`3a790737 498bc6 mov rax,r14
00007ffb`3a79073a 488b10 mov rdx,qword ptr [rax]
00007ffb`3a79073d 448b7808 mov r15d,dword ptr [rax+8]
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
00007ffb`3a790741 448d6301 lea r12d,[rbx+1]
00007ffb`3a790745 453bfc cmp r15d,r12d
00007ffb`3a790748 7546 jne 00007ffb`3a790790
00007ffb`3a79074a 4c8d8988000000 lea r9,[rcx+88h]
00007ffb`3a790751 8bc3 mov eax,ebx
00007ffb`3a790753 f0450fb121 lock cmpxchg dword ptr [r9],r12d
00007ffb`3a790758 3bc3 cmp eax,ebx
00007ffb`3a79075a 7534 jne 00007ffb`3a790790
result = cell.Element;
00007ffb`3a79075c 498bc8 mov rcx,r8
00007ffb`3a79075f e85c36605f call clr+0x3dc0 (00007ffb`99d93dc0) (JitHelp: CORINFO_HELP_CHECKED_ASSIGN_REF)
buffer[index].Element = null;
00007ffb`3a790764 4863c5 movsxd rax,ebp
00007ffb`3a790767 48c1e004 shl rax,4
00007ffb`3a79076b 33d2 xor edx,edx
00007ffb`3a79076d 4889540610 mov qword ptr [rsi+rax+10h],rdx
Volatile.Write(ref buffer[index].Sequence, pos + bufferMask + 1);
00007ffb`3a790772 498d4608 lea rax,[r14+8]
00007ffb`3a790776 03df add ebx,edi
00007ffb`3a790778 ffc3 inc ebx
00007ffb`3a79077a 8918 mov dword ptr [rax],ebx
return true;
00007ffb`3a79077c b801000000 mov eax,1
src\MPMCQueue.NET\MPMCQueue.cs @ 79: (return false;)
00007ffb`3a790781 4883c420 add rsp,20h
00007ffb`3a790785 5b pop rbx
00007ffb`3a790786 5d pop rbp
00007ffb`3a790787 5e pop rsi
00007ffb`3a790788 5f pop rdi
00007ffb`3a790789 415c pop r12
00007ffb`3a79078b 415e pop r14
00007ffb`3a79078d 415f pop r15
00007ffb`3a79078f c3 ret
if (cell.Sequence < pos + 1)
00007ffb`3a790790 453bfc cmp r15d,r12d
00007ffb`3a790793 0f8d78ffffff jge 00007ffb`3a790711
result = default(object);
00007ffb`3a790799 33c0 xor eax,eax
00007ffb`3a79079b 498900 mov qword ptr [r8],rax
src\MPMCQueue.NET\MPMCQueue.cs @ 79: (return false;)
00007ffb`3a79079e 33c0 xor eax,eax
00007ffb`3a7907a0 4883c420 add rsp,20h
00007ffb`3a7907a4 5b pop rbx
00007ffb`3a7907a5 5d pop rbp
00007ffb`3a7907a6 5e pop rsi
00007ffb`3a7907a7 5f pop rdi
00007ffb`3a7907a8 415c pop r12
00007ffb`3a7907aa 415e pop r14
00007ffb`3a7907ac 415f pop r15
00007ffb`3a7907ae c3 ret
src\MPMCQueue.NET\MPMCQueue.cs @ 63:
00007ffb`3a7907af e80c1caa5f call clr!TranslateSecurityAttributes+0x88050 (00007ffb`9a2323c0) (JitHelp: CORINFO_HELP_RNGCHKFAIL)
00007ffb`3a7907b4 cc int 3
```


[1024-mpmc]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
[false-sharing]: http://mechanical-sympathy.blogspot.lt/2011/07/false-sharing.html
[memory-barriers-in-dot-net]: http://afana.me/archive/2015/07/10/memory-barriers-in-dot-net.aspx/
9 changes: 5 additions & 4 deletions src/MPMCQueue.NET/MPMCQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public bool TryEnqueue(object item)
var cell = buffer[index];
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
Volatile.Write(ref buffer[index].Element, item);
buffer[index].Sequence = pos + 1;
buffer[index].Element = item;
Volatile.Write(ref buffer[index].Sequence, pos + 1);
return true;
}

Expand All @@ -67,8 +67,9 @@ public bool TryDequeue(out object result)
var cell = buffer[index];
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
result = Volatile.Read(ref cell.Element);
buffer[index] = new Cell(pos + bufferMask + 1, null);
result = cell.Element;
buffer[index].Element = null;
Volatile.Write(ref buffer[index].Sequence, pos + bufferMask + 1);
return true;
}

Expand Down

0 comments on commit 3750330

Please sign in to comment.