Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Akka.Persistence.MongoDb compatibility with Akka.Cluster.Sharding #259

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Akka.Persistence.MongoDb.Tests/Bug25FixSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class Bug25FixSpec : Akka.TestKit.Xunit2.TestKit, IClassFixture<DatabaseFixture>
{
class MyJournalActor : ReceivePersistentActor
Expand Down Expand Up @@ -111,7 +112,8 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong
}
}";

return ConfigurationFactory.ParseString(specString);
return ConfigurationFactory.ParseString(specString)
.WithFallback(MongoDbPersistence.DefaultConfiguration());
}
}
}
7 changes: 7 additions & 0 deletions src/Akka.Persistence.MongoDb.Tests/MongoDbAllEventsSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong
collection = ""EventJournal""
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + id + @"""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong
collection = ""EventJournal""
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + id + @"""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong
collection = ""EventJournal""
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + id + @"""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong
}
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + id + @"""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
Expand Down
7 changes: 7 additions & 0 deletions src/Akka.Persistence.MongoDb.Tests/MongoDbEventsByTagSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong
}
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + id + @"""
}
}
query {
mongodb {
class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persistence.MongoDb""
Expand Down
3 changes: 2 additions & 1 deletion src/Akka.Persistence.MongoDb.Tests/MongoDbJournalPerfSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong
}
}";

return ConfigurationFactory.ParseString(specString);
return ConfigurationFactory.ParseString(specString)
.WithFallback(MongoDbPersistence.DefaultConfiguration());
}
public static readonly AtomicCounter Counter = new AtomicCounter(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbJournalSetupSpec : JournalSpec, IClassFixture<DatabaseFixture>
{
// TEST: MongoDb journal plugin set using Setup should behave exactly like when it is
Expand Down Expand Up @@ -55,7 +56,8 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong
}
}";

return ConfigurationFactory.ParseString(specString);
return ConfigurationFactory.ParseString(specString)
.WithFallback(MongoDbPersistence.DefaultConfiguration());
}
}
}
3 changes: 2 additions & 1 deletion src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong
}
}";

return ConfigurationFactory.ParseString(specString);
return ConfigurationFactory.ParseString(specString)
.WithFallback(MongoDbPersistence.DefaultConfiguration());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong
}
}";

return ConfigurationFactory.ParseString(specString);
return ConfigurationFactory.ParseString(specString)
.WithFallback(MongoDbPersistence.DefaultConfiguration());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,36 @@

namespace Akka.Persistence.MongoDb.Tests
{
// TODO: enable this spec once https://github.com/akkadotnet/akka.net/pull/4190 is available via Akka.NET v1.4.0-beta5 or higher
//[Collection("MongoDbSpec")]
//public class MongoDbLegacySerializationSnapshotStoreSpec : SnapshotStoreSpec, IClassFixture<DatabaseFixture>
//{
// protected override bool SupportsSerialization => false;
[Collection("MongoDbSpec")]
public class MongoDbLegacySerializationSnapshotStoreSpec : SnapshotStoreSpec, IClassFixture<DatabaseFixture>
{
protected override bool SupportsSerialization => false;

// public MongoDbLegacySerializationSnapshotStoreSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbSnapshotStoreSpec")
// {
// Initialize();
// }
public MongoDbLegacySerializationSnapshotStoreSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbSnapshotStoreSpec")
{
Initialize();
}

// private static Config CreateSpecConfig(DatabaseFixture databaseFixture)
// {
// var specString = @"
// akka.test.single-expect-default = 3s
// akka.persistence {
// publish-plugin-commands = on
// snapshot-store {
// plugin = ""akka.persistence.snapshot-store.mongodb""
// mongodb {
// class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
// connection-string = """ + databaseFixture.ConnectionString + @"""
// auto-initialize = on
// collection = ""SnapshotStore""
// legacy-serialization = on
// }
// }
// }";
private static Config CreateSpecConfig(DatabaseFixture databaseFixture)
{
var specString = @"
akka.test.single-expect-default = 3s
akka.persistence {
publish-plugin-commands = on
snapshot-store {
plugin = ""akka.persistence.snapshot-store.mongodb""
mongodb {
class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb""
connection-string = """ + databaseFixture.ConnectionString + @"""
auto-initialize = on
collection = ""SnapshotStore""
legacy-serialization = on
}
}
}";

// return ConfigurationFactory.ParseString(specString);
// }
//}
return ConfigurationFactory.ParseString(specString)
.WithFallback(MongoDbPersistence.DefaultConfiguration());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

namespace Akka.Persistence.MongoDb.Tests
{
[Collection("MongoDbSpec")]
public class MongoDbSnapshotStoreSetupSpec : SnapshotStoreSpec, IClassFixture<DatabaseFixture>
{
// TEST: MongoDb snapshot plugin set using Setup should behave exactly like when it is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong
}
}";

return ConfigurationFactory.ParseString(specString);
return ConfigurationFactory.ParseString(specString)
.WithFallback(MongoDbPersistence.DefaultConfiguration());
}


Expand Down
15 changes: 10 additions & 5 deletions src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using Akka.Actor;
using Akka.Persistence.Journal;
using Akka.Persistence.MongoDb.Query;
using Akka.Streams.Dsl;
using Akka.Util;
using MongoDB.Bson;
using MongoDB.Driver;
Expand All @@ -19,6 +18,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Configuration;

namespace Akka.Persistence.MongoDb.Journal
{
Expand All @@ -43,12 +43,17 @@ public class MongoDbJournal : AsyncWriteJournal

private readonly Akka.Serialization.Serialization _serialization;

public MongoDbJournal()
{
_settings = MongoDbPersistence.Get(Context.System).JournalSettings;
public MongoDbJournal() : this(MongoDbPersistence.Get(Context.System).JournalSettings)
{ }

_serialization = Context.System.Serialization;
// This constructor is needed because config can come from both Akka.Persistence and Akka.Cluster.Sharding
public MongoDbJournal(Config config) : this(new MongoDbJournalSettings(config))
{ }

private MongoDbJournal(MongoDbJournalSettings settings)
{
_settings = settings;
_serialization = Context.System.Serialization;
}

protected override void PreStart()
Expand Down
12 changes: 9 additions & 3 deletions src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.Persistence.Snapshot;
using Akka.Util;
using MongoDB.Driver;
Expand All @@ -26,10 +27,15 @@ public class MongoDbSnapshotStore : SnapshotStore

private readonly Akka.Serialization.Serialization _serialization;

public MongoDbSnapshotStore()
public MongoDbSnapshotStore() : this(MongoDbPersistence.Get(Context.System).SnapshotStoreSettings)
{ }

public MongoDbSnapshotStore(Config config) : this(new MongoDbSnapshotSettings(config))
{ }

public MongoDbSnapshotStore(MongoDbSnapshotSettings settings)
{
_settings = MongoDbPersistence.Get(Context.System).SnapshotStoreSettings;

_settings = settings;
_serialization = Context.System.Serialization;
}

Expand Down