Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Persistence.MongoDb v1.4.17 Release #189

Merged
merged 9 commits into from
Mar 18, 2021
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Both journal and snapshot store share the same configuration keys (however they
```hocon
akka.persistence {
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
# qualified type name of the MongoDb persistence journal actor
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
Expand Down Expand Up @@ -58,6 +59,7 @@ akka.persistence {
}

snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
# qualified type name of the MongoDB persistence snapshot actor
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
Expand Down
5 changes: 3 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#### 1.4.14 January 13 2021 ####
#### 1.4.17 March 17 2021 ####

* Bump [Akka.NET version to 1.4.14](https://github.com/akkadotnet/akka.net/releases/tag/1.4.14), which adds the `Timestamp` to the `EventEnvelope` data structure - so it can be used for sorting / ordering after the fact in an Akka.Persistence.Query
* Bump [Akka.NET version to 1.4.17](https://github.com/akkadotnet/akka.net/releases/tag/1.4.17)
* [Resolve MongoDb write atomicity issues on write by not updating metadata collection](https://github.com/akkadotnet/Akka.Persistence.MongoDB/pull/186) - this is an important change that makes all writes atomic for an individual persistentId in MongoDb. We don't update the meta-data collection on write anymore - it's only done when the most recent items in the journal are deleted, and thus we store the highest recorded sequence number in the meta-data collection during deletes. All of the Akka.Persistence.Sql plugins operate this way as well.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
</PackageReference>
<PackageReference Include="xunit" Version="$(XunitVersion)" />
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
<PackageReference Include="Mongo2Go" Version="2.2.16" />
<PackageReference Include="System.Net.NetworkInformation" Version="4.3.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
<ItemGroup>
<PackageReference Include="Akka.Persistence.Query" Version="$(AkkaVersion)" />
<PackageReference Include="akka.streams" Version="$(AkkaVersion)" />
<PackageReference Include="MongoDB.Driver" Version="2.11.5" />
<PackageReference Include="MongoDB.Driver" Version="2.12.0" />
</ItemGroup>
</Project>
54 changes: 32 additions & 22 deletions src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,15 @@ public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId
var builder = Builders<MetadataEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);

//Following the SqlJournal implementation
//I have tried MongoDb lookup query and that caused some deadlocks in some tests!

var metadataHighestSequenceNr = await _metadataCollection.Value.Find(filter).Project(x => x.SequenceNr).FirstOrDefaultAsync();
var metadataHighestSequenceNrTask = _metadataCollection.Value.Find(filter).Project(x => x.SequenceNr).FirstOrDefaultAsync();

//var journalHighestSequenceNr = await _journalCollection.Value.Find(Builders<JournalEntry>.Filter.Eq(x => x.PersistenceId, persistenceId)).Project(x => x.SequenceNr).FirstOrDefaultAsync();
var journalHighestSequenceNrTask = _journalCollection.Value.Find(Builders<JournalEntry>.Filter.Eq(x => x.PersistenceId, persistenceId)).Project(x => x.SequenceNr).FirstOrDefaultAsync();

//if (metadataHighestSequenceNr > journalHighestSequenceNr)
//return metadataHighestSequenceNr;
// journal data is usually good enough, except in cases when it's been deleted.
await Task.WhenAll(metadataHighestSequenceNrTask, journalHighestSequenceNrTask);

//return journalHighestSequenceNr;
return metadataHighestSequenceNr;
return Math.Max(journalHighestSequenceNrTask.Result, metadataHighestSequenceNrTask.Result);
}

protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
Expand Down Expand Up @@ -252,8 +249,6 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu
persistentIds.Add(message.PersistenceId);
});

await SetHighSequenceId(messageList);

var result = await Task<IImmutableList<Exception>>
.Factory
.ContinueWhenAll(writeTasks.ToArray(),
Expand Down Expand Up @@ -289,15 +284,25 @@ private void NotifyNewEventAppended()
}
}
}
protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
{
var builder = Builders<JournalEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);

// read highest sequence number before we start
var highestSeqNo = await ReadHighestSequenceNrAsync(persistenceId, 0L);

if (toSequenceNr != long.MaxValue)
filter &= builder.Lte(x => x.SequenceNr, toSequenceNr);

return _journalCollection.Value.DeleteManyAsync(filter);
// only update the sequence number of the top of the journal
// is about to be deleted.
if (highestSeqNo <= toSequenceNr)
{
await SetHighSequenceId(persistenceId, highestSeqNo);
}

await _journalCollection.Value.DeleteManyAsync(filter);
}

private JournalEntry ToJournalEntry(IPersistentRepresentation message)
Expand Down Expand Up @@ -431,18 +436,16 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen

}

private async Task SetHighSequenceId(IList<AtomicWrite> messages)
private async Task SetHighSequenceId(string persistenceId, long maxSeqNo)
{
var persistenceId = messages.Select(c => c.PersistenceId).First();
var highSequenceId = messages.Max(c => c.HighestSequenceNr);
var builder = Builders<MetadataEntry>.Filter;
var filter = builder.Eq(x => x.PersistenceId, persistenceId);

var metadataEntry = new MetadataEntry
{
Id = persistenceId,
PersistenceId = persistenceId,
SequenceNr = highSequenceId
SequenceNr = maxSeqNo
};

await _metadataCollection.Value.ReplaceOneAsync(filter, metadataEntry, new ReplaceOptions() { IsUpsert = true });
Expand Down Expand Up @@ -489,9 +492,9 @@ private void AddNewEventsSubscriber(IActorRef subscriber)
_newEventsSubscriber.Add(subscriber);
}
protected virtual async Task<(IEnumerable<string> Ids, long LastOrdering)> SelectAllPersistenceIdsAsync(long offset)
{
{
var ids = await GetAllPersistenceIds(offset);
var lastOrdering = await GetHighestOrdering();
var ids = await GetAllPersistenceIds();
return (ids, lastOrdering);
}

Expand Down Expand Up @@ -556,17 +559,24 @@ private void AddTagSubscriber(IActorRef subscriber, string tag)
}
}

private async Task<IEnumerable<string>> GetAllPersistenceIds()
private async Task<IEnumerable<string>> GetAllPersistenceIds(long offset)
{
var ids = await _metadataCollection.Value.Find(_=> true).ToListAsync();
return ids.Distinct().Select(x => x.PersistenceId);
var ids = await _journalCollection.Value
.DistinctAsync(x => x.PersistenceId, entry => entry.Ordering > new BsonTimestamp(offset));

var hashset = new List<string>();
while (await ids.MoveNextAsync())
{
hashset.AddRange(ids.Current);
}
return hashset;
}

private async Task<long> GetHighestOrdering()
{
var max = await _journalCollection.Value.AsQueryable()
.Select(je => je.Ordering)
.Distinct().MaxAsync();
.MaxAsync();

return max.Value;
}
Expand Down
5 changes: 2 additions & 3 deletions src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
</PropertyGroup>
<PropertyGroup>
<XunitVersion>2.4.1</XunitVersion>
<TestSdkVersion>16.8.3</TestSdkVersion>
<AkkaVersion>1.4.14</AkkaVersion>
<FluentAssertionsVersion>4.14.0</FluentAssertionsVersion>
<TestSdkVersion>16.9.1</TestSdkVersion>
<AkkaVersion>1.4.17</AkkaVersion>
</PropertyGroup>
<!-- SourceLink support for all Akka.NET projects -->
<ItemGroup>
Expand Down