Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

[DNM] Wrapper allocator PoC #7206

Draft
wants to merge 39 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
d86bd79
Implement wrapper allocator -- draft
s0me0ne-unkn0wn May 6, 2023
998a6ff
Minor fixes
s0me0ne-unkn0wn May 9, 2023
dbd40a7
Backlog tracking allocator
s0me0ne-unkn0wn May 18, 2023
c8e9d1c
Try spinlock approach
s0me0ne-unkn0wn May 19, 2023
b841129
Rename things
s0me0ne-unkn0wn May 19, 2023
d3e3e72
Merge remote-tracking branch 'origin/master' into s0me0ne/wrapper-all…
s0me0ne-unkn0wn May 19, 2023
089e6d8
Fix feature name
s0me0ne-unkn0wn May 22, 2023
818699a
Add a benchmark to measure Kusama runtime preparation time
s0me0ne-unkn0wn May 22, 2023
052b096
Merge remote-tracking branch 'origin/master' into s0me0ne/wrapper-all…
s0me0ne-unkn0wn Jun 5, 2023
66e8a8b
Merge remote-tracking branch 'origin/master' into s0me0ne/wrapper-all…
s0me0ne-unkn0wn Aug 1, 2023
dbdeb52
".git/.scripts/commands/fmt/fmt.sh"
Aug 1, 2023
1dfd9ca
` XcmContext` to `buy_weight / refund_weight` (#7563)
bkontur Aug 1, 2023
152888f
Take into account size as well in weight limiting. (#7369)
eskimor Aug 1, 2023
70aed93
[companion] Get rid of `Peerset` compatibility layer (#7355)
dmitry-markin Aug 2, 2023
88c1a70
Companion for Substrate#14373 (#7572)
drskalman Aug 2, 2023
b137472
[xcm] `GlobalConsensusConvertsFor` for remote relay chain (based on p…
bkontur Aug 3, 2023
b810ce4
Fix flaky reputation change test (#7550)
AndreiEres Aug 4, 2023
314e519
Add license to crates (#7578)
Morganamilo Aug 4, 2023
62b489f
Remove xcm on_runtime_upgrade pallet hook (#7235)
pgherveou Aug 5, 2023
14e6605
Document non-uniqueness of SetTopic IDs (#7579)
KiChjang Aug 7, 2023
929e2d4
PVF: Add missing crate descriptions (#7587)
mrcnski Aug 8, 2023
06ccdf2
update weight file template (#7589)
xlc Aug 8, 2023
aaea117
Companion for #14412 (#7547)
davxy Aug 9, 2023
c7bbfba
Remove unused code in runtime/polkadot/src/lib.rs (#7540)
liamaharon Aug 10, 2023
08f4333
Companion for substrate#12970 (#6807)
gpestana Aug 10, 2023
0f27b6c
Add counter for unapproved candidates (#7491)
AndreiEres Aug 10, 2023
e813323
Publish RC container images (#7556)
chevdor Aug 11, 2023
12fdcba
companion for 14754: cli: move no-beefy flag to sc-cli (#7600)
acatangiu Aug 11, 2023
0f57383
pvf: use test-utils feature to export test only (#7538)
jpserrat Aug 14, 2023
2dda590
RC container image fixes (#7607)
chevdor Aug 14, 2023
730a1c8
Fix the user used to login to Docker hub (#7610)
chevdor Aug 14, 2023
04ae532
Remove ParityDb migration tests (#7612)
altonen Aug 14, 2023
6f9fe26
Use same `fmt` and `clippy` configs as in Substrate (#7611)
ggwpez Aug 14, 2023
ffb8d15
Disable validation/collation protocols for normal full nodes (#7601)
altonen Aug 14, 2023
74b2fec
Don't publish test crates (#7588)
Morganamilo Aug 14, 2023
2a2393f
PVF workers: some fixes for cargo run and cargo install (#7608)
mrcnski Aug 14, 2023
ed8f0f8
XCM: Rename Instruction instructions to Command instructions (#7593)
KiChjang Aug 14, 2023
4f47d3c
Remove superflous parameter `overseer_enable_anyways` and make parach…
bkchr Aug 15, 2023
d25f550
Merge remote-tracking branch 'origin/master' into s0me0ne/wrapper-all…
s0me0ne-unkn0wn Aug 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ tikv-jemallocator = "0.5.0"
polkadot-cli = { path = "cli", features = [ "kusama-native", "westend-native", "rococo-native" ] }
polkadot-node-core-pvf-worker = { path = "node/core/pvf/worker" }
polkadot-overseer = { path = "node/overseer" }
wrapper-allocator = { path = "node/wrapper-allocator", optional = true }

[dev-dependencies]
assert_cmd = "2.0.4"
Expand Down Expand Up @@ -101,6 +102,7 @@ members = [
"node/subsystem-types",
"node/subsystem-test-helpers",
"node/subsystem-util",
"node/wrapper-allocator",
"node/jaeger",
"node/gum",
"node/gum/proc-macro",
Expand Down Expand Up @@ -208,6 +210,7 @@ fast-runtime = [ "polkadot-cli/fast-runtime" ]
runtime-metrics = [ "polkadot-cli/runtime-metrics" ]
pyroscope = ["polkadot-cli/pyroscope"]
jemalloc-allocator = ["polkadot-node-core-pvf-worker/jemalloc-allocator", "polkadot-overseer/jemalloc-allocator"]
wrapper-allocator = ["jemalloc-allocator", "dep:wrapper-allocator", "polkadot-node-core-pvf-worker/wrapper-allocator"]

# Configuration for building a .deb package - for use with `cargo-deb`
[package.metadata.deb]
Expand Down
2 changes: 2 additions & 0 deletions node/core/pvf/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rayon = "1.5.1"
tempfile = "3.3.0"
tikv-jemalloc-ctl = { version = "0.5.0", optional = true }
tokio = "1.24.2"
wrapper-allocator = { path = "../../../wrapper-allocator", optional = true }

parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] }

Expand Down Expand Up @@ -47,3 +48,4 @@ tempfile = "3.3.0"

[features]
jemalloc-allocator = ["dep:tikv-jemalloc-ctl"]
wrapper-allocator = ["dep:wrapper-allocator"]
16 changes: 16 additions & 0 deletions node/core/pvf/worker/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
#[cfg(feature = "wrapper-allocator")]
use wrapper_allocator::ALLOCATOR_DATA;
use crate::{
common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop},
prepare, prevalidate, LOG_TARGET,
Expand Down Expand Up @@ -109,8 +111,22 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
// Spawn another thread for preparation.
let prepare_fut = rt_handle
.spawn_blocking(move || {
#[cfg(feature = "wrapper-allocator")]
ALLOCATOR_DATA.checkpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions:

  1. This will measure only the dynamically allocated memory, it will fail to monitor any unexpected increase in the stack, is that what we want ?
  2. How is this different from the bellow get_max_rss_thread and why isn't that enough ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. How is this different from the bellow get_max_rss_thread and why isn't that enough ?

The issue with that is it is not deterministic enough. (Is mainly being used for gathering metrics right now.) Different kernel configurations/versions may manage memory differently (simple example is some validators may have swap enabled and some not). So they may get different values for the resident memory (how much is actually held in RAM). So some validators may reach the limit and others not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This will measure only the dynamically allocated memory, it will fail to monitor any unexpected increase in the stack, is that what we want ?

I hope yes, in the preparation phase we're not executing any untrusted code so we just presume that Wasmtime guys know what they're doing and won't abuse the stack usage. We only bother about malicious Wasm code that could force the compiler to allocate a lot of memory.

  1. How is this different from the bellow get_max_rss_thread and why isn't that enough ?

A lot of concerns here.

  • Not supported on every platform
  • Allocated memory is not necessarily resident
  • Wasmtime may spawn its own threads and we'd lose their allocations in the per-thread accounting

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So some validators may reach the limit and others not.

We don't have disputes for preparation, but what can happen is the attacker gets lucky and gets the PVF through pre-checking without hitting the limits, but then the limits are actually hit when preparing for execution causing no-shows (since we don't dispute on preparation errors). I guess we can have a lower, stricter limit for pre-checking, which we should have anyway.

Wasmtime may spawn its own threads and we'd lose their allocations in the per-thread accounting

Good point. We can't use RUSAGE_SELF instead of RUSAGE_THREAD because there's no way to "reset" the max from a previous job.


let result = prepare_artifact(pvf);

#[cfg(feature = "wrapper-allocator")]
{
let peak = ALLOCATOR_DATA.checkpoint();
gum::debug!(
target: LOG_TARGET,
%worker_pid,
"prepare job peak allocation is {} bytes",
peak,
);
}

// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
#[cfg(target_os = "linux")]
let result = result.map(|artifact| (artifact, get_max_rss_thread()));
Expand Down
9 changes: 9 additions & 0 deletions node/wrapper-allocator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "wrapper-allocator"
s0me0ne-unkn0wn marked this conversation as resolved.
Show resolved Hide resolved
description = "Wrapper allocator to control amount of memory consumed by PVF preparation process"
version.workspace = true
authors.workspace = true
edition.workspace = true

[dependencies]
tikv-jemallocator = "0.5.0"
78 changes: 78 additions & 0 deletions node/wrapper-allocator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use core::sync::atomic::{ Ordering::SeqCst, AtomicUsize };
use core::alloc::{GlobalAlloc, Layout};
use tikv_jemallocator::Jemalloc;

pub struct WrapperAllocatorData {
allocated: AtomicUsize,
checkpoint: AtomicUsize,
peak: AtomicUsize,
// limit: AtomicUsize, // Should we introduce a checkpoint limit and fail allocation if the limit is hit?
}

impl WrapperAllocatorData {
/// Marks a new checkpoint. Returns peak allocation, in bytes, since the last checkpoint.
pub fn checkpoint(&self) -> usize {
let allocated = ALLOCATOR_DATA.allocated.load(SeqCst);
let old_cp = ALLOCATOR_DATA.checkpoint.swap(allocated, SeqCst);
ALLOCATOR_DATA.peak.swap(allocated, SeqCst).saturating_sub(old_cp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While access to each field of WrapperAllocatorData is atomic, this method (as others too) loads them one by one, and you can expect anything to happen between these loads. For example:

  1. Read let allocated = ALLOCATOR_DATA.allocated.load(SeqCst);. Scheduler puts our thread to sleep.
  2. Another thread calls alloc and then checkpoint.
  3. We wake up and checkpoint goes back in time due to allocated being outdated.

Either these values shouldn't depend on each other and work as plain counters or something, or the whole struct should go under mutex.

I mean, I understand this is our best effort, and thus it depends on what is it we're trying to achieve.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While access to each field of WrapperAllocatorData is atomic, this method (as others too) loads them one by one, and you can expect anything to happen between these loads.

Yes, that's exactly the concern I'm worried about! And I'm trying to convince myself it is okay. Not sure at all.

It's okay to have a global lock in the checkpoint(). It's definitely not okay to have a global lock in the alloc(). I've omitted a mutex in the checkpoint() for now as it is supposed to be called from a single thread only, but we can add it there, checkpoint is a rare event. alloc() is much more concerning.

Say we have two threads, as in your example, and they're allocating at nearly the same point in time. I'm indexing the values below for clarity.

  • Thread1 executes old_alloc1 = allocated.fetch_add(layout_size1). At this point allocated is updated and represents the new size.
  • Then the context switches to another thread
  • Thread2 executes old_alloc2 = allocated.fetch_add2(layout_size2). At this point, allocated is updated again and represents the true allocated value. Both old_alloc1 and old_alloc2 are set to proper values.
  • Thread2 executes peak.fetch_max(old_alloc2 + layout_size2). The peak value is properly updated.
  • The context switches back and now thread1 executes peak.fetch_max(old_alloc1 + layout_size1). Here's the tricky part. Although thread1 is not aware of the allocated value has already been updated by another thread, it already has proper old_alloc1, and the peak already stores the proper peak value, including thread2's allocation. What is improper here is the assumption that for thread1 old_alloc1 + layout_size1 is the new allocated value that should be compared with the peak. But do we care? I believe we don't because thread2 has already updated the peak. I mean, by definition, old_alloc1 + layout_size1 < old_alloc2 + layout_size2 because old_alloc2 == old_alloc1 + layout_size1. So we have no chance of missing the new peak value.

Thus it seems we can avoid a global lock inside the alloc() and its siblings, but I'm just afraid of missing something. It's the case where ten eyes are much better than two.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a spinlock should probably be fine here as the critical section here's going to be very short anyway.

Another more complicated alternative that would help with thread contention would be to make this per thread: put the state in thread local storage and have one spinlock per TLS, and only when grabbing a checkpoint lock them all and collate the data. I don't think it's worth it though.

}
}

pub static ALLOCATOR_DATA: WrapperAllocatorData = WrapperAllocatorData { allocated: AtomicUsize::new(0), checkpoint: AtomicUsize::new(0), peak: AtomicUsize::new(0) };

struct WrapperAllocator<A: GlobalAlloc>(A);

unsafe impl<A: GlobalAlloc> GlobalAlloc for WrapperAllocator<A> {

// SAFETY: The wrapped methods are as safe as the underlying allocator implementation is.

#[inline]
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let old_alloc = ALLOCATOR_DATA.allocated.fetch_add(layout.size(), SeqCst);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This two lines of code could be refactored out as method on WrapperAllocatorData

ALLOCATOR_DATA.peak.fetch_max(old_alloc + layout.size(), SeqCst);
self.0.alloc(layout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the memory ordering can just be Relaxed right now, since the atomics are being used as counters and not to synchronize other memory operations. Relaxed still gives you a total ordering between accesses to the same atomic.

Locks would make this slightly more deterministic but it doesn't seem worth the cost. Say you have a situation like thread1 calling alloc and getting old_alloc, thread2 stepping in and calling dealloc, and then thread1 setting the max peak to something higher than was actually allocated. You can prevent this with a lock but not totally solve it. There is still some indeterminacy where a worker may call the dealloc thread first and not hit the max at all while other workers would hit it. It's an edge case and not sure how feasible it would be to craft an attack around that.

But I think we decided on using cgroups to set the limit though which seems the way to go given the above. We can instead use this wrapper for detecting OOM by setting a flag before allocs and clearing it after.

}

#[inline]
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let old_alloc = ALLOCATOR_DATA.allocated.fetch_add(layout.size(), SeqCst);
ALLOCATOR_DATA.peak.fetch_max(old_alloc + layout.size(), SeqCst);
self.0.alloc_zeroed(layout)
}

#[inline]
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) -> () {
ALLOCATOR_DATA.allocated.fetch_sub(layout.size(), SeqCst);
self.0.dealloc(ptr, layout)
}

#[inline]
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
if new_size > layout.size() {
let old_alloc = ALLOCATOR_DATA.allocated.fetch_add(new_size - layout.size(), SeqCst);
ALLOCATOR_DATA.peak.fetch_max(old_alloc + new_size - layout.size(), SeqCst);
} else {
ALLOCATOR_DATA.allocated.fetch_sub(layout.size() - new_size, SeqCst);
}
Copy link
Contributor

@koute koute May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if's unnecessary if you make the counters isize.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right, I'm already thinking about making them isize anyway so I can bother less about negative overflows. But I also don't want even try to update peak when it's definitely not necessary.

self.0.realloc(ptr, layout, new_size)
}
}

#[global_allocator]
static ALLOC: WrapperAllocator<Jemalloc> = WrapperAllocator(Jemalloc);
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use color_eyre::eyre;

/// Global allocator. Changing it to another allocator will require changing
/// `memory_stats::MemoryAllocationTracker`.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
#[cfg(all(any(target_os = "linux", feature = "jemalloc-allocator"), not(feature = "wrapper-allocator")))]
#[global_allocator]
pub static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

Expand Down