Skip to content

Commit

Permalink
Fix broken object type serializer in QueryExecutor (#6528)
Browse files Browse the repository at this point in the history
* Reproduction unit test for akkadotnet/Akka.Hosting#127

(cherry picked from commit 75e24b3)

* Clarify the unit test

(cherry picked from commit a0b5e8f)

* Remove persistence default serializer feature

(cherry picked from commit 3cf2eee)

* Revert changes on EventRead, add legacy data read test

(cherry picked from commit b1daa66)

* Revert persistence.conf changes, modify sqlite database to mimic legacy data

(cherry picked from commit f3fd0e9)

* Resolve conflicts

* Update API Verify list

* Add legacy data test

* Add sqlite data and fix tests
  • Loading branch information
Arkatufus authored Mar 17, 2023
1 parent f7c30f7 commit 7ca22af
Show file tree
Hide file tree
Showing 20 changed files with 511 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public abstract class BatchingSqlJournalSetup
/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
[Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
public string DefaultSerializer { get; }

/// <summary>
Expand Down Expand Up @@ -1228,7 +1229,7 @@ private async Task<WriteMessagesResult> HandleWriteMessages(WriteMessages req, T
protected virtual void WriteEvent(TCommand command, IPersistentRepresentation persistent, string tags = "")
{
var payloadType = persistent.Payload.GetType();
var serializer = _serialization.FindSerializerForType(payloadType, Setup.DefaultSerializer);
var serializer = _serialization.FindSerializerForType(payloadType);

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
Akka.Serialization.Serialization.WithTransport(_serialization.System, () =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public class QueryConfiguration
/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
[Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
public string DefaultSerializer { get; }

/// <summary>
Expand Down Expand Up @@ -780,7 +781,7 @@ protected DbCommand GetCommand(DbConnection connection, string sql)
protected virtual void WriteEvent(DbCommand command, IPersistentRepresentation e, IImmutableSet<string> tags)
{

var serializer = Serialization.FindSerializerForType(e.Payload.GetType(), Configuration.DefaultSerializer);
var serializer = Serialization.FindSerializerForType(e.Payload.GetType());

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var (binary,manifest) = Akka.Serialization.Serialization.WithTransport(Serialization.System,(e.Payload,serializer) ,(state) =>
Expand Down Expand Up @@ -846,7 +847,10 @@ protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
{
// Support old writes that did not set the serializer id
var type = Type.GetType(manifest, true);
#pragma warning disable CS0618
// Backward compatibility code, we still need to use the old default serializer on read to support legacy data
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
#pragma warning restore CS0618
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
deserialized = Akka.Serialization.Serialization.WithTransport(
Serialization.System, (deserializer, (byte[])payload, type),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public class SnapshotStoreSettings
/// <summary>
/// The default serializer being used if no type match override is specified
/// </summary>
[Obsolete(message: "This property should never be used, use the default `System.Object` serializer instead")]
public string DefaultSerializer { get; private set; }

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class QueryConfiguration
/// <summary>
/// The default serializer used when not type override matching is found
/// </summary>
[Obsolete(message: "This property should never be used for writes, use the default `System.Object` serializer instead")]
public readonly string DefaultSerializer;

/// <summary>
Expand Down Expand Up @@ -336,7 +337,7 @@ DELETE FROM {Configuration.FullSnapshotTableName}
protected virtual void SetPayloadParameter(object snapshot, DbCommand command)
{
var snapshotType = snapshot.GetType();
var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer);
var serializer = Serialization.FindSerializerForType(snapshotType);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var binary = Akka.Serialization.Serialization.WithTransport(Serialization.System, () => serializer.ToBinary(snapshot));
AddParameter(command, "@Payload", DbType.Binary, binary);
Expand All @@ -350,7 +351,7 @@ protected virtual void SetPayloadParameter(object snapshot, DbCommand command)
protected virtual void SetManifestParameters(object snapshot, DbCommand command)
{
var snapshotType = snapshot.GetType();
var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer);
var serializer = Serialization.FindSerializerForType(snapshotType);

string manifest = "";
if (serializer is SerializerWithStringManifest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,22 +224,5 @@ protected override async Task DeleteAsync(string persistenceId, SnapshotSelectio
await QueryExecutor.DeleteBatchAsync(connection, nestedCancellationTokenSource.Token, persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp);
}
}

private SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot)
{
var snapshotType = snapshot.GetType();
var serializer = Context.System.Serialization.FindSerializerForType(snapshotType, _settings.DefaultSerializer);

var binary = Akka.Serialization.Serialization.WithTransport(_actorSystem,
() => serializer.ToBinary(snapshot));


return new SnapshotEntry(
persistenceId: metadata.PersistenceId,
sequenceNr: metadata.SequenceNr,
timestamp: metadata.Timestamp,
manifest: snapshotType.TypeQualifiedName(),
payload: binary);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
</ItemGroup>

<ItemGroup>
<None Update="data\Sqlite.CustomObject.db">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="data\Sqlite.v1.3.0.db">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<DefineConstants>$(DefineConstants);RELEASE</DefineConstants>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// -----------------------------------------------------------------------
// <copyright file="CustomObjectSerializerSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Serialization;
using FluentAssertions;
using Microsoft.Data.Sqlite;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Sqlite.Tests
{
public class CustomObjectSerializerSpec : Akka.TestKit.Xunit2.TestKit, IAsyncLifetime
{
private static readonly string ConnectionString;
private static readonly Config Config;
static CustomObjectSerializerSpec()
{
var filename = $"AkkaSqlite-{Guid.NewGuid()}.db";
File.Copy("./data/Sqlite.CustomObject.db", $"{filename}.db");

ConnectionString = $"DataSource={filename}.db";
Config = ConfigurationFactory.ParseString($@"
akka.actor {{
serializers {{
mySerializer = ""{typeof(MySerializer).AssemblyQualifiedName}""
}}
serialization-bindings {{
""System.Object"" = mySerializer
}}
}}
akka.persistence {{
journal {{
plugin = ""akka.persistence.journal.sqlite""
sqlite {{
connection-string = ""{ConnectionString}""
auto-initialize = on
}}
}}
snapshot-store {{
plugin = ""akka.persistence.snapshot-store.sqlite""
sqlite {{
connection-string = ""{ConnectionString}""
auto-initialize = on
}}
}}
}}").WithFallback(SqlitePersistence.DefaultConfiguration());
}

public CustomObjectSerializerSpec(ITestOutputHelper helper)
: base(Config, nameof(CustomObjectSerializerSpec), helper)
{
}

[Fact(DisplayName = "Persistence.Sql should use custom serializer for object type")]
public async Task CustomSerializerTest()
{
var probe = CreateTestProbe();

// Sanity check to see that the system should serialize object type using MySerializer
var serializer = Sys.Serialization.FindSerializerForType(typeof(Persisted));
serializer.Should().BeOfType<MySerializer>();

var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("a", probe)));
probe.ExpectMsg("recovered");
actor.Tell(new Persisted("a"), probe);
probe.ExpectMsg(new Persisted("a"));

// Read the database directly, make sure that we're using the correct object type serializer
var conn = new SqliteConnection(ConnectionString);
conn.Open();
const string sql = "SELECT ej.serializer_id FROM event_journal ej WHERE ej.persistence_id = 'a'";
await using var cmd = new SqliteCommand(sql, conn);
var record = await cmd.ExecuteReaderAsync();
await record.ReadAsync();

// In the bug this fails, the serializer id is JSON id instead of MySerializer id
record[0].Should().Be(9999);
}

[Fact(DisplayName = "Persistence.Sql should be able to read legacy data")]
public void LegacyDataTest()
{
var probe = CreateTestProbe();
var actor = Sys.ActorOf(Props.Create(() => new PersistedActor("old", probe)));
probe.ExpectMsg(new Persisted("old"));
probe.ExpectMsg("recovered");
}

public Task InitializeAsync()
{
if(File.Exists("AkkaSqlite.db"))
File.Delete("AkkaSqlite.db");
return Task.CompletedTask;
}

public Task DisposeAsync()
{
return Task.CompletedTask;
}
}

internal sealed class Persisted: IEquatable<Persisted>
{
public Persisted(string payload)
{
Payload = payload;
}

public string Payload { get; }

public bool Equals(Persisted other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Payload == other.Payload;
}

public override bool Equals(object obj)
{
return ReferenceEquals(this, obj) || obj is Persisted other && Equals(other);
}

public override int GetHashCode()
{
return (Payload != null ? Payload.GetHashCode() : 0);
}
}

internal class MySerializer : Serializer
{
public MySerializer(ExtendedActorSystem system) : base(system)
{
}

public override bool IncludeManifest { get { return true; } }
public override int Identifier { get { return 9999; } }

public override byte[] ToBinary(object obj)
{
return Encoding.UTF8.GetBytes(obj.ToString());
}

public override object FromBinary(byte[] bytes, Type type)
{
return Encoding.UTF8.GetString(bytes);
}
}

internal sealed class PersistedActor : UntypedPersistentActor
{
private readonly IActorRef _probe;

public PersistedActor(string persistenceId, IActorRef probe)
{
PersistenceId = persistenceId;
_probe = probe;
}

public override string PersistenceId { get; }

protected override void OnCommand(object message)
{
var sender = Sender;
Persist(message, _ =>
{
sender.Tell(message);
});
}

protected override void OnRecover(object message)
{
switch (message)
{
case Persisted msg:
_probe.Tell(msg);
break;
case RecoveryCompleted _:
_probe.Tell("recovered");
break;
}
}
}
}
Loading

0 comments on commit 7ca22af

Please sign in to comment.