Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #7130: Contention scheduling actions in HashedWheelTimerScheduler #7144

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// -----------------------------------------------------------------------
// <copyright file="SchedulerHeavyUse.cs" company="Akka.NET Project">
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Akka.Actor;
using Akka.Event;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Tests.Actor.Scheduler;

public class HashedWheelTimerSchedulerContentionSpec: TestKit.Xunit2.TestKit
{
private const int TotalActor = 5000;
private const int TotalThreads = 10;
private const int ActorsPerThread = TotalActor / TotalThreads;

public HashedWheelTimerSchedulerContentionSpec(ITestOutputHelper output) : base("{}", output)
{
}

[Fact]
public void SchedulerContentionTest()
{
var collector = CreateTestProbe();
foreach (var i in Enumerable.Range(0, TotalActor))
{
Sys.ActorOf(Props.Create(() => new DoStuffActor(TestActor, collector)), i.ToString());
}

Within(10.Seconds(), () =>
{
for (var x = 0; x < TotalActor; x++)
{
ExpectMsg<Done>();
}
});

object? received = null;
do
{
received = collector.ReceiveOne(TimeSpan.Zero);
if (received is long value)
{
value.Should().BeLessThan(200, "Scheduler should not experience resource contention");
}
} while (received is not null);

}

[Fact]
public void SchedulerContentionThreadedTest()
{
var collector = CreateTestProbe();
var threads = new List<Thread>();

foreach (var j in Enumerable.Range(0, TotalThreads))
{
threads.Add(new Thread(() => RunThread(j)));
}

foreach (var thread in threads)
{
thread.Start();
}

foreach (var thread in threads)
{
thread.Join();
}

Within(10.Seconds(), () =>
{
for (var x = 0; x < TotalActor; x++)
{
ExpectMsg<Done>();
}
});

object? received = null;
do
{
received = collector.ReceiveOne(TimeSpan.Zero);
if (received is long value)
{
value.Should().BeLessThan(200, "Scheduler should not experience resource contention");
}
} while (received is not null);

return;

void RunThread(int n)
{
n *= ActorsPerThread;
for (var i = 0; i < ActorsPerThread; i++)
{
Sys.ActorOf(Props.Create(() => new DoStuffActor(TestActor, collector)), (n + i).ToString());
}
}
}

public class DoStuffActor : ReceiveActor, IWithTimers
{
private readonly IActorRef _collector;
public ITimerScheduler Timers { get; set; }

public DoStuffActor(IActorRef probe, IActorRef collector)
{
_collector = collector;

Receive<Done>(d =>
{
Context.Stop(Self);
probe.Tell(d);
});
}

protected override void PreStart()
{
base.PreStart();
var sw = Stopwatch.StartNew();
Timers.StartSingleTimer("Test", Done.Instance, TimeSpan.FromSeconds(3));
sw.Stop();

if (sw.ElapsedMilliseconds > 0)
{
Context.GetLogger().Info($"{sw.ElapsedMilliseconds}");
_collector.Tell(sw.ElapsedMilliseconds);
}
}
}
}
35 changes: 18 additions & 17 deletions src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace Akka.Actor
public sealed class HashedWheelTimerScheduler : SchedulerBase, IDateTimeOffsetNowTimeProvider, IDisposable
{
private readonly TimeSpan _shutdownTimeout;
private readonly TimeSpan _timerDuration;
private readonly long _tickDuration; // a timespan expressed as ticks

static HashedWheelTimerScheduler()
Expand Down Expand Up @@ -65,17 +66,17 @@ public HashedWheelTimerScheduler(Config scheduler, ILoggingAdapter log) : base(s
"Cannot be greater than 2^30.");
// ReSharper restore NotResolvedInText

var tickDuration = SchedulerConfig.GetTimeSpan("akka.scheduler.tick-duration", TimeSpan.Zero);
if (tickDuration.TotalMilliseconds < 10.0d)
_timerDuration = SchedulerConfig.GetTimeSpan("akka.scheduler.tick-duration", TimeSpan.Zero);
if (_timerDuration.TotalMilliseconds < 10.0d)
{
// ReSharper disable NotResolvedInText
throw new ArgumentOutOfRangeException("akka.scheduler.tick-duration", tickDuration.TotalMilliseconds,
throw new ArgumentOutOfRangeException("akka.scheduler.tick-duration", _timerDuration.TotalMilliseconds,
"minimum supported akka.scheduler.tick-duration on Windows is 10ms");
// ReSharper restore NotResolvedInText
}

// convert tick-duration to ticks
_tickDuration = tickDuration.Ticks;
_tickDuration = _timerDuration.Ticks;

// Normalize ticks per wheel to power of two and create the wheel
_wheel = CreateWheel(ticksPerWheel, log);
Expand All @@ -96,7 +97,11 @@ public HashedWheelTimerScheduler(Config scheduler, ILoggingAdapter log) : base(s
private long _startTime;
private long _tick;
private readonly int _mask;
#if NET6_0_OR_GREATER
private readonly TaskCompletionSource _workerInitialized = new();
#else
private readonly CountdownEvent _workerInitialized = new(1);
#endif
private readonly ConcurrentQueue<SchedulerRegistration> _registrations = new();
private readonly Bucket[] _wheel;

Expand Down Expand Up @@ -144,15 +149,13 @@ private void Start()
{
if (_workerState == WORKER_STATE_STARTED)
{
} // do nothing
// do nothing
}
else if (_workerState == WORKER_STATE_INIT)
{
if (Interlocked.CompareExchange(ref _workerState, WORKER_STATE_STARTED, WORKER_STATE_INIT) ==
WORKER_STATE_INIT)
if (Interlocked.CompareExchange(ref _workerState, WORKER_STATE_STARTED, WORKER_STATE_INIT) == WORKER_STATE_INIT)
{
var t = TimeSpan.FromTicks(_tickDuration);
_timer = new PeriodicTimer(t);

_timer ??= new PeriodicTimer(_timerDuration);
Task.Run(() => RunAsync(_cts.Token)); // start the clock
}
}
Expand All @@ -164,11 +167,9 @@ private void Start()
{
throw new InvalidOperationException($"Worker in invalid state: {_workerState}");
}

while (_startTime == 0)
{
_workerInitialized.Wait();
}

if(_startTime == 0)
_workerInitialized.Task.Wait();
}

private async Task RunAsync(CancellationToken token)
Expand All @@ -184,7 +185,7 @@ private async Task RunAsync(CancellationToken token)
_startTime = 1;
}

_workerInitialized.Signal();
_workerInitialized.SetResult();

try
{
Expand Down Expand Up @@ -248,7 +249,7 @@ private void Start()
if (_workerState == WORKER_STATE_STARTED) { } // do nothing
else if (_workerState == WORKER_STATE_INIT)
{
_worker = new Thread(Run) { IsBackground = true };
_worker ??= new Thread(Run) { IsBackground = true };
if (Interlocked.CompareExchange(ref _workerState, WORKER_STATE_STARTED, WORKER_STATE_INIT) ==
WORKER_STATE_INIT)
{
Expand Down
Loading