Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding commands to help print out Akka.Cluster.Sharding data #7

Merged
merged 12 commits into from
Aug 5, 2021
Merged
6 changes: 6 additions & 0 deletions Akka.Cluster.Sharding.RepairTool.sln
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Petabridge.Cmd.Cluster.Shar
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RepairTool", "src\RepairTool\RepairTool.csproj", "{0AF4A4EB-0FAE-4CE7-8F2A-805CA88A1ABA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RepairTool.End2End.Tests", "src\RepairTool.End2End.Tests\RepairTool.End2End.Tests.csproj", "{62BCF978-E3A5-4281-B47B-B2F0C9B4936B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -36,6 +38,10 @@ Global
{0AF4A4EB-0FAE-4CE7-8F2A-805CA88A1ABA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0AF4A4EB-0FAE-4CE7-8F2A-805CA88A1ABA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0AF4A4EB-0FAE-4CE7-8F2A-805CA88A1ABA}.Release|Any CPU.Build.0 = Release|Any CPU
{62BCF978-E3A5-4281-B47B-B2F0C9B4936B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{62BCF978-E3A5-4281-B47B-B2F0C9B4936B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{62BCF978-E3A5-4281-B47B-B2F0C9B4936B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{62BCF978-E3A5-4281-B47B-B2F0C9B4936B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
1 change: 1 addition & 0 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<TestSdkVersion>16.10.0</TestSdkVersion>
<AkkaVersion>1.4.21</AkkaVersion>
<PbmVersion>1.0.1</PbmVersion>
<DockerVersion>3.125.4</DockerVersion>
<FluentAssertionsVersion>5.10.3</FluentAssertionsVersion>
<MicrosoftExtensionsVersion>5.0.0</MicrosoftExtensionsVersion>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.IO;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Setup;
using Akka.Cluster.Sharding;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using System.Linq;
using Akka.Actor;
using Akka.Persistence.Query;
using Akka.Streams;
using Akka.Streams.Dsl;

namespace Petabridge.Cmd.Cluster.Sharding.Repair
{
/// <summary>
/// Responsible for printing out all of the Akka.Persistence entities detected inside the ShardRegion
/// </summary>
internal class ClusterShardingEntityPrinter : ReceiveActor
{
private class PrintComplete
{
public static readonly PrintComplete Instance = new PrintComplete();
private PrintComplete(){}
}

private readonly IActorRef _reporter;
private readonly ICurrentPersistenceIdsQuery _readJournal;
private bool _regionsOnly;

public ClusterShardingEntityPrinter(ICurrentPersistenceIdsQuery readJournal, bool regionsOnly, IActorRef reporter)
{
_readJournal = readJournal;
_regionsOnly = regionsOnly;
_reporter = reporter;

Receive<string>(str =>
{
if (_regionsOnly && str.Contains("Coordinator"))
{
var stub = "/system/sharding/";
var (startPos, endPos) = (str.IndexOf(stub, StringComparison.Ordinal), str.IndexOf("Coordinator", StringComparison.Ordinal));
var regionName = str.Substring(startPos + stub.Length, endPos - stub.Length - startPos);
_reporter.Tell(new CommandResponse(regionName, final:false));
return;
}

_reporter.Tell(new CommandResponse(str, false));
});

Receive<PrintComplete>(_ =>
{
// terminate response stream
_reporter.Tell(CommandResponse.Empty);
Context.Stop(Self);
});
}

protected override void PreStart()
{
var source = _readJournal.CurrentPersistenceIds().Where(x => x.StartsWith("/system/sharding"));
var sink = Sink.ActorRef<string>(Self, PrintComplete.Instance);
source.RunWith(sink, Context.Materializer());
}

protected override void PreRestart(Exception reason, object message)
{
_reporter.Tell(new ErroredCommandResponse(reason.Message + Environment.NewLine + reason.StackTrace, true));
base.PreRestart(reason, message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,23 @@

namespace Petabridge.Cmd.Cluster.Sharding.Repair
{
internal static class ClusterShardingRepairCmd
public static class ClusterShardingRepairCmd
{
public static readonly CommandDefinition PrintInternalClusterShardingData = new CommandDefinitionBuilder()
.WithName("print-sharding-data")
.WithDescription(
"Lists all of the Akka.Persistence entity ids that will be deleted by Akka.Cluster.Sharding")
.Build();

public static readonly CommandDefinition PrintShardRegionNameData = new CommandDefinitionBuilder()
.WithName("print-sharding-regions")
.WithDescription(
"Lists all of the shardRegion names currently stored inside Akka.Cluster.Sharding persistence storage.")
.Build();

public static readonly CommandDefinition RemoveInternalClusterShardingData = new CommandDefinitionBuilder()
.WithName("delete-sharding-data")
.WithDescription("Lists all Akka.Cluster.Sharding regions known to the current node.")
.WithDescription("Delete all Akka.Cluster.Sharding regions known to the current node.")
.WithArgument(b => b.WithName("typeName")
.WithSwitch("-t").WithSwitch("-T").WithDescription("The name of the entity type.")
.AllowMultiple(true)
Expand All @@ -26,6 +38,6 @@ internal static class ClusterShardingRepairCmd
.Build();

public static readonly CommandPalette ClusterShardingRepairCommandPalette = new CommandPalette("cluster-sharding-repair",
new[] {RemoveInternalClusterShardingData});
new[] {RemoveInternalClusterShardingData, PrintShardRegionNameData, PrintInternalClusterShardingData });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

using System.Collections.Generic;
using System.Linq;
using Akka.DependencyInjection;
using Petabridge.Cmd.Host;
using static Petabridge.Cmd.Cluster.Sharding.Repair.ClusterShardingRepairCmd;

Expand All @@ -17,8 +18,26 @@ namespace Petabridge.Cmd.Cluster.Sharding.Repair
/// </summary>
internal sealed class ClusterShardingRepairCmdHandler: CommandHandlerActor
{
private int _printCounter = 0;

public ClusterShardingRepairCmdHandler() : base(ClusterShardingRepairCommandPalette)
{
Process(PrintInternalClusterShardingData.Name, cmd =>
{
var sp = DependencyResolver.For(Context.System);
var sender = Sender;
var props = sp.Props<ClusterShardingEntityPrinter>(sender, false);
Context.ActorOf(props, "printer" + _printCounter++);
});

Process(PrintShardRegionNameData.Name, cmd =>
{
var sp = DependencyResolver.For(Context.System);
var sender = Sender;
var props = sp.Props<ClusterShardingEntityPrinter>(sender, true);
Context.ActorOf(props, "printer" + _printCounter++);
});

Process(RemoveInternalClusterShardingData.Name, command =>
{
var journalPluginId = command.Arguments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public Result(Try<Removals> removals)

private readonly IActorRef _replyTo;

private bool _hasSnapshots = false;

public RemoveOnePersistenceId(string journalPluginId, string snapshotPluginId, string persistenceId, IActorRef replyTo)
{
JournalPluginId = journalPluginId;
Expand All @@ -59,40 +57,28 @@ public RemoveOnePersistenceId(string journalPluginId, string snapshotPluginId, s
}

public override string PersistenceId { get; }


public override Recovery Recovery { get; } = Recovery.None;

protected override bool ReceiveRecover(object message)
{
switch (message)
{
case ClusterEvent.IClusterDomainEvent _:
return true;

case SnapshotOffer _:
_hasSnapshots = true;
return true;

case RecoveryCompleted _:
DeleteMessages(long.MaxValue);
if(_hasSnapshots)
DeleteSnapshots(new SnapshotSelectionCriteria(long.MaxValue, DateTime.MaxValue, 0, DateTime.MinValue));
else
Context.Become(WaitDeleteMessagesSuccess);
return true;

default:
return false;
}
return false;
}

protected override bool ReceiveCommand(object message)
{
switch (message)
{
case Start _: // we've recovered and loaded our LastSeqNr
DeleteMessages(long.MaxValue);
DeleteSnapshots(new SnapshotSelectionCriteria(long.MaxValue, DateTime.MaxValue, 0, DateTime.MinValue));
return true;

case DeleteSnapshotsSuccess _:
Context.Become(WaitDeleteMessagesSuccess);
return true;

case DeleteMessagesSuccess _:
case DeleteMessagesSuccess d:
Context.Become(WaitDeleteSnapshotsSuccess);
return true;

Expand All @@ -118,7 +104,7 @@ private bool WaitDeleteMessagesSuccess(object message)
{
switch (message)
{
case DeleteMessagesSuccess _:
case DeleteMessagesSuccess m:
Done();
return true;

Expand Down Expand Up @@ -146,7 +132,7 @@ private bool HandleFailure(object message)

private void Done()
{
_replyTo.Tell(new Result(new Try<Removals>(new Removals(LastSequenceNr > 0, _hasSnapshots))));
_replyTo.Tell(new Result(new Try<Removals>(new Removals(LastSequenceNr > 0, false))));
Context.Stop(Self);
}

Expand All @@ -155,6 +141,18 @@ private void Failure(Exception cause)
_replyTo.Tell(new Result(new Try<Removals>(cause)));
Context.Stop(Self);
}

protected override void PreStart()
{
Self.Tell(Start.Instance);
base.PreStart();
}

private class Start
{
public static readonly Start Instance = new Start();
private Start(){}
}
}

public static Props Props(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@
// </copyright>
// -----------------------------------------------------------------------

using System;
using Akka.Actor;
using Akka.DependencyInjection;
using Akka.Persistence.Query;
using Petabridge.Cmd.Host;
using static Petabridge.Cmd.Cluster.Sharding.Repair.ClusterShardingRepairCmd;

namespace Petabridge.Cmd.Cluster.Sharding.Repair
{
/// <summary>
/// Cluster.Sharding repair commands.
///
/// NOTE: this plugin requires you to configure Akka.DependencyInjection to provide support for <see cref="ICurrentPersistenceIdsQuery"/>.
/// </summary>
public class ClusterShardingRepairCommands : CommandPaletteHandler
{
public static ClusterShardingRepairCommands Instance = new ClusterShardingRepairCommands();
public static readonly ClusterShardingRepairCommands Instance = new ClusterShardingRepairCommands();

private ClusterShardingRepairCommands() : base(ClusterShardingRepairCommandPalette)
{
Expand All @@ -21,5 +29,29 @@ private ClusterShardingRepairCommands() : base(ClusterShardingRepairCommandPalet

public override Props HandlerProps { get; }

// public override void OnRegister(PetabridgeCmd plugin)
// {
// // need to validate that end-user configured the plugin correctly
// try
// {
// var dr = DependencyResolver.For(plugin.Sys);
// var persistenceIdsQuery = dr.Resolver.GetService<ICurrentPersistenceIdsQuery>();
//
// if (persistenceIdsQuery == default(ICurrentPersistenceIdsQuery))
// throw new InvalidOperationException(
// "No ICurrentPersistenceIdsQuery implementation bound to IServiceProvider. Can't start repair.");
// }
// catch (InvalidOperationException)
// {
// throw;
// }
// catch (Exception ex)
// {
// throw new InvalidOperationException(
// "No ICurrentPersistenceIdsQuery implementation bound to IServiceProvider. Can't start repair.", ex);
// }
//
// base.OnRegister(plugin);
// }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

<ItemGroup>
<PackageReference Include="Akka.Cluster.Sharding" Version="$(AkkaVersion)" />
<PackageReference Include="Petabridge.Cmd.Host" Version="$(PbmVersion)"/>
<PackageReference Include="Petabridge.Cmd.Host" Version="$(PbmVersion)" />
<PackageReference Include="Akka.Persistence.Query" Version="$(AkkaVersion)" />
<PackageReference Include="Akka.DependencyInjection" Version="$(AkkaVersion)" />
<PackageReference Include="Akka.Persistence.Query.Sql" Version="$(AkkaVersion)" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using Akka;
using Akka.Persistence.Query;
using Akka.Streams.Dsl;

namespace Petabridge.Cmd.Cluster.Sharding.Repair
{
/// <summary>
/// Used to make the compiler happy, but still blow up if the end-user did it wrong.
/// </summary>
public sealed class PlaceholderReadJournal : ICurrentPersistenceIdsQuery
{
public Source<string, NotUsed> CurrentPersistenceIds()
{
return Source.Failed<string>(new NotImplementedException(
"This is a placeholder ReadJournal implementation. Replace it with a real one in your service configuration"));
}
}
}
Loading