Skip to content

Commit

Permalink
[Memtrie] Implement parallel loading algorithm for memtrie. (#11356)
Browse files Browse the repository at this point in the history
This PR implements parallel loading of memtries, using a small number of
nodes in the State column to determine the split points. It massively
improves loading speed. When tested on a 16 core GCP machine, this
decreases memtrie loading time from 122s to 13s.

From the top of parallel_loader.rs:
```
/// Logic to load a memtrie in parallel. It consists of three stages:
///  - First, we use the State column to visit the trie starting from the root. We recursively
///    expand the trie until all the unexpanded subtrees are small enough
///    (memory_usage <= `subtree_size`). The trie we have expanded is represented as a "plan",
///    which is a structure similar to the trie itself.
///  - Then, we load each small subtree (the keys under which all share a common prefix) in
///    parallel, by reading the FlatState column for keys that correspond to the prefix of that
///    subtree. The result of each construction is a `MemTrieNodeId` representing the root of that
///    subtree.
///  - Finally, We construct the final trie by using the loaded subtree roots and converting the
///    plan into a complete memtrie, returning the final root.
```

This PR includes the parallel loader, a concurrent arena to support the
parallel loading step, and updates to use parallel loading for initial
load only. As a bonus, the previous code that supports deferred
computation of node hash (which is complex and contains an unsafe block)
is deleted, since it is no longer necessary to defer it (the original
purpose was to parallelize the hash computation, but now the whole thing
is parallelized).
  • Loading branch information
robin-near authored May 31, 2024
1 parent 9db9c61 commit a4d9742
Show file tree
Hide file tree
Showing 19 changed files with 964 additions and 247 deletions.
2 changes: 1 addition & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ impl Chain {
})
.cloned()
.collect();
runtime_adapter.get_tries().load_mem_tries_for_enabled_shards(&tracked_shards)?;
runtime_adapter.get_tries().load_mem_tries_for_enabled_shards(&tracked_shards, true)?;

info!(target: "chain", "Init: header head @ #{} {}; block head @ #{} {}",
header_head.height, header_head.last_block_hash,
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl TestTriesBuilder {
}
update_for_chunk_extra.commit().unwrap();

tries.load_mem_tries_for_enabled_shards(&shard_uids).unwrap();
tries.load_mem_tries_for_enabled_shards(&shard_uids, false).unwrap();
}
tries
}
Expand Down
30 changes: 26 additions & 4 deletions core/store/src/trie/mem/arena/alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ pub struct Allocator {
const MAX_ALLOC_SIZE: usize = 16 * 1024;
const ROUND_UP_TO_8_BYTES_UNDER: usize = 256;
const ROUND_UP_TO_64_BYTES_UNDER: usize = 1024;
const CHUNK_SIZE: usize = 4 * 1024 * 1024;
pub(crate) const CHUNK_SIZE: usize = 4 * 1024 * 1024;

/// Calculates the allocation class (an index from 0 to NUM_ALLOCATION_CLASSES)
/// for the given size that we wish to allocate.
const fn allocation_class(size: usize) -> usize {
pub(crate) const fn allocation_class(size: usize) -> usize {
if size <= ROUND_UP_TO_8_BYTES_UNDER {
(size + 7) / 8 - 1
} else if size <= ROUND_UP_TO_64_BYTES_UNDER {
Expand All @@ -61,7 +61,7 @@ const fn allocation_class(size: usize) -> usize {
}

/// Calculates the size of the actual allocation for the given size class.
const fn allocation_size(size_class: usize) -> usize {
pub(crate) const fn allocation_size(size_class: usize) -> usize {
if size_class <= allocation_class(ROUND_UP_TO_8_BYTES_UNDER) {
(size_class + 1) * 8
} else if size_class <= allocation_class(ROUND_UP_TO_64_BYTES_UNDER) {
Expand All @@ -88,13 +88,30 @@ impl Allocator {
}
}

pub fn new_with_initial_stats(
name: String,
active_allocs_bytes: usize,
active_allocs_count: usize,
) -> Self {
let mut allocator = Self::new(name);
allocator.active_allocs_bytes = active_allocs_bytes;
allocator.active_allocs_count = active_allocs_count;
allocator.active_allocs_bytes_gauge.set(active_allocs_bytes as i64);
allocator.active_allocs_count_gauge.set(active_allocs_count as i64);
allocator
}

pub fn update_memory_usage_gauge(&self, memory: &STArenaMemory) {
self.memory_usage_gauge.set(memory.chunks.len() as i64 * CHUNK_SIZE as i64);
}

/// Adds a new chunk to the arena, and updates the next_alloc_pos to the beginning of
/// the new chunk.
fn new_chunk(&mut self, memory: &mut STArenaMemory) {
memory.chunks.push(vec![0; CHUNK_SIZE]);
self.next_alloc_pos =
ArenaPos { chunk: u32::try_from(memory.chunks.len() - 1).unwrap(), pos: 0 };
self.memory_usage_gauge.set(memory.chunks.len() as i64 * CHUNK_SIZE as i64);
self.update_memory_usage_gauge(memory);
}

/// Allocates a slice of the given size in the arena.
Expand Down Expand Up @@ -145,6 +162,11 @@ impl Allocator {
pub fn num_active_allocs(&self) -> usize {
self.active_allocs_count
}

#[cfg(test)]
pub fn active_allocs_bytes(&self) -> usize {
self.active_allocs_bytes
}
}

#[cfg(test)]
Expand Down
261 changes: 261 additions & 0 deletions core/store/src/trie/mem/arena/concurrent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
use super::alloc::{allocation_class, allocation_size, CHUNK_SIZE};
use super::{Arena, ArenaMemory, ArenaPos, ArenaSliceMut, STArena};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

/// Arena that can be allocated on from multiple threads, but still allowing conversion to a
/// single-threaded `STArena` afterwards.
///
/// The `ConcurrentArena` cannot be directly used; rather, for each thread wishing to use it,
/// `for_thread` must be called to get a `ConcurrentArenaForThread`, which then acts like a
/// normal arena, except that deallocation is not supported.
///
/// The only synchronization they need is the chunk counter. The purpose is so that after
/// multiple threads allocate on their own arenas, the resulting memory can still be combined
/// into a single arena while allowing the pointers (ArenaPos) to still be valid. This is what
/// allows a memtrie to be loaded in parallel.
pub struct ConcurrentArena {
/// Chunks allocated by each `ConcurrentArenaForThread` share the same "logical" memory
/// space. This counter is used to ensure that each `ConcurrentArenaForThread` gets unique
/// chunk positions, so that the allocations made by different threads do not conflict in
/// their positions.
///
/// The goal here is so that allocations coming from multiple threads can be merged into a
/// single arena at the end, without having to alter any arena pointers (ArenaPos).
next_chunk_pos: Arc<AtomicUsize>,
}

impl ConcurrentArena {
pub fn new() -> Self {
Self { next_chunk_pos: Arc::new(AtomicUsize::new(0)) }
}

/// Returns an arena that can be used for one thread.
pub fn for_thread(&self) -> ConcurrentArenaForThread {
ConcurrentArenaForThread::new(self.next_chunk_pos.clone())
}

/// Converts the arena to a single-threaded arena. All returned values of `for_thread` must be
/// passed in.
///
/// There is a caveat that may be fixed in the future if desired: the returned arena will have
/// some memory wasted. This is because the last chunk of each thread may not be full, and
/// the single-threaded arena is unable to make use of multiple partially filled chunks. The
/// maximum memory wasted is 4MB * number of threads; the average is 2MB * number of threads.
/// The wasted memory will be reclaimed when the memtrie shard is unloaded.
pub fn to_single_threaded(
self,
name: String,
threads: Vec<ConcurrentArenaForThread>,
) -> STArena {
let mut chunks = vec![Vec::new(); self.next_chunk_pos.load(Ordering::Relaxed)];
let mut active_allocs_bytes = 0;
let mut active_allocs_count = 0;
for thread in threads {
let memory = thread.memory;
for (pos, chunk) in memory.chunks.into_iter() {
assert!(
chunks[pos].is_empty(),
"Arena threads from the same ConcurrentArena passed in"
);
chunks[pos] = chunk;
}
active_allocs_bytes += thread.allocator.active_allocs_bytes;
active_allocs_count += thread.allocator.active_allocs_count;
}
for chunk in &chunks {
assert!(!chunks.is_empty(), "Not all arena threads are passed in");
assert_eq!(chunk.len(), CHUNK_SIZE); // may as well check this
}
STArena::new_from_existing_chunks(name, chunks, active_allocs_bytes, active_allocs_count)
}
}

/// Arena to be used for a single thread.
pub struct ConcurrentArenaForThread {
memory: ConcurrentArenaMemory,
allocator: ConcurrentArenaAllocator,
}

pub struct ConcurrentArenaMemory {
/// Chunks of memory allocated for this thread. The usize is the global chunk position.
chunks: Vec<(usize, Vec<u8>)>,
/// Index is global chunk position, value is local chunk position.
/// For a chunk position that does not belong to the thread, the value is `usize::MAX`.
/// This vector is as large as needed to contain the largest global chunk position used
/// by this thread, but might not be as large as the total number of chunks allocated
/// globally.
chunk_pos_global_to_local: Vec<usize>,
}

impl ConcurrentArenaMemory {
pub fn new() -> Self {
Self { chunks: Vec::new(), chunk_pos_global_to_local: Vec::new() }
}

pub fn add_chunk(&mut self, pos: usize) {
while self.chunk_pos_global_to_local.len() <= pos {
self.chunk_pos_global_to_local.push(usize::MAX);
}
self.chunk_pos_global_to_local[pos] = self.chunks.len();
self.chunks.push((pos, vec![0; CHUNK_SIZE]));
}

pub fn chunk(&self, pos: usize) -> &[u8] {
let index = self.chunk_pos_global_to_local[pos];
&self.chunks[index].1
}

pub fn chunk_mut(&mut self, pos: usize) -> &mut [u8] {
let index = self.chunk_pos_global_to_local[pos];
&mut self.chunks[index].1
}
}

impl ArenaMemory for ConcurrentArenaMemory {
fn raw_slice(&self, pos: ArenaPos, len: usize) -> &[u8] {
&self.chunk(pos.chunk())[pos.pos()..pos.pos() + len]
}

fn raw_slice_mut(&mut self, pos: ArenaPos, len: usize) -> &mut [u8] {
&mut self.chunk_mut(pos.chunk())[pos.pos()..pos.pos() + len]
}
}

/// Allocator for a single thread. Unlike the allocator for `STArena`, this one only supports
/// allocation and not deallocation, so it is substantially simpler.
pub struct ConcurrentArenaAllocator {
next_chunk_pos: Arc<AtomicUsize>,
next_pos: ArenaPos,

// Stats that will be transferred to the single-threaded arena.
active_allocs_bytes: usize,
active_allocs_count: usize,
}

impl ConcurrentArenaAllocator {
fn new(next_chunk_pos: Arc<AtomicUsize>) -> Self {
Self {
next_chunk_pos,
next_pos: ArenaPos::invalid(),
active_allocs_bytes: 0,
active_allocs_count: 0,
}
}

pub fn allocate<'a>(
&mut self,
arena: &'a mut ConcurrentArenaMemory,
size: usize,
) -> ArenaSliceMut<'a, ConcurrentArenaMemory> {
// We must allocate in the same kind of sizes as the single-threaded arena,
// so that after converting to `STArena`, these allocations can be properly
// reused.
let size_class = allocation_class(size);
let allocation_size = allocation_size(size_class);
if self.next_pos.is_invalid() || self.next_pos.pos() + allocation_size > CHUNK_SIZE {
let next_chunk_pos = self.next_chunk_pos.fetch_add(1, Ordering::Relaxed);
self.next_pos = ArenaPos { chunk: next_chunk_pos as u32, pos: 0 };
arena.add_chunk(next_chunk_pos);
}
let pos = self.next_pos;
self.next_pos = pos.offset_by(allocation_size);
self.active_allocs_bytes += allocation_size;
self.active_allocs_count += 1;
ArenaSliceMut::new(arena, pos, size)
}
}

impl ConcurrentArenaForThread {
fn new(next_chunk_pos: Arc<AtomicUsize>) -> Self {
Self {
memory: ConcurrentArenaMemory::new(),
allocator: ConcurrentArenaAllocator::new(next_chunk_pos),
}
}
}

impl Arena for ConcurrentArenaForThread {
type Memory = ConcurrentArenaMemory;

fn memory(&self) -> &Self::Memory {
&self.memory
}

fn memory_mut(&mut self) -> &mut Self::Memory {
&mut self.memory
}

fn alloc(&mut self, size: usize) -> ArenaSliceMut<Self::Memory> {
self.allocator.allocate(&mut self.memory, size)
}
}

#[cfg(test)]
mod tests {
use super::ConcurrentArena;
use crate::trie::mem::arena::alloc::CHUNK_SIZE;
use crate::trie::mem::arena::metrics::MEM_TRIE_ARENA_MEMORY_USAGE_BYTES;
use crate::trie::mem::arena::{Arena, ArenaMemory, ArenaWithDealloc};

#[test]
fn test_concurrent_arena() {
let arena = ConcurrentArena::new();
let mut thread1 = arena.for_thread();
let mut thread2 = arena.for_thread();
let mut thread3 = arena.for_thread();

let mut alloc1 = thread1.alloc(17);
let mut alloc2 = thread2.alloc(25);
let mut alloc3 = thread3.alloc(40);
alloc1.raw_slice_mut().copy_from_slice(&[1; 17]);
alloc2.raw_slice_mut().copy_from_slice(&[2; 25]);
alloc3.raw_slice_mut().copy_from_slice(&[3; 40]);
let ptr1 = alloc1.raw_pos();
let ptr2 = alloc2.raw_pos();
let ptr3 = alloc3.raw_pos();

let name = rand::random::<u64>().to_string();
let mut starena = arena.to_single_threaded(name.clone(), vec![thread1, thread2, thread3]);

assert_eq!(starena.num_active_allocs(), 3);
assert_eq!(starena.active_allocs_bytes(), 24 + 32 + 40);
assert_eq!(
MEM_TRIE_ARENA_MEMORY_USAGE_BYTES.get_metric_with_label_values(&[&name]).unwrap().get(),
3 * CHUNK_SIZE as i64
);

let mut alloc4 = starena.alloc(17);
alloc4.raw_slice_mut().copy_from_slice(&[4; 17]);
let ptr4 = alloc4.raw_pos();

assert_eq!(starena.memory().raw_slice(ptr1, 17), &[1; 17]);
assert_eq!(starena.memory().raw_slice(ptr2, 25), &[2; 25]);
assert_eq!(starena.memory().raw_slice(ptr3, 40), &[3; 40]);
assert_eq!(starena.memory().raw_slice(ptr4, 17), &[4; 17]);

// Allocations from the concurrent arena can be deallocated and reused in the converted STArena.
// Allocations of the same size class are reusable.
starena.dealloc(ptr1, 17);
let mut alloc5 = starena.alloc(23);
assert_eq!(alloc5.raw_pos(), ptr1);
alloc5.raw_slice_mut().copy_from_slice(&[5; 23]);
starena.dealloc(ptr2, 25);
let mut alloc6 = starena.alloc(32);
assert_eq!(alloc6.raw_pos(), ptr2);
alloc6.raw_slice_mut().copy_from_slice(&[6; 32]);
starena.dealloc(ptr3, 40);
let mut alloc7 = starena.alloc(37);
assert_eq!(alloc7.raw_pos(), ptr3);
alloc7.raw_slice_mut().copy_from_slice(&[7; 37]);
starena.dealloc(ptr4, 17);
let mut alloc8 = starena.alloc(24);
assert_eq!(alloc8.raw_pos(), ptr4);
alloc8.raw_slice_mut().copy_from_slice(&[8; 24]);

assert_eq!(starena.memory().raw_slice(ptr1, 23), &[5; 23]);
assert_eq!(starena.memory().raw_slice(ptr2, 32), &[6; 32]);
assert_eq!(starena.memory().raw_slice(ptr3, 37), &[7; 37]);
assert_eq!(starena.memory().raw_slice(ptr4, 24), &[8; 24]);
}
}
24 changes: 24 additions & 0 deletions core/store/src/trie/mem/arena/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod alloc;
pub mod concurrent;
mod metrics;

use self::alloc::Allocator;
Expand Down Expand Up @@ -153,11 +154,34 @@ impl STArena {
Self { memory: STArenaMemory::new(), allocator: Allocator::new(name) }
}

pub(crate) fn new_from_existing_chunks(
name: String,
chunks: Vec<Vec<u8>>,
active_allocs_bytes: usize,
active_allocs_count: usize,
) -> Self {
let arena = Self {
memory: STArenaMemory { chunks },
allocator: Allocator::new_with_initial_stats(
name,
active_allocs_bytes,
active_allocs_count,
),
};
arena.allocator.update_memory_usage_gauge(&arena.memory);
arena
}

/// Number of active allocations (alloc calls minus dealloc calls).
#[cfg(test)]
pub fn num_active_allocs(&self) -> usize {
self.allocator.num_active_allocs()
}

#[cfg(test)]
pub fn active_allocs_bytes(&self) -> usize {
self.allocator.active_allocs_bytes()
}
}

impl Arena for STArena {
Expand Down
3 changes: 1 addition & 2 deletions core/store/src/trie/mem/construction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ impl<'a, A: Arena> TrieConstructor<'a, A> {

/// Adds a leaf to the trie. The key must be greater than all previous keys
/// inserted.
pub fn add_leaf(&mut self, key: &[u8], value: FlatStateValue) {
let mut nibbles = NibbleSlice::new(key);
pub fn add_leaf(&mut self, mut nibbles: NibbleSlice, value: FlatStateValue) {
let mut i = 0;
// We'll go down the segments to find where our nibbles deviate.
// If the deviation happens in the middle of a segment, we would split
Expand Down
Loading

0 comments on commit a4d9742

Please sign in to comment.