Skip to content

Commit

Permalink
close akkadotnet#80 - fixed EventsByTag pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed Oct 4, 2019
1 parent 9215115 commit c6261af
Showing 1 changed file with 27 additions and 9 deletions.
36 changes: 27 additions & 9 deletions src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,35 +181,53 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per
/// <returns>TBD</returns>
private async Task<long> ReplayTaggedMessagesAsync(ReplayTaggedMessages replay)
{
/*
* NOTE: limit is used like a pagination value, not a cap on the amount
* of data returned by a query. This was at the root of https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/80
*/
// Limit allows only integer
var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max;
var fromSequenceNr = replay.FromOffset;
var toSequenceNr = replay.ToOffset;
var tag = replay.Tag;

// Do not replay messages if limit equal zero
if (limitValue == 0) return 0;

var builder = Builders<JournalEntry>.Filter;
var filter = builder.AnyEq(x => x.Tags, tag);
var seqNoFilter = builder.AnyEq(x => x.Tags, tag);
if (fromSequenceNr > 0)
filter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
seqNoFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
if (toSequenceNr != long.MaxValue)
filter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr));
seqNoFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr));


// Need to know what the highest seqNo of this query will be
// and return that as part of the RecoverySuccess message
var maxSeqNoEntry = await _journalCollection.Value.Find(seqNoFilter)
.SortByDescending(x => x.Ordering)
.Limit(1)
.SingleOrDefaultAsync();

if (maxSeqNoEntry == null)
return 0L; // recovered nothing

var maxOrderingId = maxSeqNoEntry.Ordering.Value;
var toSeqNo = Math.Min(toSequenceNr, maxOrderingId);

var readFilter = builder.AnyEq(x => x.Tags, tag);
if (fromSequenceNr > 0)
readFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
if (toSequenceNr != long.MaxValue)
readFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSeqNo));
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.Ordering);

long maxOrderingId = 0;
await _journalCollection.Value
.Find(filter)
.Find(readFilter)
.Sort(sort)
.Limit(limitValue)
.ForEachAsync(entry => {
var persistent = ToPersistenceRepresentation(entry, ActorRefs.NoSender);
foreach (var adapted in AdaptFromJournal(persistent))
replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, tag, entry.Ordering.Value),
ActorRefs.NoSender);
maxOrderingId = entry.Ordering.Value;
});

return maxOrderingId;
Expand Down

0 comments on commit c6261af

Please sign in to comment.