Skip to content

Commit

Permalink
Delete unnecessary / bad Sql.Common.Journal subscriptions (#6412)
Browse files Browse the repository at this point in the history
* removing ineffective Akka.Persistence.Query.Sql commands

* removed all deprecated queries from Akka.Persistence.Query.Sql

* added support for querying the database for persistent ids directly

* added API approvals
  • Loading branch information
Aaronontheweb authored Feb 18, 2023
1 parent 8fb39e5 commit 797894c
Show file tree
Hide file tree
Showing 12 changed files with 44 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<TargetFrameworks>$(NetStandardLibVersion);$(NetLibVersion)</TargetFrameworks>
<PackageTags>$(AkkaPackageTags);persistence;eventsource;sql;reactive;streams</PackageTags>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<LangVersion>8.0</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected AbstractAllEventsPublisher(long fromOffset, int maxBufferSize, string
JournalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
}

protected ILoggingAdapter Log => _log ?? (_log = Context.GetLogger());
protected ILoggingAdapter Log => _log ??= Context.GetLogger();
protected IActorRef JournalRef { get; }
protected DeliveryBuffer<EventEnvelope> Buffer { get; }
protected long FromOffset { get; }
Expand Down Expand Up @@ -81,7 +81,6 @@ protected bool Idle(object message)
switch (message)
{
case AllEventsPublisher.Continue _:
case NewEventAppended _:
if (IsTimeForReplay) Replay();
return true;
case Request _:
Expand Down Expand Up @@ -138,7 +137,6 @@ protected bool Replaying( object message )
Context.Stop(Self);
return true;
case AllEventsPublisher.Continue _:
case NewEventAppended _:
return true;
default:
return false;
Expand Down Expand Up @@ -166,7 +164,6 @@ protected override void PostStop()

protected override void ReceiveInitialRequest()
{
JournalRef.Tell(SubscribeNewEvents.Instance);
Replay();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System;
using Akka.Actor;
using Akka.Event;
using Akka.Persistence.Sql.Common.Journal;
using Akka.Streams.Actors;

namespace Akka.Persistence.Query.Sql
Expand Down Expand Up @@ -53,7 +52,7 @@ protected AbstractEventsByPersistenceIdPublisher(string persistenceId, long from
JournalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
}

protected ILoggingAdapter Log => _log ?? (_log = Context.GetLogger());
protected ILoggingAdapter Log => _log ??= Context.GetLogger();
protected string PersistenceId { get; }
protected long FromSequenceNr { get; }
protected long ToSequenceNr { get; set; }
Expand Down Expand Up @@ -96,9 +95,6 @@ protected bool Idle(object message)
case EventsByPersistenceIdPublisher.Continue _:
if (IsTimeForReplay) Replay();
return true;
case EventAppended _:
if (IsTimeForReplay) Replay();
return true;
case Request _:
ReceiveIdleRequest();
return true;
Expand Down Expand Up @@ -157,10 +153,6 @@ protected Receive Replaying(int limit)
// skip during replay
return true;

case EventAppended _:
// skip during replay
return true;

case Cancel _:
Context.Stop(Self);
return true;
Expand Down Expand Up @@ -190,7 +182,6 @@ protected override void PostStop()

protected override void ReceiveInitialRequest()
{
JournalRef.Tell(new SubscribePersistenceId(PersistenceId));
Replay();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected AbstractEventsByTagPublisher(string tag, long fromOffset, int maxBuffe
JournalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
}

protected ILoggingAdapter Log => _log ?? (_log = Context.GetLogger());
protected ILoggingAdapter Log => _log ??= Context.GetLogger();
protected string Tag { get; }
protected long FromOffset { get; }
protected abstract long ToOffset { get; }
Expand Down Expand Up @@ -88,9 +88,6 @@ protected bool Idle(object message)
case EventsByTagPublisher.Continue _:
if (IsTimeForReplay) Replay();
return true;
case TaggedEventAppended _:
if (IsTimeForReplay) Replay();
return true;
case Request _:
ReceiveIdleRequest();
return true;
Expand Down Expand Up @@ -146,11 +143,6 @@ protected Receive Replaying(int limit)
case EventsByTagPublisher.Continue _:
// no-op
return true;

case TaggedEventAppended _:
// no-op
return true;

case Cancel _:
Context.Stop(Self);
return true;
Expand Down Expand Up @@ -182,7 +174,6 @@ protected override void PostStop()

protected override void ReceiveInitialRequest()
{
JournalRef.Tell(new SubscribeTag(Tag));
Replay();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<TargetFrameworks>$(NetStandardLibVersion);$(NetLibVersion)</TargetFrameworks>
<PackageTags>$(AkkaPackageTags);persistence;eventsource;sql</PackageTags>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<LangVersion>8.0</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeInspection/CSharpLanguageProject/LanguageLevel/@EntryValue">CSharp80</s:String></wpf:ResourceDictionary>
Original file line number Diff line number Diff line change
Expand Up @@ -514,21 +514,6 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
/// </summary>
protected BatchingSqlJournalSetup Setup { get; }

/// <summary>
/// Flag determining if current journal has any subscribers for <see cref="EventAppended"/> events.
/// </summary>
protected bool HasPersistenceIdSubscribers => _persistenceIdSubscribers.Count != 0;

/// <summary>
/// Flag determining if current journal has any subscribers for <see cref="TaggedEventAppended"/> events.
/// </summary>
protected bool HasTagSubscribers => _tagSubscribers.Count != 0;

/// <summary>
/// Flag determining if current journal has any subscribers for <see cref="NewEventAppended"/> and
/// </summary>
protected bool HasNewEventsSubscribers => _newEventSubscriber.Count != 0;

/// <summary>
/// Flag determining if incoming journal requests should be published in current actor system event stream.
/// Useful mostly for tests.
Expand Down Expand Up @@ -559,10 +544,6 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)

private readonly AtomicCounterLong _bufferIdCounter;

private readonly Dictionary<string, HashSet<IActorRef>> _persistenceIdSubscribers;
private readonly Dictionary<string, HashSet<IActorRef>> _tagSubscribers;
private readonly HashSet<IActorRef> _newEventSubscriber;

private readonly Akka.Serialization.Serialization _serialization;
private readonly CircuitBreaker _circuitBreaker;
private int _remainingOperations;
Expand All @@ -575,11 +556,7 @@ protected BatchingSqlJournal(BatchingSqlJournalSetup setup)
{
Setup = setup;
CanPublish = Persistence.Instance.Apply(Context.System).Settings.Internal.PublishPluginCommands;
TimestampProvider = TimestampProviderProvider.GetTimestampProvider(setup.TimestampProviderTypeName, Context);

_persistenceIdSubscribers = new Dictionary<string, HashSet<IActorRef>>();
_tagSubscribers = new Dictionary<string, HashSet<IActorRef>>();
_newEventSubscriber = new HashSet<IActorRef>();
TimestampProvider = TimestampProviderProvider.GetTimestampProvider(setup.TimestampProviderTypeName, Context);

_remainingOperations = Setup.MaxConcurrentOperations;
_buffers = new[]
Expand Down Expand Up @@ -739,18 +716,6 @@ protected sealed override bool Receive(object message)
case BatchComplete msg:
CompleteBatch(msg);
return true;
case SubscribePersistenceId msg:
AddPersistenceIdSubscriber(msg);
return true;
case SubscribeTag msg:
AddTagSubscriber(msg);
return true;
case SubscribeNewEvents msg:
AddNewEventsSubscriber(msg);
return true;
case Terminated msg:
RemoveSubscriber(msg.ActorRef);
return true;
case ChunkExecutionFailure msg:
FailChunkExecution(msg);
return true;
Expand Down Expand Up @@ -834,68 +799,6 @@ private void FailChunkExecution(ChunkExecutionFailure message)
TryProcess();
}

#region subscriptions
private void RemoveSubscriber(IActorRef subscriberRef)
{
_persistenceIdSubscribers.RemoveItem(subscriberRef);
_tagSubscribers.RemoveItem(subscriberRef);
_newEventSubscriber.Remove(subscriberRef);
}

private void AddNewEventsSubscriber(SubscribeNewEvents message)
{
var subscriber = Sender;
_newEventSubscriber.Add(subscriber);
Context.Watch(subscriber);
}

private void AddTagSubscriber(SubscribeTag message)
{
var subscriber = Sender;
_tagSubscribers.AddItem(message.Tag, subscriber);
Context.Watch(subscriber);
}

private void AddPersistenceIdSubscriber(SubscribePersistenceId message)
{
var subscriber = Sender;
_persistenceIdSubscribers.AddItem(message.PersistenceId, subscriber);
Context.Watch(subscriber);
}

private void NotifyNewEventAppended()
{
if (HasNewEventsSubscribers)
{
foreach (var subscriber in _newEventSubscriber)
{
subscriber.Tell(NewEventAppended.Instance);
}
}
}

private void NotifyTagChanged(string tag)
{
if (_tagSubscribers.TryGetValue(tag, out var bucket))
{
var changed = new TaggedEventAppended(tag);
foreach (var subscriber in bucket)
subscriber.Tell(changed);
}
}

private void NotifyPersistenceIdChanged(string persistenceId)
{
if (_persistenceIdSubscribers.TryGetValue(persistenceId, out var bucket))
{
var changed = new EventAppended(persistenceId);
foreach (var subscriber in bucket)
subscriber.Tell(changed);
}
}

#endregion

/// <summary>
/// Tries to add incoming <paramref name="message"/> to <see cref="Buffer"/>.
/// Also checks if any DB connection has been released and next batch can be processed.
Expand Down Expand Up @@ -1505,28 +1408,6 @@ public void FinalizeSuccess(BatchingSqlJournal<TConnection, TCommand> journal)
aRef.Tell(new WriteMessageSuccess(unadapted, actorInstanceId), unadapted.Sender);
}
}

if (journal.HasTagSubscribers && _tags.Count != 0)
{
foreach (var tag in _tags)
{
journal.NotifyTagChanged(tag);
}
}

if (journal.HasPersistenceIdSubscribers)
{
foreach (var persistenceId in _persistenceIds)
{
journal.NotifyPersistenceIdChanged(persistenceId);
}
}

if (journal.HasNewEventsSubscribers)
{
journal.NotifyNewEventAppended();
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@

namespace Akka.Persistence.Sql.Common.Journal
{
/// <summary>
/// TBD
/// </summary>
[Obsolete("Query is not implemented.")]
public interface ISubscriptionCommand { }

/// <summary>
Expand All @@ -26,6 +24,7 @@ public interface ISubscriptionCommand { }
/// the subscriber when <see cref="AsyncWriteJournal.WriteMessagesAsync"/> has been called.
/// </summary>
[Serializable]
[Obsolete("Query is not implemented.", true)]
public sealed class SubscribePersistenceId : ISubscriptionCommand
{
/// <summary>
Expand All @@ -47,6 +46,7 @@ public SubscribePersistenceId(string persistenceId)
/// TBD
/// </summary>
[Serializable]
[Obsolete("Query is not implemented.", true)]
public sealed class EventAppended : IDeadLetterSuppression
{
/// <summary>
Expand Down Expand Up @@ -108,6 +108,7 @@ public CurrentPersistenceIds(IEnumerable<string> allPersistenceIds, long highest
/// the subscriber when `asyncWriteMessages` has been called.
/// </summary>
[Serializable]
[Obsolete("Query is not implemented.", true)]
public sealed class SubscribeNewEvents : ISubscriptionCommand
{
public static SubscribeNewEvents Instance = new SubscribeNewEvents();
Expand All @@ -116,6 +117,7 @@ private SubscribeNewEvents() { }
}

[Serializable]
[Obsolete("Query is not implemented.", true)]
public sealed class NewEventAppended : IDeadLetterSuppression
{
public static NewEventAppended Instance = new NewEventAppended();
Expand All @@ -131,6 +133,7 @@ private NewEventAppended() { }
/// via an <see cref="IEventAdapter"/>.
/// </summary>
[Serializable]
[Obsolete("Query is not implemented.", true)]
public sealed class SubscribeTag : ISubscriptionCommand
{
/// <summary>
Expand All @@ -152,6 +155,7 @@ public SubscribeTag(string tag)
/// TBD
/// </summary>
[Serializable]
[Obsolete("Query is not implemented.", true)]
public sealed class TaggedEventAppended : IDeadLetterSuppression
{
/// <summary>
Expand Down
Loading

0 comments on commit 797894c

Please sign in to comment.