Skip to content

Commit

Permalink
Convert Akka.Remote.Tests to async - Transport.GenericTransportSpec (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Apr 29, 2022
1 parent b231fad commit 87f9601
Showing 1 changed file with 66 additions and 73 deletions.
139 changes: 66 additions & 73 deletions src/core/Akka.Remote.Tests/Transport/GenericTransportSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,37 @@
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Remote.Serialization;
using Akka.Remote.Transport;
using Akka.TestKit;
using Akka.TestKit.Extensions;
using Google.Protobuf;
using Xunit;

namespace Akka.Remote.Tests.Transport
{
public abstract class GenericTransportSpec : AkkaSpec
{
private Address addressATest = new Address("test", "testsytemA", "testhostA", 4321);
private Address addressBTest = new Address("test", "testsytemB", "testhostB", 5432);
private readonly Address _addressATest = new Address("test", "testsytemA", "testhostA", 4321);
private readonly Address _addressBTest = new Address("test", "testsytemB", "testhostB", 5432);

private Address addressA;
private Address addressB;
private Address nonExistingAddress;
private bool withAkkaProtocol;
private Address _addressA;
private Address _addressB;
private Address _nonExistingAddress;
private readonly bool _withAkkaProtocol;

public GenericTransportSpec(bool withAkkaProtocol = false)
protected GenericTransportSpec(bool withAkkaProtocol = false)
: base("akka.actor.provider = \"Akka.Remote.RemoteActorRefProvider, Akka.Remote\" ")
{
this.withAkkaProtocol = withAkkaProtocol;
_withAkkaProtocol = withAkkaProtocol;
}

addressA = addressATest.WithProtocol(string.Format("{0}.{1}", SchemeIdentifier, addressATest.Protocol));
addressB = addressBTest.WithProtocol(string.Format("{0}.{1}", SchemeIdentifier, addressBTest.Protocol));
nonExistingAddress = new Address(SchemeIdentifier + ".test", "nosystem", "nohost", 0);
public override async Task InitializeAsync()
{
await base.InitializeAsync();

_addressA = _addressATest.WithProtocol($"{SchemeIdentifier}.{_addressATest.Protocol}");
_addressB = _addressBTest.WithProtocol($"{SchemeIdentifier}.{_addressBTest.Protocol}");
_nonExistingAddress = new Address(SchemeIdentifier + ".test", "nosystem", "nohost", 0);
}

private TimeSpan DefaultTimeout { get { return Dilated(TestKitSettings.DefaultTimeout); } }
Expand All @@ -45,7 +50,7 @@ public GenericTransportSpec(bool withAkkaProtocol = false)

private Akka.Remote.Transport.Transport WrapTransport(Akka.Remote.Transport.Transport transport)
{
if (withAkkaProtocol) {
if (_withAkkaProtocol) {
var provider = (RemoteActorRefProvider)((ExtendedActorSystem)Sys).Provider;

return new AkkaProtocolTransport(transport, Sys, new AkkaProtocolSettings(provider.RemoteSettings.Config), new AkkaPduProtobuffCodec(Sys));
Expand All @@ -56,165 +61,153 @@ private Akka.Remote.Transport.Transport WrapTransport(Akka.Remote.Transport.Tran

private Akka.Remote.Transport.Transport NewTransportA(AssociationRegistry registry)
{
return WrapTransport(FreshTransport(new TestTransport(addressATest, registry)));
return WrapTransport(FreshTransport(new TestTransport(_addressATest, registry)));
}

private Akka.Remote.Transport.Transport NewTransportB(AssociationRegistry registry)
{
return WrapTransport(FreshTransport(new TestTransport(addressBTest, registry)));
return WrapTransport(FreshTransport(new TestTransport(_addressBTest, registry)));
}

[Fact]
public void Transport_must_return_an_Address_and_promise_when_listen_is_called()
public async Task Transport_must_return_an_Address_and_promise_when_listen_is_called()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);

var result = AwaitResult(transportA.Listen());
var result = await transportA.Listen().WithTimeout(DefaultTimeout);

Assert.Equal(addressA, result.Item1);
Assert.Equal(_addressA, result.Item1);
Assert.NotNull(result.Item2);

Assert.Contains(registry.LogSnapshot().OfType<ListenAttempt>(), x => x.BoundAddress == addressATest);
Assert.Contains(registry.LogSnapshot().OfType<ListenAttempt>(), x => x.BoundAddress == _addressATest);
}

[Fact]
public void Transport_must_associate_successfully_with_another_transport_of_its_kind()
public async Task Transport_must_associate_successfully_with_another_transport_of_its_kind()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);
var transportB = NewTransportB(registry);

// Must complete the returned promise to receive events
AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
AwaitResult(transportB.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
(await transportA.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));
(await transportB.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));

AwaitCondition(() => registry.TransportsReady(addressATest, addressBTest));
await AwaitConditionAsync(() => registry.TransportsReady(_addressATest, _addressBTest));

transportA.Associate(addressB);
ExpectMsgPf(DefaultTimeout, "Expect InboundAssociation from A", o =>
// task is not awaited deliberately
var task = transportA.Associate(_addressB);
await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundAssociation from A", o =>
{
var inbound = o as InboundAssociation;

if (inbound != null && inbound.Association.RemoteAddress == addressA)
if (o is InboundAssociation inbound && inbound.Association.RemoteAddress == _addressA)
return inbound.Association;

return null;
});

Assert.Contains(registry.LogSnapshot().OfType<AssociateAttempt>(), x => x.LocalAddress == addressATest && x.RemoteAddress == addressBTest);
AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest));
Assert.Contains(registry.LogSnapshot().OfType<AssociateAttempt>(), x => x.LocalAddress == _addressATest && x.RemoteAddress == _addressBTest);
await AwaitConditionAsync(() => registry.ExistsAssociation(_addressATest, _addressBTest));
}

[Fact]
public void Transport_must_fail_to_associate_with_nonexisting_address()
public async Task Transport_must_fail_to_associate_with_non_existing_address()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);

AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
AwaitCondition(() => registry.TransportsReady(addressATest));
(await transportA.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));
await AwaitConditionAsync(() => registry.TransportsReady(_addressATest));

// Transport throws InvalidAssociationException when trying to associate with non-existing system
XAssert.Throws<InvalidAssociationException>(() =>
AwaitResult(transportA.Associate(nonExistingAddress))
await Assert.ThrowsAsync<InvalidAssociationException>(async () =>
await transportA.Associate(_nonExistingAddress).WithTimeout(DefaultTimeout)
);
}

[Fact]
public void Transport_must_successfully_send_PDUs()
public async Task Transport_must_successfully_send_PDUs()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);
var transportB = NewTransportB(registry);

AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
AwaitResult(transportB.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
(await transportA.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));
(await transportB.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));

AwaitCondition(() => registry.TransportsReady(addressATest, addressBTest));
await AwaitConditionAsync(() => registry.TransportsReady(_addressATest, _addressBTest));

var associate = transportA.Associate(addressB);
var handleB = ExpectMsgPf(DefaultTimeout, "Expect InboundAssociation from A", o =>
var associate = transportA.Associate(_addressB);
var handleB = await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundAssociation from A", o =>
{
var handle = o as InboundAssociation;
if (handle != null && handle.Association.RemoteAddress == addressA)
if (o is InboundAssociation handle && handle.Association.RemoteAddress == _addressA)
return handle.Association;

return null;
});

var handleA = AwaitResult(associate);
var handleA = await associate.WithTimeout(DefaultTimeout);

// Initialize handles
handleA.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));
handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

var payload = ByteString.CopyFromUtf8("PDU");
var pdu = withAkkaProtocol ? new AkkaPduProtobuffCodec(Sys).ConstructPayload(payload) : payload;
var pdu = _withAkkaProtocol ? new AkkaPduProtobuffCodec(Sys).ConstructPayload(payload) : payload;

AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest));
await AwaitConditionAsync(() => registry.ExistsAssociation(_addressATest, _addressBTest));

handleA.Write(payload);
ExpectMsgPf(DefaultTimeout, "Expect InboundPayload from A", o =>
await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundPayload from A", o =>
{
var inboundPayload = o as InboundPayload;

if (inboundPayload != null && inboundPayload.Payload.Equals(pdu))
if (o is InboundPayload inboundPayload && inboundPayload.Payload.Equals(pdu))
return inboundPayload.Payload;

return null;
});

Assert.Contains(registry.LogSnapshot().OfType<WriteAttempt>(), x => x.Sender == addressATest && x.Recipient == addressBTest && x.Payload.Equals(pdu));
Assert.Contains(registry.LogSnapshot().OfType<WriteAttempt>(), x => x.Sender == _addressATest && x.Recipient == _addressBTest && x.Payload.Equals(pdu));
}

[Fact]
public void Transport_must_successfully_disassociate()
public async Task Transport_must_successfully_disassociate()
{
var registry = new AssociationRegistry();
var transportA = NewTransportA(registry);
var transportB = NewTransportB(registry);

AwaitResult(transportA.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
AwaitResult(transportB.Listen()).Item2.SetResult(new ActorAssociationEventListener(TestActor));
(await transportA.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));
(await transportB.Listen().WithTimeout(DefaultTimeout)).Item2.SetResult(new ActorAssociationEventListener(TestActor));

AwaitCondition(() => registry.TransportsReady(addressATest, addressBTest));
await AwaitConditionAsync(() => registry.TransportsReady(_addressATest, _addressBTest));

var associate = transportA.Associate(addressB);
var handleB = ExpectMsgPf(DefaultTimeout, "Expect InboundAssociation from A", o =>
var associate = transportA.Associate(_addressB);
var handleB = await ExpectMsgOfAsync(DefaultTimeout, "Expect InboundAssociation from A", o =>
{
var handle = o as InboundAssociation;
if (handle != null && handle.Association.RemoteAddress == addressA)
if (o is InboundAssociation handle && handle.Association.RemoteAddress == _addressA)
return handle.Association;

return null;
});

var handleA = AwaitResult(associate);
var handleA = await associate.WithTimeout(DefaultTimeout);

// Initialize handles
handleA.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));
handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest));
await AwaitConditionAsync(() => registry.ExistsAssociation(_addressATest, _addressBTest));

handleA.Disassociate();
handleA.Disassociate("Disassociation test", Log);

ExpectMsgPf(DefaultTimeout, "Should receive Disassociated", o => o as Disassociated);
await ExpectMsgOfAsync(DefaultTimeout, "Should receive Disassociated", o => o as Disassociated);

AwaitCondition(() => !registry.ExistsAssociation(addressATest, addressBTest));
await AwaitConditionAsync(() => !registry.ExistsAssociation(_addressATest, _addressBTest));

AwaitCondition(() =>
registry.LogSnapshot().OfType<DisassociateAttempt>().Any(x => x.Requestor == addressATest && x.Remote == addressBTest)
await AwaitConditionAsync(() =>
registry.LogSnapshot().OfType<DisassociateAttempt>().Any(x => x.Requestor == _addressATest && x.Remote == _addressBTest)
);
}

private T AwaitResult<T>(Task<T> task)
{
task.Wait(DefaultTimeout);

return task.Result;
}
}
}

0 comments on commit 87f9601

Please sign in to comment.