diff --git a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs index 6558a97c6ef..2cbd839a688 100644 --- a/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/IO/FileSinkSpec.cs @@ -31,7 +31,7 @@ namespace Akka.Streams.Tests.IO public class FileSinkSpec : AkkaSpec { private readonly ActorMaterializer _materializer; - private readonly List _testLines = new List(); + private readonly List _testLines = new(); private readonly List _testByteStrings; private readonly TimeSpan _expectTimeout = TimeSpan.FromSeconds(10); @@ -263,6 +263,7 @@ await TargetFileAsync(f => // haven't figured out why this returns the aliased id rather than the id, but the stage is going away so whatever Utils.AssertDispatcher(actorRef, ActorAttributes.IODispatcher.Name); + return Task.CompletedTask; } finally { @@ -296,6 +297,7 @@ await TargetFileAsync(f => ((ActorMaterializerImpl)materializer).Supervisor.Tell(StreamSupervisor.GetChildren.Instance, TestActor); var actorRef = ExpectMsg().Refs.First(@ref => @ref.Path.ToString().Contains("File")); Utils.AssertDispatcher(actorRef, "akka.actor.default-dispatcher"); + return Task.CompletedTask; } finally { @@ -331,7 +333,7 @@ await AwaitAssertAsync( [Fact] public async Task SynchronousFileSink_should_complete_materialized_task_with_an_exception_when_upstream_fails() { - await TargetFileAsync(f => + await TargetFileAsync(async f => { var completion = Source.From(_testByteStrings) .Select(bytes => @@ -344,18 +346,24 @@ await TargetFileAsync(f => var ex = Intercept(() => completion.Wait(TimeSpan.FromSeconds(3))); ex.IoResult.Count.ShouldBe(1001); CheckFileContent(f, string.Join("", _testLines.TakeWhile(s => !s.Contains('b')))); + await Task.CompletedTask; }, _materializer); } [Fact] public async Task SynchronousFileSink_should_complete_with_failure_when_file_cannot_be_open() { - await TargetFileAsync(f => + await TargetFileAsync(async f => { var completion = Source.Single(ByteString.FromString("42")) .RunWith(FileIO.ToFile(new FileInfo("I-hope-this-file-doesnt-exist.txt"), FileMode.Open), _materializer); - AssertThrows(completion.Wait); + async Task Exec() + { + await completion; + } + + await Exec().ShouldThrowWithin(RemainingOrDefault); }, _materializer); } @@ -377,9 +385,10 @@ await TargetFileAsync(async f => actor.Tell("a\n"); actor.Tell("b\n"); - await AwaitAssertAsync(async() => + await AwaitAssertAsync(() => { CheckFileContent(f, "a\nb\n"); + return Task.CompletedTask; }, Remaining); actor.Tell("a\n"); @@ -390,7 +399,7 @@ await AwaitAssertAsync(async() => // We still have to wait for the task to complete, because the signal // came from the FileSink actor, not the source actor. await task.ShouldCompleteWithin(Remaining); - ExpectTerminated(actor, Remaining); + await ExpectTerminatedAsync(actor, Remaining); f.Length.ShouldBe(8); CheckFileContent(f, "a\nb\na\nb\n"); @@ -398,12 +407,12 @@ await AwaitAssertAsync(async() => }); } - [Fact(Skip = "Skipped for async_testkit conversion build")] + [Fact] public void SynchronousFileSink_should_write_buffered_element_if_manual_flush_is_called() { this.AssertAllStagesStopped(async() => { - await TargetFileAsync(f => + await TargetFileAsync(async f => { var flusher = new FlushSignaler(); var (actor, task) = Source.ActorRef(64, OverflowStrategy.DropNew) @@ -412,23 +421,23 @@ await TargetFileAsync(f => FileIO.ToFile(f, fileMode: FileMode.OpenOrCreate, startPosition: 0, flushSignaler:flusher), (a, t) => (a, t)) .Run(_materializer); - Thread.Sleep(100); // wait for stream to catch up + await Task.Delay(100); // wait for stream to catch up actor.Tell("a\n"); actor.Tell("b\n"); - Thread.Sleep(200); // wait for stream to catch up + await Task.Delay(200); // wait for stream to catch up flusher.Flush(); - Thread.Sleep(100); // wait for flush + await Task.Delay(100); // wait for flush CheckFileContent(f, "a\nb\n"); // file should be flushed actor.Tell("c\n"); actor.Tell("d\n"); - Thread.Sleep(200); // wait for stream to catch up + await Task.Delay(200); // wait for stream to catch up CheckFileContent(f, "a\nb\n"); // file content should not change flusher.Flush(); - Thread.Sleep(100); // wait for flush + await Task.Delay(100); // wait for flush CheckFileContent(f, "a\nb\nc\nd\n"); // file content should all be flushed actor.Tell(new Status.Success(NotUsed.Instance)); @@ -440,11 +449,11 @@ await TargetFileAsync(f => } private async Task TargetFileAsync( - Action block, + Func block, ActorMaterializer materializer, bool create = true) { - var targetFile = new FileInfo(Path.Combine(Path.GetTempPath(), "synchronous-file-sink.tmp")); + var targetFile = new FileInfo(Path.Combine(Path.GetTempPath(), $"synchronous-file-sink-{Guid.NewGuid()}.tmp")); if (!create) targetFile.Delete(); @@ -453,7 +462,7 @@ private async Task TargetFileAsync( try { - block(targetFile); + await block(targetFile); } finally {