Skip to content

Commit

Permalink
[opt](merge-on-write) Skip the alignment process of some rowsets in p…
Browse files Browse the repository at this point in the history
…artial update apache#38487 (apache#38686)

picks apache#38487

regression cases are commented currently.
  • Loading branch information
bobhan1 committed Jan 20, 2025
1 parent 0019981 commit 74f1096
Show file tree
Hide file tree
Showing 18 changed files with 731 additions and 4 deletions.
3 changes: 2 additions & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,8 @@ void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone());
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
_cur_max_version);
}

void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ namespace doris {
struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone) {
int64_t timestamp_ms, const std::string& timezone, int64_t cur_max_version = -1) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;

max_version_in_flush_phase = cur_max_version;
this->timestamp_ms = timestamp_ms;
this->timezone = timezone;
missing_cids.clear();
Expand Down Expand Up @@ -77,6 +77,7 @@ struct PartialUpdateInfo {

public:
bool is_partial_update {false};
int64_t max_version_in_flush_phase {-1};
std::set<std::string> partial_update_input_columns;
std::vector<uint32_t> missing_cids;
std::vector<uint32_t> update_cids;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); }
bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); }
KeysType keys_type() { return _schema->keys_type(); }
bool produced_by_compaction() const { return rowset_meta()->produced_by_compaction(); }

// remove all files in this rowset
// TODO should we rename the method to remove_files() to be more specific?
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ class RowsetMeta {
return num_segments() > 1 && is_singleton_delta() && segments_overlap() != NONOVERLAPPING;
}

bool produced_by_compaction() const {
return has_version() &&
(start_version() < end_version() ||
(start_version() == end_version() && segments_overlap() == NONOVERLAPPING));
}

// get the compaction score of this rowset.
// if segments are overlapping, the score equals to the number of segments,
// otherwise, score is 1.
Expand Down
35 changes: 34 additions & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3579,7 +3579,9 @@ Status Tablet::update_delete_bitmap(TabletTxnInfo* txn_info, int64_t txn_id) {
// When the new segment flush fails or the rowset build fails, the deletion marker for the
// duplicate key of the original segment should not remain in `txn_info->delete_bitmap`,
// so we need to make a copy of `txn_info->delete_bitmap` and make changes on it.
if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update) {
bool is_partial_update =
txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update;
if (is_partial_update) {
delete_bitmap = std::make_shared<DeleteBitmap>(*(txn_info->delete_bitmap));
}

Expand Down Expand Up @@ -3613,6 +3615,37 @@ Status Tablet::update_delete_bitmap(TabletTxnInfo* txn_info, int64_t txn_id) {
}
auto t3 = watch.get_elapse_time_us();

// If a rowset is produced by compaction before the commit phase of the partial update load
// and is not included in txn_info->rowset_ids, we can skip the alignment process of that rowset
// because data remains the same before and after compaction. But we still need to calculate the
// the delete bitmap for that rowset.
std::vector<RowsetSharedPtr> rowsets_skip_alignment;
if (is_partial_update) {
int64_t max_version_in_flush_phase =
txn_info->partial_update_info->max_version_in_flush_phase;
DCHECK(max_version_in_flush_phase != -1);
std::vector<RowsetSharedPtr> remained_rowsets;
for (const auto& rowset : specified_rowsets) {
if (rowset->end_version() <= max_version_in_flush_phase &&
rowset->produced_by_compaction()) {
rowsets_skip_alignment.emplace_back(rowset);
} else {
remained_rowsets.emplace_back(rowset);
}
}
if (!rowsets_skip_alignment.empty()) {
specified_rowsets = std::move(remained_rowsets);
}
}

if (!rowsets_skip_alignment.empty()) {
auto token = StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
// set rowset_writer to nullptr to skip the alignment process
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, rowsets_skip_alignment, delete_bitmap,
cur_version - 1, token.get(), nullptr));
RETURN_IF_ERROR(token->wait());
}

auto token = StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap,
cur_version - 1, token.get(), rowset_writer.get()));
Expand Down
15 changes: 15 additions & 0 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,21 @@ Status EnginePublishVersionTask::finish() {
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
}
});
DBUG_EXECUTE_IF("EnginePublishVersionTask::execute.enable_spin_wait", {
auto token = dp->param<std::string>("token", "invalid_token");
while (DebugPoints::instance()->is_enable("EnginePublishVersionTask::execute.block")) {
auto block_dp = DebugPoints::instance()->get_debug_point(
"EnginePublishVersionTask::execute.block");
if (block_dp) {
auto pass_token = block_dp->param<std::string>("pass_token", "");
if (pass_token == token) {
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
});

std::unique_ptr<ThreadPoolToken> token =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3

-- !sql --
1 999 999 666 666
2 888 888 2 2
3 777 777 555 555

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3

-- !sql --
1 999 999 666 666
2 888 888 2 2
3 777 777 555 555

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3

-- !sql --
1 999 999 1 1
2 888 888 2 2
3 777 777 3 3

7 changes: 7 additions & 0 deletions regression-test/framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -296,5 +296,12 @@ under the License.
<artifactId>hive-jdbc</artifactId>
<version>2.3.7</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.1</version>
<!--Regression tests need to include this jar-->
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.junit.Assert
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility

suite("test_partial_update_compaction_with_higher_version", "nonConcurrent") {

def table1 = "test_partial_update_compaction_with_higher_version"
sql "DROP TABLE IF EXISTS ${table1} FORCE;"
sql """ CREATE TABLE IF NOT EXISTS ${table1} (
`k1` int NOT NULL,
`c1` int,
`c2` int,
`c3` int,
`c4` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"disable_auto_compaction" = "true",
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"); """

sql "insert into ${table1} values(1,1,1,1,1);"
sql "insert into ${table1} values(2,2,2,2,2);"
sql "insert into ${table1} values(3,3,3,3,3);"
sql "sync;"
order_qt_sql "select * from ${table1};"

def beNodes = sql_return_maparray("show backends;")
def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0)
def tabletBackendId = tabletStat.BackendId
def tabletId = tabletStat.TabletId
def tabletBackend;
for (def be : beNodes) {
if (be.BackendId == tabletBackendId) {
tabletBackend = be
break;
}
}
logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}");

def check_rs_metas = { expected_rs_meta_size, check_func ->
if (isCloudMode()) {
return
}

def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl
def (code, out, err) = curl("GET", metaUrl)
Assert.assertEquals(code, 0)
def jsonMeta = parseJson(out.trim())

Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size)
for (def meta : jsonMeta.rs_metas) {
int startVersion = meta.start_version
int endVersion = meta.end_version
int numSegments = meta.num_segments
int numRows = meta.num_rows
String overlapPb = meta.segments_overlap_pb
logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}")
check_func(startVersion, endVersion, numSegments, numRows, overlapPb)
}
}

check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb ->
if (startVersion == 0) {
// [0-1]
Assert.assertEquals(endVersion, 1)
Assert.assertEquals(numSegments, 0)
} else {
// [2-2], [3-3], [4-4]
Assert.assertEquals(startVersion, endVersion)
Assert.assertEquals(numSegments, 1)
}
})

def enable_publish_spin_wait = { tokenName ->
if (isCloudMode()) {
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait", [token: "${tokenName}"])
} else {
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait", [token: "${tokenName}"])
}
}

def disable_publish_spin_wait = {
if (isCloudMode()) {
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
} else {
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
}
}

def enable_block_in_publish = { passToken ->
if (isCloudMode()) {
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block", [pass_token: "${passToken}"])
} else {
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block", [pass_token: "${passToken}"])
}
}

def disable_block_in_publish = {
if (isCloudMode()) {
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
} else {
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
}
}

try {
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()

// block the partial update in publish phase
enable_publish_spin_wait("token1")
enable_block_in_publish("-1")

// the first partial update load
def t1 = Thread.start {
sql "set enable_unique_key_partial_update=true;"
sql "sync;"
sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);"
}

Thread.sleep(600)

// the second partial update load that conflicts with the first one
enable_publish_spin_wait("token2")
def t2 = Thread.start {
sql "set enable_unique_key_partial_update=true;"
sql "sync;"
sql "insert into ${table1}(k1,c3,c4) values(1,666,666),(3,555,555);"
}

Thread.sleep(400)

// let the first partial update load finish
enable_block_in_publish("token1")
t1.join()
Thread.sleep(200)
check_rs_metas(5, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb ->
if (startVersion == 0) {
// [0-1]
Assert.assertEquals(endVersion, 1)
Assert.assertEquals(numSegments, 0)
} else {
// [2-2], [3-3], [4-4], [5-5]
Assert.assertEquals(startVersion, endVersion)
Assert.assertEquals(numSegments, 1)
}
})

// trigger full compaction on tablet
logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
Assert.assertEquals(code, 0)
def compactJson = parseJson(out.trim())
Assert.assertEquals("success", compactJson.status.toLowerCase())

// wait for full compaction to complete
Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(
{
(code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
Assert.assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
Assert.assertEquals("success", compactionStatus.status.toLowerCase())
return !compactionStatus.run_status
}
)

check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb ->
// check the rowset produced by full compaction
// [0-5]
Assert.assertEquals(startVersion, 0)
Assert.assertEquals(endVersion, 5)
Assert.assertEquals(numRows, 3)
Assert.assertEquals(overlapPb, "NONOVERLAPPING")
})

// let the second partial update load publish
disable_block_in_publish()
t1.join()
Thread.sleep(300)

order_qt_sql "select * from ${table1};"

check_rs_metas(2, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb ->
if (startVersion == 6) {
// [6-6]
Assert.assertEquals(endVersion, 6)
// checks that partial update didn't skip the alignment process of rowsets produced by compaction and
// generate new segment in publish phase
Assert.assertEquals(numSegments, 2)
Assert.assertEquals(numRows, 4) // 4 = 2 + 2
}
})

} catch(Exception e) {
logger.info(e.getMessage())
throw e
} finally {
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()
}

// sql "DROP TABLE IF EXISTS ${table1};"
}
Loading

0 comments on commit 74f1096

Please sign in to comment.