Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Async TestKit] Convert Akka.Remote.Tests to async - Transport.GenericTransportSpec #5898

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 66 additions & 73 deletions src/core/Akka.Remote.Tests/Transport/GenericTransportSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,37 @@
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Remote.Serialization;
using Akka.Remote.Transport;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Google.Protobuf;
using Xunit;

namespace Akka.Remote.Tests.Transport
{
public abstract class GenericTransportSpec : AkkaSpec
{
private Address addressATest = new Address("test", "testsytemA", "testhostA", 4321);
private Address addressBTest = new Address("test", "testsytemB", "testhostB", 5432);
private readonly Address _addressATest = new Address("test", "testsytemA", "testhostA", 4321);
private readonly Address _addressBTest = new Address("test", "testsytemB", "testhostB", 5432);

private Address addressA;
private Address addressB;
private Address nonExistingAddress;
private bool withAkkaProtocol;
private Address _addressA;
private Address _addressB;
private Address _nonExistingAddress;
private readonly bool _withAkkaProtocol;

public GenericTransportSpec(bool withAkkaProtocol = false)
protected GenericTransportSpec(bool withAkkaProtocol = false)
: base("akka.actor.provider = \"Akka.Remote.RemoteActorRefProvider, Akka.Remote\" ")
{
this.withAkkaProtocol = withAkkaProtocol;
_withAkkaProtocol = withAkkaProtocol;
}

addressA = addressATest.WithProtocol(string.Format("{0}.{1}", SchemeIdentifier, addressATest.Protocol));
addressB = addressBTest.WithProtocol(string.Format("{0}.{1}", SchemeIdentifier, addressBTest.Protocol));
nonExistingAddress = new Address(SchemeIdentifier + ".test", "nosystem", "nohost", 0);
public override async Task InitializeAsync()
{
await base.InitializeAsync();

_addressA = _addressATest.WithProtocol($"{SchemeIdentifier}.{_addressATest.Protocol}");
_addressB = _addressBTest.WithProtocol($"{SchemeIdentifier}.{_addressBTest.Protocol}");
_nonExistingAddress = new Address(SchemeIdentifier + ".test", "nosystem", "nohost", 0);
}

private TimeSpan DefaultTimeout { get { return Dilated(TestKitSettings.DefaultTimeout); } }
Expand All @@ -45,7 +50,7 @@ public GenericTransportSpec(bool withAkkaProtocol = false)

private Akka.Remote.Transport.Transport WrapTransport(Akka.Remote.Transport.Transport transport)
{
if (withAkkaProtocol) {
if (_withAkkaProtocol) {
var provider = (RemoteActorRefProvider)((ExtendedActorSystem)Sys).Provider;

return new AkkaProtocolTransport(transport, Sys, new AkkaProtocolSettings(provider.RemoteSettings.Config), new AkkaPduProtobuffCodec(Sys));
Expand All @@ -56,165 +61,153 @@ private Akka.Remote.Transport.Transport WrapTransport(Akka.Remote.Transport.Tran

private Akka.Remote.Transport.Transport NewTransportA(AssociationRegistry registry)
{
return WrapTransport(FreshTransport(new TestTransport(addressATest, registry)));
return WrapTransport(FreshTransport(new TestTransport(_addressATest, registry)));
}

private Akka.Remote.Transport.Transport NewTransportB(AssociationRegistry registry)
{
return WrapTransport(FreshTransport(new TestTransport(addressBTest, registry)));
return WrapTransport(FreshTransport(new TestTransport(_addressBTest, registry)));
}

[Fact]
public void Transport_must_return_an_Address_and_promise_when_listen_is_called()
public async Task Transport_must_return_an_Address_and_promise_when_listen_is_called()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);

var result = AwaitResult(transportA.Listen());
var result = await transportA.Listen().WithTimeout(DefaultTimeout);

Assert.Equal(addressA, result.Item1);
Assert.Equal(_addressA, result.Item1);
Assert.NotNull(result.Item2);

Assert.Contains(registry.LogSnapshot().OfType<ListenAttempt>(), x => x.BoundAddress == addressATest);
Assert.Contains(registry.LogSnapshot().OfType<ListenAttempt>(), x => x.BoundAddress == _addressATest);
}

[Fact]
public void Transport_must_associate_successfully_with_another_transport_of_its_kind()
public async Task Transport_must_associate_successfully_with_another_transport_of_its_kind()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);
var transportB = NewTransportB(registry);

// Must complete the returned promise to receive events
AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
AwaitResult(transportB.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
(await transportA.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));
(await transportB.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));

AwaitCondition(() => registry.TransportsReady(addressATest, addressBTest));
await AwaitConditionAsync(() => registry.TransportsReady(_addressATest, _addressBTest));

transportA.Associate(addressB);
ExpectMsgPf(DefaultTimeout, "Expect InboundAssociation from A", o =>
// task is not awaited deliberately
var task = transportA.Associate(_addressB);
await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundAssociation from A", o =>
{
var inbound = o as InboundAssociation;

if (inbound != null && inbound.Association.RemoteAddress == addressA)
if (o is InboundAssociation inbound && inbound.Association.RemoteAddress == _addressA)
return inbound.Association;

return null;
});

Assert.Contains(registry.LogSnapshot().OfType<AssociateAttempt>(), x => x.LocalAddress == addressATest && x.RemoteAddress == addressBTest);
AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest));
Assert.Contains(registry.LogSnapshot().OfType<AssociateAttempt>(), x => x.LocalAddress == _addressATest && x.RemoteAddress == _addressBTest);
await AwaitConditionAsync(() => registry.ExistsAssociation(_addressATest, _addressBTest));
}

[Fact]
public void Transport_must_fail_to_associate_with_nonexisting_address()
public async Task Transport_must_fail_to_associate_with_non_existing_address()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);

AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
AwaitCondition(() => registry.TransportsReady(addressATest));
(await transportA.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));
await AwaitConditionAsync(() => registry.TransportsReady(_addressATest));

// Transport throws InvalidAssociationException when trying to associate with non-existing system
XAssert.Throws<InvalidAssociationException>(() =>
AwaitResult(transportA.Associate(nonExistingAddress))
await Assert.ThrowsAsync<InvalidAssociationException>(async () =>
await transportA.Associate(_nonExistingAddress).WithTimeout(DefaultTimeout)
);
}

[Fact]
public void Transport_must_successfully_send_PDUs()
public async Task Transport_must_successfully_send_PDUs()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);
var transportB = NewTransportB(registry);

AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
AwaitResult(transportB.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
(await transportA.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));
(await transportB.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));

AwaitCondition(() => registry.TransportsReady(addressATest, addressBTest));
await AwaitConditionAsync(() => registry.TransportsReady(_addressATest, _addressBTest));

var associate = transportA.Associate(addressB);
var handleB = ExpectMsgPf(DefaultTimeout, "Expect InboundAssociation from A", o =>
var associate = transportA.Associate(_addressB);
var handleB = await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundAssociation from A", o =>
{
var handle = o as InboundAssociation;
if (handle != null && handle.Association.RemoteAddress == addressA)
if (o is InboundAssociation handle && handle.Association.RemoteAddress == _addressA)
return handle.Association;

return null;
});

var handleA = AwaitResult(associate);
var handleA = await associate.WithTimeout(DefaultTimeout);

// Initialize handles
handleA.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));
handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

var payload = ByteString.CopyFromUtf8("PDU");
var pdu = withAkkaProtocol ? new AkkaPduProtobuffCodec(Sys).ConstructPayload(payload) : payload;
var pdu = _withAkkaProtocol ? new AkkaPduProtobuffCodec(Sys).ConstructPayload(payload) : payload;

AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest));
await AwaitConditionAsync(() => registry.ExistsAssociation(_addressATest, _addressBTest));

handleA.Write(payload);
ExpectMsgPf(DefaultTimeout, "Expect InboundPayload from A", o =>
await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundPayload from A", o =>
{
var inboundPayload = o as InboundPayload;

if (inboundPayload != null && inboundPayload.Payload.Equals(pdu))
if (o is InboundPayload inboundPayload && inboundPayload.Payload.Equals(pdu))
return inboundPayload.Payload;

return null;
});

Assert.Contains(registry.LogSnapshot().OfType<WriteAttempt>(), x => x.Sender == addressATest && x.Recipient == addressBTest && x.Payload.Equals(pdu));
Assert.Contains(registry.LogSnapshot().OfType<WriteAttempt>(), x => x.Sender == _addressATest && x.Recipient == _addressBTest && x.Payload.Equals(pdu));
}

[Fact]
public void Transport_must_successfully_disassociate()
public async Task Transport_must_successfully_disassociate()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);
var transportB = NewTransportB(registry);

AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
AwaitResult(transportB.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
(await transportA.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));
(await transportB.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));

AwaitCondition(() => registry.TransportsReady(addressATest, addressBTest));
await AwaitConditionAsync(() => registry.TransportsReady(_addressATest, _addressBTest));

var associate = transportA.Associate(addressB);
var handleB = ExpectMsgPf(DefaultTimeout, "Expect InboundAssociation from A", o =>
var associate = transportA.Associate(_addressB);
var handleB = await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundAssociation from A", o =>
{
var handle = o as InboundAssociation;
if (handle != null && handle.Association.RemoteAddress == addressA)
if (o is InboundAssociation handle && handle.Association.RemoteAddress == _addressA)
return handle.Association;

return null;
});

var handleA = AwaitResult(associate);
var handleA = await associate.WithTimeout(DefaultTimeout);

// Initialize handles
handleA.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));
handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest));
await AwaitConditionAsync(() => registry.ExistsAssociation(_addressATest, _addressBTest));

handleA.Disassociate();
handleA.Disassociate("Disassociation test", Log);

ExpectMsgPf(DefaultTimeout, "Should receive Disassociated", o => o as Disassociated);
await ExpectMsgOfAsync(DefaultTimeout, "Should receive Disassociated", o => o as Disassociated);

AwaitCondition(() => !registry.ExistsAssociation(addressATest, addressBTest));
await AwaitConditionAsync(() => !registry.ExistsAssociation(_addressATest, _addressBTest));

AwaitCondition(() =>
registry.LogSnapshot().OfType<DisassociateAttempt>().Any(x => x.Requestor == addressATest && x.Remote == addressBTest)
await AwaitConditionAsync(() =>
registry.LogSnapshot().OfType<DisassociateAttempt>().Any(x => x.Requestor == _addressATest && x.Remote == _addressBTest)
);
}

private T AwaitResult<T>(Task<T> task)
{
task.Wait(DefaultTimeout);

return task.Result;
}
}
}