Skip to content

Commit

Permalink
added reproduction spec for #80 (#84)
Browse files Browse the repository at this point in the history
* added reproduction spec for #80

* close #80 - fixed EventsByTag pagination

* added an additional spec
  • Loading branch information
Aaronontheweb authored Oct 4, 2019
1 parent 67eb3d8 commit e3fecc9
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 9 deletions.
41 changes: 41 additions & 0 deletions src/Akka.Persistence.MongoDb.Tests/Bug61FixSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,47 @@ public async Task Bug61_Events_Recovered_By_Id_Should_Match_Tag()
eventsByTag.All(x => x.Event is RealMsg).Should().BeTrue("Expected all events by tag to be RealMsg");
}

/// <summary>
/// Reproduction spec for https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/80
/// </summary>
[Fact]
public void Bug80_CurrentEventsByTag_should_Recover_until_end()
{
var actor = Sys.ActorOf(TagActor.Props("y"));
var msgCount = 1200;
actor.Tell(msgCount);
ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20));

var eventsByTag = ReadJournal.CurrentEventsByTag(typeof(RealMsg).Name)
.RunForeach(e => TestActor.Tell(e), Materializer);

ReceiveN(msgCount);
}

/// <summary>
/// Making sure EventsByTag didn't break during implementation of https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/80
/// </summary>
[Fact]
public void Bug80_AllEventsByTag_should_Recover_all_messages()
{
var actor = Sys.ActorOf(TagActor.Props("y"));
var msgCount = 1200;
actor.Tell(msgCount);
ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20));

var eventsByTag = ReadJournal.EventsByTag(typeof(RealMsg).Name)
.RunForeach(e => TestActor.Tell(e), Materializer);

// can't do this because Offset isn't IComparable
// ReceiveN(msgCount).Cast<EventEnvelope>().Select(x => x.Offset).Should().BeInAscendingOrder();
ReceiveN(msgCount);

// should receive more messages after the fact
actor.Tell(msgCount);
ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20));
ReceiveN(msgCount);
}

private class TagActor : ReceivePersistentActor
{
public static Props Props(string id)
Expand Down
36 changes: 27 additions & 9 deletions src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,35 +181,53 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per
/// <returns>TBD</returns>
private async Task<long> ReplayTaggedMessagesAsync(ReplayTaggedMessages replay)
{
/*
* NOTE: limit is used like a pagination value, not a cap on the amount
* of data returned by a query. This was at the root of https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/80
*/
// Limit allows only integer
var limitValue = replay.Max >= int.MaxValue ? int.MaxValue : (int)replay.Max;
var fromSequenceNr = replay.FromOffset;
var toSequenceNr = replay.ToOffset;
var tag = replay.Tag;

// Do not replay messages if limit equal zero
if (limitValue == 0) return 0;

var builder = Builders<JournalEntry>.Filter;
var filter = builder.AnyEq(x => x.Tags, tag);
var seqNoFilter = builder.AnyEq(x => x.Tags, tag);
if (fromSequenceNr > 0)
filter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
seqNoFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
if (toSequenceNr != long.MaxValue)
filter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr));
seqNoFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr));


// Need to know what the highest seqNo of this query will be
// and return that as part of the RecoverySuccess message
var maxSeqNoEntry = await _journalCollection.Value.Find(seqNoFilter)
.SortByDescending(x => x.Ordering)
.Limit(1)
.SingleOrDefaultAsync();

if (maxSeqNoEntry == null)
return 0L; // recovered nothing

var maxOrderingId = maxSeqNoEntry.Ordering.Value;
var toSeqNo = Math.Min(toSequenceNr, maxOrderingId);

var readFilter = builder.AnyEq(x => x.Tags, tag);
if (fromSequenceNr > 0)
readFilter &= builder.Gt(x => x.Ordering, new BsonTimestamp(fromSequenceNr));
if (toSequenceNr != long.MaxValue)
readFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSeqNo));
var sort = Builders<JournalEntry>.Sort.Ascending(x => x.Ordering);

long maxOrderingId = 0;
await _journalCollection.Value
.Find(filter)
.Find(readFilter)
.Sort(sort)
.Limit(limitValue)
.ForEachAsync(entry => {
var persistent = ToPersistenceRepresentation(entry, ActorRefs.NoSender);
foreach (var adapted in AdaptFromJournal(persistent))
replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, tag, entry.Ordering.Value),
ActorRefs.NoSender);
maxOrderingId = entry.Ordering.Value;
});

return maxOrderingId;
Expand Down

0 comments on commit e3fecc9

Please sign in to comment.