Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event envelope updates #170

Merged
merged 37 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
bfe7f26
Update dependencies
eaba Nov 12, 2020
d545a03
Revert changes to Tests Project file
eaba Nov 13, 2020
80ff357
Merge branch 'dev' into dev
eaba Nov 13, 2020
4dd1e85
Merge branch 'dev' of https://github.com/eaba/Akka.Persistence.MongoD…
eaba Nov 13, 2020
b574679
Update AkkaVersion in commons.props. Out dated version references wit…
eaba Nov 13, 2020
79a9b22
Tests passed with 1.14.12 local package that had https://github.com/a…
eaba Nov 13, 2020
688e97c
Removed ReadJournal_should_deallocate_AllPersistenceIds_publisher_whe…
eaba Nov 13, 2020
a44b111
Changed to $(AkkaVersion)
eaba Nov 14, 2020
b713621
Updated to changes made in Akka.Persistence.TCK
eaba Nov 16, 2020
056db96
Fix xunit version
eaba Nov 16, 2020
2e2b01f
Made 'AddNewEventsSubscriber' private and deleted 'Timestamp' from Jo…
eaba Nov 17, 2020
27aa509
Removed comment
eaba Nov 17, 2020
f7e45c2
MongoDbAllEventsSpec
eaba Nov 17, 2020
f84cb07
Attempt to fix `MongoDbAllEventsSpec` and `MongoDbCurrentAllEventsSpe…
eaba Nov 17, 2020
6112831
Fix tests
eaba Nov 18, 2020
72cb4e3
stackoverflow link
eaba Nov 18, 2020
d5cf2e1
First Attempt to fix `Bug61FixSpec` fails on Linux machine
eaba Nov 18, 2020
16aa086
Copyright
eaba Nov 18, 2020
dde5736
Rename ActorSystemName to `MongoDbCurrentAllEventsSpec`
eaba Nov 18, 2020
967e738
Set Collection
eaba Nov 18, 2020
7ef4811
Comments added
eaba Nov 18, 2020
92f5465
Deleted staled code
eaba Nov 18, 2020
bcdbbd3
Fixed build errors caused by "deleting staled code"
eaba Nov 18, 2020
5ddf8be
Improve code
eaba Nov 18, 2020
d79b177
Improve filter creation blocks
eaba Nov 19, 2020
f524e29
First attempt at documenting "AllEvents" and "CurrentAllEvents"
eaba Nov 19, 2020
923a4ac
Added "MongoDbJournalPerfSpec"
eaba Nov 19, 2020
c2daaf9
Improve code
eaba Nov 19, 2020
74f2681
Don't use 'ReaderWriterLockSlim'. Make 'RemoveSubscriber' private
eaba Nov 24, 2020
698bce3
Removed unused fields
eaba Nov 26, 2020
4170d28
Merge upstream
eaba Jan 6, 2021
80c8b1e
* Bump AkkaVersion to 1.4.14
eaba Jan 6, 2021
e042e13
Make use of `Ordering` as Timestamp
eaba Jan 6, 2021
2f4d67d
Add Timestamp when reading messages
eaba Jan 6, 2021
41de830
Manual-populate Timestamp for `Ordering` and `Payload`
eaba Jan 7, 2021
f2cf8c4
Backwards compatibility for https://github.com/akkadotnet/akka.net/pu…
eaba Jan 7, 2021
cb9719e
Add WithTimestamp
eaba Jan 7, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</PackageReference>
<PackageReference Include="xunit" Version="$(XunitVersion)" />
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageReference Include="FluentAssertions" Version="5.10.3" />
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
<PackageReference Include="Mongo2Go" Version="2.2.16" />
<PackageReference Include="System.Net.NetworkInformation" Version="4.3.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbCurrentEventsByPersistenceIdsSpec : Akka.Persistence.TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
public class MongoDbCurrentEventsByPersistenceIdsSpec : TCK.Query.CurrentEventsByPersistenceIdSpec, IClassFixture<DatabaseFixture>
{
public static readonly AtomicCounter Counter = new AtomicCounter(0);
private readonly ITestOutputHelper _output;
Expand Down
1 change: 0 additions & 1 deletion src/Akka.Persistence.MongoDb/Journal/JournalEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class JournalEntry
[BsonElement("Manifest")]
public string Manifest { get; set; }


[BsonElement("Ordering")]
public BsonTimestamp Ordering { get; set; }

Expand Down
17 changes: 11 additions & 6 deletions src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ protected override Task DeleteMessagesToAsync(string persistenceId, long toSeque

private JournalEntry ToJournalEntry(IPersistentRepresentation message)
{
//var timeStamp = DateTimeOffset.Now.ToUnixTimeMilliseconds();
object payload = message.Payload;
//message = message.WithTimestamp(timeStamp);
if (message.Payload is Tagged tagged)
{
payload = tagged.Payload;
Expand All @@ -319,7 +321,7 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message)
Id = message.PersistenceId + "_" + message.SequenceNr,
//Ordering = _sequenceRepository.GetSequenceValue("journalentry"),
Ordering = new BsonTimestamp(0), // Auto-populates with timestamp
//Timestamp = new BsonTimestamp(0),
//Timestamp = new BsonTimestamp(timeStamp),
IsDeleted = message.IsDeleted,
Payload = payload,
PersistenceId = message.PersistenceId,
Expand All @@ -340,7 +342,7 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message)
Id = message.PersistenceId + "_" + message.SequenceNr,
//Ordering = _sequenceRepository.GetSequenceValue("journalentry"),
Ordering = new BsonTimestamp(0), // Auto-populates with timestamp
//Timestamp = new BsonTimestamp(0),
//Timestamp = new BsonTimestamp(timeStamp),
IsDeleted = message.IsDeleted,
Payload = binary,
PersistenceId = message.PersistenceId,
Expand All @@ -363,14 +365,17 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen
entry.PersistenceId,
manifest,
entry.IsDeleted,
sender);
sender,
timestamp: entry.Ordering.Timestamp);
}

var legacy = entry.SerializerId.HasValue || !string.IsNullOrEmpty(entry.Manifest);
if (!legacy)
{
var ser = _serialization.FindSerializerForType(typeof(Persistent));
return ser.FromBinary<Persistent>((byte[]) entry.Payload);
var serPersistent = ser.FromBinary<Persistent>((byte[]) entry.Payload);
serPersistent.WithTimestamp(entry.Ordering.Timestamp);
return serPersistent;
}

int? serializerId = null;
Expand Down Expand Up @@ -398,12 +403,12 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen
if (deserialized is Persistent p)
return p;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add WithTimestamp here


return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender);
return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender, timestamp: entry.Ordering.Timestamp);
}
else // backwards compat for object serialization - Payload was already deserialized by BSON
{
return new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest,
entry.IsDeleted, sender);
entry.IsDeleted, sender, timestamp: entry.Ordering.Timestamp);
}

}
Expand Down
1 change: 1 addition & 0 deletions src/Akka.Persistence.MongoDb/Query/AllEventsPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ protected bool Replaying(object message)
offset: new Sequence(replayed.Offset),
persistenceId: replayed.Persistent.PersistenceId,
sequenceNr: replayed.Persistent.SequenceNr,
timestamp: replayed.Persistent.Timestamp,
@event: replayed.Persistent.Payload));

CurrentOffset = replayed.Offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ protected Receive Replaying(int limit)
offset: new Sequence(seqNr),
persistenceId: PersistenceId,
sequenceNr: seqNr,
timestamp: replayed.Persistent.Timestamp,
@event: replayed.Persistent.Payload));
CurrentSequenceNr = seqNr + 1;
Buffer.DeliverBuffer(TotalDemand);
Expand Down
1 change: 1 addition & 0 deletions src/Akka.Persistence.MongoDb/Query/EventsByTagPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ protected Receive Replaying(int limit)
offset: new Sequence(replayed.Offset),
persistenceId: replayed.Persistent.PersistenceId,
sequenceNr: replayed.Persistent.SequenceNr,
timestamp: replayed.Persistent.Timestamp,
@event: replayed.Persistent.Payload));

CurrentOffset = replayed.Offset;
Expand Down
9 changes: 5 additions & 4 deletions src/common.props
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
<Project>
<PropertyGroup>
<Copyright>Copyright © 2013-2020 Akka.NET Project</Copyright>
<Copyright>Copyright © 2013-2021 Akka.NET Project</Copyright>
<Authors>Akka.NET Contrib</Authors>
<VersionPrefix>1.4.1</VersionPrefix>
<VersionPrefix>1.4.14</VersionPrefix>
<PackageIconUrl>http://getakka.net/images/akkalogo.png</PackageIconUrl>
<PackageProjectUrl>https://github.com/akkadotnet/Akka.Persistence.MongoDB</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/akkadotnet/Akka.Persistence.MongoDB/blob/master/LICENSE.md</PackageLicenseUrl>
<PackageReleaseNotes>Bump Akka version to 1.4.1</PackageReleaseNotes>
<PackageReleaseNotes>Bump Akka version to 1.4.14</PackageReleaseNotes>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Description>Akka Persistence journal and snapshot store backed by MongoDB database.</Description>
<NoWarn>$(NoWarn);CS1591</NoWarn>
</PropertyGroup>
<PropertyGroup>
<XunitVersion>2.4.1</XunitVersion>
<TestSdkVersion>16.8.3</TestSdkVersion>
<AkkaVersion>1.4.12</AkkaVersion>
<AkkaVersion>1.4.14</AkkaVersion>
<FluentAssertionsVersion>4.14.0</FluentAssertionsVersion>
</PropertyGroup>
<!-- SourceLink support for all Akka.NET projects -->
<ItemGroup>
Expand Down