Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to latest releases #159

Merged
merged 30 commits into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bfe7f26
Update dependencies
eaba Nov 12, 2020
d545a03
Revert changes to Tests Project file
eaba Nov 13, 2020
80ff357
Merge branch 'dev' into dev
eaba Nov 13, 2020
4dd1e85
Merge branch 'dev' of https://github.com/eaba/Akka.Persistence.MongoD…
eaba Nov 13, 2020
b574679
Update AkkaVersion in commons.props. Out dated version references wit…
eaba Nov 13, 2020
79a9b22
Tests passed with 1.14.12 local package that had https://github.com/a…
eaba Nov 13, 2020
688e97c
Removed ReadJournal_should_deallocate_AllPersistenceIds_publisher_whe…
eaba Nov 13, 2020
a44b111
Changed to $(AkkaVersion)
eaba Nov 14, 2020
b713621
Updated to changes made in Akka.Persistence.TCK
eaba Nov 16, 2020
056db96
Fix xunit version
eaba Nov 16, 2020
2e2b01f
Made 'AddNewEventsSubscriber' private and deleted 'Timestamp' from Jo…
eaba Nov 17, 2020
27aa509
Removed comment
eaba Nov 17, 2020
f7e45c2
MongoDbAllEventsSpec
eaba Nov 17, 2020
f84cb07
Attempt to fix `MongoDbAllEventsSpec` and `MongoDbCurrentAllEventsSpe…
eaba Nov 17, 2020
6112831
Fix tests
eaba Nov 18, 2020
72cb4e3
stackoverflow link
eaba Nov 18, 2020
d5cf2e1
First Attempt to fix `Bug61FixSpec` fails on Linux machine
eaba Nov 18, 2020
16aa086
Copyright
eaba Nov 18, 2020
dde5736
Rename ActorSystemName to `MongoDbCurrentAllEventsSpec`
eaba Nov 18, 2020
967e738
Set Collection
eaba Nov 18, 2020
7ef4811
Comments added
eaba Nov 18, 2020
92f5465
Deleted staled code
eaba Nov 18, 2020
bcdbbd3
Fixed build errors caused by "deleting staled code"
eaba Nov 18, 2020
5ddf8be
Improve code
eaba Nov 18, 2020
d79b177
Improve filter creation blocks
eaba Nov 19, 2020
f524e29
First attempt at documenting "AllEvents" and "CurrentAllEvents"
eaba Nov 19, 2020
923a4ac
Added "MongoDbJournalPerfSpec"
eaba Nov 19, 2020
c2daaf9
Improve code
eaba Nov 19, 2020
74f2681
Don't use 'ReaderWriterLockSlim'. Make 'RemoveSubscriber' private
eaba Nov 24, 2020
698bce3
Removed unused fields
eaba Nov 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions src/Akka.Persistence.MongoDb.Tests/MongoDbAllCurrentEventsSpec.cs

This file was deleted.

7 changes: 2 additions & 5 deletions src/Akka.Persistence.MongoDb.Tests/MongoDbAllEventsSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class MongoDbAllEventsSpec: AllEventsSpec, IClassFixture<DatabaseFixture>
{
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
{
// akka.test.single-expect-default = 10s
var specString = @"
akka.test.single-expect-default = 10s
akka.persistence {
Expand All @@ -37,13 +38,9 @@ class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persis
return ConfigurationFactory.ParseString(specString);
}
public static readonly AtomicCounter Counter = new AtomicCounter(0);
private readonly ITestOutputHelper _output;

public MongoDbAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture)
: base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbAllEventsSpec", output)
public MongoDbAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbAllEventsSpec", output)
{
_output = output;
output.WriteLine(databaseFixture.ConnectionString + Counter.Current);
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);
}
}
Expand Down
49 changes: 49 additions & 0 deletions src/Akka.Persistence.MongoDb.Tests/MongoDbCurrentAllEventsSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using Akka.Configuration;
using Akka.Persistence.MongoDb.Query;
using Akka.Persistence.Query;
using Akka.Persistence.TCK.Query;
using Akka.Util.Internal;
using System;
using System.Collections.Generic;
using System.Text;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.MongoDb.Tests
{
public class MongoDbCurrentAllEventsSpec : CurrentAllEventsSpec, IClassFixture<DatabaseFixture>
{
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
{
// akka.test.single-expect-default = 10s
var specString = @"
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + id + @"""
auto-initialize = on
collection = ""EventJournal""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
refresh-interval = 1s
}
}
}";

return ConfigurationFactory.ParseString(specString);
}
public static readonly AtomicCounter Counter = new AtomicCounter(0);

public MongoDbCurrentAllEventsSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), "MongoDbAllEventsSpec", output)
{
ReadJournal = Sys.ReadJournalFor<MongoDbReadJournal>(MongoDbReadJournal.Identifier);
}
}
}
17 changes: 17 additions & 0 deletions src/Akka.Persistence.MongoDb/Auto/AutoSequence.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;

namespace Akka.Persistence.MongoDb.Auto
{
public class AutoSequence
{
[BsonId]
[BsonRepresentation(BsonType.ObjectId)]
[BsonElement("_id")]
public string Id { get; set; }

public string SequenceName { get; set; }

public int SequenceValue { get; set; }
}
}
26 changes: 26 additions & 0 deletions src/Akka.Persistence.MongoDb/Auto/SequenceRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using MongoDB.Driver;

namespace Akka.Persistence.MongoDb.Auto
{
public class SequenceRepository
{
protected readonly IMongoDatabase _database;
protected readonly IMongoCollection<AutoSequence> _collection;

public SequenceRepository(IMongoDatabase database)
{
_database = database;
_collection = _database.GetCollection<AutoSequence>(typeof(AutoSequence).Name);
}

public long GetSequenceValue(string sequenceName)
{
var filter = Builders<AutoSequence>.Filter.Eq(s => s.SequenceName, sequenceName);
var update = Builders<AutoSequence>.Update.Inc(s => s.SequenceValue, 1);

var result = _collection.FindOneAndUpdate(filter, update, new FindOneAndUpdateOptions<AutoSequence, AutoSequence> { IsUpsert = true, ReturnDocument = ReturnDocument.After });

return result.SequenceValue;
}
}
}
2 changes: 1 addition & 1 deletion src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class JournalEntry
public string Manifest { get; set; }

[BsonElement("Ordering")]
public BsonTimestamp Ordering { get; set; }
public long Ordering { get; set; }

[BsonElement("Tags")]
public ICollection<string> Tags { get; set; } = new HashSet<string>();
Expand Down
62 changes: 38 additions & 24 deletions src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

using Akka.Actor;
using Akka.Persistence.Journal;
using Akka.Persistence.MongoDb.Auto;
using Akka.Persistence.MongoDb.Query;
using Akka.Streams.Dsl;
using Akka.Util;
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
Expand All @@ -31,6 +31,7 @@ public class MongoDbJournal : AsyncWriteJournal
private Lazy<IMongoDatabase> _mongoDatabase;
private Lazy<IMongoCollection<JournalEntry>> _journalCollection;
private Lazy<IMongoCollection<MetadataEntry>> _metadataCollection;
private SequenceRepository _sequenceRepository;

private readonly HashSet<string> _allPersistenceIds = new HashSet<string>();
private readonly HashSet<IActorRef> _allPersistenceIdSubscribers = new HashSet<IActorRef>();
Expand Down Expand Up @@ -61,7 +62,6 @@ protected override void PreStart()

return client.GetDatabase(connectionString.DatabaseName);
});

_journalCollection = new Lazy<IMongoCollection<JournalEntry>>(() =>
{
var collection = _mongoDatabase.Value.GetCollection<JournalEntry>(_settings.Collection);
Expand Down Expand Up @@ -107,6 +107,7 @@ protected override void PreStart()

return collection;
});
_sequenceRepository = new SequenceRepository(_mongoDatabase.Value);
}

public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action<IPersistentRepresentation> recoveryCallback)
Expand Down Expand Up @@ -160,9 +161,9 @@ private async Task<long> ReplayTaggedMessagesAsync(ReplayTaggedMessages replay)
var builder = Builders<JournalEntry>.Filter;
var seqNoFilter = builder.AnyEq(x => x.Tags, tag);
if (fromSequenceNr > 0)
seqNoFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
seqNoFilter &= builder.Gt(x => x.Ordering, fromSequenceNr);
if (toSequenceNr != long.MaxValue)
seqNoFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr));
seqNoFilter &= builder.Lte(x => x.Ordering, toSequenceNr);


// Need to know what the highest seqNo of this query will be
Expand All @@ -175,14 +176,14 @@ private async Task<long> ReplayTaggedMessagesAsync(ReplayTaggedMessages replay)
if (maxSeqNoEntry == null)
return 0L; // recovered nothing

var maxOrderingId = maxSeqNoEntry.Ordering.Value;
var maxOrderingId = maxSeqNoEntry.Ordering;
var toSeqNo = Math.Min(toSequenceNr, maxOrderingId);

var readFilter = builder.AnyEq(x => x.Tags, tag);
if (fromSequenceNr > 0)
readFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
readFilter &= builder.Gt(x => x.Ordering, fromSequenceNr);
if (toSequenceNr != long.MaxValue)
readFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSeqNo));
readFilter &= builder.Lte(x => x.Ordering, toSeqNo);
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.Ordering);

await _journalCollection.Value
Expand All @@ -193,7 +194,7 @@ await _journalCollection.Value
{
var persistent = ToPersistenceRepresentation(entry, ActorRefs.NoSender);
foreach (var adapted in AdaptFromJournal(persistent))
replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, tag, entry.Ordering.Value),
replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, tag, entry.Ordering),
ActorRefs.NoSender);
});

Expand Down Expand Up @@ -319,7 +320,7 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message)
return new JournalEntry
{
Id = message.PersistenceId + "_" + message.SequenceNr,
Ordering = new BsonTimestamp(0), // Auto-populates with timestamp
Ordering = _sequenceRepository.GetSequenceValue("journalentry"), // Auto-populates with timestamp
IsDeleted = message.IsDeleted,
Payload = payload,
PersistenceId = message.PersistenceId,
Expand All @@ -338,7 +339,7 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message)
return new JournalEntry
{
Id = message.PersistenceId + "_" + message.SequenceNr,
Ordering = new BsonTimestamp(0), // Auto-populates with timestamp
Ordering = _sequenceRepository.GetSequenceValue("journalentry"), // Auto-populates with timestamp
IsDeleted = message.IsDeleted,
Payload = binary,
PersistenceId = message.PersistenceId,
Expand Down Expand Up @@ -475,27 +476,32 @@ protected virtual async Task<long> ReplayAllEventsAsync(ReplayAllEvents replay)
{
// Limit allows only integer
var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max;

var maxOrdering = 0L;
var take = Math.Min(replay.ToOffset - replay.FromOffset, replay.Max);
var maxOrdering = await GetHighestOrdering();
var builder = Builders<JournalEntry>.Filter;
var filter = builder.Gte(x => x.SequenceNr, replay.FromOffset);
filter &= builder.Lte(x => x.SequenceNr, replay.ToOffset);
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.SequenceNr);
await _journalCollection.Value
var filter = builder.Gt(x => x.Ordering, replay.FromOffset);
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.Ordering);

var j = await _journalCollection.Value
.Find(filter)
.Sort(sort)
.Limit(limitValue)
.Limit(limitValue).ToListAsync();/*
.ForEachAsync(entry =>
{
var persistent = ToPersistenceRepresentation(entry, ActorRefs.NoSender);
foreach (var adapted in AdaptFromJournal(persistent))
{
var currentSequence = persistent.SequenceNr;
replay.ReplyTo.Tell(new ReplayedEvent(adapted, currentSequence), ActorRefs.NoSender);
if(currentSequence > maxOrdering)
maxOrdering = currentSequence;
replay.ReplyTo.Tell(new ReplayedEvent(adapted, entry.Ordering), ActorRefs.NoSender);
}
});
});*/
foreach(var t in j)
{
var persistent = ToPersistenceRepresentation(t, ActorRefs.NoSender);
foreach (var adapted in AdaptFromJournal(persistent))
{
replay.ReplyTo.Tell(new ReplayedEvent(adapted, t.Ordering), ActorRefs.NoSender);
}
}
return maxOrdering;
}

Expand Down Expand Up @@ -524,12 +530,20 @@ private async Task<long> GetHighestOrdering()
return await Task.Run(() =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as below - just use the MaxAsync call

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this query can be rewritten in a much less expensive way:

var modelWithOrdering = new CreateIndexModel<JournalEntry>(
Builders<JournalEntry>
.IndexKeys
.Ascending(entry => entry.Ordering));

We have a sorted index built on top of the Ordering column. Just select the maximum value from it. Don't do an entire table scan. That turns this into a O(1) query rather than an O(n) query.

{
return _journalCollection.Value.AsQueryable()
.Select(je => je.SequenceNr)
.Select(je => je.Ordering)
.Distinct()
.Max();
});
}

private async Task<long> CountTotalRows()
{
return await Task.Run(() =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than do this, why now just use CountAsync on the MongoDb driver itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have now deleted that...I only created it while I was trying to understand the issue with ReplayAllEventsAsyn

{
return _journalCollection.Value.AsQueryable()
.Select(je => je.SequenceNr)
.Count();
});
}
private void AddPersistenceIdSubscriber(IActorRef subscriber, string persistenceId)
{
if (!_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscriptions))
Expand Down
10 changes: 7 additions & 3 deletions src/Akka.Persistence.MongoDb/Query/DeliveryBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ public void AddRange(IEnumerable<T> elements)

public void DeliverBuffer(long demand)
{
if (!Buffer.IsEmpty && demand > 0) {
if (!Buffer.IsEmpty && demand > 0)
{
var totalDemand = Math.Min((int)demand, Buffer.Length);
if (Buffer.Length == 1) {
if (Buffer.Length == 1)
{
// optimize for this common case
_onNext(Buffer[0]);
Buffer = ImmutableArray<T>.Empty;
}
else if (demand <= int.MaxValue) {
else if (demand <= int.MaxValue)
{
for (var i = 0; i < totalDemand; i++)
_onNext(Buffer[i]);

Expand All @@ -55,6 +58,7 @@ public void DeliverBuffer(long demand)
Buffer = ImmutableArray<T>.Empty;
}
}

}

}
Expand Down