Skip to content

Commit

Permalink
[74-74] InputStreamSourceSpec (#6622)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
eaba and Aaronontheweb authored Mar 27, 2023
1 parent 3a75d0c commit fab1288
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions src/core/Akka.Streams.Tests/IO/InputStreamSourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Akka.IO;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
Expand Down Expand Up @@ -174,23 +175,22 @@ public void InputStreamSource_must_not_signal_when_no_demand()
}

[Fact]
public void InputStreamSource_must_read_bytes_from_InputStream()
public async Task InputStreamSource_must_read_bytes_from_InputStream()
{
this.AssertAllStagesStopped(() =>
{
var f = StreamConverters.FromInputStream(() => new ListInputStream(new[] {"a", "b", "c"}))
.RunWith(Sink.First<ByteString>(), _materializer);
await this.AssertAllStagesStoppedAsync(() => {
var f = StreamConverters.FromInputStream(() => new ListInputStream(new[] { "a", "b", "c" }))
.RunWith(Sink.First<ByteString>(), _materializer);

f.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
f.Result.Should().BeEquivalentTo(ByteString.FromString("abc"));
return Task.CompletedTask;
}, _materializer);
}

[Fact]
public void InputStreamSource_must_emit_as_soon_as_read()
public async Task InputStreamSource_must_emit_as_soon_as_read()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var latch = new TestLatch(1);
var probe = StreamConverters.FromInputStream(() => new EmittedInputStream(latch), chunkSize: 1)
.RunWith(this.SinkProbe<ByteString>(), _materializer);
Expand All @@ -199,6 +199,7 @@ public void InputStreamSource_must_emit_as_soon_as_read()
probe.ExpectNext(ByteString.FromString("M"));
latch.CountDown();
probe.ExpectComplete();
return Task.CompletedTask;
}, _materializer);
}
}
Expand Down

0 comments on commit fab1288

Please sign in to comment.