Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GetEntityLocation use wrong actor path #6120

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<PropertyGroup>
<AssemblyTitle>Akka.Cluster.Sharding.Tests</AssemblyTitle>
<TargetFrameworks>$(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
<LangVersion>9.0</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
//-----------------------------------------------------------------------
// <copyright file="ShardRegionQueriesSpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Akka.TestKit.TestActors;
using Akka.Util;
using Xunit;
using Xunit.Abstractions;
using FluentAssertions;

namespace Akka.Cluster.Sharding.Tests
{
public class ShardRegionQueriesHashCodeSpecs : AkkaSpec
{
private class MessageExtractor: HashCodeMessageExtractor
{
public MessageExtractor() : base(10)
{ }

public override string EntityId(object message)
{
return message switch
{
int i => i.ToString(),
_ => throw new NotSupportedException()
};
}
}

private readonly Cluster _cluster;
private readonly ClusterSharding _clusterSharding;
private readonly IActorRef _shardRegion;

private readonly ActorSystem _proxySys;

public ShardRegionQueriesHashCodeSpecs(ITestOutputHelper outputHelper) : base(GetConfig(), outputHelper)
{
_clusterSharding = ClusterSharding.Get(Sys);
_cluster = Cluster.Get(Sys);
_shardRegion = _clusterSharding.Start("entity", s => EchoActor.Props(this, true),
ClusterShardingSettings.Create(Sys).WithRole("shard"), new MessageExtractor());

var proxySysConfig = ConfigurationFactory.ParseString("akka.cluster.roles = [proxy]")
.WithFallback(Sys.Settings.Config);
_proxySys = ActorSystem.Create(Sys.Name, proxySysConfig);

_cluster.Join(_cluster.SelfAddress);
AwaitAssert(() => { _cluster.SelfMember.Status.ShouldBe(MemberStatus.Up); });

// form a 2-node cluster
var proxyCluster = Cluster.Get(_proxySys);
proxyCluster.Join(_cluster.SelfAddress);
AwaitAssert(() => { proxyCluster.SelfMember.Status.ShouldBe(MemberStatus.Up); });
}

protected override void AfterAll()
{
Shutdown(_proxySys);
base.AfterAll();
}

private static Config GetConfig()
{
return ConfigurationFactory.ParseString(@"
akka.loglevel = WARNING
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.cluster.roles = [shard]")
.WithFallback(Sharding.ClusterSharding.DefaultConfig())
.WithFallback(DistributedData.DistributedData.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig());
}

[Fact]
public async Task ShardRegion_GetEntityLocation_test()
{
// creates an entity with entityId="1"
await _shardRegion.Ask<int>(1, TimeSpan.FromSeconds(3));

// determine where entity with "entityId=1" is located in cluster
var q1 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));

q1.EntityId.Should().Be("1");

// have a valid ShardId
q1.ShardId.Should().NotBeEmpty();

// have valid address for node that will / would host entity
q1.ShardRegion.Should().NotBe(Address.AllSystems); // has real address

// if entity actor is alive, will retrieve a reference to it
q1.EntityRef.HasValue.Should().BeTrue();
}

[Fact(DisplayName = "ShardRegion should support GetEntityLocation queries locally")]
public async Task ShardRegion_should_support_GetEntityLocation_query_locally()
{
// arrange
await _shardRegion.Ask<int>(1, TimeSpan.FromSeconds(3));
await _shardRegion.Ask<int>(2, TimeSpan.FromSeconds(3));

// act
var q1 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));
var q2 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("2", TimeSpan.FromSeconds(1)));
var q3 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("3", TimeSpan.FromSeconds(1)));

// assert
void AssertValidEntityLocation(EntityLocation e, string entityId)
{
e.EntityId.Should().Be(entityId);
e.EntityRef.Should().NotBe(Option<IActorRef>.None);
e.ShardId.Should().NotBeNullOrEmpty();
e.ShardRegion.Should().Be(_cluster.SelfAddress);
}

AssertValidEntityLocation(q1, "1");
AssertValidEntityLocation(q2, "2");

q3.EntityRef.Should().Be(Option<IActorRef>.None);
q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard?
q3.ShardRegion.Should().Be(Address.AllSystems);
}

[Fact(DisplayName = "ShardRegion should support GetEntityLocation queries remotely")]
public async Task ShardRegion_should_support_GetEntityLocation_query_remotely()
{
// arrange
var sharding2 = ClusterSharding.Get(_proxySys);
var shardRegionProxy = await sharding2.StartProxyAsync("entity", "shard", new MessageExtractor());

await shardRegionProxy.Ask<int>(1, TimeSpan.FromSeconds(3));
await shardRegionProxy.Ask<int>(2, TimeSpan.FromSeconds(3));

// act
var q1 = await shardRegionProxy.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));
var q2 = await shardRegionProxy.Ask<EntityLocation>(new GetEntityLocation("2", TimeSpan.FromSeconds(1)));
var q3 = await shardRegionProxy.Ask<EntityLocation>(new GetEntityLocation("3", TimeSpan.FromSeconds(1)));

// assert
void AssertValidEntityLocation(EntityLocation e, string entityId)
{
e.EntityId.Should().Be(entityId);
e.EntityRef.Should().NotBe(Option<IActorRef>.None);
e.ShardId.Should().NotBeNullOrEmpty();
e.ShardRegion.Should().Be(_cluster.SelfAddress);
}

AssertValidEntityLocation(q1, "1");
AssertValidEntityLocation(q2, "2");

q3.EntityRef.Should().Be(Option<IActorRef>.None);
q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard?
q3.ShardRegion.Should().Be(Address.AllSystems);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ private string ExtractShardId(object message)
switch (message)
{
case int i:
return (i % 10).ToString();
return (i % 10 + 1).ToString();
// must support ShardRegion.StartEntity in order for
// GetEntityLocation to work properly
case ShardRegion.StartEntity se:
return se.EntityId;
return (int.Parse(se.EntityId) % 10 + 1).ToString();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the test IMessageExtractor so that the ShardId does not equal to EntityId to catch actor path bugs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doh - that's a good catch

}

throw new NotSupportedException();
Expand Down
5 changes: 3 additions & 2 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,7 @@ Address GetNodeAddress(IActorRef shardOrRegionRef)

try
{
var entityId = getEntityLocation.EntityId;
var shardId = ExtractShardId(new StartEntity(getEntityLocation.EntityId));
if (string.IsNullOrEmpty(shardId))
{
Expand Down Expand Up @@ -1046,15 +1047,15 @@ async Task ResolveEntityRef(Address destinationAddress, ActorPath entityPath)
// NOTE: in the event that we're querying a shard's location from a ShardRegionProxy
// the shard may not be technically "homed" inside the proxy, but it does exist.
#pragma warning disable CS4014
ResolveEntityRef(GetNodeAddress(shardRegionRef), shardRegionRef.Path / shardId / shardId); // needs to run as a detached task
ResolveEntityRef(GetNodeAddress(shardRegionRef), shardRegionRef.Path / shardId / entityId); // needs to run as a detached task
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actor path should be shardRegionRef.Path / shardId / entityId not shardRegionRef.Path / shardId / shardId

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, my mistake

#pragma warning restore CS4014
}

return;
}

#pragma warning disable CS4014
ResolveEntityRef(GetNodeAddress(shardActorRef), shardActorRef.Path / shardId); // needs to run as a detached task
ResolveEntityRef(GetNodeAddress(shardActorRef), shardActorRef.Path / entityId); // needs to run as a detached task
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, actor path should be shardActorRef.Path / entityId, not shardActorRef.Path / shardId

#pragma warning restore CS4014
}
catch (Exception ex)
Expand Down