Skip to content

Commit

Permalink
refactor(storage): remove unused struct used in deleted hummock v1 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Feb 24, 2023
1 parent a173a29 commit 7f237d7
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 2,008 deletions.
17 changes: 4 additions & 13 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use risingwave_storage::hummock::compactor::{
Compactor, ConcatSstableIterator, DummyCompactionFilter, TaskConfig,
};
use risingwave_storage::hummock::iterator::{
ConcatIterator, Forward, HummockIterator, HummockIteratorUnion, MultiSstIterator,
UnorderedMergeIteratorInner,
ConcatIterator, Forward, HummockIterator, UnorderedMergeIteratorInner,
};
use risingwave_storage::hummock::multi_builder::{
CapacitySplitTableBuilder, LocalTableBuilderFactory,
Expand Down Expand Up @@ -214,18 +213,10 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) {
b.to_async(FuturesExecutor).iter(|| {
let sstable_store1 = sstable_store.clone();
let sub_iters = vec![
HummockIteratorUnion::First(ConcatIterator::new(
level1.clone(),
sstable_store.clone(),
read_options.clone(),
)),
HummockIteratorUnion::First(ConcatIterator::new(
level2.clone(),
sstable_store.clone(),
read_options.clone(),
)),
ConcatIterator::new(level1.clone(), sstable_store.clone(), read_options.clone()),
ConcatIterator::new(level2.clone(), sstable_store.clone(), read_options.clone()),
];
let iter = MultiSstIterator::for_compactor(sub_iters);
let iter = UnorderedMergeIteratorInner::for_compactor(sub_iters);
async move { compact(iter, sstable_store1).await }
});
});
Expand Down
114 changes: 33 additions & 81 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,12 @@ use risingwave_pb::hummock::compact_task;
use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
use crate::hummock::compactor::context::CompactorContext;
use crate::hummock::compactor::{CompactOutput, Compactor};
use crate::hummock::iterator::{Forward, HummockIterator};
use crate::hummock::shared_buffer::shared_buffer_uploader::UploadTaskPayload;
use crate::hummock::shared_buffer::{build_ordered_merge_iter, UncommittedData};
use crate::hummock::sstable::{DeleteRangeAggregatorBuilder, SstableIteratorReadOptions};
use crate::hummock::event_handler::uploader::UploadTaskPayload;
use crate::hummock::iterator::{Forward, HummockIterator, OrderedMergeIteratorInner};
use crate::hummock::sstable::DeleteRangeAggregatorBuilder;
use crate::hummock::{
CachePolicy, ForwardIter, HummockError, HummockResult, RangeTombstonesCollector,
SstableBuilderOptions,
CachePolicy, HummockError, HummockResult, RangeTombstonesCollector, SstableBuilderOptions,
};
use crate::monitor::StoreLocalStatistic;

const GC_DELETE_KEYS_FOR_FLUSH: bool = false;
const GC_WATERMARK_FOR_FLUSH: u64 = 0;
Expand All @@ -50,37 +47,22 @@ pub async fn compact(
compaction_group_index: Arc<HashMap<TableId, CompactionGroupId>>,
) -> HummockResult<Vec<LocalSstableInfo>> {
let mut grouped_payload: HashMap<CompactionGroupId, UploadTaskPayload> = HashMap::new();
for uncommitted_list in payload {
let mut next_inner = HashSet::new();
for uncommitted in uncommitted_list {
let compaction_group_id = match &uncommitted {
UncommittedData::Sst(LocalSstableInfo {
compaction_group_id,
..
}) => *compaction_group_id,
UncommittedData::Batch(batch) => {
match compaction_group_index.get(&batch.table_id) {
// compaction group id is used only as a hint for grouping different data.
// If the compaction group id is not found for the table id, we can assign a
// default compaction group id for the batch.
//
// On meta side, when we commit a new epoch, it is acceptable that the
// compaction group id provided from CN does not match the latest compaction
// group config.
None => StaticCompactionGroupId::StateDefault as CompactionGroupId,
Some(group_id) => *group_id,
}
}
};
let group = grouped_payload
.entry(compaction_group_id)
.or_insert_with(std::vec::Vec::new);
if !next_inner.contains(&compaction_group_id) {
group.push(vec![]);
next_inner.insert(compaction_group_id);
}
group.last_mut().unwrap().push(uncommitted);
}
for imm in payload {
let compaction_group_id = match compaction_group_index.get(&imm.table_id) {
// compaction group id is used only as a hint for grouping different data.
// If the compaction group id is not found for the table id, we can assign a
// default compaction group id for the batch.
//
// On meta side, when we commit a new epoch, it is acceptable that the
// compaction group id provided from CN does not match the latest compaction
// group config.
None => StaticCompactionGroupId::StateDefault as CompactionGroupId,
Some(group_id) => *group_id,
};
grouped_payload
.entry(compaction_group_id)
.or_insert_with(std::vec::Vec::new)
.push(imm);
}

let mut futures = vec![];
Expand Down Expand Up @@ -113,31 +95,18 @@ async fn compact_shared_buffer(
payload: UploadTaskPayload,
) -> HummockResult<Vec<LocalSstableInfo>> {
// Local memory compaction looks at all key ranges.
let sstable_store = context.sstable_store.clone();
let mut local_stats = StoreLocalStatistic::default();
let mut size_and_start_user_keys = vec![];
let mut compact_data_size = 0;
let mut builder = DeleteRangeAggregatorBuilder::default();
for data_list in &payload {
for data in data_list {
let data_size = match data {
UncommittedData::Sst(LocalSstableInfo { sst_info, .. }) => {
let table = sstable_store.sstable(sst_info, &mut local_stats).await?;
// TODO: use reference to avoid memory allocation.
let tombstones = table.value().meta.range_tombstone_list.clone();
builder.add_tombstone(tombstones);
sst_info.file_size
}
UncommittedData::Batch(batch) => {
let tombstones = batch.get_delete_range_tombstones();
builder.add_tombstone(tombstones);
// calculate encoded bytes of key var length
(batch.get_payload().len() * 8 + batch.size()) as u64
}
};
compact_data_size += data_size;
size_and_start_user_keys.push((data_size, data.start_user_key()));
}
for imm in &payload {
let data_size = {
let tombstones = imm.get_delete_range_tombstones();
builder.add_tombstone(tombstones);
// calculate encoded bytes of key var length
(imm.get_payload().len() * 8 + imm.size()) as u64
};
compact_data_size += data_size;
size_and_start_user_keys.push((data_size, imm.start_user_key()));
}
size_and_start_user_keys.sort();
let mut splits = Vec::with_capacity(size_and_start_user_keys.len());
Expand Down Expand Up @@ -182,18 +151,7 @@ async fn compact_shared_buffer(

let existing_table_ids: HashSet<u32> = payload
.iter()
.flat_map(|data_list| {
data_list
.iter()
.flat_map(|uncommitted_data| match uncommitted_data {
UncommittedData::Sst(local_sst_info) => {
local_sst_info.sst_info.table_ids.clone()
}
UncommittedData::Batch(shared_buffer_write_batch) => {
vec![shared_buffer_write_batch.table_id.table_id()]
}
})
})
.map(|imm| imm.table_id.table_id)
.dedup()
.collect();

Expand All @@ -204,7 +162,6 @@ async fn compact_shared_buffer(
.acquire(existing_table_ids)
.await;
let multi_filter_key_extractor = Arc::new(multi_filter_key_extractor);
let compactor_metrics = context.compactor_metrics.clone();

let parallelism = splits.len();
let mut compact_success = true;
Expand All @@ -219,13 +176,9 @@ async fn compact_shared_buffer(
context.clone(),
sub_compaction_sstable_size as usize,
);
let iter = build_ordered_merge_iter::<ForwardIter>(
&payload,
sstable_store.clone(),
&mut local_stats,
Arc::new(SstableIteratorReadOptions::default()),
)
.await?;
let iter = OrderedMergeIteratorInner::new(
payload.iter().map(|imm| imm.clone().into_forward_iter()),
);
let compaction_executor = context.compaction_executor.clone();
let multi_filter_key_extractor = multi_filter_key_extractor.clone();
let del_range_agg = agg.clone();
Expand All @@ -236,7 +189,6 @@ async fn compact_shared_buffer(
});
compaction_futures.push(handle);
}
local_stats.report_compactor(compactor_metrics.as_ref());

let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
let mut err = None;
Expand Down
14 changes: 3 additions & 11 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::hummock::event_handler::uploader::{
};
use crate::hummock::event_handler::HummockEvent;
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::shared_buffer::UncommittedData;
use crate::hummock::store::version::{
HummockReadVersion, StagingData, StagingSstableInfo, VersionUpdate,
};
Expand Down Expand Up @@ -124,16 +123,9 @@ async fn flush_imms(
error!("unable to set watermark sst id. epoch: {}, {:?}", epoch, e);
});
}
compact(
compactor_context,
payload
.into_iter()
.map(|imm| vec![UncommittedData::Batch(imm)])
.collect(),
task_info.compaction_group_index,
)
.verbose_stack_trace("shared_buffer_compact")
.await
compact(compactor_context, payload, task_info.compaction_group_index)
.verbose_stack_trace("shared_buffer_compact")
.await
}

impl HummockEventHandler {
Expand Down
Loading

0 comments on commit 7f237d7

Please sign in to comment.