From 87f9601234d78280f7d7cea6bbf882526ec26b1c Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sat, 30 Apr 2022 00:14:41 +0700 Subject: [PATCH] Convert Akka.Remote.Tests to async - Transport.GenericTransportSpec (#5898) --- .../Transport/GenericTransportSpec.cs | 139 +++++++++--------- 1 file changed, 66 insertions(+), 73 deletions(-) diff --git a/src/core/Akka.Remote.Tests/Transport/GenericTransportSpec.cs b/src/core/Akka.Remote.Tests/Transport/GenericTransportSpec.cs index a6cbbd098b9..885b9f1e22b 100644 --- a/src/core/Akka.Remote.Tests/Transport/GenericTransportSpec.cs +++ b/src/core/Akka.Remote.Tests/Transport/GenericTransportSpec.cs @@ -9,9 +9,9 @@ using System.Linq; using System.Threading.Tasks; using Akka.Actor; -using Akka.Remote.Serialization; using Akka.Remote.Transport; using Akka.TestKit; +using Akka.TestKit.Extensions; using Google.Protobuf; using Xunit; @@ -19,22 +19,27 @@ namespace Akka.Remote.Tests.Transport { public abstract class GenericTransportSpec : AkkaSpec { - private Address addressATest = new Address("test", "testsytemA", "testhostA", 4321); - private Address addressBTest = new Address("test", "testsytemB", "testhostB", 5432); + private readonly Address _addressATest = new Address("test", "testsytemA", "testhostA", 4321); + private readonly Address _addressBTest = new Address("test", "testsytemB", "testhostB", 5432); - private Address addressA; - private Address addressB; - private Address nonExistingAddress; - private bool withAkkaProtocol; + private Address _addressA; + private Address _addressB; + private Address _nonExistingAddress; + private readonly bool _withAkkaProtocol; - public GenericTransportSpec(bool withAkkaProtocol = false) + protected GenericTransportSpec(bool withAkkaProtocol = false) : base("akka.actor.provider = \"Akka.Remote.RemoteActorRefProvider, Akka.Remote\" ") { - this.withAkkaProtocol = withAkkaProtocol; + _withAkkaProtocol = withAkkaProtocol; + } - addressA = addressATest.WithProtocol(string.Format("{0}.{1}", SchemeIdentifier, addressATest.Protocol)); - addressB = addressBTest.WithProtocol(string.Format("{0}.{1}", SchemeIdentifier, addressBTest.Protocol)); - nonExistingAddress = new Address(SchemeIdentifier + ".test", "nosystem", "nohost", 0); + public override async Task InitializeAsync() + { + await base.InitializeAsync(); + + _addressA = _addressATest.WithProtocol($"{SchemeIdentifier}.{_addressATest.Protocol}"); + _addressB = _addressBTest.WithProtocol($"{SchemeIdentifier}.{_addressBTest.Protocol}"); + _nonExistingAddress = new Address(SchemeIdentifier + ".test", "nosystem", "nohost", 0); } private TimeSpan DefaultTimeout { get { return Dilated(TestKitSettings.DefaultTimeout); } } @@ -45,7 +50,7 @@ public GenericTransportSpec(bool withAkkaProtocol = false) private Akka.Remote.Transport.Transport WrapTransport(Akka.Remote.Transport.Transport transport) { - if (withAkkaProtocol) { + if (_withAkkaProtocol) { var provider = (RemoteActorRefProvider)((ExtendedActorSystem)Sys).Provider; return new AkkaProtocolTransport(transport, Sys, new AkkaProtocolSettings(provider.RemoteSettings.Config), new AkkaPduProtobuffCodec(Sys)); @@ -56,165 +61,153 @@ private Akka.Remote.Transport.Transport WrapTransport(Akka.Remote.Transport.Tran private Akka.Remote.Transport.Transport NewTransportA(AssociationRegistry registry) { - return WrapTransport(FreshTransport(new TestTransport(addressATest, registry))); + return WrapTransport(FreshTransport(new TestTransport(_addressATest, registry))); } private Akka.Remote.Transport.Transport NewTransportB(AssociationRegistry registry) { - return WrapTransport(FreshTransport(new TestTransport(addressBTest, registry))); + return WrapTransport(FreshTransport(new TestTransport(_addressBTest, registry))); } [Fact] - public void Transport_must_return_an_Address_and_promise_when_listen_is_called() + public async Task Transport_must_return_an_Address_and_promise_when_listen_is_called() { var registry = new AssociationRegistry(); var transportA = NewTransportA(registry); - var result = AwaitResult(transportA.Listen()); + var result = await transportA.Listen().WithTimeout(DefaultTimeout); - Assert.Equal(addressA, result.Item1); + Assert.Equal(_addressA, result.Item1); Assert.NotNull(result.Item2); - Assert.Contains(registry.LogSnapshot().OfType(), x => x.BoundAddress == addressATest); + Assert.Contains(registry.LogSnapshot().OfType(), x => x.BoundAddress == _addressATest); } [Fact] - public void Transport_must_associate_successfully_with_another_transport_of_its_kind() + public async Task Transport_must_associate_successfully_with_another_transport_of_its_kind() { var registry = new AssociationRegistry(); var transportA = NewTransportA(registry); var transportB = NewTransportB(registry); // Must complete the returned promise to receive events - AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor)); - AwaitResult(transportB.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor)); + (await transportA.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor)); + (await transportB.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor)); - AwaitCondition(() => registry.TransportsReady(addressATest, addressBTest)); + await AwaitConditionAsync(() => registry.TransportsReady(_addressATest, _addressBTest)); - transportA.Associate(addressB); - ExpectMsgPf(DefaultTimeout, "Expect InboundAssociation from A", o => + // task is not awaited deliberately + var task = transportA.Associate(_addressB); + await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundAssociation from A", o => { - var inbound = o as InboundAssociation; - - if (inbound != null && inbound.Association.RemoteAddress == addressA) + if (o is InboundAssociation inbound && inbound.Association.RemoteAddress == _addressA) return inbound.Association; return null; }); - Assert.Contains(registry.LogSnapshot().OfType(), x => x.LocalAddress == addressATest && x.RemoteAddress == addressBTest); - AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest)); + Assert.Contains(registry.LogSnapshot().OfType(), x => x.LocalAddress == _addressATest && x.RemoteAddress == _addressBTest); + await AwaitConditionAsync(() => registry.ExistsAssociation(_addressATest, _addressBTest)); } [Fact] - public void Transport_must_fail_to_associate_with_nonexisting_address() + public async Task Transport_must_fail_to_associate_with_non_existing_address() { var registry = new AssociationRegistry(); var transportA = NewTransportA(registry); - AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor)); - AwaitCondition(() => registry.TransportsReady(addressATest)); + (await transportA.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor)); + await AwaitConditionAsync(() => registry.TransportsReady(_addressATest)); // Transport throws InvalidAssociationException when trying to associate with non-existing system - XAssert.Throws(() => - AwaitResult(transportA.Associate(nonExistingAddress)) + await Assert.ThrowsAsync(async () => + await transportA.Associate(_nonExistingAddress).WithTimeout(DefaultTimeout) ); } [Fact] - public void Transport_must_successfully_send_PDUs() + public async Task Transport_must_successfully_send_PDUs() { var registry = new AssociationRegistry(); var transportA = NewTransportA(registry); var transportB = NewTransportB(registry); - AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor)); - AwaitResult(transportB.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor)); + (await transportA.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor)); + (await transportB.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor)); - AwaitCondition(() => registry.TransportsReady(addressATest, addressBTest)); + await AwaitConditionAsync(() => registry.TransportsReady(_addressATest, _addressBTest)); - var associate = transportA.Associate(addressB); - var handleB = ExpectMsgPf(DefaultTimeout, "Expect InboundAssociation from A", o => + var associate = transportA.Associate(_addressB); + var handleB = await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundAssociation from A", o => { - var handle = o as InboundAssociation; - if (handle != null && handle.Association.RemoteAddress == addressA) + if (o is InboundAssociation handle && handle.Association.RemoteAddress == _addressA) return handle.Association; return null; }); - var handleA = AwaitResult(associate); + var handleA = await associate.WithTimeout(DefaultTimeout); // Initialize handles handleA.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor)); handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor)); var payload = ByteString.CopyFromUtf8("PDU"); - var pdu = withAkkaProtocol ? new AkkaPduProtobuffCodec(Sys).ConstructPayload(payload) : payload; + var pdu = _withAkkaProtocol ? new AkkaPduProtobuffCodec(Sys).ConstructPayload(payload) : payload; - AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest)); + await AwaitConditionAsync(() => registry.ExistsAssociation(_addressATest, _addressBTest)); handleA.Write(payload); - ExpectMsgPf(DefaultTimeout, "Expect InboundPayload from A", o => + await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundPayload from A", o => { - var inboundPayload = o as InboundPayload; - - if (inboundPayload != null && inboundPayload.Payload.Equals(pdu)) + if (o is InboundPayload inboundPayload && inboundPayload.Payload.Equals(pdu)) return inboundPayload.Payload; return null; }); - Assert.Contains(registry.LogSnapshot().OfType(), x => x.Sender == addressATest && x.Recipient == addressBTest && x.Payload.Equals(pdu)); + Assert.Contains(registry.LogSnapshot().OfType(), x => x.Sender == _addressATest && x.Recipient == _addressBTest && x.Payload.Equals(pdu)); } [Fact] - public void Transport_must_successfully_disassociate() + public async Task Transport_must_successfully_disassociate() { var registry = new AssociationRegistry(); var transportA = NewTransportA(registry); var transportB = NewTransportB(registry); - AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor)); - AwaitResult(transportB.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor)); + (await transportA.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor)); + (await transportB.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor)); - AwaitCondition(() => registry.TransportsReady(addressATest, addressBTest)); + await AwaitConditionAsync(() => registry.TransportsReady(_addressATest, _addressBTest)); - var associate = transportA.Associate(addressB); - var handleB = ExpectMsgPf(DefaultTimeout, "Expect InboundAssociation from A", o => + var associate = transportA.Associate(_addressB); + var handleB = await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundAssociation from A", o => { - var handle = o as InboundAssociation; - if (handle != null && handle.Association.RemoteAddress == addressA) + if (o is InboundAssociation handle && handle.Association.RemoteAddress == _addressA) return handle.Association; return null; }); - var handleA = AwaitResult(associate); + var handleA = await associate.WithTimeout(DefaultTimeout); // Initialize handles handleA.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor)); handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor)); - AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest)); + await AwaitConditionAsync(() => registry.ExistsAssociation(_addressATest, _addressBTest)); - handleA.Disassociate(); + handleA.Disassociate("Disassociation test", Log); - ExpectMsgPf(DefaultTimeout, "Should receive Disassociated", o => o as Disassociated); + await ExpectMsgOfAsync(DefaultTimeout, "Should receive Disassociated", o => o as Disassociated); - AwaitCondition(() => !registry.ExistsAssociation(addressATest, addressBTest)); + await AwaitConditionAsync(() => !registry.ExistsAssociation(_addressATest, _addressBTest)); - AwaitCondition(() => - registry.LogSnapshot().OfType().Any(x => x.Requestor == addressATest && x.Remote == addressBTest) + await AwaitConditionAsync(() => + registry.LogSnapshot().OfType().Any(x => x.Requestor == _addressATest && x.Remote == _addressBTest) ); } - - private T AwaitResult(Task task) - { - task.Wait(DefaultTimeout); - - return task.Result; - } } }