From 4cfa5bddf105feb0c16f9023bb88ec50fcdc053f Mon Sep 17 00:00:00 2001 From: Stein Somers Date: Thu, 22 Oct 2020 20:57:11 +0200 Subject: [PATCH] BTreeMap: avoid aliasing while handling underfull nodes --- library/alloc/src/collections/btree/append.rs | 11 +- library/alloc/src/collections/btree/mem.rs | 1 + .../alloc/src/collections/btree/navigate.rs | 14 - library/alloc/src/collections/btree/node.rs | 247 +++++++++++++----- .../alloc/src/collections/btree/node/tests.rs | 4 +- library/alloc/src/collections/btree/remove.rs | 230 ++++++++-------- library/alloc/src/collections/btree/split.rs | 16 +- 7 files changed, 320 insertions(+), 203 deletions(-) diff --git a/library/alloc/src/collections/btree/append.rs b/library/alloc/src/collections/btree/append.rs index e0362b2f37df7..a4253a732c30f 100644 --- a/library/alloc/src/collections/btree/append.rs +++ b/library/alloc/src/collections/btree/append.rs @@ -89,20 +89,15 @@ impl Root { let mut cur_node = self.node_as_mut(); while let Internal(internal) = cur_node.force() { // Check if right-most child is underfull. - let mut last_edge = internal.last_edge(); - let right_child_len = last_edge.reborrow().descend().len(); + let mut last_kv = internal.last_kv().consider_for_balancing(); + let right_child_len = last_kv.right_child_len(); if right_child_len < MIN_LEN { // We need to steal. - let mut last_kv = match last_edge.left_kv() { - Ok(left) => left, - Err(_) => unreachable!(), - }; last_kv.bulk_steal_left(MIN_LEN - right_child_len); - last_edge = last_kv.right_edge(); } // Go further down. - cur_node = last_edge.descend(); + cur_node = last_kv.into_right_child(); } } } diff --git a/library/alloc/src/collections/btree/mem.rs b/library/alloc/src/collections/btree/mem.rs index 5e7d9fa3f91ba..e1363d1ae1f6b 100644 --- a/library/alloc/src/collections/btree/mem.rs +++ b/library/alloc/src/collections/btree/mem.rs @@ -6,6 +6,7 @@ use core::ptr; /// relevant function. /// /// If a panic occurs in the `change` closure, the entire process will be aborted. +#[allow(dead_code)] // keep as illustration and for future use #[inline] pub fn take_mut(v: &mut T, change: impl FnOnce(T) -> T) { replace(v, |value| (change(value), ())) diff --git a/library/alloc/src/collections/btree/navigate.rs b/library/alloc/src/collections/btree/navigate.rs index de78148fc82be..ef6f888693fc8 100644 --- a/library/alloc/src/collections/btree/navigate.rs +++ b/library/alloc/src/collections/btree/navigate.rs @@ -362,20 +362,6 @@ impl<'a, K, V> Handle, K, V, marker::Leaf>, marker::E } } -impl<'a, K, V> Handle, K, V, marker::Leaf>, marker::Edge> { - /// Moves the leaf edge handle to the next leaf edge. - /// - /// # Safety - /// There must be another KV in the direction travelled. - pub unsafe fn move_next_unchecked(&mut self) { - super::mem::take_mut(self, |leaf_edge| { - let kv = leaf_edge.next_kv(); - let kv = unsafe { unwrap_unchecked(kv.ok()) }; - kv.next_leaf_edge() - }) - } -} - impl Handle, marker::Edge> { /// Moves the leaf edge handle to the next leaf edge and returns the key and value /// in between, deallocating any node left behind while leaving the corresponding diff --git a/library/alloc/src/collections/btree/node.rs b/library/alloc/src/collections/btree/node.rs index dbf9031620ee1..5feab9201382e 100644 --- a/library/alloc/src/collections/btree/node.rs +++ b/library/alloc/src/collections/btree/node.rs @@ -498,6 +498,12 @@ impl NodeRef { } impl<'a, K, V, Type> NodeRef, K, V, Type> { + /// Unsafely asserts to the compiler the static information that this node is a `Leaf`. + unsafe fn cast_to_leaf_unchecked(self) -> NodeRef, K, V, marker::Leaf> { + debug_assert!(self.height == 0); + NodeRef { height: self.height, node: self.node, _marker: PhantomData } + } + /// Unsafely asserts to the compiler the static information that this node is an `Internal`. unsafe fn cast_to_internal_unchecked(self) -> NodeRef, K, V, marker::Internal> { debug_assert!(self.height > 0); @@ -922,6 +928,14 @@ impl } impl<'a, K, V, NodeType, HandleType> Handle, K, V, NodeType>, HandleType> { + /// Unsafely asserts to the compiler the static information that the handle's node is a `Leaf`. + pub unsafe fn cast_to_leaf_unchecked( + self, + ) -> Handle, K, V, marker::Leaf>, HandleType> { + let node = unsafe { self.node.cast_to_leaf_unchecked() }; + Handle { node, idx: self.idx, _marker: PhantomData } + } + /// Temporarily takes out another, mutable handle on the same location. Beware, as /// this method is very dangerous, doubly so since it may not immediately appear /// dangerous. @@ -961,9 +975,9 @@ impl Handle, mar } } -enum InsertionPlace { - Left(usize), - Right(usize), +pub enum LeftOrRight { + Left(T), + Right(T), } /// Given an edge index where we want to insert into a node filled to capacity, @@ -971,14 +985,14 @@ enum InsertionPlace { /// The goal of the split point is for its key and value to end up in a parent node; /// the keys, values and edges to the left of the split point become the left child; /// the keys, values and edges to the right of the split point become the right child. -fn splitpoint(edge_idx: usize) -> (usize, InsertionPlace) { +fn splitpoint(edge_idx: usize) -> (usize, LeftOrRight) { debug_assert!(edge_idx <= CAPACITY); // Rust issue #74834 tries to explain these symmetric rules. match edge_idx { - 0..EDGE_IDX_LEFT_OF_CENTER => (KV_IDX_CENTER - 1, InsertionPlace::Left(edge_idx)), - EDGE_IDX_LEFT_OF_CENTER => (KV_IDX_CENTER, InsertionPlace::Left(edge_idx)), - EDGE_IDX_RIGHT_OF_CENTER => (KV_IDX_CENTER, InsertionPlace::Right(0)), - _ => (KV_IDX_CENTER + 1, InsertionPlace::Right(edge_idx - (KV_IDX_CENTER + 1 + 1))), + 0..EDGE_IDX_LEFT_OF_CENTER => (KV_IDX_CENTER - 1, LeftOrRight::Left(edge_idx)), + EDGE_IDX_LEFT_OF_CENTER => (KV_IDX_CENTER, LeftOrRight::Left(edge_idx)), + EDGE_IDX_RIGHT_OF_CENTER => (KV_IDX_CENTER, LeftOrRight::Right(0)), + _ => (KV_IDX_CENTER + 1, LeftOrRight::Right(edge_idx - (KV_IDX_CENTER + 1 + 1))), } } @@ -1016,10 +1030,10 @@ impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Leaf>, mark let middle = unsafe { Handle::new_kv(self.node, middle_kv_idx) }; let (mut left, k, v, mut right) = middle.split(); let mut insertion_edge = match insertion { - InsertionPlace::Left(insert_idx) => unsafe { + LeftOrRight::Left(insert_idx) => unsafe { Handle::new_edge(left.reborrow_mut(), insert_idx) }, - InsertionPlace::Right(insert_idx) => unsafe { + LeftOrRight::Right(insert_idx) => unsafe { Handle::new_edge(right.leaf_node_as_mut(), insert_idx) }, }; @@ -1080,10 +1094,10 @@ impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Internal>, let middle = unsafe { Handle::new_kv(self.node, middle_kv_idx) }; let (mut left, k, v, mut right) = middle.split(); let mut insertion_edge = match insertion { - InsertionPlace::Left(insert_idx) => unsafe { + LeftOrRight::Left(insert_idx) => unsafe { Handle::new_edge(left.reborrow_mut(), insert_idx) }, - InsertionPlace::Right(insert_idx) => unsafe { + LeftOrRight::Right(insert_idx) => unsafe { Handle::new_edge(right.internal_node_as_mut(), insert_idx) }, }; @@ -1250,18 +1264,6 @@ impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Leaf>, mark } } -impl<'a, K, V> Handle, K, V, marker::Internal>, marker::KV> { - /// Returns `true` if it is valid to call `.merge()`, i.e., whether there is enough room in - /// a node to hold the combination of the nodes to the left and right of this handle along - /// with the key/value pair at this handle. - pub fn can_merge(&self) -> bool { - (self.reborrow().left_edge().descend().len() - + self.reborrow().right_edge().descend().len() - + 1) - <= CAPACITY - } -} - impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Internal>, marker::KV> { /// Splits the underlying node into three parts: /// @@ -1290,28 +1292,118 @@ impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Internal>, (self.node, k, v, right) } } +} + +/// Represents a session for evaluating and performing a balancing operation +/// around an internal key/value pair. +pub struct BalancingContext<'a, K, V> { + parent: Handle, K, V, marker::Internal>, marker::KV>, + left_child: NodeRef, K, V, marker::LeafOrInternal>, + right_child: NodeRef, K, V, marker::LeafOrInternal>, +} + +impl<'a, K, V> Handle, K, V, marker::Internal>, marker::KV> { + pub fn consider_for_balancing(self) -> BalancingContext<'a, K, V> { + let self1 = unsafe { ptr::read(&self) }; + let self2 = unsafe { ptr::read(&self) }; + BalancingContext { + parent: self, + left_child: self1.left_edge().descend(), + right_child: self2.right_edge().descend(), + } + } +} - /// Combines the node immediately to the left of this handle, the key/value pair pointed - /// to by this handle, and the node immediately to the right of this handle into one new - /// child of the underlying node, returning an edge referencing that new child. +impl<'a, K, V> NodeRef, K, V, marker::LeafOrInternal> { + /// Chooses a balancing context involving the node as a child, thus between + /// the KV immediately to the left or to the right in the parent node. + /// Returns an `Err` if there is no parent. /// - /// Panics unless this edge `.can_merge()`. + /// This method optimizes for a node that has fewer elements than its left + /// and right siblings, if they exist, by preferring the left parent KV. + /// Merging with the left sibling is faster, since we only need to move + /// the node's N elements, instead of shifting them to the right and moving + /// more than N elements in front. Stealing from the left sibling is also + /// typically faster, since we only need to shift the node's N elements to + /// the right, instead of shifting at least N of the sibling's elements to + /// the left. + pub fn choose_parent_kv(self) -> Result>, Self> { + match unsafe { ptr::read(&self) }.ascend() { + Ok(parent) => match parent.left_kv() { + Ok(left_parent_kv) => Ok(LeftOrRight::Left(BalancingContext { + parent: unsafe { ptr::read(&left_parent_kv) }, + left_child: left_parent_kv.left_edge().descend(), + right_child: self, + })), + Err(parent) => match parent.right_kv() { + Ok(right_parent_kv) => Ok(LeftOrRight::Right(BalancingContext { + parent: unsafe { ptr::read(&right_parent_kv) }, + left_child: self, + right_child: right_parent_kv.right_edge().descend(), + })), + Err(_) => unreachable!("empty non-root node"), + }, + }, + Err(root) => Err(root), + } + } +} + +impl<'a, K, V> BalancingContext<'a, K, V> { + pub fn left_child_len(&self) -> usize { + self.left_child.len() + } + + pub fn right_child_len(&self) -> usize { + self.right_child.len() + } + + pub fn into_left_child(self) -> NodeRef, K, V, marker::LeafOrInternal> { + self.left_child + } + + pub fn into_right_child(self) -> NodeRef, K, V, marker::LeafOrInternal> { + self.right_child + } + + /// Returns `true` if it is valid to call `.merge()` in the balancing context, + /// i.e., whether there is enough room in a node to hold the combination of + /// both adjacent child nodes, along with the key/value pair in the parent. + pub fn can_merge(&self) -> bool { + self.left_child.len() + 1 + self.right_child.len() <= CAPACITY + } +} + +impl<'a, K: 'a, V: 'a> BalancingContext<'a, K, V> { + /// Merges the parent's key/value pair and both adjacent child nodes into + /// the left node and returns an edge handle in that expanded left node. + /// If `track_edge_idx` is given some value, the returned edge corresponds + /// to where the edge in that child node ended up, + /// + /// Panics unless we `.can_merge()`. pub fn merge( mut self, - ) -> Handle, K, V, marker::Internal>, marker::Edge> { - let self1 = unsafe { ptr::read(&self) }; - let self2 = unsafe { ptr::read(&self) }; - let mut left_node = self1.left_edge().descend(); + track_edge_idx: Option>, + ) -> Handle, K, V, marker::LeafOrInternal>, marker::Edge> { + let mut left_node = self.left_child; let left_len = left_node.len(); - let right_node = self2.right_edge().descend(); + let right_node = self.right_child; let right_len = right_node.len(); assert!(left_len + right_len < CAPACITY); + assert!(match track_edge_idx { + None => true, + Some(LeftOrRight::Left(idx)) => idx <= left_len, + Some(LeftOrRight::Right(idx)) => idx <= right_len, + }); unsafe { *left_node.reborrow_mut().into_len_mut() += right_len as u16 + 1; - let parent_key = slice_remove(self.node.reborrow_mut().into_key_area_slice(), self.idx); + let parent_key = slice_remove( + self.parent.node.reborrow_mut().into_key_area_slice(), + self.parent.idx, + ); left_node.reborrow_mut().into_key_area_mut_at(left_len).write(parent_key); ptr::copy_nonoverlapping( right_node.reborrow().key_area().as_ptr(), @@ -1319,7 +1411,10 @@ impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Internal>, right_len, ); - let parent_val = slice_remove(self.node.reborrow_mut().into_val_area_slice(), self.idx); + let parent_val = slice_remove( + self.parent.node.reborrow_mut().into_val_area_slice(), + self.parent.idx, + ); left_node.reborrow_mut().into_val_area_mut_at(left_len).write(parent_val); ptr::copy_nonoverlapping( right_node.reborrow().val_area().as_ptr(), @@ -1327,15 +1422,18 @@ impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Internal>, right_len, ); - slice_remove(&mut self.node.reborrow_mut().into_edge_area_slice(), self.idx + 1); - let self_len = self.node.len(); - self.node.correct_childrens_parent_links(self.idx + 1..self_len); - *self.node.reborrow_mut().into_len_mut() -= 1; + slice_remove( + &mut self.parent.node.reborrow_mut().into_edge_area_slice(), + self.parent.idx + 1, + ); + let parent_old_len = self.parent.node.len(); + self.parent.node.correct_childrens_parent_links(self.parent.idx + 1..parent_old_len); + *self.parent.node.reborrow_mut().into_len_mut() -= 1; - if self.node.height > 1 { + if self.parent.node.height > 1 { // SAFETY: the height of the nodes being merged is one below the height // of the node of this edge, thus above zero, so they are internal. - let mut left_node = left_node.cast_to_internal_unchecked(); + let mut left_node = left_node.reborrow_mut().cast_to_internal_unchecked(); let right_node = right_node.cast_to_internal_unchecked(); ptr::copy_nonoverlapping( right_node.reborrow().edge_area().as_ptr(), @@ -1350,50 +1448,67 @@ impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Internal>, Global.dealloc(right_node.node.cast(), Layout::new::>()); } - Handle::new_edge(self.node, self.idx) + let new_idx = match track_edge_idx { + None => 0, + Some(LeftOrRight::Left(idx)) => idx, + Some(LeftOrRight::Right(idx)) => left_len + 1 + idx, + }; + Handle::new_edge(left_node, new_idx) } } - /// This removes a key/value pair from the left child and places it in the key/value storage - /// pointed to by this handle while pushing the old key/value pair of this handle into the right - /// child. - pub fn steal_left(&mut self) { + /// Removes a key/value pair from the left child and places it in the key/value storage + /// of the parent, while pushing the old parent key/value pair into the right child. + /// Returns a handle to the edge in the right child corresponding to where the original + /// edge specified by `track_right_edge_idx` ended up. + pub fn steal_left( + mut self, + track_right_edge_idx: usize, + ) -> Handle, K, V, marker::LeafOrInternal>, marker::Edge> { unsafe { - let (k, v, edge) = self.reborrow_mut().left_edge().descend().pop(); + let (k, v, edge) = self.left_child.pop(); - let k = mem::replace(self.kv_mut().0, k); - let v = mem::replace(self.kv_mut().1, v); + let k = mem::replace(self.parent.kv_mut().0, k); + let v = mem::replace(self.parent.kv_mut().1, v); - match self.reborrow_mut().right_edge().descend().force() { + match self.right_child.reborrow_mut().force() { ForceResult::Leaf(mut leaf) => leaf.push_front(k, v), ForceResult::Internal(mut internal) => internal.push_front(k, v, edge.unwrap()), } + + Handle::new_edge(self.right_child, 1 + track_right_edge_idx) } } - /// This removes a key/value pair from the right child and places it in the key/value storage - /// pointed to by this handle while pushing the old key/value pair of this handle into the left - /// child. - pub fn steal_right(&mut self) { + /// Removes a key/value pair from the right child and places it in the key/value storage + /// of the parent, while pushing the old parent key/value pair onto the left child. + /// Returns a handle to the edge in the left child specified by `track_left_edge_idx`, + /// which didn't move. + pub fn steal_right( + mut self, + track_left_edge_idx: usize, + ) -> Handle, K, V, marker::LeafOrInternal>, marker::Edge> { unsafe { - let (k, v, edge) = self.reborrow_mut().right_edge().descend().pop_front(); + let (k, v, edge) = self.right_child.pop_front(); - let k = mem::replace(self.kv_mut().0, k); - let v = mem::replace(self.kv_mut().1, v); + let k = mem::replace(self.parent.kv_mut().0, k); + let v = mem::replace(self.parent.kv_mut().1, v); - match self.reborrow_mut().left_edge().descend().force() { + match self.left_child.reborrow_mut().force() { ForceResult::Leaf(mut leaf) => leaf.push(k, v), ForceResult::Internal(mut internal) => internal.push(k, v, edge.unwrap()), } + + Handle::new_edge(self.left_child, track_left_edge_idx) } } /// This does stealing similar to `steal_left` but steals multiple elements at once. pub fn bulk_steal_left(&mut self, count: usize) { unsafe { - let mut left_node = ptr::read(self).left_edge().descend(); + let left_node = &mut self.left_child; let left_len = left_node.len(); - let mut right_node = ptr::read(self).right_edge().descend(); + let right_node = &mut self.right_child; let right_len = right_node.len(); // Make sure that we may steal safely. @@ -1407,7 +1522,7 @@ impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Internal>, let left_kv = left_node.reborrow_mut().into_kv_pointers_mut(); let right_kv = right_node.reborrow_mut().into_kv_pointers_mut(); let parent_kv = { - let kv = self.kv_mut(); + let kv = self.parent.kv_mut(); (kv.0 as *mut K, kv.1 as *mut V) }; @@ -1428,7 +1543,7 @@ impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Internal>, *left_node.reborrow_mut().into_len_mut() -= count as u16; *right_node.reborrow_mut().into_len_mut() += count as u16; - match (left_node.force(), right_node.force()) { + match (left_node.reborrow_mut().force(), right_node.reborrow_mut().force()) { (ForceResult::Internal(left), ForceResult::Internal(mut right)) => { // Make room for stolen edges. let left = left.reborrow(); @@ -1447,9 +1562,9 @@ impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Internal>, /// The symmetric clone of `bulk_steal_left`. pub fn bulk_steal_right(&mut self, count: usize) { unsafe { - let mut left_node = ptr::read(self).left_edge().descend(); + let left_node = &mut self.left_child; let left_len = left_node.len(); - let mut right_node = ptr::read(self).right_edge().descend(); + let right_node = &mut self.right_child; let right_len = right_node.len(); // Make sure that we may steal safely. @@ -1463,7 +1578,7 @@ impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Internal>, let left_kv = left_node.reborrow_mut().into_kv_pointers_mut(); let right_kv = right_node.reborrow_mut().into_kv_pointers_mut(); let parent_kv = { - let kv = self.kv_mut(); + let kv = self.parent.kv_mut(); (kv.0 as *mut K, kv.1 as *mut V) }; @@ -1484,7 +1599,7 @@ impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Internal>, *left_node.reborrow_mut().into_len_mut() += count as u16; *right_node.reborrow_mut().into_len_mut() -= count as u16; - match (left_node.force(), right_node.force()) { + match (left_node.reborrow_mut().force(), right_node.reborrow_mut().force()) { (ForceResult::Internal(left), ForceResult::Internal(mut right)) => { move_edges(right.reborrow(), 0, left, left_len + 1, count); diff --git a/library/alloc/src/collections/btree/node/tests.rs b/library/alloc/src/collections/btree/node/tests.rs index 38c75de34eeeb..ac61c1df8e2e4 100644 --- a/library/alloc/src/collections/btree/node/tests.rs +++ b/library/alloc/src/collections/btree/node/tests.rs @@ -54,11 +54,11 @@ fn test_splitpoint() { let mut left_len = middle_kv_idx; let mut right_len = CAPACITY - middle_kv_idx - 1; match insertion { - InsertionPlace::Left(edge_idx) => { + LeftOrRight::Left(edge_idx) => { assert!(edge_idx <= left_len); left_len += 1; } - InsertionPlace::Right(edge_idx) => { + LeftOrRight::Right(edge_idx) => { assert!(edge_idx <= right_len); right_len += 1; } diff --git a/library/alloc/src/collections/btree/remove.rs b/library/alloc/src/collections/btree/remove.rs index 99655d3e2bf64..c4253d4221b3b 100644 --- a/library/alloc/src/collections/btree/remove.rs +++ b/library/alloc/src/collections/btree/remove.rs @@ -1,133 +1,153 @@ use super::map::MIN_LEN; -use super::node::{marker, ForceResult, Handle, NodeRef}; +use super::node::{marker, ForceResult::*, Handle, LeftOrRight::*, NodeRef}; use super::unwrap_unchecked; use core::mem; -use core::ptr; impl<'a, K: 'a, V: 'a> Handle, K, V, marker::LeafOrInternal>, marker::KV> { - /// Removes a key/value-pair from the map, and returns that pair, as well as - /// the leaf edge corresponding to that former pair. + /// Removes a key/value-pair from the tree, and returns that pair, as well as + /// the leaf edge corresponding to that former pair. It's possible this empties + /// a root node that is internal, which the caller should pop from the map + /// holding the tree. The caller should also decrement the map's length. pub fn remove_kv_tracking( self, handle_emptied_internal_root: F, ) -> ((K, V), Handle, K, V, marker::Leaf>, marker::Edge>) { - let (old_kv, mut pos, was_internal) = match self.force() { - ForceResult::Leaf(leaf) => { - let (old_kv, pos) = leaf.remove(); - (old_kv, pos, false) - } - ForceResult::Internal(mut internal) => { - // Replace the location freed in the internal node with an - // adjacent KV, and remove that adjacent KV from its leaf. - // Always choose the adjacent KV on the left side because - // it is typically faster to pop an element from the end - // of the KV arrays without needing to shift other elements. - - let key_loc = internal.kv_mut().0 as *mut K; - let val_loc = internal.kv_mut().1 as *mut V; - - let to_remove = internal.left_edge().descend().last_leaf_edge().left_kv().ok(); - let to_remove = unsafe { unwrap_unchecked(to_remove) }; - - let (kv, pos) = to_remove.remove(); - - let old_key = unsafe { mem::replace(&mut *key_loc, kv.0) }; - let old_val = unsafe { mem::replace(&mut *val_loc, kv.1) }; - - ((old_key, old_val), pos, true) - } - }; - - // Handle underflow - let mut cur_node = unsafe { ptr::read(&pos).into_node().forget_type() }; - let mut at_leaf = true; - while cur_node.len() < MIN_LEN { - match handle_underfull_node(cur_node) { - UnderflowResult::AtRoot => break, - UnderflowResult::Merged(edge, merged_with_left, offset) => { - // If we merged with our right sibling then our tracked - // position has not changed. However if we merged with our - // left sibling then our tracked position is now dangling. - if at_leaf && merged_with_left { - let idx = pos.idx() + offset; - let node = match unsafe { ptr::read(&edge).descend().force() } { - ForceResult::Leaf(leaf) => leaf, - ForceResult::Internal(_) => unreachable!(), - }; - pos = unsafe { Handle::new_edge(node, idx) }; - } + match self.force() { + Leaf(node) => node.remove_leaf_kv(handle_emptied_internal_root), + Internal(node) => node.remove_internal_kv(handle_emptied_internal_root), + } + } +} - let parent = edge.into_node(); - if parent.len() == 0 { - // The parent that was just emptied must be the root, - // because nodes on a lower level would not have been - // left with a single child. - handle_emptied_internal_root(); - break; +impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Leaf>, marker::KV> { + fn remove_leaf_kv( + self, + handle_emptied_internal_root: F, + ) -> ((K, V), Handle, K, V, marker::Leaf>, marker::Edge>) { + let (old_kv, mut pos) = self.remove(); + let len = pos.reborrow().into_node().len(); + if len < MIN_LEN { + let idx = pos.idx(); + // We have to temporarily forget the child type, because there is no + // distinct node type for the immediate parents of a leaf. + let new_pos = match pos.into_node().forget_type().choose_parent_kv() { + Ok(Left(left_parent_kv)) => { + debug_assert!(left_parent_kv.right_child_len() == MIN_LEN - 1); + if left_parent_kv.can_merge() { + left_parent_kv.merge(Some(Right(idx))) } else { - cur_node = parent.forget_type(); - at_leaf = false; + debug_assert!(left_parent_kv.left_child_len() > MIN_LEN); + left_parent_kv.steal_left(idx) } } - UnderflowResult::Stole(stole_from_left) => { - // Adjust the tracked position if we stole from a left sibling - if stole_from_left && at_leaf { - // SAFETY: This is safe since we just added an element to our node. - unsafe { - pos.move_next_unchecked(); - } + Ok(Right(right_parent_kv)) => { + debug_assert!(right_parent_kv.left_child_len() == MIN_LEN - 1); + if right_parent_kv.can_merge() { + right_parent_kv.merge(Some(Left(idx))) + } else { + debug_assert!(right_parent_kv.right_child_len() > MIN_LEN); + right_parent_kv.steal_right(idx) } - break; } - } - } + Err(pos) => unsafe { Handle::new_edge(pos, idx) }, + }; + // SAFETY: `new_pos` is the leaf we started from or a sibling. + pos = unsafe { new_pos.cast_to_leaf_unchecked() }; - // If we deleted from an internal node then we need to compensate for - // the earlier swap and adjust the tracked position to point to the - // next element. - if was_internal { - pos = unsafe { unwrap_unchecked(pos.next_kv().ok()).next_leaf_edge() }; + // Only if we merged, the parent (if any) has shrunk, but skipping + // the following step does not pay off in benchmarks. + // + // SAFETY: We won't destroy or rearrange the leaf where `pos` is at + // by handling its parent recursively; at worst we will destroy or + // rearrange the parent through the grandparent, thus change the + // leaf's parent pointer. + if let Ok(parent) = unsafe { pos.reborrow_mut() }.into_node().ascend() { + parent.into_node().handle_shrunk_node_recursively(handle_emptied_internal_root); + } } - (old_kv, pos) } } -enum UnderflowResult<'a, K, V> { - AtRoot, - Merged(Handle, K, V, marker::Internal>, marker::Edge>, bool, usize), - Stole(bool), -} +impl<'a, K: 'a, V: 'a> Handle, K, V, marker::Internal>, marker::KV> { + fn remove_internal_kv( + self, + handle_emptied_internal_root: F, + ) -> ((K, V), Handle, K, V, marker::Leaf>, marker::Edge>) { + // Remove an adjacent KV from its leaf and then put it back in place of + // the element we were asked to remove. Prefer the left adjacent KV, + // for the reasons listed in `choose_parent_kv`. + let left_leaf_kv = self.left_edge().descend().last_leaf_edge().left_kv(); + let left_leaf_kv = unsafe { unwrap_unchecked(left_leaf_kv.ok()) }; + let (left_kv, left_hole) = left_leaf_kv.remove_leaf_kv(handle_emptied_internal_root); -fn handle_underfull_node<'a, K: 'a, V: 'a>( - node: NodeRef, K, V, marker::LeafOrInternal>, -) -> UnderflowResult<'_, K, V> { - let parent = match node.ascend() { - Ok(parent) => parent, - Err(_) => return UnderflowResult::AtRoot, - }; + // The internal node may have been stolen from or merged. Go back right + // to find where the original KV ended up. + let mut internal = unsafe { unwrap_unchecked(left_hole.next_kv().ok()) }; + let old_key = mem::replace(internal.kv_mut().0, left_kv.0); + let old_val = mem::replace(internal.kv_mut().1, left_kv.1); + let pos = internal.next_leaf_edge(); + ((old_key, old_val), pos) + } +} - // Prefer the left KV if it exists. Merging with the left side is faster, - // since merging happens towards the left and `node` has fewer elements. - // Stealing from the left side is faster, since we can pop from the end of - // the KV arrays. - let (is_left, mut handle) = match parent.left_kv() { - Ok(left) => (true, left), - Err(parent) => { - let right = unsafe { unwrap_unchecked(parent.right_kv().ok()) }; - (false, right) +impl<'a, K: 'a, V: 'a> NodeRef, K, V, marker::Internal> { + /// Stocks up a possibly underfull internal node, recursively. + /// Climbs up until it reaches an ancestor that has elements to spare or the root. + fn handle_shrunk_node_recursively(mut self, handle_emptied_internal_root: F) { + loop { + self = match self.len() { + 0 => { + // An empty node must be the root, because length is only + // reduced by one, and non-root underfull nodes are stocked up, + // so non-root nodes never have fewer than MIN_LEN - 1 elements. + debug_assert!(self.ascend().is_err()); + handle_emptied_internal_root(); + return; + } + 1..MIN_LEN => { + if let Some(parent) = self.handle_underfull_node_locally() { + parent + } else { + return; + } + } + _ => return, + } } - }; + } - if handle.can_merge() { - let offset = if is_left { handle.reborrow().left_edge().descend().len() + 1 } else { 0 }; - UnderflowResult::Merged(handle.merge(), is_left, offset) - } else { - if is_left { - handle.steal_left(); - } else { - handle.steal_right(); + /// Stocks up an underfull internal node, possibly at the cost of shrinking + /// its parent instead, which is then returned. + fn handle_underfull_node_locally( + self, + ) -> Option, K, V, marker::Internal>> { + match self.forget_type().choose_parent_kv() { + Ok(Left(left_parent_kv)) => { + debug_assert!(left_parent_kv.right_child_len() == MIN_LEN - 1); + if left_parent_kv.can_merge() { + let pos = left_parent_kv.merge(None); + let parent_edge = unsafe { unwrap_unchecked(pos.into_node().ascend().ok()) }; + Some(parent_edge.into_node()) + } else { + debug_assert!(left_parent_kv.left_child_len() > MIN_LEN); + left_parent_kv.steal_left(0); + None + } + } + Ok(Right(right_parent_kv)) => { + debug_assert!(right_parent_kv.left_child_len() == MIN_LEN - 1); + if right_parent_kv.can_merge() { + let pos = right_parent_kv.merge(None); + let parent_edge = unsafe { unwrap_unchecked(pos.into_node().ascend().ok()) }; + Some(parent_edge.into_node()) + } else { + debug_assert!(right_parent_kv.right_child_len() > MIN_LEN); + right_parent_kv.steal_right(0); + None + } + } + Err(_) => None, } - UnderflowResult::Stole(is_left) } } diff --git a/library/alloc/src/collections/btree/split.rs b/library/alloc/src/collections/btree/split.rs index 5f00a5a25abad..cd9615a330df6 100644 --- a/library/alloc/src/collections/btree/split.rs +++ b/library/alloc/src/collections/btree/split.rs @@ -60,17 +60,17 @@ impl Root { let mut cur_node = self.node_as_mut(); while let Internal(node) = cur_node.force() { - let mut last_kv = node.last_kv(); + let mut last_kv = node.last_kv().consider_for_balancing(); if last_kv.can_merge() { - cur_node = last_kv.merge().descend(); + cur_node = last_kv.merge(None).into_node(); } else { - let right_len = last_kv.reborrow().right_edge().descend().len(); + let right_len = last_kv.right_child_len(); // `MIN_LEN + 1` to avoid readjust if merge happens on the next level. if right_len < MIN_LEN + 1 { last_kv.bulk_steal_left(MIN_LEN + 1 - right_len); } - cur_node = last_kv.right_edge().descend(); + cur_node = last_kv.into_right_child(); } } } @@ -86,17 +86,17 @@ impl Root { let mut cur_node = self.node_as_mut(); while let Internal(node) = cur_node.force() { - let mut first_kv = node.first_kv(); + let mut first_kv = node.first_kv().consider_for_balancing(); if first_kv.can_merge() { - cur_node = first_kv.merge().descend(); + cur_node = first_kv.merge(None).into_node(); } else { - let left_len = first_kv.reborrow().left_edge().descend().len(); + let left_len = first_kv.left_child_len(); // `MIN_LEN + 1` to avoid readjust if merge happens on the next level. if left_len < MIN_LEN + 1 { first_kv.bulk_steal_right(MIN_LEN + 1 - left_len); } - cur_node = first_kv.left_edge().descend(); + cur_node = first_kv.into_left_child(); } } }