Skip to content

Commit

Permalink
feat(storage): avoid using box dyn for storage table merge sort (#19713)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Dec 10, 2024
1 parent bb73593 commit 9209dbc
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 63 deletions.
52 changes: 19 additions & 33 deletions src/storage/src/table/merge_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,47 @@ use futures_async_stream::try_stream;

use super::{KeyedChangeLogRow, KeyedRow};

pub trait NodePeek<R> {
pub trait NodePeek {
fn vnode_key(&self) -> &[u8];
fn into_row(self: Box<Self>) -> R;
}

impl<K: AsRef<[u8]>> NodePeek<KeyedRow<K>> for KeyedRow<K> {
impl<K: AsRef<[u8]>> NodePeek for KeyedRow<K> {
fn vnode_key(&self) -> &[u8] {
self.key()
}

fn into_row(self: Box<Self>) -> KeyedRow<K> {
*self
}
}

impl<K: AsRef<[u8]>> NodePeek<KeyedChangeLogRow<K>> for KeyedChangeLogRow<K> {
impl<K: AsRef<[u8]>> NodePeek for KeyedChangeLogRow<K> {
fn vnode_key(&self) -> &[u8] {
self.key()
}

fn into_row(self: Box<Self>) -> KeyedChangeLogRow<K> {
*self
}
}

struct Node<'a, S, R> {
struct Node<S, R: NodePeek> {
stream: S,

/// The next item polled from `stream` previously. Since the `eq` and `cmp` must be synchronous
/// functions, we need to implement peeking manually.
peeked: Box<dyn NodePeek<R> + 'a + Send + Sync>,
peeked: R,
}

impl<S, R> PartialEq for Node<'_, S, R> {
impl<S, R: NodePeek> PartialEq for Node<S, R> {
fn eq(&self, other: &Self) -> bool {
match self.peeked.vnode_key() == other.peeked.vnode_key() {
true => unreachable!("primary key from different iters should be unique"),
false => false,
}
}
}
impl<S, R> Eq for Node<'_, S, R> {}
impl<S, R: NodePeek> Eq for Node<S, R> {}

impl<S, R> PartialOrd for Node<'_, S, R> {
impl<S, R: NodePeek> PartialOrd for Node<S, R> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl<S, R> Ord for Node<'_, S, R> {
impl<S, R: NodePeek> Ord for Node<S, R> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// The heap is a max heap, so we need to reverse the order.
self.peeked
Expand All @@ -81,39 +72,33 @@ impl<S, R> Ord for Node<'_, S, R> {
}

#[try_stream(ok=KO, error=E)]
pub async fn merge_sort<'a, KO, E, NP, R>(streams: Vec<R>)
pub async fn merge_sort<E, KO, R>(streams: Vec<R>)
where
// K: AsRef<[u8]> + 'a,
KO: 'a,
NP: NodePeek<KO> + 'a + Send + Sync,
E: Error + 'a,
R: Stream<Item = Result<NP, E>> + 'a + Unpin,
KO: NodePeek + Send + Sync,
E: Error,
R: Stream<Item = Result<KO, E>> + Unpin,
{
let mut heap = BinaryHeap::new();
for mut stream in streams {
if let Some(peeked) = stream.next().await.transpose()? {
heap.push(Node {
stream,
peeked: Box::new(peeked),
});
heap.push(Node { stream, peeked });
}
}
while let Some(mut node) = heap.peek_mut() {
// Note: If the `next` returns `Err`, we'll fail to yield the previous item.
yield match node.stream.next().await.transpose()? {
// There still remains data in the stream, take and update the peeked value.
Some(new_peeked) => {
std::mem::replace(&mut node.peeked, Box::new(new_peeked)).into_row()
}
Some(new_peeked) => std::mem::replace(&mut node.peeked, new_peeked),
// This stream is exhausted, remove it from the heap.
None => PeekMut::pop(node).peeked.into_row(),
None => PeekMut::pop(node).peeked,
};
}
}

#[cfg(test)]
mod tests {
use futures_async_stream::for_await;
use rand::random;
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::ScalarImpl;
Expand All @@ -123,7 +108,8 @@ mod tests {
use crate::error::StorageResult;

fn gen_pk_and_row(i: u8) -> StorageResult<KeyedRow<Vec<u8>>> {
let mut key = VirtualNode::ZERO.to_be_bytes().to_vec();
let vnode = VirtualNode::from_index(random::<usize>() % VirtualNode::COUNT_FOR_TEST);
let mut key = vnode.to_be_bytes().to_vec();
key.extend(vec![i]);
Ok(KeyedRow::new(
TableKey(key),
Expand Down
39 changes: 9 additions & 30 deletions src/storage/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,24 +102,24 @@ pub fn get_second<T, U, E>(arg: Result<(T, U), E>) -> Result<U, E> {
}

#[derive(Debug)]
pub struct KeyedRow<T: AsRef<[u8]>> {
pub struct KeyedRow<T: AsRef<[u8]>, R = OwnedRow> {
vnode_prefixed_key: TableKey<T>,
row: OwnedRow,
row: R,
}

impl<T: AsRef<[u8]>> KeyedRow<T> {
pub fn new(table_key: TableKey<T>, row: OwnedRow) -> Self {
impl<T: AsRef<[u8]>, R> KeyedRow<T, R> {
pub fn new(table_key: TableKey<T>, row: R) -> Self {
Self {
vnode_prefixed_key: table_key,
row,
}
}

pub fn into_owned_row(self) -> OwnedRow {
pub fn into_owned_row(self) -> R {
self.row
}

pub fn into_owned_row_key(self) -> (TableKey<T>, OwnedRow) {
pub fn into_owned_row_key(self) -> (TableKey<T>, R) {
(self.vnode_prefixed_key, self.row)
}

Expand All @@ -131,11 +131,11 @@ impl<T: AsRef<[u8]>> KeyedRow<T> {
self.vnode_prefixed_key.key_part()
}

pub fn row(&self) -> &OwnedRow {
pub fn row(&self) -> &R {
&self.row
}

pub fn into_parts(self) -> (TableKey<T>, OwnedRow) {
pub fn into_parts(self) -> (TableKey<T>, R) {
(self.vnode_prefixed_key, self.row)
}
}
Expand All @@ -148,28 +148,7 @@ impl<T: AsRef<[u8]>> Deref for KeyedRow<T> {
}
}

#[derive(Debug)]
pub struct KeyedChangeLogRow<T: AsRef<[u8]>> {
vnode_prefixed_key: TableKey<T>,
row: ChangeLogRow,
}

impl<T: AsRef<[u8]>> KeyedChangeLogRow<T> {
pub fn new(table_key: TableKey<T>, row: ChangeLogRow) -> Self {
Self {
vnode_prefixed_key: table_key,
row,
}
}

pub fn into_owned_row(self) -> ChangeLogRow {
self.row
}

pub fn key(&self) -> &[u8] {
self.vnode_prefixed_key.key_part()
}
}
pub type KeyedChangeLogRow<T> = KeyedRow<T, ChangeLogRow>;

pub type ChangeLogRow = ChangeLogValue<OwnedRow>;

Expand Down

0 comments on commit 9209dbc

Please sign in to comment.