-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Singleton class and settings based on current Akka Typed implementation #6050
Merged
Aaronontheweb
merged 2 commits into
akkadotnet:dev
from
ismaelhamed:clustersingleton-ext
Aug 12, 2022
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
133 changes: 133 additions & 0 deletions
133
src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonApiSpec.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
//----------------------------------------------------------------------- | ||
// <copyright file="ClusterSingletonConfigSpec.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
//----------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Cluster.Tools.Singleton; | ||
using Akka.Configuration; | ||
using Akka.TestKit; | ||
using Akka.TestKit.Configs; | ||
using Akka.TestKit.Extensions; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
|
||
namespace Akka.Cluster.Tools.Tests.Singleton | ||
{ | ||
public class ClusterSingletonApiSpec : AkkaSpec | ||
{ | ||
#region Internal | ||
|
||
public sealed class Pong | ||
{ | ||
public static Pong Instance => new Pong(); | ||
private Pong() { } | ||
} | ||
|
||
public sealed class Ping | ||
{ | ||
public IActorRef RespondTo { get; } | ||
public Ping(IActorRef respondTo) => RespondTo = respondTo; | ||
} | ||
|
||
public sealed class Perish | ||
{ | ||
public static Perish Instance => new Perish(); | ||
private Perish() { } | ||
} | ||
|
||
public class PingPong : UntypedActor | ||
{ | ||
protected override void OnReceive(object message) | ||
{ | ||
switch (message) | ||
{ | ||
case Ping ping: | ||
ping.RespondTo.Tell(Pong.Instance); | ||
break; | ||
case Perish _: | ||
Context.Stop(Self); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
#endregion | ||
|
||
private readonly Cluster _clusterNode1; | ||
private readonly Cluster _clusterNode2; | ||
private readonly ActorSystem _system2; | ||
|
||
public static Config GetConfig() => ConfigurationFactory.ParseString(@" | ||
akka.loglevel = DEBUG | ||
akka.actor.provider = ""cluster"" | ||
akka.cluster.roles = [""singleton""] | ||
akka.remote { | ||
dot-netty.tcp { | ||
hostname = ""127.0.0.1"" | ||
port = 0 | ||
} | ||
}").WithFallback(TestConfigs.DefaultConfig); | ||
|
||
public ClusterSingletonApiSpec(ITestOutputHelper testOutput) | ||
: base(GetConfig(), testOutput) | ||
{ | ||
_clusterNode1 = Cluster.Get(Sys); | ||
|
||
_system2 = ActorSystem.Create( | ||
Sys.Name, | ||
ConfigurationFactory.ParseString("akka.cluster.roles = [\"singleton\"]").WithFallback(Sys.Settings.Config)); | ||
|
||
_clusterNode2 = Cluster.Get(_system2); | ||
} | ||
|
||
[Fact] | ||
public void A_cluster_singleton_must_be_accessible_from_two_nodes_in_a_cluster() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LGTM |
||
{ | ||
var node1UpProbe = CreateTestProbe(Sys); | ||
var node2UpProbe = CreateTestProbe(Sys); | ||
|
||
_clusterNode1.Join(_clusterNode1.SelfAddress); | ||
node1UpProbe.AwaitAssert(() => _clusterNode1.SelfMember.Status.ShouldBe(MemberStatus.Up), TimeSpan.FromSeconds(3)); | ||
|
||
_clusterNode2.Join(_clusterNode2.SelfAddress); | ||
node2UpProbe.AwaitAssert(() => _clusterNode2.SelfMember.Status.ShouldBe(MemberStatus.Up), TimeSpan.FromSeconds(3)); | ||
|
||
var cs1 = ClusterSingleton.Get(Sys); | ||
var cs2 = ClusterSingleton.Get(_system2); | ||
|
||
var settings = ClusterSingletonSettings.Create(Sys).WithRole("singleton"); | ||
var node1ref = cs1.Init(SingletonActor.Create(Props.Create<PingPong>(), "ping-pong").WithStopMessage(Perish.Instance).WithSettings(settings)); | ||
var node2ref = cs2.Init(SingletonActor.Create(Props.Create<PingPong>(), "ping-pong").WithStopMessage(Perish.Instance).WithSettings(settings)); | ||
|
||
// subsequent spawning returns the same refs | ||
cs1.Init(SingletonActor.Create(Props.Create<PingPong>(), "ping-pong").WithStopMessage(Perish.Instance).WithSettings(settings)).ShouldBe(node1ref); | ||
cs2.Init(SingletonActor.Create(Props.Create<PingPong>(), "ping-pong").WithStopMessage(Perish.Instance).WithSettings(settings)).ShouldBe(node2ref); | ||
|
||
var node1PongProbe = CreateTestProbe(Sys); | ||
var node2PongProbe = CreateTestProbe(_system2); | ||
|
||
node1PongProbe.AwaitAssert(() => | ||
{ | ||
node1ref.Tell(new Ping(node1PongProbe.Ref)); | ||
node1PongProbe.ExpectMsg<Pong>(); | ||
}, TimeSpan.FromSeconds(3)); | ||
|
||
node2PongProbe.AwaitAssert(() => | ||
{ | ||
node2ref.Tell(new Ping(node2PongProbe.Ref)); | ||
node2PongProbe.ExpectMsg<Pong>(); | ||
}, TimeSpan.FromSeconds(3)); | ||
} | ||
|
||
protected override async Task AfterAllAsync() | ||
{ | ||
await base.AfterAllAsync(); | ||
await _system2.Terminate().AwaitWithTimeout(TimeSpan.FromSeconds(3)); | ||
} | ||
} | ||
} |
131 changes: 131 additions & 0 deletions
131
src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingleton.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
//----------------------------------------------------------------------- | ||
// <copyright file="ClusterSingletonManager.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
//----------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Collections.Concurrent; | ||
using Akka.Actor; | ||
using Akka.Annotations; | ||
using Akka.Util; | ||
|
||
namespace Akka.Cluster.Tools.Singleton | ||
{ | ||
/// <summary> | ||
/// This class is not intended for user extension other than for test purposes (e.g. stub implementation). | ||
/// More methods may be added in the future and that may break such implementations. | ||
/// </summary> | ||
[DoNotInherit] | ||
public class ClusterSingleton : IExtension | ||
{ | ||
private readonly ActorSystem _system; | ||
private readonly Lazy<Cluster> _cluster; | ||
private readonly ConcurrentDictionary<string, IActorRef> _proxies = new ConcurrentDictionary<string, IActorRef>(); | ||
|
||
public static ClusterSingleton Get(ActorSystem system) => | ||
system.WithExtension<ClusterSingleton, ClusterSingletonProvider>(); | ||
|
||
public ClusterSingleton(ExtendedActorSystem system) | ||
{ | ||
_system = system; | ||
_cluster = new Lazy<Cluster>(() => Cluster.Get(system)); | ||
} | ||
|
||
/// <summary> | ||
/// Start if needed and provide a proxy to a named singleton. | ||
/// | ||
/// <para>If there already is a manager running for the given `singletonName` on this node, no additional manager is started.</para> | ||
/// <para>If there already is a proxy running for the given `singletonName` on this node, an <see cref="IActorRef"/> to that is returned.</para> | ||
/// </summary> | ||
/// <returns>A proxy actor that can be used to communicate with the singleton in the cluster</returns> | ||
public IActorRef Init(SingletonActor singleton) | ||
{ | ||
var settings = singleton.Settings.GetOrElse(ClusterSingletonSettings.Create(_system)); | ||
if (settings.ShouldRunManager(_cluster.Value)) | ||
{ | ||
var managerName = ManagerNameFor(singleton.Name); | ||
try | ||
{ | ||
_system.ActorOf(ClusterSingletonManager.Props( | ||
singletonProps: singleton.Props, | ||
terminationMessage: singleton.StopMessage.GetOrElse(PoisonPill.Instance), | ||
settings: settings.ToManagerSettings(singleton.Name)), | ||
managerName); | ||
} | ||
catch (InvalidActorNameException ex) when (ex.Message.EndsWith("is not unique!")) | ||
{ | ||
// This is fine. We just wanted to make sure it is running and it already is | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LGTM |
||
} | ||
} | ||
|
||
return GetProxy(singleton.Name, settings); | ||
} | ||
|
||
private IActorRef GetProxy(string name, ClusterSingletonSettings settings) | ||
{ | ||
IActorRef ProxyCreator() | ||
{ | ||
var proxyName = $"singletonProxy{name}"; | ||
return _system.ActorOf(ClusterSingletonProxy.Props( | ||
singletonManagerPath: $"/user/{ManagerNameFor(name)}", | ||
settings: settings.ToProxySettings(name)), | ||
proxyName); | ||
} | ||
|
||
return _proxies.GetOrAdd(name, _ => ProxyCreator()); | ||
} | ||
|
||
|
||
private string ManagerNameFor(string singletonName) => $"singletonManager{singletonName}"; | ||
} | ||
|
||
public class ClusterSingletonProvider : ExtensionIdProvider<ClusterSingleton> | ||
{ | ||
public override ClusterSingleton CreateExtension(ExtendedActorSystem system) => new ClusterSingleton(system); | ||
} | ||
|
||
public class SingletonActor | ||
{ | ||
public string Name { get; } | ||
|
||
public Props Props { get; } | ||
|
||
public Option<object> StopMessage { get; } | ||
|
||
public Option<ClusterSingletonSettings> Settings { get; } | ||
|
||
public static SingletonActor Create(Props props, string name) => | ||
new SingletonActor(name, props, Option<object>.None, Option<ClusterSingletonSettings>.None); | ||
|
||
private SingletonActor(string name, Props props, Option<object> stopMessage, Option<ClusterSingletonSettings> settings) | ||
{ | ||
Name = name; | ||
Props = props; | ||
StopMessage = stopMessage; | ||
Settings = settings; | ||
} | ||
|
||
/// <summary> | ||
/// <see cref="Props"/> of the singleton actor, such as dispatcher settings. | ||
/// </summary> | ||
public SingletonActor WithProps(Props props) => Copy(props: props); | ||
|
||
/// <summary> | ||
/// Message sent to the singleton to tell it to stop, e.g. when being migrated. | ||
/// If this is not defined, a <see cref="PoisonPill"/> will be used instead. | ||
/// It can be useful to define a custom stop message if the singleton needs to | ||
/// perform some asynchronous cleanup or interactions before stopping. | ||
/// </summary> | ||
public SingletonActor WithStopMessage(object stopMessage) => Copy(stopMessage: stopMessage); | ||
|
||
/// <summary> | ||
/// Additional settings, typically loaded from configuration. | ||
/// </summary> | ||
public SingletonActor WithSettings(ClusterSingletonSettings settings) => Copy(settings: settings); | ||
|
||
private SingletonActor Copy(string name = null, Props props = null, Option<object> stopMessage = default, Option<ClusterSingletonSettings> settings = default) => | ||
new SingletonActor(name ?? Name, props ?? Props, stopMessage.HasValue ? stopMessage : StopMessage, settings.HasValue ? settings : Settings); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, we prefer to use DocFx's
[code]
blocks that reference a runnable source file instead of traditional markdown blocks - see https://getakka.net/community/contributing/documentation-guidelines.html#code-samples-must-use-code-references for examples on how to do this.Reason why we care: if the sample is updated, the documentation is also automatically updated too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to merge this in now and we can update those docs to reference your unit tests' code in a separate PR.