Skip to content

Commit

Permalink
Event envelope updates (#170)
Browse files Browse the repository at this point in the history
* Bump AkkaVersion to 1.4.14
* Add FluentAssertionsVersion to Common.Props and set the version to 4.14.0
* Backwards compatibility for akkadotnet/akka.net#4680
  • Loading branch information
eaba authored Jan 7, 2021
1 parent 4bd8ca1 commit ac1da6a
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</PackageReference>
<PackageReference Include="xunit" Version="$(XunitVersion)" />
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageReference Include="FluentAssertions" Version="5.10.3" />
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
<PackageReference Include="Mongo2Go" Version="2.2.16" />
<PackageReference Include="System.Net.NetworkInformation" Version="4.3.0" />
</ItemGroup>
Expand Down
8 changes: 5 additions & 3 deletions src/Akka.Persistence.MongoDb.Tests/Bug61FixSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public async Task Bug61_Events_Recovered_By_Id_Should_Match_Tag()
public void Bug80_CurrentEventsByTag_should_Recover_until_end()
{
var actor = Sys.ActorOf(TagActor.Props("y"));
var msgCount = 1200;
//increased this to test for non-collision with the generated timestamps
var msgCount = 5000;
actor.Tell(msgCount);
ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20));

Expand All @@ -96,7 +97,8 @@ public void Bug80_CurrentEventsByTag_should_Recover_until_end()
public void Bug80_AllEventsByTag_should_Recover_all_messages()
{
var actor = Sys.ActorOf(TagActor.Props("y"));
var msgCount = 1200;
//increased this to test for non-collision with the generated timestamps
var msgCount = 5000;
actor.Tell(msgCount);
ExpectMsg($"{msgCount}-done", TimeSpan.FromSeconds(20));

Expand Down Expand Up @@ -176,7 +178,7 @@ public object ToJournal(object evt)
private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id)
{
var specString = @"
akka.test.single-expect-default = 3s
akka.test.single-expect-default = 10s
akka.persistence {
publish-plugin-commands = on
journal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbCurrentEventsByPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
public class MongoDbCurrentEventsByPersistenceIdsSpec : TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
{
public static readonly AtomicCounter Counter = new AtomicCounter(0);
private readonly ITestOutputHelper _output;
Expand Down
1 change: 0 additions & 1 deletion src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class JournalEntry
[BsonElement("Manifest")]
public string Manifest { get; set; }


[BsonElement("Ordering")]
public BsonTimestamp Ordering { get; set; }

Expand Down
24 changes: 15 additions & 9 deletions src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,7 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message)
return new JournalEntry
{
Id = message.PersistenceId + "_" + message.SequenceNr,
//Ordering = _sequenceRepository.GetSequenceValue("journalentry"),
Ordering = new BsonTimestamp(0), // Auto-populates with timestamp
//Timestamp = new BsonTimestamp(0),
IsDeleted = message.IsDeleted,
Payload = payload,
PersistenceId = message.PersistenceId,
Expand All @@ -338,9 +336,7 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message)
return new JournalEntry
{
Id = message.PersistenceId + "_" + message.SequenceNr,
//Ordering = _sequenceRepository.GetSequenceValue("journalentry"),
Ordering = new BsonTimestamp(0), // Auto-populates with timestamp
//Timestamp = new BsonTimestamp(0),
IsDeleted = message.IsDeleted,
Payload = binary,
PersistenceId = message.PersistenceId,
Expand All @@ -363,14 +359,24 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen
entry.PersistenceId,
manifest,
entry.IsDeleted,
sender);
sender,
timestamp: entry.Ordering.Timestamp);
}

var legacy = entry.SerializerId.HasValue || !string.IsNullOrEmpty(entry.Manifest);
if (!legacy)
{
var ser = _serialization.FindSerializerForType(typeof(Persistent));
return ser.FromBinary<Persistent>((byte[]) entry.Payload);
var output = ser.FromBinary<Persistent>((byte[])entry.Payload);

// backwards compatibility for https://github.com/akkadotnet/akka.net/pull/4680
// it the timestamp is not defined in the binary payload
if (output.Timestamp == 0L)
{
output = (Persistent)output.WithTimestamp(entry.Ordering.Timestamp);
}

return output;
}

int? serializerId = null;
Expand All @@ -396,14 +402,14 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen
}

if (deserialized is Persistent p)
return p;
return (Persistent)p.WithTimestamp(entry.Ordering.Timestamp);

return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender);
return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender, timestamp: entry.Ordering.Timestamp);
}
else // backwards compat for object serialization - Payload was already deserialized by BSON
{
return new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest,
entry.IsDeleted, sender);
entry.IsDeleted, sender, timestamp: entry.Ordering.Timestamp);
}

}
Expand Down
1 change: 1 addition & 0 deletions src/Akka.Persistence.MongoDb/Query/AllEventsPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ protected bool Replaying(object message)
offset: new Sequence(replayed.Offset),
persistenceId: replayed.Persistent.PersistenceId,
sequenceNr: replayed.Persistent.SequenceNr,
timestamp: replayed.Persistent.Timestamp,
@event: replayed.Persistent.Payload));

CurrentOffset = replayed.Offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ protected Receive Replaying(int limit)
offset: new Sequence(seqNr),
persistenceId: PersistenceId,
sequenceNr: seqNr,
timestamp: replayed.Persistent.Timestamp,
@event: replayed.Persistent.Payload));
CurrentSequenceNr = seqNr + 1;
Buffer.DeliverBuffer(TotalDemand);
Expand Down
1 change: 1 addition & 0 deletions src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ protected Receive Replaying(int limit)
offset: new Sequence(replayed.Offset),
persistenceId: replayed.Persistent.PersistenceId,
sequenceNr: replayed.Persistent.SequenceNr,
timestamp: replayed.Persistent.Timestamp,
@event: replayed.Persistent.Payload));

CurrentOffset = replayed.Offset;
Expand Down
14 changes: 10 additions & 4 deletions src/common.props
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
<Project>
<PropertyGroup>
<Copyright>Copyright © 2013-2020 Akka.NET Project</Copyright>
<Copyright>Copyright © 2013-2021 Akka.NET Project</Copyright>
<Authors>Akka.NET Contrib</Authors>
<VersionPrefix>1.4.1</VersionPrefix>
<VersionPrefix>1.4.14</VersionPrefix>
<PackageIconUrl>http://getakka.net/images/akkalogo.png</PackageIconUrl>
<PackageProjectUrl>https://github.com/akkadotnet/Akka.Persistence.MongoDB</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/akkadotnet/Akka.Persistence.MongoDB/blob/master/LICENSE.md</PackageLicenseUrl>
<PackageReleaseNotes>Bump Akka version to 1.4.1</PackageReleaseNotes>
<PackageReleaseNotes>
Bump Akka version to 1.4.14
Corrected `CurrentPersistentIds` query and `AllPersistentIds` queries to be more memory efficient and query entity ID data directly from Mongo
Introduced `AllEvents` and `CurrentEvents` query to read the entire MongoDb journal
Deprecated previous `GetMaxSeqNo` behavior - we no longer query the max sequence number directly from the journal AND the metadata collection. We only get that data directly from the metadata collection itself, which should make this query an O(1) operation rather than O(n)
</PackageReleaseNotes>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Description>Akka Persistence journal and snapshot store backed by MongoDB database.</Description>
<NoWarn>$(NoWarn);CS1591</NoWarn>
</PropertyGroup>
<PropertyGroup>
<XunitVersion>2.4.1</XunitVersion>
<TestSdkVersion>16.8.3</TestSdkVersion>
<AkkaVersion>1.4.12</AkkaVersion>
<AkkaVersion>1.4.14</AkkaVersion>
<FluentAssertionsVersion>4.14.0</FluentAssertionsVersion>
</PropertyGroup>
<!-- SourceLink support for all Akka.NET projects -->
<ItemGroup>
Expand Down

0 comments on commit ac1da6a

Please sign in to comment.