Skip to content

Commit

Permalink
Revert changes on EventRead, add legacy data read test
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Nov 7, 2022
1 parent 3cf2eee commit b1daa66
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ public class QueryConfiguration
/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
[Obsolete(message: "This property will always return null")]
public string DefaultSerializer => null;
[Obsolete(message: "This property should never be used, use the default `System.Object` serializer instead")]
public string DefaultSerializer { get; }

/// <summary>
/// Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS.
Expand Down Expand Up @@ -221,7 +221,7 @@ public QueryConfiguration(
string orderingColumnName,
string serializerIdColumnName,
TimeSpan timeout,
string defaultSerializer, // This is being ignored now
string defaultSerializer,
bool useSequentialAccess)
{
SchemaName = schemaName;
Expand All @@ -236,6 +236,7 @@ public QueryConfiguration(
Timeout = timeout;
TagsColumnName = tagsColumnName;
OrderingColumnName = orderingColumnName;
DefaultSerializer = defaultSerializer;
SerializerIdColumnName = serializerIdColumnName;
UseSequentialAccess = useSequentialAccess;
}
Expand Down Expand Up @@ -846,7 +847,10 @@ protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
{
// Support old writes that did not set the serializer id
var type = Type.GetType(manifest, true);
var deserializer = Serialization.FindSerializerForType(type);
#pragma warning disable CS0618
// Backward compatibility code, we still need to use the old default serializer on read to support legacy data
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
#pragma warning restore CS0618
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
deserialized = Akka.Serialization.Serialization.WithTransport(
Serialization.System, (deserializer, (byte[])payload, type),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
</ItemGroup>

<ItemGroup>
<None Update="data\Sqlite.CustomObject.db">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<DefineConstants>$(DefineConstants);RELEASE</DefineConstants>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ namespace Akka.Persistence.Sqlite.Tests
{
public class CustomObjectSerializerSpec : Akka.TestKit.Xunit2.TestKit, IAsyncLifetime
{
private static readonly Config Config = ConfigurationFactory.ParseString($@"
private static readonly string ConnectionString;
private static readonly Config Config;
static CustomObjectSerializerSpec()
{
var filename = $"AkkaSqlite-{Guid.NewGuid()}.db";
File.Copy("./data/Sqlite.CustomObject.db", $"{filename}.db");

ConnectionString = $"DataSource={filename}.db";
Config = ConfigurationFactory.ParseString($@"
akka.actor {{
serializers {{
mySerializer = ""{typeof(MySerializer).AssemblyQualifiedName}""
Expand All @@ -35,18 +43,19 @@ public class CustomObjectSerializerSpec : Akka.TestKit.Xunit2.TestKit, IAsyncLif
journal {{
plugin = ""akka.persistence.journal.sqlite""
sqlite {{
connection-string = ""DataSource=AkkaJournal.db""
connection-string = ""{ConnectionString}""
auto-initialize = on
}}
}}
snapshot-store {{
plugin = ""akka.persistence.snapshot-store.sqlite""
sqlite {{
connection-string = ""DataSource=AkkaSnapshot.db""
connection-string = ""{ConnectionString}""
auto-initialize = on
}}
}}
}}").WithFallback(SqlitePersistence.DefaultConfiguration());
}

public CustomObjectSerializerSpec(ITestOutputHelper helper)
: base(Config, nameof(CustomObjectSerializerSpec), helper)
Expand All @@ -62,12 +71,13 @@ public async Task CustomSerializerTest()
var serializer = Sys.Serialization.FindSerializerForType(typeof(Persisted));
serializer.Should().BeOfType<MySerializer>();

var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("a")));
var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("a", probe)));
probe.ExpectMsg("recovered");
actor.Tell(new Persisted("a"), probe);
probe.ExpectMsg(new Persisted("a"));

// Read the database directly, make sure that we're using the correct object type serializer
var conn = new SqliteConnection("DataSource=AkkaJournal.db");
var conn = new SqliteConnection(ConnectionString);
conn.Open();
const string sql = "SELECT ej.serializer_id FROM event_journal ej WHERE ej.persistence_id = 'a'";
await using var cmd = new SqliteCommand(sql, conn);
Expand All @@ -78,10 +88,19 @@ public async Task CustomSerializerTest()
record[0].Should().Be(9999);
}

[Fact(DisplayName = "Persistence.Sql should be able to read legacy data")]
public void LegacyDataTest()
{
var probe = CreateTestProbe();
var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("old", probe)));
probe.ExpectMsg(new Persisted("old"));
probe.ExpectMsg("recovered");
}

public Task InitializeAsync()
{
if(File.Exists("AkkaJournal.db"))
File.Delete("AkkaJournal.db");
if(File.Exists("AkkaSqlite.db"))
File.Delete("AkkaSqlite.db");
return Task.CompletedTask;
}

Expand Down Expand Up @@ -140,9 +159,12 @@ public override object FromBinary(byte[] bytes, Type type)

internal sealed class PersistedActor : UntypedPersistentActor
{
public PersistedActor(string persistenceId)
private readonly IActorRef _probe;

public PersistedActor(string persistenceId, IActorRef probe)
{
PersistenceId = persistenceId;
_probe = probe;
}

public override string PersistenceId { get; }
Expand All @@ -158,6 +180,15 @@ protected override void OnCommand(object message)

protected override void OnRecover(object message)
{
switch (message)
{
case Persisted msg:
_probe.Tell(msg);
break;
case RecoveryCompleted _:
_probe.Tell("recovered");
break;
}
}
}
}
Binary file not shown.

0 comments on commit b1daa66

Please sign in to comment.