From 85b3c2e1b9959f11b598c7f191c8526e13963195 Mon Sep 17 00:00:00 2001 From: Aramik Date: Thu, 9 Nov 2023 16:17:06 -0800 Subject: [PATCH] Messages v2 included migration (#1738) # Goal The goal of this PR is to propose and implement messages v2 compatible with PoV Closes #198 # Discussion - Refactored Messages to minimize used PoV - Added storage migration (single block) # Migration Details - Based on data used in rococo and main-net and calculations we don't need to do a multi-block migration. (only around 15%) of the block is being used. - Was not able to test with upgrading on local due to getting errors when running relay nodes - Was able to successfully run try-run-time cli tool against rococo # Checklist - [x] Chain spec updated - [x] Design doc(s) updated - [x] Tests added - [x] Benchmarks added - [x] Weights updated --- Cargo.lock | 1 + designdocs/message_storage_v2.md | 47 +++++ e2e/capacity/transactions.test.ts | 4 +- e2e/messages/addIPFSMessage.test.ts | 8 +- e2e/package-lock.json | 2 +- e2e/scaffolding/extrinsicHelpers.ts | 4 +- node/cli/Cargo.toml | 2 + node/cli/src/command.rs | 32 +-- pallets/messages/src/benchmarking.rs | 25 +-- pallets/messages/src/lib.rs | 112 +++++----- pallets/messages/src/migration/mod.rs | 2 + pallets/messages/src/migration/v2.rs | 143 +++++++++++++ pallets/messages/src/tests/mock.rs | 3 +- pallets/messages/src/tests/other_tests.rs | 237 +++++++++++----------- pallets/messages/src/types.rs | 11 +- runtime/common/src/constants.rs | 3 - runtime/frequency/src/lib.rs | 13 +- 17 files changed, 407 insertions(+), 242 deletions(-) create mode 100644 designdocs/message_storage_v2.md create mode 100644 pallets/messages/src/migration/mod.rs create mode 100644 pallets/messages/src/migration/v2.rs diff --git a/Cargo.lock b/Cargo.lock index efc307189b..5b97f3b262 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3715,6 +3715,7 @@ dependencies = [ "cumulus-primitives-core", "cumulus-primitives-parachain-inherent", "derive_more", + "frame-benchmarking", "frame-benchmarking-cli", "frame-support", "frame-system", diff --git a/designdocs/message_storage_v2.md b/designdocs/message_storage_v2.md new file mode 100644 index 0000000000..bb2b8c3ec5 --- /dev/null +++ b/designdocs/message_storage_v2.md @@ -0,0 +1,47 @@ +# On Chain Message Storage + +## Context and Scope +The proposed feature consists of changes that is going to be one (or more) pallet(s) in runtime of a +Substrate based blockchain, and it will be used in all environments including production. + +## Problem Statement +After introduction of **Proof of Validity** or **PoV** in runtime weights, all pallets should be +re-evaluated and refactored if necessary to minimize the usage of **PoV**. This is to ensure all +important operations are scalable. +This document tries to propose some changes on **Messages** pallet to optimize the **PoV** size. + +## Goals +- Minimizing Weights including **execution times** and **PoV** size. + +## Proposal +Storing messages on chain using **BlockNumber** and **SchemaId** and **MessageIndex** as main and secondary + and tertiary keys using [StorageNMap](https://paritytech.github.io/substrate/master/frame_support/storage/trait.StorageNMap.html) data structure provided in Substrate. + +### Main Storage types +- **MessagesV2** + - _Type_: `StorageNMap<(BlockNumber, SchemaId, MessageIndex), Message>` + - _Purpose_: Main structure To store all messages for a certain block number and schema id and + index + + +### On Chain Structure +Following is a proposed data structure for storing a Message on chain. +```rust +/// only `index` is removed from old structure +pub struct Message { + pub payload: Vec, // Serialized data in a user-defined schemas format + pub provider_key: AccountId, // Signature of the signer + pub msa_id: u64, // Message source account id (the original source of the message) +} +``` +## Description + +The idea is to use existing **whitelisted** storage with `BlockMessageIndex` type to store and get +the index of each message to be able to use it as our third key for `StorageNMap`. + +We would store each message separately into `StorageNMap` with following keys +- primary key would be `block_number` +- secondary key would be `schema_id` +- tertiary key would be the `index` of the message for current block which starts from 0 + + diff --git a/e2e/capacity/transactions.test.ts b/e2e/capacity/transactions.test.ts index 1d25e85780..9fd536f6e6 100644 --- a/e2e/capacity/transactions.test.ts +++ b/e2e/capacity/transactions.test.ts @@ -218,7 +218,7 @@ describe('Capacity Transactions', function () { const { eventMap } = await call.payWithCapacity(); assertEvent(eventMap, 'capacity.CapacityWithdrawn'); - assertEvent(eventMap, 'messages.MessagesStored'); + assertEvent(eventMap, 'messages.MessagesInBlock'); }); it('successfully pays with Capacity for eligible transaction - addOnchainMessage', async function () { @@ -227,7 +227,7 @@ describe('Capacity Transactions', function () { const call = ExtrinsicHelper.addOnChainMessage(capacityKeys, dummySchemaId, '0xdeadbeef'); const { eventMap } = await call.payWithCapacity(); assertEvent(eventMap, 'capacity.CapacityWithdrawn'); - assertEvent(eventMap, 'messages.MessagesStored'); + assertEvent(eventMap, 'messages.MessagesInBlock'); const get = await ExtrinsicHelper.apiPromise.rpc.messages.getBySchemaId(dummySchemaId, { from_block: starting_block, from_index: 0, diff --git a/e2e/messages/addIPFSMessage.test.ts b/e2e/messages/addIPFSMessage.test.ts index 02ebf64482..f88d028cec 100644 --- a/e2e/messages/addIPFSMessage.test.ts +++ b/e2e/messages/addIPFSMessage.test.ts @@ -107,9 +107,7 @@ describe('Add Offchain Message', function () { const f = ExtrinsicHelper.addIPFSMessage(keys, schemaId, ipfs_cid_64, ipfs_payload_len); const { target: event } = await f.fundAndSend(fundingSource); - assert.notEqual(event, undefined, 'should have returned a MessagesStored event'); - assert.deepEqual(event?.data.schemaId, schemaId, 'schema ids should be equal'); - assert.notEqual(event?.data.blockNumber, undefined, 'should have a block number'); + assert.notEqual(event, undefined, 'should have returned a MessagesInBlock event'); }); it('should successfully retrieve added message and returned CID should have Base32 encoding', async function () { @@ -130,9 +128,7 @@ describe('Add Offchain Message', function () { const f = ExtrinsicHelper.addOnChainMessage(keys, dummySchemaId, '0xdeadbeef'); const { target: event } = await f.fundAndSend(fundingSource); - assert.notEqual(event, undefined, 'should have returned a MessagesStored event'); - assert.deepEqual(event?.data.schemaId, dummySchemaId, 'schema ids should be equal'); - assert.notEqual(event?.data.blockNumber, undefined, 'should have a block number'); + assert.notEqual(event, undefined, 'should have returned a MessagesInBlock event'); const get = await ExtrinsicHelper.apiPromise.rpc.messages.getBySchemaId(dummySchemaId, { from_block: starting_block, diff --git a/e2e/package-lock.json b/e2e/package-lock.json index 6c3f0f1131..f30e18810d 100644 --- a/e2e/package-lock.json +++ b/e2e/package-lock.json @@ -260,7 +260,7 @@ "node_modules/@frequency-chain/api-augment": { "version": "0.0.0", "resolved": "file:../js/api-augment/dist/frequency-chain-api-augment-0.0.0.tgz", - "integrity": "sha512-wcyYIFMu8I2RiqEs664Acp+IdltgIXSi/5VL7WB64YYt3b9krI6CkXst757sBd0aDuQhIjHX35UOaaOyNdZMvw==", + "integrity": "sha512-y5oeksTwmIpVJgZCWj7D+yVoN4TZggsMA5Gv9YmV5DCgCdpXiF/JQ/DcfEs4JUYIlB/P/ccLJkj4x+TJCYhPoA==", "license": "Apache-2.0", "dependencies": { "@polkadot/api": "^10.9.1", diff --git a/e2e/scaffolding/extrinsicHelpers.ts b/e2e/scaffolding/extrinsicHelpers.ts index c794d3eb6b..3ab94997b1 100644 --- a/e2e/scaffolding/extrinsicHelpers.ts +++ b/e2e/scaffolding/extrinsicHelpers.ts @@ -483,7 +483,7 @@ export class ExtrinsicHelper { return new Extrinsic( () => ExtrinsicHelper.api.tx.messages.addIpfsMessage(schemaId, cid, payload_length), keys, - ExtrinsicHelper.api.events.messages.MessagesStored + ExtrinsicHelper.api.events.messages.MessagesInBlock ); } @@ -668,7 +668,7 @@ export class ExtrinsicHelper { return new Extrinsic( () => ExtrinsicHelper.api.tx.messages.addOnchainMessage(null, schemaId, payload), keys, - ExtrinsicHelper.api.events.messages.MessagesStored + ExtrinsicHelper.api.events.messages.MessagesInBlock ); } diff --git a/node/cli/Cargo.toml b/node/cli/Cargo.toml index cfe2e5b224..4e111f3857 100644 --- a/node/cli/Cargo.toml +++ b/node/cli/Cargo.toml @@ -29,6 +29,7 @@ cli-opt = { default-features = false, path = "../cli-opt" } # Substrate frame-benchmarking-cli = { git = "https://github.com/paritytech/polkadot-sdk", optional = true, branch = "release-polkadot-v1.1.0" } +frame-benchmarking = { git = "https://github.com/paritytech/polkadot-sdk", optional = true, branch = "release-polkadot-v1.1.0" } frame-support = { git = "https://github.com/paritytech/polkadot-sdk", default-features = false, branch = "release-polkadot-v1.1.0" } frame-system = { git = "https://github.com/paritytech/polkadot-sdk", default-features = false, branch = "release-polkadot-v1.1.0" } pallet-balances = { git = "https://github.com/paritytech/polkadot-sdk", default-features = false, branch = "release-polkadot-v1.1.0" } @@ -70,6 +71,7 @@ cli = [ "sc-cli", "sc-service", "frame-benchmarking-cli", + "frame-benchmarking", "try-runtime-cli" ] default = ["std", "cli"] diff --git a/node/cli/src/command.rs b/node/cli/src/command.rs index 3912022cc7..22536f0e0e 100644 --- a/node/cli/src/command.rs +++ b/node/cli/src/command.rs @@ -371,22 +371,24 @@ pub fn run() -> Result<()> { #[cfg(feature = "try-runtime")] Some(Subcommand::TryRuntime(cmd)) => { - use sc_executor::{sp_wasm_interface::ExtendedHostFunctions, NativeExecutionDispatch}; + use common_runtime::constants::MILLISECS_PER_BLOCK; + use try_runtime_cli::block_building_info::timestamp_with_aura_info; + let runner = cli.create_runner(cmd)?; - runner.async_run(|config| { - // we don't need any of the components of new_partial, just a runtime, or a task - // manager to do `async_run`. - let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry); - let task_manager = - sc_service::TaskManager::new(config.tokio_handle.clone(), registry) - .map_err(|e| sc_cli::Error::Service(sc_service::Error::Prometheus(e)))?; - Ok(( - cmd.run::::ExtendHostFunctions, - >>(), - task_manager, - )) + + type HostFunctions = + (sp_io::SubstrateHostFunctions, frame_benchmarking::benchmarking::HostFunctions); + + // grab the task manager. + let registry = &runner.config().prometheus_config.as_ref().map(|cfg| &cfg.registry); + let task_manager = + sc_service::TaskManager::new(runner.config().tokio_handle.clone(), *registry) + .map_err(|e| format!("Error: {:?}", e))?; + + let info_provider = timestamp_with_aura_info(MILLISECS_PER_BLOCK); + + runner.async_run(|_| { + Ok((cmd.run::(Some(info_provider)), task_manager)) }) }, Some(Subcommand::ExportRuntimeVersion(cmd)) => { diff --git a/pallets/messages/src/benchmarking.rs b/pallets/messages/src/benchmarking.rs index 44ef4466a1..632bad471c 100644 --- a/pallets/messages/src/benchmarking.rs +++ b/pallets/messages/src/benchmarking.rs @@ -15,6 +15,7 @@ use sp_runtime::traits::One; const IPFS_SCHEMA_ID: u16 = 50; const IPFS_PAYLOAD_LENGTH: u32 = 10; +const MAX_MESSAGES_IN_BLOCK: u32 = 500; fn onchain_message(schema_id: SchemaId) -> DispatchResult { let message_source_id = DelegatorId(1); @@ -62,8 +63,6 @@ fn create_schema(location: PayloadLocation) -> DispatchResult { } benchmarks! { - // this is temporary to avoid massive PoV sizes which will break the chain until rework on messages - #[pov_mode = Measured] add_onchain_message { let n in 0 .. T::MessagesMaxPayloadSizeBytes::get() - 1; let message_source_id = DelegatorId(2); @@ -78,21 +77,17 @@ benchmarks! { assert_ok!(T::MsaBenchmarkHelper::set_delegation_relationship(ProviderId(1), message_source_id.into(), [schema_id].to_vec())); let payload = vec![1; n as usize]; - let average_messages_per_block: u32 = T::MaxMessagesPerBlock::get() / 2; - for j in 1 .. average_messages_per_block { + for j in 1 .. MAX_MESSAGES_IN_BLOCK { assert_ok!(onchain_message::(schema_id)); } }: _ (RawOrigin::Signed(caller), Some(message_source_id.into()), schema_id, payload) verify { - assert_eq!( - MessagesPallet::::get_messages( - BlockNumberFor::::one(), schema_id).len(), - average_messages_per_block as usize + assert_eq!(MessagesPallet::::get_messages_by_schema_and_block( + schema_id, PayloadLocation::OnChain, BlockNumberFor::::one()).len(), + MAX_MESSAGES_IN_BLOCK as usize ); } - // this is temporary to avoid massive PoV sizes which will break the chain until rework on messages - #[pov_mode = Measured] add_ipfs_message { let caller: T::AccountId = whitelisted_caller(); let cid = "bafkreidgvpkjawlxz6sffxzwgooowe5yt7i6wsyg236mfoks77nywkptdq".as_bytes().to_vec(); @@ -102,16 +97,14 @@ benchmarks! { assert_ok!(create_schema::(PayloadLocation::IPFS)); } assert_ok!(T::MsaBenchmarkHelper::add_key(ProviderId(1).into(), caller.clone())); - let average_messages_per_block: u32 = T::MaxMessagesPerBlock::get() / 2; - for j in 1 .. average_messages_per_block { + for j in 1 .. MAX_MESSAGES_IN_BLOCK { assert_ok!(ipfs_message::(IPFS_SCHEMA_ID)); } }: _ (RawOrigin::Signed(caller),IPFS_SCHEMA_ID, cid, IPFS_PAYLOAD_LENGTH) verify { - assert_eq!( - MessagesPallet::::get_messages( - BlockNumberFor::::one(), IPFS_SCHEMA_ID).len(), - average_messages_per_block as usize + assert_eq!(MessagesPallet::::get_messages_by_schema_and_block( + IPFS_SCHEMA_ID, PayloadLocation::IPFS, BlockNumberFor::::one()).len(), + MAX_MESSAGES_IN_BLOCK as usize ); } diff --git a/pallets/messages/src/lib.rs b/pallets/messages/src/lib.rs index e89010827f..e87677cc8b 100644 --- a/pallets/messages/src/lib.rs +++ b/pallets/messages/src/lib.rs @@ -46,6 +46,8 @@ #[cfg(feature = "runtime-benchmarks")] mod benchmarking; +/// migration module +pub mod migration; #[cfg(test)] mod tests; @@ -77,11 +79,16 @@ pub use weights::*; use cid::Cid; use frame_system::pallet_prelude::*; +const LOG_TARGET: &str = "runtime::messages"; + #[frame_support::pallet] pub mod pallet { use super::*; use frame_support::pallet_prelude::*; + /// The current storage version. + pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(2); + #[pallet::config] pub trait Config: frame_system::Config { /// The overarching event type. @@ -99,10 +106,6 @@ pub mod pallet { /// A type that will supply schema related information. type SchemaProvider: SchemaProvider; - /// The maximum number of messages in a block. - #[pallet::constant] - type MaxMessagesPerBlock: Get; - /// The maximum size of a message payload bytes. #[pallet::constant] type MessagesMaxPayloadSizeBytes: Get + Clone + Debug + MaxEncodedLen; @@ -117,33 +120,32 @@ pub mod pallet { } #[pallet::pallet] + #[pallet::storage_version(STORAGE_VERSION)] pub struct Pallet(_); - /// A permanent storage for messages mapped by block number and schema id. - /// - Keys: BlockNumber, Schema Id - /// - Value: List of Messages - #[pallet::storage] - #[pallet::getter(fn get_messages)] - pub(super) type Messages = StorageDoubleMap< - _, - Twox64Concat, - BlockNumberFor, - Twox64Concat, - SchemaId, - BoundedVec, T::MaxMessagesPerBlock>, - ValueQuery, - >; - /// A temporary storage for getting the index for messages /// At the start of the next block this storage is set to 0 #[pallet::storage] #[pallet::whitelist_storage] #[pallet::getter(fn get_message_index)] - pub(super) type BlockMessageIndex = StorageValue<_, u16, ValueQuery>; + pub(super) type BlockMessageIndex = StorageValue<_, MessageIndex, ValueQuery>; + + #[pallet::storage] + #[pallet::getter(fn get_messages_v2)] + pub(super) type MessagesV2 = StorageNMap< + _, + ( + storage::Key>, + storage::Key, + storage::Key, + ), + Message, + OptionQuery, + >; #[pallet::error] pub enum Error { - /// Too many messages are added to existing block + /// Deprecated: Too many messages are added to existing block TooManyMessagesInBlock, /// Message payload size is too large @@ -174,6 +176,7 @@ pub mod pallet { #[pallet::event] #[pallet::generate_deposit(pub(super) fn deposit_event)] pub enum Event { + /// Deprecated: please use [`Event::MessagesInBlock`] /// Messages are stored for a specified schema id and block number MessagesStored { /// The schema for these messages @@ -181,6 +184,8 @@ pub mod pallet { /// The block number for these messages block_number: BlockNumberFor, }, + /// Messages stored in the current block + MessagesInBlock, } #[pallet::hooks] @@ -202,14 +207,13 @@ pub mod pallet { /// The actual message content will be on IPFS. /// /// # Events - /// * [`Event::MessagesStored`] - In the next block + /// * [`Event::MessagesInBlock`] - Messages Stored in the block /// /// # Errors /// * [`Error::ExceedsMaxMessagePayloadSizeBytes`] - Payload is too large /// * [`Error::InvalidSchemaId`] - Schema not found /// * [`Error::InvalidPayloadLocation`] - The schema is not an IPFS payload location /// * [`Error::InvalidMessageSourceAccount`] - Origin must be from an MSA - /// * [`Error::TooManyMessagesInBlock`] - Block is full of messages already /// * [`Error::TypeConversionOverflow`] - Failed to add the message to storage as it is very full /// * [`Error::UnsupportedCidVersion`] - CID version is not supported (V0) /// * [`Error::InvalidCid`] - Unable to parse provided CID @@ -245,10 +249,7 @@ pub mod pallet { schema_id, current_block, )? { - Self::deposit_event(Event::MessagesStored { - schema_id, - block_number: current_block, - }); + Self::deposit_event(Event::MessagesInBlock); } Ok(()) } else { @@ -259,7 +260,7 @@ pub mod pallet { /// Add an on-chain message for a given schema id. /// /// # Events - /// * [`Event::MessagesStored`] - In the next block + /// * [`Event::MessagesInBlock`] - In the next block /// /// # Errors /// * [`Error::ExceedsMaxMessagePayloadSizeBytes`] - Payload is too large @@ -267,7 +268,6 @@ pub mod pallet { /// * [`Error::InvalidPayloadLocation`] - The schema is not an IPFS payload location /// * [`Error::InvalidMessageSourceAccount`] - Origin must be from an MSA /// * [`Error::UnAuthorizedDelegate`] - Trying to add a message without a proper delegation between the origin and the on_behalf_of MSA - /// * [`Error::TooManyMessagesInBlock`] - Block is full of messages already /// * [`Error::TypeConversionOverflow`] - Failed to add the message to storage as it is very full /// #[pallet::call_index(1)] @@ -316,10 +316,7 @@ pub mod pallet { schema_id, current_block, )? { - Self::deposit_event(Event::MessagesStored { - schema_id, - block_number: current_block, - }); + Self::deposit_event(Event::MessagesInBlock); } Ok(()) @@ -334,7 +331,6 @@ impl Pallet { /// Stores a message for a given schema id. /// returns true if it needs to emit an event /// # Errors - /// * [`Error::TooManyMessagesInBlock`] /// * [`Error::TypeConversionOverflow`] /// pub fn add_message( @@ -344,28 +340,17 @@ impl Pallet { schema_id: SchemaId, current_block: BlockNumberFor, ) -> Result { - >::try_mutate( - current_block, - schema_id, - |existing_messages| -> Result { - // first message for any schema_id is going to trigger an event - let need_event = existing_messages.len() == 0; - let index = BlockMessageIndex::::get(); - let msg = Message { - payload, // size is checked on top of extrinsic - provider_msa_id, - msa_id, - index, - }; - - existing_messages - .try_push(msg) - .map_err(|_| Error::::TooManyMessagesInBlock)?; - - BlockMessageIndex::::put(index.saturating_add(1)); - Ok(need_event) - }, - ) + let index = BlockMessageIndex::::get(); + let first = index == 0; + let msg = Message { + payload, // size is checked on top of extrinsic + provider_msa_id, + msa_id, + }; + + >::insert((current_block, schema_id, index), msg); + BlockMessageIndex::::set(index.saturating_add(1)); + Ok(first) } /// Resolve an MSA from an account key(key) @@ -394,12 +379,15 @@ impl Pallet { match schema_payload_location { PayloadLocation::Itemized | PayloadLocation::Paginated => return Vec::new(), - _ => - return >::get(block_number, schema_id) - .into_inner() - .iter() - .map(|msg| msg.map_to_response(block_number_value, schema_payload_location)) - .collect(), + _ => { + let mut messages: Vec<_> = >::iter_prefix((block_number, schema_id)) + .map(|(index, msg)| { + msg.map_to_response(block_number_value, schema_payload_location, index) + }) + .collect(); + messages.sort_by(|a, b| a.index.cmp(&b.index)); + return messages + }, } } diff --git a/pallets/messages/src/migration/mod.rs b/pallets/messages/src/migration/mod.rs new file mode 100644 index 0000000000..c34354a101 --- /dev/null +++ b/pallets/messages/src/migration/mod.rs @@ -0,0 +1,2 @@ +/// migrations to v2 +pub mod v2; diff --git a/pallets/messages/src/migration/v2.rs b/pallets/messages/src/migration/v2.rs new file mode 100644 index 0000000000..ff33b5f418 --- /dev/null +++ b/pallets/messages/src/migration/v2.rs @@ -0,0 +1,143 @@ +use crate::{BlockNumberFor, Config, Message, MessagesV2, Pallet, SchemaId, LOG_TARGET}; +use frame_support::{pallet_prelude::*, storage_alias, traits::OnRuntimeUpgrade, weights::Weight}; +use log; +use sp_runtime::Saturating; + +#[cfg(feature = "try-runtime")] +use sp_runtime::TryRuntimeError; +#[cfg(feature = "try-runtime")] +use sp_std::vec::Vec; + +/// old structures and storages +pub mod old { + use super::*; + use common_primitives::msa::MessageSourceId; + use sp_std::fmt::Debug; + + /// old message structure that was stored + #[derive(Default, Encode, Decode, PartialEq, Debug, TypeInfo, Eq, MaxEncodedLen)] + #[scale_info(skip_type_params(MaxDataSize))] + #[codec(mel_bound(MaxDataSize: MaxEncodedLen))] + pub struct OldMessage + where + MaxDataSize: Get + Debug, + { + /// Data structured by the associated schema's model. + pub payload: BoundedVec, + /// Message source account id of the Provider. This may be the same id as contained in `msa_id`, + /// indicating that the original source MSA is acting as its own provider. An id differing from that + /// of `msa_id` indicates that `provider_msa_id` was delegated by `msa_id` to send this message on + /// its behalf. + pub provider_msa_id: MessageSourceId, + /// Message source account id (the original source). + pub msa_id: Option, + /// Stores index of message in block to keep total order. + pub index: u16, + } + + /// old permanent storage for messages mapped by block number and schema id. + #[storage_alias] + pub(crate) type Messages = StorageDoubleMap< + Pallet, + Twox64Concat, + BlockNumberFor, + Twox64Concat, + SchemaId, + BoundedVec< + OldMessage<::MessagesMaxPayloadSizeBytes>, + ConstU32<200>, + >, + ValueQuery, + >; +} +/// migration to v2 implementation +pub struct MigrateToV2(PhantomData); + +impl OnRuntimeUpgrade for MigrateToV2 { + fn on_runtime_upgrade() -> Weight { + migrate_to_v2::() + } + + #[cfg(feature = "try-runtime")] + fn pre_upgrade() -> Result, TryRuntimeError> { + use frame_support::storage::generator::StorageDoubleMap; + log::info!(target: LOG_TARGET, "Running pre_upgrade..."); + + let pallet_prefix = old::Messages::::module_prefix(); + let storage_prefix = old::Messages::::storage_prefix(); + assert_eq!(&b"Messages"[..], pallet_prefix); + assert_eq!(&b"Messages"[..], storage_prefix); + + let mut count = 0u32; + for (_, _, messages) in old::Messages::::iter() { + count += messages.len() as u32; + } + log::info!(target: LOG_TARGET, "Finish pre_upgrade for {:?}", count); + Ok(count.encode()) + } + + #[cfg(feature = "try-runtime")] + fn post_upgrade(state: Vec) -> Result<(), TryRuntimeError> { + log::info!(target: LOG_TARGET, "Running post_upgrade..."); + + let old_count: u32 = Decode::decode(&mut state.as_slice()) + .expect("the state parameter should be something that was generated by pre_upgrade"); + + let count = old::Messages::::iter().count(); + let moved_count = MessagesV2::::iter().count(); + + log::info!(target: LOG_TARGET, "Finish post_upgrade for {:?}", moved_count); + let onchain_version = Pallet::::on_chain_storage_version(); + + assert_eq!(count, 0usize); + assert_eq!(moved_count, old_count as usize); + assert_eq!(onchain_version, crate::pallet::STORAGE_VERSION); + Ok(()) + } +} +/// migrating to v2 +pub fn migrate_to_v2() -> Weight { + log::info!(target: LOG_TARGET, "Running storage migration..."); + let onchain_version = Pallet::::on_chain_storage_version(); + let current_version = Pallet::::current_storage_version(); + log::info!(target: LOG_TARGET, "onchain_version= {:?}, current_version={:?}", onchain_version, current_version); + + if onchain_version < 2 { + let mut reads = 1u64; + let mut writes = 0u64; + let mut bytes = 0u64; + for (block_number, schema_id, messages) in old::Messages::::drain() { + bytes = bytes.saturating_add(messages.encode().len() as u64); + + for message in &messages { + let new_msg = Message { + provider_msa_id: message.provider_msa_id, + msa_id: message.msa_id, + payload: message.payload.clone(), + }; + bytes = bytes.saturating_add(new_msg.encode().len() as u64); + MessagesV2::::insert((block_number, schema_id, message.index), new_msg); + } + + reads.saturating_inc(); + writes = writes.saturating_add(messages.len() as u64 + 1); + } + + // Set storage version to `2`. + StorageVersion::new(2).put::>(); + writes.saturating_inc(); + + log::info!(target: LOG_TARGET, "Storage migrated to version 2 read={:?}, write={:?}, bytes={:?}", reads, writes, bytes); + let weights = T::DbWeight::get().reads_writes(reads, writes).add_proof_size(bytes); + log::info!(target: LOG_TARGET, "Migration Calculated weights={:?}",weights); + weights + } else { + log::info!( + target: LOG_TARGET, + "Migration did not execute. This probably should be removed onchain:{:?}, current:{:?}", + onchain_version, + current_version + ); + T::DbWeight::get().reads(1) + } +} diff --git a/pallets/messages/src/tests/mock.rs b/pallets/messages/src/tests/mock.rs index afcafdbde5..9842e59016 100644 --- a/pallets/messages/src/tests/mock.rs +++ b/pallets/messages/src/tests/mock.rs @@ -67,7 +67,6 @@ impl system::Config for Test { type MaxConsumers = ConstU32<16>; } -pub type MaxMessagesPerBlock = ConstU32<500>; pub type MaxSchemaGrantsPerDelegation = ConstU32<30>; // Needs parameter_types! for the impls below @@ -79,6 +78,7 @@ parameter_types! { // Take care when adding new tests for on-chain (not IPFS) messages that the payload // is not too big. pub const MessagesMaxPayloadSizeBytes: u32 = 73; + } impl std::fmt::Debug for MessagesMaxPayloadSizeBytes { @@ -221,7 +221,6 @@ impl pallet_messages::Config for Test { type SchemaGrantValidator = SchemaGrantValidationHandler; type SchemaProvider = SchemaHandler; type WeightInfo = (); - type MaxMessagesPerBlock = MaxMessagesPerBlock; type MessagesMaxPayloadSizeBytes = MessagesMaxPayloadSizeBytes; /// A set of helper functions for benchmarking. diff --git a/pallets/messages/src/tests/other_tests.rs b/pallets/messages/src/tests/other_tests.rs index 9546f2fb28..8ccc31dde2 100644 --- a/pallets/messages/src/tests/other_tests.rs +++ b/pallets/messages/src/tests/other_tests.rs @@ -1,16 +1,23 @@ use crate::{ - tests::mock::*, BlockMessageIndex, Config, Error, Event as MessageEvent, Message, Messages, + migration::{v2, v2::old::OldMessage}, + tests::mock::*, + BlockMessageIndex, Error, Event as MessageEvent, Message, MessagesV2, }; use codec::Encode; use common_primitives::{messages::MessageResponse, schema::*}; -use frame_support::{assert_err, assert_noop, assert_ok, traits::OnInitialize, BoundedVec}; +use frame_support::{ + assert_err, assert_noop, assert_ok, + pallet_prelude::{GetStorageVersion, StorageVersion}, + traits::OnInitialize, + BoundedVec, +}; use frame_system::{EventRecord, Phase}; use multibase::Base; #[allow(unused_imports)] use pretty_assertions::{assert_eq, assert_ne, assert_str_eq}; use rand::Rng; use serde::Serialize; -use sp_core::{ConstU32, Get}; +use sp_core::ConstU32; use sp_std::vec::Vec; #[derive(Serialize)] @@ -54,18 +61,17 @@ fn populate_messages( let mut counter = 0; for (idx, count) in message_per_block.iter().enumerate() { - let mut list = BoundedVec::default(); for _ in 0..*count { - list.try_push(Message { - msa_id: Some(10), - payload: payload.clone().try_into().unwrap(), - index: counter, - provider_msa_id: 1, - }) - .unwrap(); + MessagesV2::::set( + (idx as u32, schema_id, counter), + Some(Message { + msa_id: Some(10), + payload: payload.clone().try_into().unwrap(), + provider_msa_id: 1, + }), + ); counter += 1; } - Messages::::insert(idx as u32, schema_id, list); } } @@ -127,61 +133,46 @@ fn add_message_should_store_message_in_storage() { )); // assert messages - let list1 = Messages::::get(1, schema_id_1).into_inner(); - let list2 = Messages::::get(1, schema_id_2).into_inner(); - assert_eq!(list1.len(), 1); - assert_eq!(list2.len(), 2); + let msg1 = MessagesV2::::get((1, schema_id_1, 0u16)); + let msg2 = MessagesV2::::get((1, schema_id_2, 1u16)); + let msg3 = MessagesV2::::get((1, schema_id_2, 2u16)); assert_eq!( - list1[0], - Message { + msg1, + Some(Message { msa_id: Some(get_msa_from_account(caller_1)), payload: message_payload_1.try_into().unwrap(), - index: 0, provider_msa_id: get_msa_from_account(caller_1) - } + }) ); assert_eq!( - list2, - vec![ - Message { - msa_id: Some(get_msa_from_account(caller_2)), - payload: message_payload_2.try_into().unwrap(), - index: 1, - provider_msa_id: get_msa_from_account(caller_2) - }, - Message { - msa_id: Some(get_msa_from_account(caller_2)), - payload: message_payload_3.try_into().unwrap(), - index: 2, - provider_msa_id: get_msa_from_account(caller_2) - }, - ] + msg2, + Some(Message { + msa_id: Some(get_msa_from_account(caller_2)), + payload: message_payload_2.try_into().unwrap(), + provider_msa_id: get_msa_from_account(caller_2) + }) + ); + + assert_eq!( + msg3, + Some(Message { + msa_id: Some(get_msa_from_account(caller_2)), + payload: message_payload_3.try_into().unwrap(), + provider_msa_id: get_msa_from_account(caller_2) + }) ); // assert events - let events_occured = System::events(); + let events_occurred = System::events(); assert_eq!( - events_occured, - vec![ - EventRecord { - phase: Phase::Initialization, - event: RuntimeEvent::MessagesPallet(MessageEvent::MessagesStored { - block_number: 1, - schema_id: schema_id_1, - }), - topics: vec![] - }, - EventRecord { - phase: Phase::Initialization, - event: RuntimeEvent::MessagesPallet(MessageEvent::MessagesStored { - block_number: 1, - schema_id: schema_id_2, - }), - topics: vec![] - }, - ] + events_occurred, + vec![EventRecord { + phase: Phase::Initialization, + event: RuntimeEvent::MessagesPallet(MessageEvent::MessagesInBlock), + topics: vec![] + },] ); }); } @@ -248,63 +239,6 @@ fn add_ipfs_message_with_invalid_msa_account_errors() { }); } -#[test] -fn add_message_with_maxed_out_storage_errors() { - new_test_ext().execute_with(|| { - // arrange - let caller_1 = 5; - let schema_id_1: SchemaId = 1; - let message_payload_1 = generate_payload(1, None); - - // act - for _ in 0..::MaxMessagesPerBlock::get() { - assert_ok!(MessagesPallet::add_onchain_message( - RuntimeOrigin::signed(caller_1), - None, - schema_id_1, - message_payload_1.clone() - )); - } - assert_noop!( - MessagesPallet::add_onchain_message( - RuntimeOrigin::signed(caller_1), - None, - schema_id_1, - message_payload_1 - ), - Error::::TooManyMessagesInBlock - ); - }); -} - -#[test] -fn add_ipfs_message_with_maxed_out_storage_errors() { - new_test_ext().execute_with(|| { - // arrange - let caller_1 = 5; - let schema_id_1: SchemaId = IPFS_SCHEMA_ID; - - // act - for _ in 0..::MaxMessagesPerBlock::get() { - assert_ok!(MessagesPallet::add_ipfs_message( - RuntimeOrigin::signed(caller_1), - schema_id_1, - DUMMY_CID_BASE32.to_vec(), - 15 - )); - } - assert_noop!( - MessagesPallet::add_ipfs_message( - RuntimeOrigin::signed(caller_1), - schema_id_1, - DUMMY_CID_BASE32.to_vec(), - 15 - ), - Error::::TooManyMessagesInBlock - ); - }); -} - /// Assert that MessageResponse for IPFS messages returns the payload_length of the offchain message. #[test] fn get_messages_by_schema_with_ipfs_payload_location_should_return_offchain_payload_length() { @@ -369,9 +303,8 @@ fn get_messages_by_schema_with_ipfs_payload_location_should_fail_bad_schema() { .unwrap(), msa_id: Some(0), provider_msa_id: 1, - index: 0, }; - let mapped_response = bad_message.map_to_response(0, PayloadLocation::IPFS); + let mapped_response = bad_message.map_to_response(0, PayloadLocation::IPFS, 0); assert_eq!( mapped_response.cid, Some(multibase::encode(Base::Base32Lower, Vec::new()).as_bytes().to_vec()) @@ -399,8 +332,8 @@ fn add_message_via_non_delegate_should_fail() { ); // assert - let list = Messages::::get(1, schema_id_1).into_inner(); - assert_eq!(list.len(), 0); + let msg = MessagesV2::::get((1, schema_id_1, 0)); + assert_eq!(msg, None); }); } @@ -671,8 +604,7 @@ fn validate_cid_unwrap_panics() { fn map_to_response_on_chain() { let payload_vec = b"123456789012345678901234567890".to_vec(); let payload_bounded = BoundedVec::>::try_from(payload_vec.clone()).unwrap(); - let msg = - Message { payload: payload_bounded, provider_msa_id: 10u64, msa_id: None, index: 1u16 }; + let msg = Message { payload: payload_bounded, provider_msa_id: 10u64, msa_id: None }; let expected = MessageResponse { provider_msa_id: 10u64, index: 1u16, @@ -682,7 +614,7 @@ fn map_to_response_on_chain() { cid: None, payload_length: None, }; - assert_eq!(msg.map_to_response(42, PayloadLocation::OnChain), expected); + assert_eq!(msg.map_to_response(42, PayloadLocation::OnChain, 1), expected); } #[test] @@ -690,7 +622,7 @@ fn map_to_response_ipfs() { let cid = DUMMY_CID_SHA512; let payload_tuple: crate::OffchainPayloadType = (multibase::decode(cid).unwrap().1, 10); let payload = BoundedVec::>::try_from(payload_tuple.encode()).unwrap(); - let msg = Message { payload, provider_msa_id: 10u64, msa_id: None, index: 1u16 }; + let msg = Message { payload, provider_msa_id: 10u64, msa_id: None }; let expected = MessageResponse { provider_msa_id: 10u64, index: 1u16, @@ -700,5 +632,68 @@ fn map_to_response_ipfs() { cid: Some(cid.as_bytes().to_vec()), payload_length: Some(10), }; - assert_eq!(msg.map_to_response(42, PayloadLocation::IPFS), expected); + assert_eq!(msg.map_to_response(42, PayloadLocation::IPFS, 1), expected); +} + +#[test] +fn migration_to_v2_should_work_as_expected() { + new_test_ext().execute_with(|| { + // Setup + let schema_id: SchemaId = IPFS_SCHEMA_ID; + let cid = &DUMMY_CID_BASE32[..]; + let message_per_block = vec![3, 4, 5, 6]; + let payload = ( + multibase::decode(sp_std::str::from_utf8(cid).unwrap()).unwrap().1, + IPFS_PAYLOAD_LENGTH, + ) + .encode(); + + let mut counter = 0; + for (idx, count) in message_per_block.iter().enumerate() { + let mut list = BoundedVec::default(); + for _ in 0..*count { + list.try_push(OldMessage { + msa_id: Some(10), + payload: payload.clone().try_into().unwrap(), + index: counter, + provider_msa_id: 1, + }) + .unwrap(); + counter += 1; + } + v2::old::Messages::::insert(idx as u32, schema_id, list); + } + + let _ = v2::migrate_to_v2::(); + + let old_count = v2::old::Messages::::iter().count(); + let new_count = MessagesV2::::iter().count(); + let current_version = MessagesPallet::current_storage_version(); + + assert_eq!(old_count, 0); + assert_eq!(new_count, message_per_block.iter().sum::()); + assert_eq!(current_version, StorageVersion::new(2)); + + let mut total_index = 0u16; + for (block, count) in message_per_block.iter().enumerate() { + for _ in 0..*count { + assert!(MessagesV2::::get((block as u32, schema_id, total_index)).is_some()); + total_index += 1; + } + // should not exist + assert!(MessagesV2::::get((block as u32, schema_id, total_index)).is_none()); + } + }); +} + +#[test] +fn migration_to_v2_should_have_correct_prefix() { + new_test_ext().execute_with(|| { + use frame_support::storage::generator::StorageDoubleMap; + let pallet_prefix = v2::old::Messages::::module_prefix(); + let storage_prefix = v2::old::Messages::::storage_prefix(); + + assert_eq!(&b"MessagesPallet"[..], pallet_prefix); + assert_eq!(&b"Messages"[..], storage_prefix); + }); } diff --git a/pallets/messages/src/types.rs b/pallets/messages/src/types.rs index bf5deab6b5..a63a8139ff 100644 --- a/pallets/messages/src/types.rs +++ b/pallets/messages/src/types.rs @@ -9,6 +9,8 @@ use sp_std::{fmt::Debug, prelude::*}; /// Payloads stored offchain contain a tuple of (bytes(the payload reference), payload length). pub type OffchainPayloadType = (Vec, u32); +/// Index of message in the block +pub type MessageIndex = u16; /// A single message type definition. #[derive(Default, Encode, Decode, PartialEq, Debug, TypeInfo, Eq, MaxEncodedLen)] @@ -27,8 +29,6 @@ where pub provider_msa_id: MessageSourceId, /// Message source account id (the original source). pub msa_id: Option, - /// Stores index of message in block to keep total order. - pub index: u16, } impl Message @@ -40,11 +40,12 @@ where &self, block_number: BlockNumber, payload_location: PayloadLocation, + index: u16, ) -> MessageResponse { match payload_location { PayloadLocation::OnChain => MessageResponse { provider_msa_id: self.provider_msa_id, - index: self.index, + index, block_number, msa_id: self.msa_id, payload: Some(self.payload.to_vec()), @@ -56,7 +57,7 @@ where OffchainPayloadType::decode(&mut &self.payload[..]).unwrap_or_default(); MessageResponse { provider_msa_id: self.provider_msa_id, - index: self.index, + index, block_number, cid: Some(multibase::encode(Base::Base32Lower, binary_cid).as_bytes().to_vec()), payload_length: Some(payload_length), @@ -66,7 +67,7 @@ where }, // Message types of Itemized and Paginated are retrieved differently _ => MessageResponse { provider_msa_id: self.provider_msa_id, - index: self.index, + index, block_number, msa_id: None, payload: None, diff --git a/runtime/common/src/constants.rs b/runtime/common/src/constants.rs index 3d86b63bd5..24cec370f4 100644 --- a/runtime/common/src/constants.rs +++ b/runtime/common/src/constants.rs @@ -268,9 +268,6 @@ parameter_types! { // -end- Collator Selection Pallet --- // --- Messages Pallet --- -/// The maximum number of messages per block -pub type MessagesMaxPerBlock = ConstU32<200>; - impl Clone for MessagesMaxPayloadSizeBytes { fn clone(&self) -> Self { MessagesMaxPayloadSizeBytes {} diff --git a/runtime/frequency/src/lib.rs b/runtime/frequency/src/lib.rs index b552628a46..0f78d6a7ed 100644 --- a/runtime/frequency/src/lib.rs +++ b/runtime/frequency/src/lib.rs @@ -79,7 +79,7 @@ pub use common_runtime::{ use frame_support::traits::Contains; #[cfg(feature = "try-runtime")] -use frame_support::traits::TryStateSelect; +use frame_support::traits::{TryStateSelect, UpgradeCheckSelect}; /// Interface to collective pallet to propose a proposal. pub struct CouncilProposalProvider; @@ -220,6 +220,7 @@ pub type Executive = frame_executive::Executive< frame_system::ChainContext, Runtime, AllPalletsWithSystem, + (pallet_messages::migration::v2::MigrateToV2,), >; /// Opaque types. These are used by the CLI to instantiate machinery that don't need to know @@ -257,7 +258,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { spec_name: create_runtime_str!("frequency"), impl_name: create_runtime_str!("frequency"), authoring_version: 1, - spec_version: 60, + spec_version: 61, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 1, @@ -271,7 +272,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { spec_name: create_runtime_str!("frequency-rococo"), impl_name: create_runtime_str!("frequency"), authoring_version: 1, - spec_version: 60, + spec_version: 61, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 1, @@ -920,8 +921,6 @@ impl pallet_messages::Config for Runtime { type SchemaGrantValidator = Msa; // The type that provides schema info type SchemaProvider = Schemas; - // The maximum number of messages per block - type MaxMessagesPerBlock = MessagesMaxPerBlock; // The maximum message payload in bytes type MessagesMaxPayloadSizeBytes = MessagesMaxPayloadSizeBytes; @@ -1321,9 +1320,9 @@ impl_runtime_apis! { #[cfg(feature = "try-runtime")] impl frame_try_runtime::TryRuntime for Runtime { - fn on_runtime_upgrade(_checks: bool) -> (Weight, Weight) { + fn on_runtime_upgrade(checks: UpgradeCheckSelect) -> (Weight, Weight) { log::info!("try-runtime::on_runtime_upgrade frequency."); - let weight = Executive::try_runtime_upgrade(true).unwrap(); + let weight = Executive::try_runtime_upgrade(checks).unwrap(); (weight, RuntimeBlockWeights::get().max_block) }