Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to latest releases #159

Merged
merged 30 commits into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bfe7f26
Update dependencies
eaba Nov 12, 2020
d545a03
Revert changes to Tests Project file
eaba Nov 13, 2020
80ff357
Merge branch 'dev' into dev
eaba Nov 13, 2020
4dd1e85
Merge branch 'dev' of https://github.com/eaba/Akka.Persistence.MongoD…
eaba Nov 13, 2020
b574679
Update AkkaVersion in commons.props. Out dated version references wit…
eaba Nov 13, 2020
79a9b22
Tests passed with 1.14.12 local package that had https://github.com/a…
eaba Nov 13, 2020
688e97c
Removed ReadJournal_should_deallocate_AllPersistenceIds_publisher_whe…
eaba Nov 13, 2020
a44b111
Changed to $(AkkaVersion)
eaba Nov 14, 2020
b713621
Updated to changes made in Akka.Persistence.TCK
eaba Nov 16, 2020
056db96
Fix xunit version
eaba Nov 16, 2020
2e2b01f
Made 'AddNewEventsSubscriber' private and deleted 'Timestamp' from Jo…
eaba Nov 17, 2020
27aa509
Removed comment
eaba Nov 17, 2020
f7e45c2
MongoDbAllEventsSpec
eaba Nov 17, 2020
f84cb07
Attempt to fix `MongoDbAllEventsSpec` and `MongoDbCurrentAllEventsSpe…
eaba Nov 17, 2020
6112831
Fix tests
eaba Nov 18, 2020
72cb4e3
stackoverflow link
eaba Nov 18, 2020
d5cf2e1
First Attempt to fix `Bug61FixSpec` fails on Linux machine
eaba Nov 18, 2020
16aa086
Copyright
eaba Nov 18, 2020
dde5736
Rename ActorSystemName to `MongoDbCurrentAllEventsSpec`
eaba Nov 18, 2020
967e738
Set Collection
eaba Nov 18, 2020
7ef4811
Comments added
eaba Nov 18, 2020
92f5465
Deleted staled code
eaba Nov 18, 2020
bcdbbd3
Fixed build errors caused by "deleting staled code"
eaba Nov 18, 2020
5ddf8be
Improve code
eaba Nov 18, 2020
d79b177
Improve filter creation blocks
eaba Nov 19, 2020
f524e29
First attempt at documenting "AllEvents" and "CurrentAllEvents"
eaba Nov 19, 2020
923a4ac
Added "MongoDbJournalPerfSpec"
eaba Nov 19, 2020
c2daaf9
Improve code
eaba Nov 19, 2020
74f2681
Don't use 'ReaderWriterLockSlim'. Make 'RemoveSubscriber' private
eaba Nov 24, 2020
698bce3
Removed unused fields
eaba Nov 26, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageReference Include="xunit.runner.visualstudio" Version="$(XunitVersion)" />
<PackageReference Include="xunit.runner.visualstudio" Version="$(XunitVersion)">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="xunit" Version="$(XunitVersion)" />
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageReference Include="FluentAssertions" Version="5.10.3" />
Expand Down
43 changes: 43 additions & 0 deletions src/Akka.Persistence.MongoDb.Tests/MongoDbPersistenceIdsSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
using Akka.Streams.TestKit;
using System.Linq;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Reflection;
using Reactive.Streams;

namespace Akka.Persistence.MongoDb.Tests
{
Expand Down Expand Up @@ -59,7 +62,47 @@ class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persis

return ConfigurationFactory.ParseString(specString);
}
/*public override async Task ReadJournal_should_deallocate_AllPersistenceIds_publisher_when_the_last_subscriber_left()
{
if (AllocatesAllPersistenceIDsPublisher)
{
var journal = ReadJournal.AsInstanceOf<IPersistenceIdsQuery>();

Setup("a", 1);
Setup("b", 1);

var source = journal.PersistenceIds();
var probe =
source.RunWith(this.SinkProbe<string>(), Materializer);
var probe2 =
source.RunWith(this.SinkProbe<string>(), Materializer);

var fieldInfo = journal.GetType()
.GetField("_persistenceIdsPublisher",
BindingFlags.NonPublic | BindingFlags.Instance);
Assert.True(fieldInfo != null);

// Assert that publisher is running.
probe.Within(TimeSpan.FromSeconds(10), () => probe.Request(10)
.ExpectNextUnordered("a", "b")
.ExpectNoMsg(TimeSpan.FromMilliseconds(200)));

probe.Cancel();

// Assert that publisher is still alive when it still have a subscriber
Assert.True(fieldInfo.GetValue(journal) is IPublisher<string>);

probe2.Within(TimeSpan.FromSeconds(10), () => probe2.Request(4)
.ExpectNextUnordered("a", "b")
.ExpectNoMsg(TimeSpan.FromMilliseconds(200)));

// Assert that publisher is de-allocated when the last subscriber left
probe2.Cancel();
await Task.Delay(400);
Assert.True(fieldInfo.GetValue(journal) is null);
}
}
*/
[Fact]
public void ReadJournal_ConcurrentMessaging_should_work()
{
Expand Down
3 changes: 2 additions & 1 deletion src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
<EmbeddedResource Include="reference.conf" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Akka.Persistence.Query" Version="$(AkkaVersion)" />
<PackageReference Include="Akka.Persistence.Query" Version="1.4.11" />
<PackageReference Include="akka.streams" Version="1.4.11" />
<PackageReference Include="MongoDB.Driver" Version="2.11.4" />
</ItemGroup>
</Project>
160 changes: 97 additions & 63 deletions src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,19 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Persistence.Journal;
using Akka.Persistence.MongoDb.Query;
using Akka.Streams;
using Akka.Streams.Dsl;
using MongoDB.Bson;
using Akka.Serialization;
using Akka.Util;
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Akka.Persistence.MongoDb.Journal
{
Expand All @@ -37,10 +34,11 @@ public class MongoDbJournal : AsyncWriteJournal

private readonly HashSet<string> _allPersistenceIds = new HashSet<string>();
private readonly HashSet<IActorRef> _allPersistenceIdSubscribers = new HashSet<IActorRef>();
private readonly Dictionary<string, ISet<IActorRef>> _tagSubscribers =
new Dictionary<string, ISet<IActorRef>>();
private readonly Dictionary<string, ISet<IActorRef>> _persistenceIdSubscribers
= new Dictionary<string, ISet<IActorRef>>();
private ImmutableDictionary<string, IImmutableSet<IActorRef>> _persistenceIdSubscribers = ImmutableDictionary.Create<string, IImmutableSet<IActorRef>>();
private ImmutableDictionary<string, IImmutableSet<IActorRef>> _tagSubscribers = ImmutableDictionary.Create<string, IImmutableSet<IActorRef>>();
private readonly HashSet<IActorRef> _newEventsSubscriber = new HashSet<IActorRef>();
private IImmutableDictionary<string, long> _tagSequenceNr = ImmutableDictionary<string, long>.Empty;


private readonly Akka.Serialization.Serialization _serialization;

Expand Down Expand Up @@ -113,8 +111,6 @@ protected override void PreStart()

public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action<IPersistentRepresentation> recoveryCallback)
{
NotifyNewPersistenceIdAdded(persistenceId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


// Limit allows only integer
var limitValue = max >= int.MaxValue ? int.MaxValue : (int)max;

Expand Down Expand Up @@ -205,7 +201,6 @@ await _journalCollection.Value

public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr)
{
NotifyNewPersistenceIdAdded(persistenceId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm


var builder = Builders<MetadataEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);
Expand Down Expand Up @@ -271,8 +266,6 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu

protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
{
NotifyNewPersistenceIdAdded(persistenceId);

var builder = Builders<JournalEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);

Expand Down Expand Up @@ -400,7 +393,7 @@ private async Task SetHighSequenceId(IList<AtomicWrite> messages)
SequenceNr = highSequenceId
};

await _metadataCollection.Value.ReplaceOneAsync(filter, metadataEntry, new UpdateOptions() { IsUpsert = true });
await _metadataCollection.Value.ReplaceOneAsync(filter, metadataEntry, new ReplaceOptions() { IsUpsert = true });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, since we are replacing the metadataentry outright

}

protected override bool ReceivePluginInternal(object message)
Expand All @@ -410,47 +403,83 @@ protected override bool ReceivePluginInternal(object message)
case ReplayTaggedMessages replay:
ReplayTaggedMessagesAsync(replay)
.PipeTo(replay.ReplyTo, success: h => new RecoverySuccess(h), failure: e => new ReplayMessagesFailure(e));
break;
return true;
case ReplayAllEvents replay:
ReplayAllEventsAsync(replay)
.PipeTo(replay.ReplyTo, success: h => new EventReplaySuccess(h),
failure: e => new EventReplayFailure(e));
return true;
case SubscribePersistenceId subscribe:
AddPersistenceIdSubscriber(Sender, subscribe.PersistenceId);
Context.Watch(Sender);
break;
case SubscribeAllPersistenceIds subscribe:
AddAllPersistenceIdSubscriber(Sender);
Context.Watch(Sender);
break;
return true;
case SelectCurrentPersistenceIds request:
SelectAllPersistenceIdsAsync(request.Offset)
.PipeTo(request.ReplyTo, success: result => new CurrentPersistenceIds(result.Ids, request.Offset));
return true;
case SubscribeTag subscribe:
AddTagSubscriber(Sender, subscribe.Tag);
Context.Watch(Sender);
break;
return true;
case SubscribeNewEvents _:
AddNewEventsSubscriber(Sender);
Context.Watch(Sender);
return true;
case Terminated terminated:
RemoveSubscriber(terminated.ActorRef);
break;
return true;
default:
return false;
}

return true;
}
public void AddNewEventsSubscriber(IActorRef subscriber)
{
_newEventsSubscriber.Add(subscriber);
}
protected virtual async Task<(IEnumerable<string> Ids, long LastOrdering)> SelectAllPersistenceIdsAsync(long offset)
{
var lastOrdering = await GetHighestPersistenceId();
return (GetAllPersistenceIds(), lastOrdering);
}

private void AddAllPersistenceIdSubscriber(IActorRef subscriber)
protected virtual async Task<long> ReplayAllEventsAsync(ReplayAllEvents replay)
{
lock (_allPersistenceIdSubscribers)
{
_allPersistenceIdSubscribers.Add(subscriber);
}
subscriber.Tell(new CurrentPersistenceIds(GetAllPersistenceIds()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good

// Limit allows only integer
var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max;

var maxOrdering = 0L;
var builder = Builders<JournalEntry>.Filter;
var filter = builder.Gte(x => x.SequenceNr, replay.FromOffset);
filter &= builder.Lte(x => x.SequenceNr, replay.ToOffset);
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.SequenceNr);
await _journalCollection.Value
.Find(filter)
.Sort(sort)
.Limit(limitValue)
.ForEachAsync(entry =>
{
var persistent = ToPersistenceRepresentation(entry, ActorRefs.NoSender);
foreach (var adapted in AdaptFromJournal(persistent))
{
var currentSequence = persistent.SequenceNr;
replay.ReplyTo.Tell(new ReplayedEvent(adapted, currentSequence), ActorRefs.NoSender);
if(currentSequence > maxOrdering)
maxOrdering = currentSequence;
}
});
return maxOrdering;
}

private void AddTagSubscriber(IActorRef subscriber, string tag)
{
if (!_tagSubscribers.TryGetValue(tag, out var subscriptions))
{
subscriptions = new HashSet<IActorRef>();
_tagSubscribers.Add(tag, subscriptions);
_tagSubscribers = _tagSubscribers.Add(tag, ImmutableHashSet.Create(subscriber));
}
else
{
_tagSubscribers = _tagSubscribers.SetItem(tag, subscriptions.Add(subscriber));
}

subscriptions.Add(subscriber);
}

private IEnumerable<string> GetAllPersistenceIds()
Expand All @@ -461,45 +490,50 @@ private IEnumerable<string> GetAllPersistenceIds()
.ToList();
}

private async Task<long> GetHighestPersistenceId()
{
return await Task.Run(() =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than do this, why now just use CountAsync on the MongoDb driver itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have now deleted that...I only created it while I was trying to understand the issue with ReplayAllEventsAsyn

{
return _journalCollection.Value.AsQueryable()
.Select(je => je.SequenceNr)
.Distinct()
.Max();
});
}

private void AddPersistenceIdSubscriber(IActorRef subscriber, string persistenceId)
{
if (!_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscriptions))
{
subscriptions = new HashSet<IActorRef>();
_persistenceIdSubscribers.Add(persistenceId, subscriptions);
_persistenceIdSubscribers = _persistenceIdSubscribers.Add(persistenceId, ImmutableHashSet.Create(subscriber));
}
else
{
_persistenceIdSubscribers = _persistenceIdSubscribers.SetItem(persistenceId, subscriptions.Add(subscriber));
}

subscriptions.Add(subscriber);
}

private void RemoveSubscriber(IActorRef subscriber)
/// <summary>
/// TBD
/// </summary>
/// <param name="subscriber">TBD</param>
public void RemoveSubscriber(IActorRef subscriber)
{
var pidSubscriptions = _persistenceIdSubscribers.Values.Where(x => x.Contains(subscriber));
foreach (var subscription in pidSubscriptions)
subscription.Remove(subscriber);
_persistenceIdSubscribers = _persistenceIdSubscribers.SetItems(_persistenceIdSubscribers
.Where(kv => kv.Value.Contains(subscriber))
.Select(kv => new KeyValuePair<string, IImmutableSet<IActorRef>>(kv.Key, kv.Value.Remove(subscriber))));

var tagSubscriptions = _tagSubscribers.Values.Where(x => x.Contains(subscriber));
foreach (var subscription in tagSubscriptions)
subscription.Remove(subscriber);
_tagSubscribers = _tagSubscribers.SetItems(_tagSubscribers
.Where(kv => kv.Value.Contains(subscriber))
.Select(kv => new KeyValuePair<string, IImmutableSet<IActorRef>>(kv.Key, kv.Value.Remove(subscriber))));

_allPersistenceIdSubscribers.Remove(subscriber);
_newEventsSubscriber.Remove(subscriber);
}

protected bool HasAllPersistenceIdSubscribers => _allPersistenceIdSubscribers.Count != 0;
protected bool HasTagSubscribers => _tagSubscribers.Count != 0;
protected bool HasPersistenceIdSubscribers => _persistenceIdSubscribers.Count != 0;

private void NotifyNewPersistenceIdAdded(string persistenceId)
{
var isNew = TryAddPersistenceId(persistenceId);
if (isNew && HasAllPersistenceIdSubscribers)
{
var added = new PersistenceIdAdded(persistenceId);
foreach (var subscriber in _allPersistenceIdSubscribers)
subscriber.Tell(added);
}
}

private bool TryAddPersistenceId(string persistenceId)
{
lock (_allPersistenceIds)
Expand Down
Loading