-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathHelios.Concurrency.DedicatedThreadPool.cs
762 lines (646 loc) · 26.9 KB
/
Helios.Concurrency.DedicatedThreadPool.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
//-----------------------------------------------------------------------
// <copyright file="Helios.Concurrency.DedicatedThreadPool.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
/*
* Copyright 2015 Roger Alsing, Aaron Stannard
* Helios.DedicatedThreadPool - https://github.com/helios-io/DedicatedThreadPool
*/
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Akka.Dispatch;
namespace Helios.Concurrency
{
/// <summary>
/// The type of threads to use - either foreground or background threads.
/// </summary>
internal enum ThreadType
{
Foreground,
Background
}
/// <summary>
/// Provides settings for a dedicated thread pool
/// </summary>
internal class DedicatedThreadPoolSettings
{
/// <summary>
/// Background threads are the default thread type
/// </summary>
public const ThreadType DefaultThreadType = ThreadType.Background;
public DedicatedThreadPoolSettings(int numThreads, string name = null, TimeSpan? deadlockTimeout = null)
: this(numThreads, DefaultThreadType, name, deadlockTimeout)
{ }
public DedicatedThreadPoolSettings(
int numThreads,
ThreadType threadType,
string name = null,
TimeSpan? deadlockTimeout = null)
{
Name = name ?? ("DedicatedThreadPool-" + Guid.NewGuid());
ThreadType = threadType;
NumThreads = numThreads;
DeadlockTimeout = deadlockTimeout;
if (deadlockTimeout.HasValue && deadlockTimeout.Value.TotalMilliseconds <= 0)
throw new ArgumentOutOfRangeException("deadlockTimeout", string.Format("deadlockTimeout must be null or at least 1ms. Was {0}.", deadlockTimeout));
if (numThreads <= 0)
throw new ArgumentOutOfRangeException("numThreads", string.Format("numThreads must be at least 1. Was {0}", numThreads));
}
/// <summary>
/// The total number of threads to run in this thread pool.
/// </summary>
public int NumThreads { get; private set; }
/// <summary>
/// The type of threads to run in this thread pool.
/// </summary>
public ThreadType ThreadType { get; private set; }
/// <summary>
/// Interval to check for thread deadlocks.
///
/// If a thread takes longer than <see cref="DeadlockTimeout"/> it will be aborted
/// and replaced.
/// </summary>
public TimeSpan? DeadlockTimeout { get; private set; }
/// <summary>
/// TBD
/// </summary>
public string Name { get; private set; }
/// <summary>
/// TBD
/// </summary>
public Action<Exception> ExceptionHandler { get; private set; }
/// <summary>
/// Gets the thread stack size, 0 represents the default stack size.
/// </summary>
public int ThreadMaxStackSize { get; private set; }
}
/// <summary>
/// TaskScheduler for working with a <see cref="DedicatedThreadPool"/> instance
/// </summary>
internal class DedicatedThreadPoolTaskScheduler : TaskScheduler
{
// Indicates whether the current thread is processing work items.
[ThreadStatic]
private static bool _currentThreadIsRunningTasks;
/// <summary>
/// Number of tasks currently running
/// </summary>
private volatile int _parallelWorkers = 0;
private readonly LinkedList<Task> _tasks = new();
private readonly DedicatedThreadPool _pool;
/// <summary>
/// TBD
/// </summary>
/// <param name="pool">TBD</param>
public DedicatedThreadPoolTaskScheduler(DedicatedThreadPool pool)
{
_pool = pool;
}
/// <summary>
/// TBD
/// </summary>
/// <param name="task">TBD</param>
protected override void QueueTask(Task task)
{
lock (_tasks)
{
_tasks.AddLast(task);
}
EnsureWorkerRequested();
}
/// <summary>
/// TBD
/// </summary>
/// <param name="task">TBD</param>
/// <param name="taskWasPreviouslyQueued">TBD</param>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
//current thread isn't running any tasks, can't execute inline
if (!_currentThreadIsRunningTasks) return false;
//remove the task from the queue if it was previously added
if (taskWasPreviouslyQueued)
if (TryDequeue(task))
return TryExecuteTask(task);
else
return false;
return TryExecuteTask(task);
}
/// <summary>
/// TBD
/// </summary>
/// <param name="task">TBD</param>
/// <returns>TBD</returns>
protected override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
/// <summary>
/// Level of concurrency is directly equal to the number of threads
/// in the <see cref="DedicatedThreadPool"/>.
/// </summary>
public override int MaximumConcurrencyLevel
{
get { return _pool.Settings.NumThreads; }
}
/// <summary>
/// TBD
/// </summary>
/// <exception cref="NotSupportedException">
/// This exception is thrown if can't ensure a thread-safe return of the list of tasks.
/// </exception>
/// <returns>TBD</returns>
protected override IEnumerable<Task> GetScheduledTasks()
{
var lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
//should this be immutable?
if (lockTaken) return _tasks;
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
private void EnsureWorkerRequested()
{
var count = _parallelWorkers;
while (count < _pool.Settings.NumThreads)
{
var prev = Interlocked.CompareExchange(ref _parallelWorkers, count + 1, count);
if (prev == count)
{
RequestWorker();
break;
}
count = prev;
}
}
private void ReleaseWorker()
{
var count = _parallelWorkers;
while (count > 0)
{
var prev = Interlocked.CompareExchange(ref _parallelWorkers, count - 1, count);
if (prev == count)
{
break;
}
count = prev;
}
}
private sealed class RequestWorkerTask : IRunnable
{
private readonly DedicatedThreadPoolTaskScheduler _scheduler;
public RequestWorkerTask(DedicatedThreadPoolTaskScheduler scheduler)
{
_scheduler = scheduler;
}
public void Run()
{
// this thread is now available for inlining
_currentThreadIsRunningTasks = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_scheduler._tasks)
{
// done processing
if (_scheduler._tasks.Count == 0)
{
_scheduler.ReleaseWorker();
break;
}
// Get the next item from the queue
item = _scheduler._tasks.First.Value;
_scheduler._tasks.RemoveFirst();
}
// Execute the task we pulled out of the queue
_scheduler.TryExecuteTask(item);
}
}
// We're done processing items on the current thread
finally { _currentThreadIsRunningTasks = false; }
}
public void Execute()
{
Run();
}
}
private void RequestWorker()
{
_pool.QueueUserWorkItem(new RequestWorkerTask(this));
}
}
/// <summary>
/// An instanced, dedicated thread pool.
/// </summary>
internal sealed class DedicatedThreadPool : IDisposable
{
/// <summary>
/// TBD
/// </summary>
/// <param name="settings">TBD</param>
public DedicatedThreadPool(DedicatedThreadPoolSettings settings)
{
_workQueue = new ThreadPoolWorkQueue();
Settings = settings;
_workers = Enumerable.Range(1, settings.NumThreads).Select(workerId => new PoolWorker(this, workerId)).ToArray();
// Note:
// The DedicatedThreadPoolSupervisor was removed because aborting thread could lead to unexpected behavior
// If a new implementation is done, it should spawn a new thread when a worker is not making progress and
// try to keep {settings.NumThreads} active threads.
}
/// <summary>
/// TBD
/// </summary>
public DedicatedThreadPoolSettings Settings { get; private set; }
private readonly ThreadPoolWorkQueue _workQueue;
private readonly PoolWorker[] _workers;
/// <summary>
/// TBD
/// </summary>
/// <exception cref="ArgumentNullException">
/// This exception is thrown if the given <paramref name="work"/> item is undefined.
/// </exception>
/// <returns>TBD</returns>
public bool QueueUserWorkItem<T>(T work) where T:IRunnable
{
if (work == null)
throw new ArgumentNullException(nameof(work), "Work item cannot be null.");
return _workQueue.TryAdd(work);
}
/// <summary>
/// TBD
/// </summary>
public void Dispose()
{
_workQueue.CompleteAdding();
}
/// <summary>
/// TBD
/// </summary>
public void WaitForThreadsExit()
{
WaitForThreadsExit(Timeout.InfiniteTimeSpan);
}
/// <summary>
/// TBD
/// </summary>
/// <param name="timeout">TBD</param>
public void WaitForThreadsExit(TimeSpan timeout)
{
Task.WaitAll(_workers.Select(worker => worker.ThreadExit).ToArray(), timeout);
}
#region Pool worker implementation
private class PoolWorker
{
private readonly DedicatedThreadPool _pool;
private readonly TaskCompletionSource<object> _threadExit;
public Task ThreadExit
{
get { return _threadExit.Task; }
}
public PoolWorker(DedicatedThreadPool pool, int workerId)
{
_pool = pool;
_threadExit = new TaskCompletionSource<object>();
var thread = new Thread(RunThread)
{
IsBackground = pool.Settings.ThreadType == ThreadType.Background,
};
if (pool.Settings.Name != null)
thread.Name = string.Format("{0}_{1}", pool.Settings.Name, workerId);
thread.Start();
}
private void RunThread()
{
try
{
foreach (var action in _pool._workQueue.GetConsumingEnumerable())
{
try
{
action.Run();
}
catch (Exception ex)
{
_pool.Settings.ExceptionHandler(ex);
}
}
}
finally
{
_threadExit.TrySetResult(null);
}
}
}
#endregion
#region WorkQueue implementation
private class ThreadPoolWorkQueue
{
private static readonly int ProcessorCount = Environment.ProcessorCount;
private const int CompletedState = 1;
private readonly ConcurrentQueue<IRunnable> _queue = new();
private readonly UnfairSemaphore _semaphore = new();
private int _outstandingRequests;
private int _isAddingCompleted;
public bool IsAddingCompleted
{
get { return Volatile.Read(ref _isAddingCompleted) == CompletedState; }
}
public bool TryAdd<T>(T work) where T:IRunnable
{
// If TryAdd returns true, it's guaranteed the work item will be executed.
// If it returns false, it's also guaranteed the work item won't be executed.
if (IsAddingCompleted)
return false;
_queue.Enqueue(work);
EnsureThreadRequested();
return true;
}
public IEnumerable<IRunnable> GetConsumingEnumerable()
{
while (true)
{
if (_queue.TryDequeue(out var work))
{
yield return work;
}
else if (IsAddingCompleted)
{
while (_queue.TryDequeue(out work))
yield return work;
break;
}
else
{
_semaphore.Wait();
MarkThreadRequestSatisfied();
}
}
}
public void CompleteAdding()
{
int previousCompleted = Interlocked.Exchange(ref _isAddingCompleted, CompletedState);
if (previousCompleted == CompletedState)
return;
// When CompleteAdding() is called, we fill up the _outstandingRequests and the semaphore
// This will ensure that all threads will unblock and try to execute the remaining item in
// the queue. When IsAddingCompleted is set, all threads will exit once the queue is empty.
while (true)
{
int count = Volatile.Read(ref _outstandingRequests);
int countToRelease = UnfairSemaphore.MaxWorker - count;
int prev = Interlocked.CompareExchange(ref _outstandingRequests, UnfairSemaphore.MaxWorker, count);
if (prev == count)
{
_semaphore.Release((short)countToRelease);
break;
}
}
}
private void EnsureThreadRequested()
{
// There is a double counter here (_outstandingRequest and _semaphore)
// Unfair semaphore does not support value bigger than short.MaxValue,
// trying to Release more than short.MaxValue could fail miserably.
// The _outstandingRequest counter ensure that we only request a
// maximum of {ProcessorCount} to the semaphore.
// It's also more efficient to have two counter, _outstandingRequests is
// more lightweight than the semaphore.
// This trick is borrowed from the .Net ThreadPool
// https://github.com/dotnet/coreclr/blob/bc146608854d1db9cdbcc0b08029a87754e12b49/src/mscorlib/src/System/Threading/ThreadPool.cs#L568
int count = Volatile.Read(ref _outstandingRequests);
while (count < ProcessorCount)
{
int prev = Interlocked.CompareExchange(ref _outstandingRequests, count + 1, count);
if (prev == count)
{
_semaphore.Release();
break;
}
count = prev;
}
}
private void MarkThreadRequestSatisfied()
{
int count = Volatile.Read(ref _outstandingRequests);
while (count > 0)
{
int prev = Interlocked.CompareExchange(ref _outstandingRequests, count - 1, count);
if (prev == count)
{
break;
}
count = prev;
}
}
}
#endregion
#region UnfairSemaphore implementation
// This class has been translated from:
// https://github.com/dotnet/coreclr/blob/97433b9d153843492008652ff6b7c3bf4d9ff31c/src/vm/win32threadpool.h#L124
// UnfairSemaphore is a more scalable semaphore than Semaphore. It prefers to release threads that have more recently begun waiting,
// to preserve locality. Additionally, very recently-waiting threads can be released without an addition kernel transition to unblock
// them, which reduces latency.
//
// UnfairSemaphore is only appropriate in scenarios where the order of unblocking threads is not important, and where threads frequently
// need to be woken.
[StructLayout(LayoutKind.Sequential)]
private sealed class UnfairSemaphore
{
public const int MaxWorker = 0x7FFF;
private static readonly int ProcessorCount = Environment.ProcessorCount;
// We track everything we care about in a single 64-bit struct to allow us to
// do CompareExchanges on this for atomic updates.
[StructLayout(LayoutKind.Explicit)]
private struct SemaphoreState
{
//how many threads are currently spin-waiting for this semaphore?
[FieldOffset(0)]
public short Spinners;
//how much of the semaphore's count is available to spinners?
[FieldOffset(2)]
public short CountForSpinners;
//how many threads are blocked in the OS waiting for this semaphore?
[FieldOffset(4)]
public short Waiters;
//how much count is available to waiters?
[FieldOffset(6)]
public short CountForWaiters;
[FieldOffset(0)]
public long RawData;
}
[StructLayout(LayoutKind.Explicit, Size = 64)]
private struct CacheLinePadding
{ }
private readonly Semaphore m_semaphore;
// padding to ensure we get our own cache line
#pragma warning disable 169
private readonly CacheLinePadding m_padding1;
private SemaphoreState m_state;
private readonly CacheLinePadding m_padding2;
#pragma warning restore 169
public UnfairSemaphore()
{
m_semaphore = new Semaphore(0, short.MaxValue);
}
public bool Wait()
{
return Wait(Timeout.InfiniteTimeSpan);
}
public bool Wait(TimeSpan timeout)
{
while (true)
{
SemaphoreState currentCounts = GetCurrentState();
SemaphoreState newCounts = currentCounts;
// First, just try to grab some count.
if (currentCounts.CountForSpinners > 0)
{
--newCounts.CountForSpinners;
if (TryUpdateState(newCounts, currentCounts))
return true;
}
else
{
// No count available, become a spinner
++newCounts.Spinners;
if (TryUpdateState(newCounts, currentCounts))
break;
}
}
//
// Now we're a spinner.
//
int numSpins = 0;
const int spinLimitPerProcessor = 50;
while (true)
{
SemaphoreState currentCounts = GetCurrentState();
SemaphoreState newCounts = currentCounts;
if (currentCounts.CountForSpinners > 0)
{
--newCounts.CountForSpinners;
--newCounts.Spinners;
if (TryUpdateState(newCounts, currentCounts))
return true;
}
else
{
double spinnersPerProcessor = (double)currentCounts.Spinners / ProcessorCount;
int spinLimit = (int)((spinLimitPerProcessor / spinnersPerProcessor) + 0.5);
if (numSpins >= spinLimit)
{
--newCounts.Spinners;
++newCounts.Waiters;
if (TryUpdateState(newCounts, currentCounts))
break;
}
else
{
//
// We yield to other threads using Thread.Sleep(0) rather than the more traditional Thread.Yield().
// This is because Thread.Yield() does not yield to threads currently scheduled to run on other
// processors. On a 4-core machine, for example, this means that Thread.Yield() is only ~25% likely
// to yield to the correct thread in some scenarios.
// Thread.Sleep(0) has the disadvantage of not yielding to lower-priority threads. However, this is ok because
// once we've called this a few times we'll become a "waiter" and wait on the Semaphore, and that will
// yield to anything that is runnable.
//
Thread.Sleep(0);
numSpins++;
}
}
}
//
// Now we're a waiter
//
bool waitSucceeded = m_semaphore.WaitOne(timeout);
while (true)
{
SemaphoreState currentCounts = GetCurrentState();
SemaphoreState newCounts = currentCounts;
--newCounts.Waiters;
if (waitSucceeded)
--newCounts.CountForWaiters;
if (TryUpdateState(newCounts, currentCounts))
return waitSucceeded;
}
}
public void Release()
{
Release(1);
}
public void Release(short count)
{
while (true)
{
SemaphoreState currentState = GetCurrentState();
SemaphoreState newState = currentState;
short remainingCount = count;
// First, prefer to release existing spinners,
// because a) they're hot, and b) we don't need a kernel
// transition to release them.
short spinnersToRelease = Math.Max((short)0, Math.Min(remainingCount, (short)(currentState.Spinners - currentState.CountForSpinners)));
newState.CountForSpinners += spinnersToRelease;
remainingCount -= spinnersToRelease;
// Next, prefer to release existing waiters
short waitersToRelease = Math.Max((short)0, Math.Min(remainingCount, (short)(currentState.Waiters - currentState.CountForWaiters)));
newState.CountForWaiters += waitersToRelease;
remainingCount -= waitersToRelease;
// Finally, release any future spinners that might come our way
newState.CountForSpinners += remainingCount;
// Try to commit the transaction
if (TryUpdateState(newState, currentState))
{
// Now we need to release the waiters we promised to release
if (waitersToRelease > 0)
m_semaphore.Release(waitersToRelease);
break;
}
}
}
private bool TryUpdateState(SemaphoreState newState, SemaphoreState currentState)
{
if (Interlocked.CompareExchange(ref m_state.RawData, newState.RawData, currentState.RawData) == currentState.RawData)
{
Debug.Assert(newState.CountForSpinners <= MaxWorker, "CountForSpinners is greater than MaxWorker");
Debug.Assert(newState.CountForSpinners >= 0, "CountForSpinners is lower than zero");
Debug.Assert(newState.Spinners <= MaxWorker, "Spinners is greater than MaxWorker");
Debug.Assert(newState.Spinners >= 0, "Spinners is lower than zero");
Debug.Assert(newState.CountForWaiters <= MaxWorker, "CountForWaiters is greater than MaxWorker");
Debug.Assert(newState.CountForWaiters >= 0, "CountForWaiters is lower than zero");
Debug.Assert(newState.Waiters <= MaxWorker, "Waiters is greater than MaxWorker");
Debug.Assert(newState.Waiters >= 0, "Waiters is lower than zero");
Debug.Assert(newState.CountForSpinners + newState.CountForWaiters <= MaxWorker, "CountForSpinners + CountForWaiters is greater than MaxWorker");
return true;
}
return false;
}
private SemaphoreState GetCurrentState()
{
// Volatile.Read of a long can get a partial read in x86 but the invalid
// state will be detected in TryUpdateState with the CompareExchange.
SemaphoreState state = new SemaphoreState();
state.RawData = Volatile.Read(ref m_state.RawData);
return state;
}
}
#endregion
}
}