From 9bee8fab04b9880aa483e95fedf6366b89a81fd3 Mon Sep 17 00:00:00 2001 From: ZhangTao Date: Mon, 18 Jan 2021 10:11:59 +0800 Subject: [PATCH] [StateService] Storage and P2P (#431) * make leveldb public * storage * rename folder * update neo * Fix end of line marker * file encoding * Update src/StateService/Network/StateRoot.cs Co-authored-by: Luchuan * fix some * recover levelDB * use neo LoadStore * clean project file * Update MPTNode.cs * add p2p * internal payload category * update neo and remove OnVerifiedInventory * null check and clean StateStore and clean settings * remove IP2PPlugin interface impl * clean settings * Update Settings.cs * Remove StatePlugin.Persistence.cs * clean actor system * Fix StatePlugin * Optimize * format * remove StoreDataCache and StoreMetaDataCache * remove StoreDataCache from mpt * Optimize * sync OnPersist Co-authored-by: Shargon Co-authored-by: Erik Zhang Co-authored-by: Luchuan Co-authored-by: Owen Zhang <38493437+superboyiii@users.noreply.github.com> --- src/StateService/MPT/MPTCache.cs | 71 +++++++- src/StateService/Settings.cs | 23 +++ src/StateService/StatePlugin.cs | 41 +++++ src/StateService/StateService.csproj | 7 + src/StateService/StateService/config.json | 6 + src/StateService/Storage/Prefixs.cs | 10 ++ src/StateService/Storage/StateRoot.cs | 130 ++++++++++++++ src/StateService/Storage/StateSnapshot.cs | 83 +++++++++ src/StateService/Storage/StateStore.cs | 169 ++++++++++++++++++ .../MPT/UT_MPTNode.cs | 2 +- 10 files changed, 532 insertions(+), 10 deletions(-) create mode 100644 src/StateService/Settings.cs create mode 100644 src/StateService/StatePlugin.cs create mode 100644 src/StateService/StateService/config.json create mode 100644 src/StateService/Storage/Prefixs.cs create mode 100644 src/StateService/Storage/StateRoot.cs create mode 100644 src/StateService/Storage/StateSnapshot.cs create mode 100644 src/StateService/Storage/StateStore.cs diff --git a/src/StateService/MPT/MPTCache.cs b/src/StateService/MPT/MPTCache.cs index 4e54c33b6..d6cac966e 100644 --- a/src/StateService/MPT/MPTCache.cs +++ b/src/StateService/MPT/MPTCache.cs @@ -1,20 +1,48 @@ -using Neo.IO.Caching; +using Neo.IO; using Neo.Persistence; +using System.Collections.Generic; namespace Neo.Plugins.MPT { public class MPTCache { - private readonly DataCache cache; + private enum TrackState : byte + { + None, + Added, + Changed, + Deleted + } + + private class Trackable + { + public MPTNode Node; + public TrackState State; + } + + private readonly ISnapshot store; + private readonly byte prefix; + private readonly Dictionary cache = new Dictionary(); public MPTCache(ISnapshot store, byte prefix) { - cache = new StoreDataCache(store, prefix); + this.store = store; + this.prefix = prefix; } public MPTNode Resolve(UInt256 hash) { - return cache.TryGet(hash)?.Clone(); + if (cache.TryGetValue(hash, out Trackable t)) + { + return t.Node?.Clone(); + } + var n = store.TryGet(prefix, hash.ToArray())?.AsSerializable(); + cache.Add(hash, new Trackable + { + Node = n, + State = TrackState.None, + }); + return n?.Clone(); } public void PutNode(MPTNode np) @@ -23,10 +51,16 @@ public void PutNode(MPTNode np) if (n is null) { np.Reference = 1; - cache.Add(np.Hash, np.Clone()); + cache[np.Hash] = new Trackable + { + Node = np.Clone(), + State = TrackState.Added, + }; return; } - cache.GetAndChange(np.Hash).Reference++; + var entry = cache[np.Hash]; + entry.Node.Reference++; + entry.State = TrackState.Changed; } public void DeleteNode(UInt256 hash) @@ -35,15 +69,34 @@ public void DeleteNode(UInt256 hash) if (n is null) return; if (1 < n.Reference) { - cache.GetAndChange(hash).Reference--; + var entry = cache[hash]; + entry.Node.Reference--; + entry.State = TrackState.Changed; return; } - cache.Delete(hash); + cache[hash] = new Trackable + { + Node = null, + State = TrackState.Deleted, + }; } public void Commit() { - cache.Commit(); + foreach (var item in cache) + { + switch (item.Value.State) + { + case TrackState.Added: + case TrackState.Changed: + store.Put(prefix, item.Key.ToArray(), item.Value.Node.ToArray()); + break; + case TrackState.Deleted: + store.Delete(prefix, item.Key.ToArray()); + break; + } + } + cache.Clear(); } } } diff --git a/src/StateService/Settings.cs b/src/StateService/Settings.cs new file mode 100644 index 000000000..0f64369dc --- /dev/null +++ b/src/StateService/Settings.cs @@ -0,0 +1,23 @@ +using Microsoft.Extensions.Configuration; + +namespace Neo.Plugins.StateService +{ + internal class Settings + { + public string Path { get; } + public bool FullState { get; } + + public static Settings Default { get; private set; } + + private Settings(IConfigurationSection section) + { + Path = string.Format(section.GetValue("Path", "Data_MPT_{0}"), ProtocolSettings.Default.Magic.ToString("X8")); + FullState = section.GetValue("FullState", false); + } + + public static void Load(IConfigurationSection section) + { + Default = new Settings(section); + } + } +} diff --git a/src/StateService/StatePlugin.cs b/src/StateService/StatePlugin.cs new file mode 100644 index 000000000..5d596b3c2 --- /dev/null +++ b/src/StateService/StatePlugin.cs @@ -0,0 +1,41 @@ +using Akka.Actor; +using Neo.IO.Caching; +using Neo.Network.P2P.Payloads; +using Neo.Persistence; +using Neo.Plugins.StateService.Storage; +using System.Collections.Generic; +using System.Linq; +using static Neo.Ledger.Blockchain; + +namespace Neo.Plugins.StateService +{ + public class StatePlugin : Plugin, IPersistencePlugin + { + public const string StatePayloadCategory = "StateService"; + public override string Name => "StateService"; + public override string Description => "Enables MPT for the node"; + + private IActorRef store; + + protected override void Configure() + { + Settings.Load(GetConfiguration()); + } + + protected override void OnPluginsLoaded() + { + store = System.ActorSystem.ActorOf(StateStore.Props(System, Settings.Default.Path)); + } + + public override void Dispose() + { + base.Dispose(); + System.EnsureStoped(store); + } + + void IPersistencePlugin.OnPersist(Block block, StoreView snapshot, IReadOnlyList applicationExecutedList) + { + StateStore.Singleton.UpdateLocalStateRoot(block.Index, snapshot.Storages.GetChangeSet().Where(p => p.State != TrackState.None).ToList()); + } + } +} diff --git a/src/StateService/StateService.csproj b/src/StateService/StateService.csproj index 24ea71663..6c3585c7d 100644 --- a/src/StateService/StateService.csproj +++ b/src/StateService/StateService.csproj @@ -5,5 +5,12 @@ Neo.Plugins true + + + + PreserveNewest + PreserveNewest + + diff --git a/src/StateService/StateService/config.json b/src/StateService/StateService/config.json new file mode 100644 index 000000000..d930ced39 --- /dev/null +++ b/src/StateService/StateService/config.json @@ -0,0 +1,6 @@ +{ + "PluginConfiguration": { + "Path": "Data_MPT_{0}", + "FullState": false + } +} diff --git a/src/StateService/Storage/Prefixs.cs b/src/StateService/Storage/Prefixs.cs new file mode 100644 index 000000000..6e8f9dc72 --- /dev/null +++ b/src/StateService/Storage/Prefixs.cs @@ -0,0 +1,10 @@ + +namespace Neo.Plugins.StateService.Storage +{ + public static class Prefixs + { + public const byte StateRoot = 0x01; + public const byte CurrentLocalRootIndex = 0x02; + public const byte CurrentValidatedRootIndex = 0x04; + } +} diff --git a/src/StateService/Storage/StateRoot.cs b/src/StateService/Storage/StateRoot.cs new file mode 100644 index 000000000..6f23cc9e9 --- /dev/null +++ b/src/StateService/Storage/StateRoot.cs @@ -0,0 +1,130 @@ +using Neo.Cryptography; +using Neo.Cryptography.ECC; +using Neo.IO; +using Neo.IO.Json; +using Neo.Ledger; +using Neo.Network.P2P; +using Neo.Network.P2P.Payloads; +using Neo.Persistence; +using Neo.SmartContract; +using Neo.SmartContract.Native; +using System; +using System.IO; + +namespace Neo.Plugins.StateService.Storage +{ + public class StateRoot : ICloneable, IVerifiable, ISerializable + { + public byte Version; + public uint Index; + public UInt256 RootHash; + public Witness Witness; + + private UInt256 _hash = null; + + public UInt256 Hash + { + get + { + if (_hash is null) + { + _hash = new UInt256(Crypto.Hash256(this.GetHashData())); + } + return _hash; + } + } + + Witness[] IVerifiable.Witnesses + { + get + { + return new[] { Witness }; + } + set + { + if (value is null || value.Length != 1) throw new ArgumentException(); + Witness = value[0]; + } + } + + public int Size => + sizeof(byte) + //Version + sizeof(uint) + //Index + UInt256.Length + //RootHash + (Witness is null ? 1 : 1 + Witness.Size); //Witness + + public StateRoot Clone() + { + return new StateRoot + { + Version = Version, + Index = Index, + RootHash = RootHash, + Witness = Witness, + }; + } + + public void FromReplica(StateRoot replica) + { + Version = replica.Version; + Index = replica.Index; + RootHash = replica.RootHash; + Witness = replica.Witness; + } + + public void Deserialize(BinaryReader reader) + { + this.DeserializeUnsigned(reader); + Witness[] arr = reader.ReadSerializableArray(); + if (arr.Length < 1) + Witness = null; + else + Witness = arr[0]; + } + + public void DeserializeUnsigned(BinaryReader reader) + { + Version = reader.ReadByte(); + Index = reader.ReadUInt32(); + RootHash = reader.ReadSerializable(); + } + + public void Serialize(BinaryWriter writer) + { + this.SerializeUnsigned(writer); + if (Witness is null) + writer.WriteVarInt(0); + else + writer.Write(new Witness[] { Witness }); + } + + public void SerializeUnsigned(BinaryWriter writer) + { + writer.Write(Version); + writer.Write(Index); + writer.Write(RootHash); + } + + public bool Verify(StoreView snapshot) + { + return this.VerifyWitnesses(snapshot, 1_00000000); + } + + public UInt160[] GetScriptHashesForVerifying(StoreView snapshot) + { + ECPoint[] validators = NativeContract.RoleManagement.GetDesignatedByRole(snapshot, Role.StateValidator, Index); + if (validators.Length < 1) throw new InvalidOperationException("No script hash for state root verifying"); + return new UInt160[] { Blockchain.GetConsensusAddress(validators) }; + } + + public JObject ToJson() + { + var json = new JObject(); + json["version"] = Version; + json["index"] = Index; + json["roothash"] = RootHash.ToString(); + json["witness"] = Witness?.ToJson(); + return json; + } + } +} diff --git a/src/StateService/Storage/StateSnapshot.cs b/src/StateService/Storage/StateSnapshot.cs new file mode 100644 index 000000000..6fc75ce78 --- /dev/null +++ b/src/StateService/Storage/StateSnapshot.cs @@ -0,0 +1,83 @@ +using Neo; +using Neo.IO; +using Neo.IO.Caching; +using Neo.Ledger; +using Neo.Persistence; +using Neo.Plugins.MPT; +using System; +using System.Text; + +namespace Neo.Plugins.StateService.Storage +{ + public class StateSnapshot : IDisposable + { + private readonly ISnapshot snapshot; + public MPTTrie Trie; + + public StateSnapshot(IStore store) + { + snapshot = store.GetSnapshot(); + Trie = new MPTTrie(snapshot, CurrentLocalRootHash(), Settings.Default.FullState); + } + + public StateRoot GetStateRoot(uint index) + { + return snapshot.TryGet(Prefixs.StateRoot, BitConverter.GetBytes(index))?.AsSerializable(); + } + + public void AddLocalStateRoot(StateRoot state_root) + { + snapshot.Put(Prefixs.StateRoot, BitConverter.GetBytes(state_root.Index), state_root.ToArray()); + snapshot.Put(Prefixs.CurrentLocalRootIndex, Array.Empty(), BitConverter.GetBytes(state_root.Index)); + } + + public uint CurrentLocalRootIndex() + { + var bytes = snapshot.TryGet(Prefixs.CurrentLocalRootIndex, Array.Empty()); + if (bytes is null) return uint.MaxValue; + return BitConverter.ToUInt32(bytes); + } + + public UInt256 CurrentLocalRootHash() + { + var index = CurrentLocalRootIndex(); + if (index == uint.MaxValue) return null; + return GetStateRoot(index)?.RootHash; + } + + public void AddValidatedStateRoot(StateRoot state_root) + { + if (state_root?.Witness is null) + throw new ArgumentException(nameof(state_root) + " missing witness in invalidated state root"); + snapshot.Put(Prefixs.StateRoot, BitConverter.GetBytes(state_root.Index), state_root.ToArray()); + snapshot.Put(Prefixs.CurrentValidatedRootIndex, Array.Empty(), BitConverter.GetBytes(state_root.Index)); + } + + public uint CurrentValidatedRootIndex() + { + var bytes = snapshot.TryGet(Prefixs.CurrentValidatedRootIndex, Array.Empty()); + if (bytes is null) return uint.MaxValue; + return BitConverter.ToUInt32(bytes); + } + + public UInt256 CurrentValidatedRootHash() + { + var index = CurrentLocalRootIndex(); + if (index == uint.MaxValue) return null; + var state_root = GetStateRoot(index); + if (state_root is null || state_root.Witness is null) + throw new InvalidOperationException(nameof(CurrentValidatedRootHash) + " could not get validated state root"); + return state_root.RootHash; + } + + public void Commit() + { + snapshot.Commit(); + } + + public void Dispose() + { + snapshot.Dispose(); + } + } +} diff --git a/src/StateService/Storage/StateStore.cs b/src/StateService/Storage/StateStore.cs new file mode 100644 index 000000000..c93811bc1 --- /dev/null +++ b/src/StateService/Storage/StateStore.cs @@ -0,0 +1,169 @@ +using Akka.Actor; +using Neo.IO; +using Neo.IO.Caching; +using Neo.Ledger; +using Neo.Network.P2P.Payloads; +using Neo.Persistence; +using Neo.Plugins.MPT; +using System; +using System.Collections.Generic; +using System.Threading; +using Item = Neo.IO.Caching.DataCache.Trackable; + +namespace Neo.Plugins.StateService.Storage +{ + class StateStore : UntypedActor + { + private readonly NeoSystem core; + private readonly IStore store; + private const int MaxCacheCount = 100; + private readonly Dictionary cache = new Dictionary(); + private StateSnapshot currentSnapshot; + public UInt256 CurrentLocalRootHash => currentSnapshot.CurrentLocalRootHash(); + public uint LocalRootIndex => currentSnapshot.CurrentLocalRootIndex(); + public uint ValidatedRootIndex => currentSnapshot.CurrentValidatedRootIndex(); + + private static StateStore singleton; + public static StateStore Singleton + { + get + { + while (singleton is null) Thread.Sleep(10); + return singleton; + } + } + + public StateStore(NeoSystem core, string path) + { + if (singleton != null) throw new InvalidOperationException(nameof(StateStore)); + this.core = core; + this.store = core.LoadStore(path); + singleton = this; + core.ActorSystem.EventStream.Subscribe(Self, typeof(Blockchain.RelayResult)); + UpdateCurrentSnapshot(); + } + + public void Dispose() + { + store.Dispose(); + } + + public StateSnapshot GetSnapshot() + { + return new StateSnapshot(store); + } + + public HashSet GetProof(UInt256 root, StorageKey skey) + { + using ISnapshot snapshot = store.GetSnapshot(); + var trie = new MPTTrie(snapshot, root); + return trie.GetProof(skey); + } + + protected override void OnReceive(object message) + { + switch (message) + { + case Blockchain.RelayResult rr: + if (rr.Result == VerifyResult.Succeed && rr.Inventory is ExtensiblePayload payload && payload.Category == StatePlugin.StatePayloadCategory) + OnStatePayload(payload); + break; + default: + break; + } + } + + private void OnStatePayload(ExtensiblePayload payload) + { + StateRoot state_root = null; + try + { + state_root = payload.Data?.AsSerializable(); + } + catch (Exception ex) + { + Utility.Log(nameof(StateStore), LogLevel.Warning, " invalid state root " + ex.Message); + return; + } + if (state_root != null) + OnNewStateRoot(state_root); + } + + private bool OnNewStateRoot(StateRoot state_root) + { + if (state_root?.Witness is null) return false; + if (state_root.Index <= ValidatedRootIndex) return false; + if (LocalRootIndex < state_root.Index && state_root.Index < LocalRootIndex + MaxCacheCount) + { + cache.Add(state_root.Index, state_root); + return true; + } + using var state_snapshot = Singleton.GetSnapshot(); + StateRoot local_root = state_snapshot.GetStateRoot(state_root.Index); + if (local_root is null || local_root.Witness != null) return false; + using var snapshot = Blockchain.Singleton.GetSnapshot(); + if (!state_root.Verify(snapshot)) return false; + if (local_root.RootHash != state_root.RootHash) return false; + state_snapshot.AddValidatedStateRoot(state_root); + state_snapshot.Commit(); + UpdateCurrentSnapshot(); + //Tell validation service + return true; + } + + public void UpdateLocalStateRoot(uint height, List change_set) + { + using StateSnapshot state_snapshot = Singleton.GetSnapshot(); + foreach (var item in change_set) + { + switch (item.State) + { + case TrackState.Added: + state_snapshot.Trie.Put(item.Key, item.Item); + break; + case TrackState.Changed: + state_snapshot.Trie.Put(item.Key, item.Item); + break; + case TrackState.Deleted: + state_snapshot.Trie.Delete(item.Key); + break; + } + } + UInt256 root_hash = state_snapshot.Trie.Root.Hash; + StateRoot state_root = new StateRoot + { + Index = height, + RootHash = root_hash, + Witness = null, + }; + state_snapshot.AddLocalStateRoot(state_root); + state_snapshot.Commit(); + UpdateCurrentSnapshot(); + CheckValidatedStateRoot(height); + } + + private void CheckValidatedStateRoot(uint index) + { + if (cache.TryGetValue(index, out StateRoot state_root)) + { + cache.Remove(index); + OnNewStateRoot(state_root); + } + } + + private void UpdateCurrentSnapshot() + { + Interlocked.Exchange(ref currentSnapshot, GetSnapshot())?.Dispose(); + } + + protected override void PostStop() + { + base.PostStop(); + } + + public static Props Props(NeoSystem core, string path) + { + return Akka.Actor.Props.Create(() => new StateStore(core, path)); + } + } +} diff --git a/tests/Neo.Plugins.StateService.Tests/MPT/UT_MPTNode.cs b/tests/Neo.Plugins.StateService.Tests/MPT/UT_MPTNode.cs index e1f83fcad..31bec7653 100644 --- a/tests/Neo.Plugins.StateService.Tests/MPT/UT_MPTNode.cs +++ b/tests/Neo.Plugins.StateService.Tests/MPT/UT_MPTNode.cs @@ -17,7 +17,7 @@ public class UT_MPTNode private byte[] NodeToArrayAsChild(MPTNode n) { using var ms = new MemoryStream(); - using var writer = new BinaryWriter(ms, Utility.StrictUTF8, true); + using var writer = new BinaryWriter(ms, Neo.Utility.StrictUTF8, true); n.SerializeAsChild(writer); writer.Flush();