From 7a220ae4865742c6518ae4efb75c22dff855089e Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 30 Jul 2024 12:36:02 +0800 Subject: [PATCH] add inject points and skip rowsets case --- be/src/olap/txn_manager.cpp | 6 + .../test_partial_update_skip_compaction.out | 11 ++ ...test_partial_update_skip_compaction.groovy | 158 +++++++----------- 3 files changed, 78 insertions(+), 97 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 3d33b8c16f426f1..cf68ddfe1ee6bd2 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -402,6 +402,12 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, TabletUid tablet_uid, const Version& version, TabletPublishStatistics* stats) { + DBUG_EXECUTE_IF("TxnManager::publish_txn.enable_spin_wait", { + auto block_tablet_id = dp->param("tablet_id"); + if (tablet_id == block_tablet_id) { + DBUG_EXECUTE_IF("TxnManager::publish_txn.block_publish_txn", DBUG_BLOCK); + } + }); auto tablet = _engine.tablet_manager()->get_tablet(tablet_id); if (tablet == nullptr) { return Status::OK(); diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out new file mode 100644 index 000000000000000..6c7fe443a894faf --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 1 1 +2 888 888 2 2 +3 777 777 3 3 + diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy index 9e04a3664f78072..9f2a6f441d41816 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy @@ -69,7 +69,7 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { } logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); - def get_rs_metas = { + def check_rs_metas = { expected_rs_meta_size, check_func -> def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl def command = "curl ${metaUrl}" log.info("get_rs_metas execute command: ${command}") @@ -78,15 +78,20 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { out = process.text def jsonMeta = parseJson(out) assertEquals(code, 0) - return jsonMeta.rs_metas + + assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + for (def meta : jsonMeta.rs_metas) { + int startVersion = meta.start_version.toInteger() + int endVersion = meta.end_version.toInteger() + int numSegments = meta.num_segments.toInteger() + int numRows = meta.num_rows.toInteger() + String overlapPb = meta.segments_overlap_pb.toString() + logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + } } - def metas = get_rs_metas() - assertEquals(metas.size(), 4) - for (def meta : metas) { - int startVersion = meta.start_version.toInteger() - int endVersion = meta.end_version.toInteger() - int numSegments = meta.num_segments.toInteger() + check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> if (startVersion == 0) { assertEquals(endVersion, 1) assertEquals(numSegments, 0) @@ -94,99 +99,58 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { assertEquals(startVersion, endVersion) assertEquals(numSegments, 1) } - logger.info("[${startVersion}-${endVersion}] ${meta.num_segments} ${meta.rowset_id_v2}") - } + }) + + try { + GetDebugPoint().clearDebugPointsForAllBEs() - // trigger full compaction on tablet - logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") - def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) - logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def compactJson = parseJson(out.trim()) - assertEquals("success", compactJson.status.toLowerCase()) + // block the partial update before publish phase + GetDebugPoint().enableDebugPointForAllBEs("TxnManager::publish_txn.enable_spin_wait", [tablet_id: "${tabletId}"]) + GetDebugPoint().enableDebugPointForAllBEs("TxnManager::publish_txn.block_publish_txn") - Thread.sleep(2000) + def t1 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + } - metas = get_rs_metas() - assertEquals(metas.size(), 1) - for (def meta : metas) { - logger.info("[${meta.start_version}-${meta.end_version}] ${meta.num_segments} ${meta.rowset_id_v2}") + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Thread.sleep(1500) + + check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + assertEquals(startVersion, 0) + assertEquals(endVersion, 4) + assertEquals(overlapPb, "NONOVERLAPPING") + }) + + GetDebugPoint().disableDebugPointForAllBEs("TxnManager::publish_txn.block_publish_txn") + t1.join() + + order_qt_sql "select * from ${table1};" + + check_rs_metas(2, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 5) { + assertEquals(endVersion, 5) + // checks that partial update skips the alignment process of rowsets produced by compaction and + // doesn't generate new segment in publish phase + assertEquals(numSegments, 1) + } + }) + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() } - // def wait_for_publish = {txnId, waitSecond -> - // String st = "PREPARE" - // while (!st.equalsIgnoreCase("VISIBLE") && !st.equalsIgnoreCase("ABORTED") && waitSecond > 0) { - // Thread.sleep(1000) - // waitSecond -= 1 - // def result = sql_return_maparray "show transaction from ${dbName} where id = ${txnId}" - // assertNotNull(result) - // st = result[0].TransactionStatus - // } - // log.info("Stream load with txn ${txnId} is ${st}") - // assertEquals(st, "VISIBLE") - // } - - // String txnId1 - // streamLoad { - // table "${table1}" - // set 'column_separator', ',' - // set 'format', 'csv' - // set 'partial_columns', 'true' - // set 'columns', 'k1,c1,c2' - // set 'strict_mode', "false" - // set 'two_phase_commit', 'true' - // file 'data1.csv' - // time 10000 // limit inflight 10s - // check { result, exception, startTime, endTime -> - // if (exception != null) { - // throw exception - // } - // log.info("Stream load result: ${result}".toString()) - // def json = parseJson(result) - // txnId1 = json.TxnId - // assertEquals("success", json.Status.toLowerCase()) - // } - // } - // sql "sync;" - // order_qt_sql "select * from ${table1};" - - // // another partial update that conflicts with the previous load and publishes successfully - // sql "set enable_unique_key_partial_update=true;" - // sql "sync;" - // sql "insert into ${table1}(k1,c3,c4) values(1, 99, 99),(2,88,88),(3,77,77);" - // sql "set enable_unique_key_partial_update=false;" - // sql "sync;" - // order_qt_sql "select * from ${table1};" - - // // restart backend - // cluster.restartBackends() - // Thread.sleep(5000) - - // // wait for be restart - // boolean ok = false - // int cnt = 0 - // for (; cnt < 10; cnt++) { - // def be = sql_return_maparray("show backends").get(0) - // if (be.Alive.toBoolean()) { - // ok = true - // break; - // } - // logger.info("wait for BE restart...") - // Thread.sleep(1000) - // } - // if (!ok) { - // logger.info("BE failed to restart") - // assertTrue(false) - // } - - // Thread.sleep(5000) - - // do_streamload_2pc_commit(txnId1) - // wait_for_publish(txnId1, 10) - - - // sql "sync;" - order_qt_sql "select * from ${table1};" sql "DROP TABLE IF EXISTS ${table1};" - }