-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update to latest releases #159
Conversation
src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj
Show resolved
Hide resolved
src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj
Outdated
Show resolved
Hide resolved
…h missing Tests not in the latest version of 1.4.11
…n_the_last_subscriber_left()
All in all, the codes looks good, just need some minor tweaks. |
Ok @Arkatufus. Can you direct me to what blocks of codes need tweaking? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get rid of the Timestamp
field. Ordering
already does this and the clock is set by the MongoDb server by default, which is a better practice than setting it on the client.
@eaba One last changes from my side: |
|
||
namespace Akka.Persistence.MongoDb.Tests | ||
{ | ||
class MongoDbAllCurrentEventsSpec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a spec you're still working on?
The current test failures with the "AllEventsSpec" - is this because the events are being returned out of order? I only briefly checked the error messages but haven't looked at the detailed logs. |
It is being caused by Message mismatch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good so far but we have some major changes that need to be made to the MongoDb queries. Avoid table scans and use the indices we have.
@@ -113,8 +110,6 @@ protected override void PreStart() | |||
|
|||
public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action<IPersistentRepresentation> recoveryCallback) | |||
{ | |||
NotifyNewPersistenceIdAdded(persistenceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) | ||
{ | ||
NotifyNewPersistenceIdAdded(persistenceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
} | ||
} | ||
|
||
internal sealed class LivePersistenceIdsPublisher : ActorPublisher<string>, IWithUnboundedStash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
} | ||
private async Task<long> CountTotalRows() | ||
{ | ||
return await Task.Run(() => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than do this, why now just use CountAsync
on the MongoDb driver itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have now deleted that...I only created it while I was trying to understand the issue with ReplayAllEventsAsyn
} | ||
|
||
private async Task<long> GetHighestOrdering() | ||
{ | ||
return await Task.Run(() => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as below - just use the MaxAsync
call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this query can be rewritten in a much less expensive way:
Akka.Persistence.MongoDB/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
Lines 82 to 85 in 38a0c49
var modelWithOrdering = new CreateIndexModel<JournalEntry>( | |
Builders<JournalEntry> | |
.IndexKeys | |
.Ascending(entry => entry.Ordering)); |
We have a sorted index built on top of the Ordering
column. Just select the maximum value from it. Don't do an entire table scan. That turns this into a O(1) query rather than an O(n) query.
@@ -400,7 +423,7 @@ private async Task SetHighSequenceId(IList<AtomicWrite> messages) | |||
SequenceNr = highSequenceId | |||
}; | |||
|
|||
await _metadataCollection.Value.ReplaceOneAsync(filter, metadataEntry, new UpdateOptions() { IsUpsert = true }); | |||
await _metadataCollection.Value.ReplaceOneAsync(filter, metadataEntry, new ReplaceOptions() { IsUpsert = true }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, since we are replacing the metadataentry outright
{ | ||
_allPersistenceIdSubscribers.Add(subscriber); | ||
} | ||
subscriber.Tell(new CurrentPersistenceIds(GetAllPersistenceIds())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good
readFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSeqNo)); | ||
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.Ordering); | ||
|
||
await _journalCollection.Value.Find(readFilter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
} | ||
|
||
subscriptions.Add(subscriber); | ||
} | ||
|
||
private IEnumerable<string> GetAllPersistenceIds() | ||
{ | ||
return _journalCollection.Value.AsQueryable() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this to query the _metadataCollection
instead - that will be a tremendously less expensive query than scanning the entire journal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
nice work @eaba |
We have Akka.Persistence.MongoDB plugin stuck on older releases of Akka.NET - this is because of some changes we made to the Akka.Persistence.TCK in Akka.NET v1.4.10.