Skip to content

Commit

Permalink
[Enhancement] Optimize vacuum logic by adding vacuum version. (#55764)
Browse files Browse the repository at this point in the history
## Why I'm doing:
The FE (Frontend) will continuously issue vacuum requests to the CN (Compute Node) without any intelligent control, until the `lake_autovacuum_stale_partition_threshold` is reached, which is set to 12 hours by default. This approach presents two issues:

1. **Incomplete Vacuuming**:
   It is possible that after this time threshold has passed, the partition may not have completed the vacuum process. However, it will no longer be scheduled for vacuuming afterward, leaving some data unprocessed.

2. **Redundant Requests**:
   It is also possible that the partition has already been vacuumed, but the FE continues to issue vacuum requests indiscriminately. This results in unnecessary requests being sent, which not only wastes resources but also increases S3 API costs.

These issues highlight the need for a more intelligent scheduling mechanism to optimize vacuuming efficiency and reduce unnecessary costs.

## What I'm doing:
Add  `lastSuccVacuumVersion` as the last success vacuum version, which means all garbage files before this version have been vacuumed and FE does not need to resend vacuum request if the `lastSuccVacuumVersion` is catch up the latest version.

Signed-off-by: sevev <qiangzh95@gmail.com>
(cherry picked from commit 1e54252)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/catalog/PhysicalPartition.java
  • Loading branch information
sevev authored and mergify[bot] committed Feb 24, 2025
1 parent 611fc09 commit 7d563f0
Show file tree
Hide file tree
Showing 7 changed files with 736 additions and 13 deletions.
19 changes: 14 additions & 5 deletions be/src/storage/lake/vacuum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ static Status collect_garbage_files(const TabletMetadataPB& metadata, const std:
static Status collect_files_to_vacuum(TabletManager* tablet_mgr, std::string_view root_dir, int64_t tablet_id,
int64_t grace_timestamp, int64_t min_retain_version,
AsyncFileDeleter* datafile_deleter, AsyncFileDeleter* metafile_deleter,
int64_t* total_datafile_size) {
int64_t* total_datafile_size, int64_t* vacuumed_version) {
auto t0 = butil::gettimeofday_ms();
auto meta_dir = join_path(root_dir, kMetadataDirectoryName);
auto data_dir = join_path(root_dir, kSegmentDirectoryName);
Expand Down Expand Up @@ -348,6 +348,7 @@ static Status collect_files_to_vacuum(TabletManager* tablet_mgr, std::string_vie
auto t1 = butil::gettimeofday_ms();
g_metadata_travel_latency << (t1 - t0);

*vacuumed_version = final_retain_version;
if (!skip_check_grace_timestamp) {
// All tablet metadata files encountered were created after the grace timestamp, there were no files to delete
return Status::OK();
Expand All @@ -371,7 +372,8 @@ static void erase_tablet_metadata_from_metacache(TabletManager* tablet_mgr, cons

static Status vacuum_tablet_metadata(TabletManager* tablet_mgr, std::string_view root_dir,
const std::vector<int64_t>& tablet_ids, int64_t min_retain_version,
int64_t grace_timestamp, int64_t* vacuumed_files, int64_t* vacuumed_file_size) {
int64_t grace_timestamp, int64_t* vacuumed_files, int64_t* vacuumed_file_size,
int64_t* vacuumed_version) {
DCHECK(tablet_mgr != nullptr);
DCHECK(std::is_sorted(tablet_ids.begin(), tablet_ids.end()));
DCHECK(min_retain_version >= 0);
Expand All @@ -382,14 +384,19 @@ static Status vacuum_tablet_metadata(TabletManager* tablet_mgr, std::string_view
auto metafile_delete_cb = [=](const std::vector<std::string>& files) {
erase_tablet_metadata_from_metacache(tablet_mgr, files);
};

for (auto tablet_id : tablet_ids) {
int64_t tablet_vacuumed_version = 0;
AsyncFileDeleter datafile_deleter(config::lake_vacuum_min_batch_delete_size);
AsyncFileDeleter metafile_deleter(INT64_MAX, metafile_delete_cb);
RETURN_IF_ERROR(collect_files_to_vacuum(tablet_mgr, root_dir, tablet_id, grace_timestamp, min_retain_version,
&datafile_deleter, &metafile_deleter, vacuumed_file_size));
&datafile_deleter, &metafile_deleter, vacuumed_file_size,
&tablet_vacuumed_version));
RETURN_IF_ERROR(datafile_deleter.finish());
RETURN_IF_ERROR(metafile_deleter.finish());
if (*vacuumed_version == 0 || *vacuumed_version > tablet_vacuumed_version) {
// set partition vacuumed_version to min tablet vacuumed version
*vacuumed_version = tablet_vacuumed_version;
}
(*vacuumed_files) += datafile_deleter.delete_count();
(*vacuumed_files) += metafile_deleter.delete_count();
}
Expand Down Expand Up @@ -466,16 +473,18 @@ Status vacuum_impl(TabletManager* tablet_mgr, const VacuumRequest& request, Vacu

int64_t vacuumed_files = 0;
int64_t vacuumed_file_size = 0;
int64_t vacuumed_version = 0;

std::sort(tablet_ids.begin(), tablet_ids.end());

RETURN_IF_ERROR(vacuum_tablet_metadata(tablet_mgr, root_loc, tablet_ids, min_retain_version, grace_timestamp,
&vacuumed_files, &vacuumed_file_size));
&vacuumed_files, &vacuumed_file_size, &vacuumed_version));
if (request.delete_txn_log()) {
RETURN_IF_ERROR(vacuum_txn_log(root_loc, min_active_txn_id, &vacuumed_files, &vacuumed_file_size));
}
response->set_vacuumed_files(vacuumed_files);
response->set_vacuumed_file_size(vacuumed_file_size);
response->set_vacuumed_version(vacuumed_version);
return Status::OK();
}

Expand Down
105 changes: 105 additions & 0 deletions be/test/storage/lake/vacuum_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ TEST_P(LakeVacuumTest, test_vacuum_3) {
ASSERT_NE(0, response.status().status_code());
EXPECT_EQ(0, response.vacuumed_files());
EXPECT_EQ(0, response.vacuumed_file_size());
EXPECT_EQ(0, response.vacuumed_version());

ensure_all_files_exist();
}
Expand All @@ -529,6 +530,7 @@ TEST_P(LakeVacuumTest, test_vacuum_3) {
<< response.status().error_msgs(0);
EXPECT_EQ(0, response.vacuumed_files());
EXPECT_EQ(0, response.vacuumed_file_size());
EXPECT_EQ(0, response.vacuumed_version());

ensure_all_files_exist();
}
Expand All @@ -548,6 +550,7 @@ TEST_P(LakeVacuumTest, test_vacuum_3) {
<< response.status().error_msgs(0);
EXPECT_EQ(0, response.vacuumed_files());
EXPECT_EQ(0, response.vacuumed_file_size());
EXPECT_EQ(0, response.vacuumed_version());

ensure_all_files_exist();
}
Expand All @@ -568,6 +571,7 @@ TEST_P(LakeVacuumTest, test_vacuum_3) {
<< response.status().error_msgs(0);
EXPECT_EQ(0, response.vacuumed_files());
EXPECT_EQ(0, response.vacuumed_file_size());
EXPECT_EQ(0, response.vacuumed_version());

ensure_all_files_exist();
}
Expand All @@ -586,6 +590,7 @@ TEST_P(LakeVacuumTest, test_vacuum_3) {
EXPECT_EQ(0, response.status().status_code()) << response.status().error_msgs(0);
EXPECT_EQ(0, response.vacuumed_files());
EXPECT_EQ(0, response.vacuumed_file_size());
EXPECT_EQ(2, response.vacuumed_version());

ensure_all_files_exist();
}
Expand All @@ -609,6 +614,7 @@ TEST_P(LakeVacuumTest, test_vacuum_3) {
// 1 txn slog file
EXPECT_EQ(15, response.vacuumed_files());
EXPECT_GT(response.vacuumed_file_size(), 0);
EXPECT_EQ(5, response.vacuumed_version());

EXPECT_FALSE(file_exist(tablet_metadata_filename(100, 2)));
EXPECT_FALSE(file_exist(tablet_metadata_filename(100, 3)));
Expand Down Expand Up @@ -1338,6 +1344,105 @@ TEST_P(LakeVacuumTest, test_vacuum_combined_txn_log) {
}
}

TEST_P(LakeVacuumTest, test_vacuumed_version) {
ASSERT_OK(_tablet_mgr->put_tablet_metadata(json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 10001,
"version": 2,
"rowsets": [
{
"segments": [
"00000000000059e4_27dc159f-6bfc-4a3a-9d9c-c97c10bb2e1d.dat"
],
"data_size": 4096
}
],
"commit_time": 1687331159
}
)DEL")));

ASSERT_OK(_tablet_mgr->put_tablet_metadata(json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 10001,
"version": 3,
"rowsets": [
{
"segments": [
"00000000000059e5_5b3c5f4b-2675-4b7a-b5e0-4006cc285815.dat"
],
"data_size": 100
}
],
"prev_garbage_version": 2,
"commit_time": 1687331160
}
)DEL")));

ASSERT_OK(_tablet_mgr->put_tablet_metadata(json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 10001,
"version": 4,
"rowsets": [
{
"segments": [
"00000000000059e5_5b3c5f4b-2675-4b7a-b5e0-4006cc285815.dat"
],
"data_size": 4096
}
],
"prev_garbage_version": 3,
"commit_time": 1687331161
}
)DEL")));

ASSERT_OK(_tablet_mgr->put_tablet_metadata(json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 10002,
"version": 4,
"rowsets": [
{
"segments": [
"00000000000059e5_5b3c5f4b-2675-4b7a-b5e0-4006cc285815.dat"
],
"data_size": 4096
}
],
"prev_garbage_version": 3,
"commit_time": 1687331162
}
)DEL")));

{
VacuumRequest request;
VacuumResponse response;
request.set_delete_txn_log(true);
request.add_tablet_ids(10001);
request.add_tablet_ids(10002);
request.set_min_retain_version(4);
request.set_grace_timestamp(1687331161);
request.set_min_active_txn_id(12344);
vacuum(_tablet_mgr.get(), request, &response);
ASSERT_TRUE(response.has_status());
EXPECT_EQ(0, response.status().status_code()) << response.status().error_msgs(0);
EXPECT_EQ(3, response.vacuumed_version());
}

{
VacuumRequest request;
VacuumResponse response;
request.set_delete_txn_log(true);
request.add_tablet_ids(10001);
request.add_tablet_ids(10002);
request.set_min_retain_version(4);
request.set_grace_timestamp(1687331162);
request.set_min_active_txn_id(12344);
vacuum(_tablet_mgr.get(), request, &response);
ASSERT_TRUE(response.has_status());
EXPECT_EQ(0, response.status().status_code()) << response.status().error_msgs(0);
EXPECT_EQ(4, response.vacuumed_version());
}
}

INSTANTIATE_TEST_SUITE_P(LakeVacuumTest, LakeVacuumTest,
::testing::Values(VacuumTestArg{1}, VacuumTestArg{3}, VacuumTestArg{100}));

Expand Down
Loading

0 comments on commit 7d563f0

Please sign in to comment.