From b61f84bf02eff064debd2f1bd2e597fd667aa408 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Tue, 26 Dec 2023 22:07:19 +0800 Subject: [PATCH] Cherry-picked Move to mimalloc --- Cargo.lock | 29 ++++++++++ Cargo.toml | 14 ++++- kashd/Cargo.toml | 4 +- kashd/src/main.rs | 3 + simpa/Cargo.toml | 5 +- simpa/src/main.rs | 3 + testing/integration/Cargo.toml | 1 + .../src/consensus_integration_tests.rs | 23 ++++++++ .../src/consensus_pipeline_tests.rs | 5 +- .../src/daemon_integration_tests.rs | 4 ++ utils/alloc/Cargo.toml | 14 +++++ utils/alloc/src/lib.rs | 57 +++++++++++++++++++ 12 files changed, 156 insertions(+), 6 deletions(-) create mode 100644 utils/alloc/Cargo.toml create mode 100644 utils/alloc/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 056599f885..2aec8cb8f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2082,6 +2082,13 @@ dependencies = [ "tokio", ] +[[package]] +name = "kash-alloc" +version = "0.13.1" +dependencies = [ + "mimalloc", +] + [[package]] name = "kash-bip32" version = "0.13.1" @@ -2859,6 +2866,7 @@ dependencies = [ "indexmap 2.1.0", "itertools 0.11.0", "kash-addresses", + "kash-alloc", "kash-bip32", "kash-consensus", "kash-consensus-core", @@ -3216,6 +3224,7 @@ dependencies = [ "futures-util", "kash-addresses", "kash-addressmanager", + "kash-alloc", "kash-consensus", "kash-consensus-core", "kash-consensus-notify", @@ -3297,6 +3306,16 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libmimalloc-sys" +version = "0.1.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3979b5c37ece694f1f5e51e7ecc871fdb0f517ed04ee45f88d15d6d553cb9664" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "libredox" version = "0.0.1" @@ -3527,6 +3546,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mimalloc" +version = "0.1.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa01922b5ea280a911e323e4d2fd24b7fe5cc4042e0d2cda3c40775cdc4bdc9c" +dependencies = [ + "libmimalloc-sys", +] + [[package]] name = "mime" version = "0.3.17" @@ -4818,6 +4846,7 @@ dependencies = [ "futures-util", "indexmap 2.1.0", "itertools 0.11.0", + "kash-alloc", "kash-consensus", "kash-consensus-core", "kash-consensus-notify", diff --git a/Cargo.toml b/Cargo.toml index 9d57c63d84..b6902962ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ members = [ "rothschild", "metrics/perf_monitor", "metrics/core", + "utils/alloc", ] [workspace.package] @@ -73,6 +74,9 @@ include = [ ] [workspace.dependencies] +mimalloc = { version = "0.1.39", default-features = false, features = [ + 'override', +] } # kash-testing-integration = { version = "0.13.1", path = "testing/integration" } kash-addresses = { version = "0.13.1", path = "crypto/addresses" } kash-addressmanager = { version = "0.13.1", path = "components/addressmanager" } @@ -124,6 +128,7 @@ kash-wrpc-proxy = { version = "0.13.1", path = "rpc/wrpc/proxy" } kash-wrpc-server = { version = "0.13.1", path = "rpc/wrpc/server" } kash-wrpc-wasm = { version = "0.13.1", path = "rpc/wrpc/wasm" } kashd = { version = "0.13.1", path = "kashd" } +kash-alloc = { version = "0.13.1", path = "utils/alloc" } # external aes = "0.8.3" @@ -162,7 +167,9 @@ faster-hex = "0.6.1" # TODO "0.8.1" - fails unit tests fixedstr = { version = "0.5.4", features = ["serde"] } flate2 = "1.0.28" futures = { version = "0.3.29" } -futures-util = { version = "0.3.29", default-features = false, features = [ "alloc", ] } +futures-util = { version = "0.3.29", default-features = false, features = [ + "alloc", +] } getrandom = { version = "0.2.10", features = ["js"] } h2 = "0.3.21" heapless = "0.7.16" @@ -237,7 +244,10 @@ web-sys = "=0.3.64" xxhash-rust = { version = "0.8.7", features = ["xxh3"] } zeroize = { version = "1.6.0", default-features = false, features = ["alloc"] } pin-project-lite = "0.2.13" -tower-http = { version = "0.4.4", features = ["map-response-body", "map-request-body"] } +tower-http = { version = "0.4.4", features = [ + "map-response-body", + "map-request-body", +] } tower = "0.4.7" hyper = "0.14.27" # workflow dependencies that are not a part of core libraries diff --git a/kashd/Cargo.toml b/kashd/Cargo.toml index 5852589ae9..4607441561 100644 --- a/kashd/Cargo.toml +++ b/kashd/Cargo.toml @@ -13,6 +13,8 @@ name = "kashd_lib" crate-type = ["cdylib", "lib"] [dependencies] +kash-alloc.workspace = true # This changes the global allocator for all of the next dependencies so should be kept first + kash-addresses.workspace = true kash-addressmanager.workspace = true kash-consensus-core.workspace = true @@ -50,5 +52,5 @@ tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] } workflow-log.workspace = true [features] -heap = ["dhat"] +heap = ["dhat", "kash-alloc/heap"] devnet-prealloc = ["kash-consensus/devnet-prealloc"] diff --git a/kashd/src/main.rs b/kashd/src/main.rs index 4232c1d17e..69e4cd1a41 100644 --- a/kashd/src/main.rs +++ b/kashd/src/main.rs @@ -4,6 +4,7 @@ extern crate kash_hashes; use std::sync::Arc; +use kash_alloc::init_allocator_with_default_settings; use kash_core::{info, signals::Signals}; use kash_utils::fd_budget; use kashd_lib::{ @@ -19,6 +20,8 @@ pub fn main() { #[cfg(feature = "heap")] let _profiler = dhat::Profiler::builder().file_name("kashd-heap.json").build(); + init_allocator_with_default_settings(); + let args = parse_args(); match fd_budget::try_set_fd_limit(DESIRED_DAEMON_SOFT_FD_LIMIT) { diff --git a/simpa/Cargo.toml b/simpa/Cargo.toml index 09bc8bf52e..a5ff9fcb30 100644 --- a/simpa/Cargo.toml +++ b/simpa/Cargo.toml @@ -9,6 +9,7 @@ include.workspace = true license.workspace = true [dependencies] +kash-alloc.workspace = true # This changes the global allocator for all of the next dependencies so should be kept first kash-consensus-core.workspace = true kash-consensus-notify.workspace = true kash-consensus.workspace = true @@ -20,7 +21,7 @@ kash-utils.workspace = true async-channel.workspace = true clap.workspace = true -dhat = {workspace = true, optional = true} +dhat = { workspace = true, optional = true } futures-util.workspace = true futures.workspace = true indexmap.workspace = true @@ -34,4 +35,4 @@ secp256k1.workspace = true tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] } [features] -heap = ["dhat"] +heap = ["dhat", "kash-alloc/heap"] diff --git a/simpa/src/main.rs b/simpa/src/main.rs index 37a9dbb6c9..edc457e8de 100644 --- a/simpa/src/main.rs +++ b/simpa/src/main.rs @@ -2,6 +2,7 @@ use async_channel::unbounded; use clap::Parser; use futures::{future::try_join_all, Future}; use itertools::Itertools; +use kash_alloc::init_allocator_with_default_settings; use kash_consensus::{ config::ConfigBuilder, consensus::Consensus, @@ -118,6 +119,8 @@ fn main() { #[cfg(feature = "heap")] let _profiler = dhat::Profiler::builder().file_name("simpa-heap.json").build(); + init_allocator_with_default_settings(); + // Get CLI arguments let args = Args::parse(); diff --git a/testing/integration/Cargo.toml b/testing/integration/Cargo.toml index 3b559241b2..3f4bf5f6a9 100644 --- a/testing/integration/Cargo.toml +++ b/testing/integration/Cargo.toml @@ -8,6 +8,7 @@ include.workspace = true license.workspace = true [dependencies] +kash-alloc.workspace = true # This changes the global allocator for all of the next dependencies so should be kept first kash-addresses.workspace = true kash-consensus-core.workspace = true kash-consensus-notify.workspace = true diff --git a/testing/integration/src/consensus_integration_tests.rs b/testing/integration/src/consensus_integration_tests.rs index 892cc50415..819478a583 100644 --- a/testing/integration/src/consensus_integration_tests.rs +++ b/testing/integration/src/consensus_integration_tests.rs @@ -3,6 +3,7 @@ //! use async_channel::unbounded; +use kash_alloc::init_allocator_with_default_settings; use kash_consensus::config::genesis::GENESIS; use kash_consensus::config::{Config, ConfigBuilder}; use kash_consensus::consensus::factory::Factory as ConsensusFactory; @@ -176,16 +177,19 @@ fn reachability_stretch_test(use_attack_json: bool) { #[test] fn test_attack_json() { + init_allocator_with_default_settings(); reachability_stretch_test(true); } #[test] fn test_noattack_json() { + init_allocator_with_default_settings(); reachability_stretch_test(false); } #[tokio::test] async fn consensus_sanity_test() { + init_allocator_with_default_settings(); let genesis_child: Hash = 2.into(); let config = ConfigBuilder::new(MAINNET_PARAMS).skip_proof_of_work().build(); let consensus = TestConsensus::new(&config); @@ -235,6 +239,7 @@ struct GhostdagTestBlock { #[tokio::test] async fn ghostdag_test() { + init_allocator_with_default_settings(); let mut path_strings: Vec = common::read_dir("testdata/dags").map(|f| f.unwrap().path().to_str().unwrap().to_owned()).collect(); path_strings.sort(); @@ -319,6 +324,7 @@ fn strings_to_hashes(strings: &Vec) -> Vec { #[tokio::test] async fn block_window_test() { + init_allocator_with_default_settings(); let config = ConfigBuilder::new(MAINNET_PARAMS) .skip_proof_of_work() .edit_consensus_params(|p| { @@ -388,6 +394,7 @@ async fn block_window_test() { #[tokio::test] async fn header_in_isolation_validation_test() { + init_allocator_with_default_settings(); let config = Config::new(MAINNET_PARAMS); let consensus = TestConsensus::new(&config); let wait_handles = consensus.init(); @@ -456,6 +463,7 @@ async fn header_in_isolation_validation_test() { #[tokio::test] async fn incest_test() { + init_allocator_with_default_settings(); let config = ConfigBuilder::new(MAINNET_PARAMS).skip_proof_of_work().build(); let consensus = TestConsensus::new(&config); let wait_handles = consensus.init(); @@ -484,6 +492,7 @@ async fn incest_test() { #[tokio::test] async fn missing_parents_test() { + init_allocator_with_default_settings(); let config = ConfigBuilder::new(MAINNET_PARAMS).skip_proof_of_work().build(); let consensus = TestConsensus::new(&config); let wait_handles = consensus.init(); @@ -508,6 +517,7 @@ async fn missing_parents_test() { // as a known invalid. #[tokio::test] async fn known_invalid_test() { + init_allocator_with_default_settings(); let config = ConfigBuilder::new(MAINNET_PARAMS).skip_proof_of_work().build(); let consensus = TestConsensus::new(&config); let wait_handles = consensus.init(); @@ -533,6 +543,7 @@ async fn known_invalid_test() { #[tokio::test] async fn median_time_test() { + init_allocator_with_default_settings(); struct Test { name: &'static str, config: Config, @@ -606,6 +617,7 @@ async fn median_time_test() { #[tokio::test] async fn mergeset_size_limit_test() { + init_allocator_with_default_settings(); let config = ConfigBuilder::new(MAINNET_PARAMS).skip_proof_of_work().build(); let consensus = TestConsensus::new(&config); let wait_handles = consensus.init(); @@ -832,32 +844,38 @@ impl KashdGoParams { #[tokio::test] async fn goref_custom_pruning_depth_test() { + init_allocator_with_default_settings(); json_test("testdata/dags_for_json_tests/goref_custom_pruning_depth", false).await } #[tokio::test] async fn goref_notx_test() { + init_allocator_with_default_settings(); json_test("testdata/dags_for_json_tests/goref-notx-5000-blocks", false).await } #[tokio::test] async fn goref_notx_concurrent_test() { + init_allocator_with_default_settings(); json_test("testdata/dags_for_json_tests/goref-notx-5000-blocks", true).await } #[tokio::test] async fn goref_tx_small_test() { + init_allocator_with_default_settings(); json_test("testdata/dags_for_json_tests/goref-905-tx-265-blocks", false).await } #[tokio::test] async fn goref_tx_small_concurrent_test() { + init_allocator_with_default_settings(); json_test("testdata/dags_for_json_tests/goref-905-tx-265-blocks", true).await } #[ignore] #[tokio::test] async fn goref_tx_big_test() { + init_allocator_with_default_settings(); // TODO: add this directory to a data repo and fetch dynamically json_test("testdata/dags_for_json_tests/goref-1.6M-tx-10K-blocks", false).await } @@ -865,6 +883,7 @@ async fn goref_tx_big_test() { #[ignore] #[tokio::test] async fn goref_tx_big_concurrent_test() { + init_allocator_with_default_settings(); // TODO: add this file to a data repo and fetch dynamically json_test("testdata/dags_for_json_tests/goref-1.6M-tx-10K-blocks", true).await } @@ -1230,6 +1249,7 @@ fn hex_decode(src: &str) -> Vec { #[tokio::test] async fn bounded_merge_depth_test() { + init_allocator_with_default_settings(); let config = ConfigBuilder::new(DEVNET_PARAMS) .skip_proof_of_work() .edit_consensus_params(|p| { @@ -1309,6 +1329,7 @@ async fn bounded_merge_depth_test() { #[tokio::test] async fn difficulty_test() { + init_allocator_with_default_settings(); async fn add_block(consensus: &TestConsensus, block_time: Option, parents: Vec) -> Header { let selected_parent = consensus.ghostdag_manager().find_selected_parent(parents.iter().copied()); let block_time = block_time.unwrap_or_else(|| { @@ -1627,6 +1648,7 @@ async fn difficulty_test() { #[tokio::test] async fn selected_chain_test() { + init_allocator_with_default_settings(); kash_core::log::try_init_logger("info"); let config = ConfigBuilder::new(MAINNET_PARAMS) @@ -1695,6 +1717,7 @@ fn selected_chain_store_iterator(consensus: &TestConsensus, pruning_point: Hash) #[tokio::test] async fn staging_consensus_test() { + init_allocator_with_default_settings(); let config = ConfigBuilder::new(MAINNET_PARAMS).build(); let db_tempdir = get_kash_tempdir(); diff --git a/testing/integration/src/consensus_pipeline_tests.rs b/testing/integration/src/consensus_pipeline_tests.rs index 43eb77384d..e6dd8b36ce 100644 --- a/testing/integration/src/consensus_pipeline_tests.rs +++ b/testing/integration/src/consensus_pipeline_tests.rs @@ -1,4 +1,5 @@ use futures_util::future::try_join_all; +use kash_alloc::init_allocator_with_default_settings; use kash_consensus::{ config::ConfigBuilder, consensus::test_consensus::TestConsensus, params::MAINNET_PARAMS, processes::reachability::tests::StoreValidationExtensions, @@ -11,6 +12,7 @@ use tokio::join; #[tokio::test] async fn test_concurrent_pipeline() { + init_allocator_with_default_settings(); let config = ConfigBuilder::new(MAINNET_PARAMS).skip_proof_of_work().edit_consensus_params(|p| p.genesis.hash = 1.into()).build(); let consensus = TestConsensus::new(&config); let wait_handles = consensus.init(); @@ -31,7 +33,7 @@ async fn test_concurrent_pipeline() { for (hash, parents) in blocks { // Submit to consensus twice to make sure duplicates are handled - let b = consensus.build_block_with_parents(hash, parents).to_immutable(); + let b: kash_consensus_core::block::Block = consensus.build_block_with_parents(hash, parents).to_immutable(); let results = join!( consensus.validate_and_insert_block(b.clone()).virtual_state_task, consensus.validate_and_insert_block(b).virtual_state_task @@ -76,6 +78,7 @@ async fn test_concurrent_pipeline() { #[tokio::test] async fn test_concurrent_pipeline_random() { + init_allocator_with_default_settings(); let genesis: Hash = blockhash::new_unique(); let bps = 8; let delay = 2; diff --git a/testing/integration/src/daemon_integration_tests.rs b/testing/integration/src/daemon_integration_tests.rs index 9944abee57..2b58c3bc85 100644 --- a/testing/integration/src/daemon_integration_tests.rs +++ b/testing/integration/src/daemon_integration_tests.rs @@ -1,4 +1,5 @@ use kash_addresses::Address; +use kash_alloc::init_allocator_with_default_settings; use kash_consensusmanager::ConsensusManager; use kash_core::task::runtime::AsyncRuntime; use kash_notify::scope::{Scope, VirtualDaaScoreChangedScope}; @@ -10,6 +11,7 @@ use std::{sync::Arc, time::Duration}; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn daemon_sanity_test() { + init_allocator_with_default_settings(); kash_core::log::try_init_logger("INFO"); // let total_fd_limit = kash_utils::fd_budget::get_limit() / 2 - 128; @@ -32,6 +34,7 @@ async fn daemon_sanity_test() { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn daemon_mining_test() { + init_allocator_with_default_settings(); kash_core::log::try_init_logger("INFO"); let args = Args { @@ -103,6 +106,7 @@ async fn daemon_mining_test() { // The following test runtime parameters are required for a graceful shutdown of the gRPC server #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn daemon_cleaning_test() { + init_allocator_with_default_settings(); kash_core::log::try_init_logger("info,kash_grpc_core=trace,kash_grpc_server=trace,kash_grpc_client=trace,kash_core=trace"); let args = Args { devnet: true, ..Default::default() }; let consensus_manager; diff --git a/utils/alloc/Cargo.toml b/utils/alloc/Cargo.toml new file mode 100644 index 0000000000..75fde1adf5 --- /dev/null +++ b/utils/alloc/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "kash-alloc" +description = "Kash allocator wrapper" +version.workspace = true +authors.workspace = true +license.workspace = true +edition.workspace = true +include.workspace = true + +[dependencies] +mimalloc.workspace = true + +[features] +heap = [] diff --git a/utils/alloc/src/lib.rs b/utils/alloc/src/lib.rs new file mode 100644 index 0000000000..3cd40d3955 --- /dev/null +++ b/utils/alloc/src/lib.rs @@ -0,0 +1,57 @@ +#[cfg(not(feature = "heap"))] +#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))] +extern "C" { + fn mi_option_set_enabled(_: mi_option_e, val: bool); +} + +#[cfg(not(feature = "heap"))] +#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))] +#[allow(non_camel_case_types)] +#[allow(dead_code)] +#[repr(C)] +enum mi_option_e { + // stable options + mi_option_show_errors, // print error messages + mi_option_show_stats, // print statistics on termination + mi_option_verbose, // print verbose messages + // the following options are experimental (see src/options.h) + mi_option_eager_commit, // eager commit segments? (after `eager_commit_delay` segments) (=1) + mi_option_arena_eager_commit, // eager commit arenas? Use 2 to enable just on overcommit systems (=2) + mi_option_purge_decommits, // should a memory purge decommit (or only reset) (=1) + mi_option_allow_large_os_pages, // allow large (2MiB) OS pages, implies eager commit + mi_option_reserve_huge_os_pages, // reserve N huge OS pages (1GiB/page) at startup + mi_option_reserve_huge_os_pages_at, // reserve huge OS pages at a specific NUMA node + mi_option_reserve_os_memory, // reserve specified amount of OS memory in an arena at startup + mi_option_deprecated_segment_cache, + mi_option_deprecated_page_reset, + mi_option_abandoned_page_purge, // immediately purge delayed purges on thread termination + mi_option_deprecated_segment_reset, + mi_option_eager_commit_delay, + mi_option_purge_delay, // memory purging is delayed by N milli seconds; use 0 for immediate purging or -1 for no purging at all. + mi_option_use_numa_nodes, // 0 = use all available numa nodes, otherwise use at most N nodes. + mi_option_limit_os_alloc, // 1 = do not use OS memory for allocation (but only programmatically reserved arenas) + mi_option_os_tag, // tag used for OS logging (macOS only for now) + mi_option_max_errors, // issue at most N error messages + mi_option_max_warnings, // issue at most N warning messages + mi_option_max_segment_reclaim, + mi_option_destroy_on_exit, // if set, release all memory on exit; sometimes used for dynamic unloading but can be unsafe. + mi_option_arena_reserve, // initial memory size in KiB for arena reservation (1GiB on 64-bit) + mi_option_arena_purge_mult, + mi_option_purge_extend_delay, + _mi_option_last, +} + +#[cfg(not(feature = "heap"))] +use mimalloc::MiMalloc; +#[cfg(not(feature = "heap"))] +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +pub fn init_allocator_with_default_settings() { + #[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))] + #[cfg(not(feature = "heap"))] + unsafe { + // Empirical tests show that this option results in the smallest RSS. + mi_option_set_enabled(mi_option_e::mi_option_purge_decommits, false) + }; +}