Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port Akka.Tests.Dispatch tests to async/await - MailboxesSpec #5790

Merged
merged 6 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions src/core/Akka.Tests/Actor/InboxSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using Akka.Actor.Internal;
using Akka.Event;
using Akka.TestKit;
using Akka.Tests.Util;
using Xunit;

namespace Akka.Tests.Actor
Expand All @@ -29,13 +28,13 @@ public InboxSpec()
}

[Fact]
public async Task Inbox_support_watch()
public void Inbox_support_watch()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this file reverted to sync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That changes belonged to #5780. Some how after I fetched the changes from the origin, that was counted as a new change to the branch. I wanted to avoid conflicts, so I un-staged the commit! That was what happened.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to fix this then

{
_inbox.Watch(TestActor);

// check watch
TestActor.Tell(PoisonPill.Instance);
var received = await _inbox.ReceiveAsync(TimeSpan.FromSeconds(1));
var received = _inbox.Receive(TimeSpan.FromSeconds(1));

received.GetType().ShouldBe(typeof(Terminated));
var terminated = (Terminated)received;
Expand Down Expand Up @@ -83,22 +82,22 @@ public void Inbox_support_selective_receives()
}

[Fact]
public async Task Inbox_have_maximum_queue_size()
public void Inbox_have_maximum_queue_size()
{
try
{
//Fill the inbox (it can hold 1000) messages
foreach (var zero in Enumerable.Repeat(0, 1000))
_inbox.Receiver.Tell(zero);

await ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
ExpectNoMsg(TimeSpan.FromSeconds(1));

//The inbox is full. Sending another message should result in a Warning message
await EventFilter.Warning(start:"Dropping message").ExpectOneAsync(() => _inbox.Receiver.Tell(42));
EventFilter.Warning(start:"Dropping message").ExpectOne(() => _inbox.Receiver.Tell(42));

//The inbox is still full. But since the warning message has already been sent, no more warnings should be sent
_inbox.Receiver.Tell(42);
await ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
ExpectNoMsg(TimeSpan.FromSeconds(1));

//Receive all messages from the inbox
var gotit = Enumerable.Repeat(0, 1000).Select(_ => _inbox.Receive());
Expand All @@ -108,7 +107,7 @@ public async Task Inbox_have_maximum_queue_size()
}

//The inbox should be empty now, so receiving should result in a timeout
Assert.Throws<TimeoutException>(() =>
Intercept<TimeoutException>(() =>
{
var received = _inbox.Receive(TimeSpan.FromSeconds(1));
Log.Error("Received " + received);
Expand All @@ -121,17 +120,18 @@ public async Task Inbox_have_maximum_queue_size()
}

[Fact]
public async Task Inbox_have_a_default_and_custom_timeouts()
public void Inbox_have_a_default_and_custom_timeouts()
{
await WithinAsync(TimeSpan.FromSeconds(4), TimeSpan.FromSeconds(6), () =>
Within(TimeSpan.FromSeconds(4), TimeSpan.FromSeconds(6), () =>
{
Assert.Throws<TimeoutException>(() => _inbox.Receive());
Intercept<TimeoutException>(() => _inbox.Receive());
return true;
});

await WithinAsync(TimeSpan.FromSeconds(1), () =>
Within(TimeSpan.FromSeconds(1), () =>
{
Assert.Throws<TimeoutException>(() => _inbox.Receive(TimeSpan.FromMilliseconds(100)));
Intercept<TimeoutException>(() => _inbox.Receive(TimeSpan.FromMilliseconds(100)));
return true;
});
}

Expand All @@ -152,10 +152,11 @@ public void Select_WithClient_should_update_Client_and_copy_the_rest_of_the_prop
}

[Fact]
public async Task Inbox_Receive_will_timeout_gracefully_if_timeout_is_already_expired()
public void Inbox_Receive_will_timeout_gracefully_if_timeout_is_already_expired()
{
var task = _inbox.ReceiveAsync(TimeSpan.FromSeconds(-1));
Assert.True(await task.AwaitWithTimeout(TimeSpan.FromMilliseconds(1000)), "Receive did not complete in time.");

Assert.True(task.Wait(1000), "Receive did not complete in time.");
Assert.IsType<Status.Failure>(task.Result);
}
}
Expand Down
68 changes: 34 additions & 34 deletions src/core/Akka.Tests/Dispatch/MailboxesSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,15 @@ public Property UnboundedPriorityQueue_should_sort_items_in_expected_order(int[]
#endif

[Fact]
public void Can_use_unbounded_priority_mailbox()
public async Task Can_use_unbounded_priority_mailbox()
{
var actor = (IInternalActorRef)Sys.ActorOf(EchoActor.Props(this).WithMailbox("string-prio-mailbox"), "echo");

//pause mailbox until all messages have been told
actor.SendSystemMessage(new Suspend());

// wait until we can confirm that the mailbox is suspended before we begin sending messages
AwaitCondition(() => (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf<ActorCell>().Mailbox.IsSuspended());
await AwaitConditionAsync(() => (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf<ActorCell>().Mailbox.IsSuspended());

actor.Tell(true);
for (var i = 0; i < 30; i++)
Expand All @@ -222,27 +222,27 @@ public void Can_use_unbounded_priority_mailbox()
//resume mailbox, this prevents the mailbox from running to early
//priority mailbox is best effort only

ExpectMsg("a");
ExpectMsg(true);
await ExpectMsgAsync("a");
await ExpectMsgAsync(true);
for (var i = 0; i < 60; i++)
{
ExpectMsg(1);
await ExpectMsgAsync(1);
}
ExpectMsg(2.0);
await ExpectMsgAsync(2.0);

ExpectNoMsg(TimeSpan.FromSeconds(0.3));
await ExpectNoMsgAsync(TimeSpan.FromSeconds(0.3));
}

[Fact]
public void Can_use_unbounded_stable_priority_mailbox()
public async Task Can_use_unbounded_stable_priority_mailbox()
{
var actor = (IInternalActorRef)Sys.ActorOf(EchoActor.Props(this).WithMailbox("stable-prio-mailbox"), "echo");

//pause mailbox until all messages have been told
actor.SendSystemMessage(new Suspend());

// wait until we can confirm that the mailbox is suspended before we begin sending messages
AwaitCondition(() => (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf<ActorCell>().Mailbox.IsSuspended());
await AwaitConditionAsync(() => (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf<ActorCell>().Mailbox.IsSuspended());

actor.Tell(true);
for (var i = 0; i < 30; i++)
Expand All @@ -260,26 +260,26 @@ public void Can_use_unbounded_stable_priority_mailbox()
//resume mailbox, this prevents the mailbox from running to early
//priority mailbox is best effort only

ExpectMsg("a");
ExpectMsg(true);
await ExpectMsgAsync("a");
await ExpectMsgAsync(true);
for (var i = 0; i < 60; i++)
{
ExpectMsg(i);
await ExpectMsgAsync(i);
}
ExpectMsg(2.0);
await ExpectMsgAsync(2.0);

ExpectNoMsg(TimeSpan.FromSeconds(0.3));
await ExpectNoMsgAsync(TimeSpan.FromSeconds(0.3));
}

[Fact]
public void Priority_mailbox_keeps_ordering_with_many_priority_values()
public async Task Priority_mailbox_keeps_ordering_with_many_priority_values()
{
var actor = (IInternalActorRef)Sys.ActorOf(EchoActor.Props(this).WithMailbox("int-prio-mailbox"), "echo");

//pause mailbox until all messages have been told
actor.SendSystemMessage(new Suspend());

AwaitCondition(()=> (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf<ActorCell>().Mailbox.IsSuspended());
await AwaitConditionAsync(()=> (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf<ActorCell>().Mailbox.IsSuspended());
// creates 50 messages with values spanning from Int32.MinValue to Int32.MaxValue
var values = new int[50];
var increment = (int)(UInt32.MaxValue / values.Length);
Expand All @@ -301,23 +301,23 @@ public void Priority_mailbox_keeps_ordering_with_many_priority_values()
// expect the messages in the correct order
foreach (var value in values)
{
ExpectMsg(value);
ExpectMsg(value);
ExpectMsg(value);
await ExpectMsgAsync(value);
await ExpectMsgAsync(value);
await ExpectMsgAsync(value);
}

ExpectNoMsg(TimeSpan.FromSeconds(0.3));
await ExpectNoMsgAsync(TimeSpan.FromSeconds(0.3));
}

[Fact]
public void Unbounded_Priority_Mailbox_Supports_Unbounded_Stashing()
public async Task Unbounded_Priority_Mailbox_Supports_Unbounded_Stashing()
{
var actor = (IInternalActorRef)Sys.ActorOf(StashingActor.Props(this).WithMailbox("int-prio-mailbox"), "echo");

//pause mailbox until all messages have been told
actor.SendSystemMessage(new Suspend());

AwaitCondition(() => (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf<ActorCell>().Mailbox.IsSuspended());
await AwaitConditionAsync(() => (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf<ActorCell>().Mailbox.IsSuspended());

var values = new int[10];
var increment = (int)(UInt32.MaxValue / values.Length);
Expand All @@ -338,29 +338,29 @@ public void Unbounded_Priority_Mailbox_Supports_Unbounded_Stashing()
//resume mailbox, this prevents the mailbox from running to early
actor.SendSystemMessage(new Resume(null));

this.Within(5.Seconds(), () =>
await WithinAsync(5.Seconds(), async() =>
{
// expect the messages in the correct order
foreach (var value in values)
{
ExpectMsg(value);
ExpectMsg(value);
ExpectMsg(value);
await ExpectMsgAsync(value);
await ExpectMsgAsync(value);
await ExpectMsgAsync(value);
}
});

ExpectNoMsg(TimeSpan.FromSeconds(0.3));
await ExpectNoMsgAsync(TimeSpan.FromSeconds(0.3));
}

[Fact]
public void Unbounded_Stable_Priority_Mailbox_Supports_Unbounded_Stashing()
public async Task Unbounded_Stable_Priority_Mailbox_Supports_Unbounded_Stashing()
{
var actor = (IInternalActorRef)Sys.ActorOf(StashingActor.Props(this).WithMailbox("stable-prio-mailbox"), "echo");

//pause mailbox until all messages have been told
actor.SendSystemMessage(new Suspend());

AwaitCondition(() => (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf<ActorCell>().Mailbox.IsSuspended());
await AwaitConditionAsync(() => (((ActorRefWithCell)actor).Underlying is ActorCell) && ((ActorRefWithCell)actor).Underlying.AsInstanceOf<ActorCell>().Mailbox.IsSuspended());

var values = new int[10];
var increment = (int)(UInt32.MaxValue / values.Length);
Expand All @@ -381,18 +381,18 @@ public void Unbounded_Stable_Priority_Mailbox_Supports_Unbounded_Stashing()
//resume mailbox, this prevents the mailbox from running to early
actor.SendSystemMessage(new Resume(null));

this.Within(5.Seconds(), () =>
await WithinAsync(5.Seconds(), async() =>
{
// expect the messages in the original order
foreach (var value in values)
{
ExpectMsg(value);
ExpectMsg(value);
ExpectMsg(value);
await ExpectMsgAsync(value);
await ExpectMsgAsync(value);
await ExpectMsgAsync(value);
}
});

ExpectNoMsg(TimeSpan.FromSeconds(0.3));
await ExpectNoMsgAsync(TimeSpan.FromSeconds(0.3));
}
}
}
Expand Down