Skip to content

Commit

Permalink
[fix](regression)Fix unstable compaction related cases
Browse files Browse the repository at this point in the history
  • Loading branch information
qidaye committed Jan 13, 2025
1 parent 565edd9 commit feecdc8
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 38 deletions.
17 changes: 17 additions & 0 deletions regression-test/plugins/plugin_compaction.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,20 @@ Suite.metaClass.trigger_and_wait_compaction = { String table_name, String compac

assert !running: "wait compaction timeout, be host: ${be_host}"
}

Suite.metaClass.trigger_compaction_with_retry = { String table_name, String compaction_type, int max_retries=10, int delay_ms=2000, int timeout_seconds=300 ->
def retry_count = 0
while (true) {
try {
trigger_and_wait_compaction(table_name, compaction_type, timeout_seconds)
return // Success
} catch (Exception e) {
retry_count++
if (retry_count >= max_retries) {
throw new Exception("Failed to complete ${compaction_type} compaction after ${max_retries} attempts", e)
}
logger.warn("Compaction attempt ${retry_count} failed: ${e.getMessage()}")
Thread.sleep(delay_ms)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,49 @@ suite("test_skip_index_compaction_fault_injection", "nonConcurrent") {
}
}

def trigger_full_compaction_on_tablets = { tablets ->
for (def tablet : tablets) {
String tablet_id = tablet.TabletId
String backend_id = tablet.BackendId
int times = 1

String compactionStatus;
do{
def (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
compactionStatus = parseJson(out.trim()).status.toLowerCase();
} while (compactionStatus!="success" && times<=10 && compactionStatus!="e-6010")


if (compactionStatus == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction && compactionStatus!="e-6010") {
assertEquals("success", compactionStatus)
}
}
}

def wait_full_compaction_done = { tablets ->
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
String backend_id = tablet.BackendId
def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
}

def get_rowset_count = { tablets ->
int rowsetCount = 0
for (def tablet in tablets) {
Expand Down Expand Up @@ -135,13 +178,15 @@ suite("test_skip_index_compaction_fault_injection", "nonConcurrent") {
assert (rowsetCount == 11 * replicaNum)

// first
trigger_and_wait_compaction(tableName, "full")
trigger_full_compaction_on_tablets.call(tablets)
wait_full_compaction_done.call(tablets)

rowsetCount = get_rowset_count.call(tablets);
assert (rowsetCount == 11 * replicaNum)

// second
trigger_and_wait_compaction(tableName, "full")
trigger_full_compaction_on_tablets.call(tablets)
wait_full_compaction_done.call(tablets)

rowsetCount = get_rowset_count.call(tablets);
if (isCloudMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ suite("test_index_compaction_unique_keys_arr", "array_contains_inverted_index, n
assert (rowsetCount == 7 * replicaNum)

// trigger full compactions for all tablets in ${table_name}
trigger_and_wait_compaction(table_name, "full")
trigger_compaction_with_retry(table_name, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets);
Expand Down Expand Up @@ -136,7 +136,7 @@ suite("test_index_compaction_unique_keys_arr", "array_contains_inverted_index, n
}

// trigger full compactions for all tablets in ${table_name}
trigger_and_wait_compaction(table_name, "full")
trigger_compaction_with_retry(table_name, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ suite("test_index_compaction_with_multi_index_segments_arr", "nonConcurrent") {
assert (rowsetCount == 3 * replicaNum)

// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets)
Expand Down Expand Up @@ -213,7 +213,7 @@ suite("test_index_compaction_with_multi_index_segments_arr", "nonConcurrent") {
assert (rowsetCount == 2 * replicaNum)
}
// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets)
Expand Down Expand Up @@ -292,7 +292,7 @@ suite("test_index_compaction_with_multi_index_segments_arr", "nonConcurrent") {
assert (rowsetCount == 3 * replicaNum)

// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets)
Expand Down Expand Up @@ -331,7 +331,7 @@ suite("test_index_compaction_with_multi_index_segments_arr", "nonConcurrent") {
assert (rowsetCount == 2 * replicaNum)
}
// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,6 @@ suite("test_index_change_with_cumulative_compaction", "nonConcurrent") {
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout")
}

def trigger_compaction_with_retry = {table_name, compaction_type = "cumulative", max_retries = 10, delay_ms = 2000 ->
def retry_count = 0
while (true) {
try {
trigger_and_wait_compaction(table_name, compaction_type)
return // Success
} catch (Exception e) {
retry_count++
if (retry_count >= max_retries) {
throw new Exception("Failed to complete ${compaction_type} compaction after ${max_retries} attempts", e)
}
logger.warn("Compaction attempt ${retry_count} failed: ${e.getMessage()}")
Thread.sleep(delay_ms)
}
}
}

try {
//BackendId,Cluster,IP,HeartbeatPort,BePort,HttpPort,BrpcPort,LastStartTime,LastHeartbeat,Alive,SystemDecommissioned,ClusterDecommissioned,TabletNum,DataUsedCapacity,AvailCapacity,TotalCapacity,UsedPct,MaxDiskUsedPct,Tag,ErrMsg,Version,Status
String[][] backends = sql """ show backends; """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ suite("test_index_compaction_dup_keys", "nonConcurrent") {
assert (rowsetCount == 7 * replicaNum)

// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets);
Expand Down Expand Up @@ -173,7 +173,7 @@ suite("test_index_compaction_dup_keys", "nonConcurrent") {
assert (rowsetCount == 7 * replicaNum)
}
// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ suite("test_index_compaction_unique_keys", "nonConcurrent") {
assert (rowsetCount == 7 * replicaNum)

// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets);
Expand Down Expand Up @@ -179,7 +179,7 @@ suite("test_index_compaction_unique_keys", "nonConcurrent") {
}

// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ suite("test_index_compaction_with_multi_index_segments", "nonConcurrent") {
assert (rowsetCount == 3 * replicaNum)

// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets)
Expand Down Expand Up @@ -208,7 +208,7 @@ suite("test_index_compaction_with_multi_index_segments", "nonConcurrent") {
assert (rowsetCount == 2 * replicaNum)
}
// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets)
Expand Down Expand Up @@ -288,7 +288,7 @@ suite("test_index_compaction_with_multi_index_segments", "nonConcurrent") {
assert (rowsetCount == 3 * replicaNum)

// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")
// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets)
if (isCloudMode) {
Expand Down Expand Up @@ -326,7 +326,7 @@ suite("test_index_compaction_with_multi_index_segments", "nonConcurrent") {
assert (rowsetCount == 2 * replicaNum)
}
// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ suite("test_index_index_V2_file_size", "nonConcurrent") {
qt_sql """ select * from ${tableName} where score < 100 order by id, name, hobbies, score """

// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

def dedup_tablets = deduplicate_tablets(tablets)

Expand Down Expand Up @@ -135,7 +135,7 @@ suite("test_index_index_V2_file_size", "nonConcurrent") {

set_be_config.call("inverted_index_compaction_enable", "false")
// trigger full compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
trigger_compaction_with_retry(tableName, "full")

// after full compaction, there is only 1 rowset.
count = get_rowset_count.call(tablets);
Expand Down
6 changes: 3 additions & 3 deletions regression-test/suites/variant_p0/nested.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ suite("regression_test_variant_nested", "p0"){
sql """insert into var_nested values (${i}, '{"nested${i}" : {"nested": [{"yyyxxxx" : "11111"},{"ax1111" : "1111"},{"axxxb": 100, "xxxy111": 111}, {"ddsss":1024, "aaa" : "11"}, {"xx" : 10}]}, "not nested" : 1024, "not nested2" : {"llll" : 123}}');"""
}

trigger_and_wait_compaction("var_nested", "full")
trigger_compaction_with_retry("var_nested", "full")

qt_sql """
select * from var_nested order by k limit 101
"""
sql """INSERT INTO var_nested SELECT *, '{"k1":1, "k2": "some", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]], "nested1" : {"nested2" : [{"a" : 10, "b" : 1.1, "c" : "1111"}]}}' FROM numbers("number" = "1000") where number > 200 limit 100;"""
sql """INSERT INTO var_nested SELECT *, '{"k2":1, "k3": "nice", "k4" : [1234], "k5" : 1.10000, "k6" : [[123]], "nested2" : {"nested1" : [{"a" : 10, "b" : 1.1, "c" : "1111"}]}}' FROM numbers("number" = "5013") where number >= 400 limit 1024;"""
trigger_and_wait_compaction("var_nested", "full")
trigger_compaction_with_retry("var_nested", "full")

qt_sql """select /*+SET_VAR(batch_size=1024,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_
parallel_pipeline_task_num=7,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16
Expand All @@ -101,7 +101,7 @@ parallel_pipeline_task_num=7,profile_level=1,enable_pipeline_engine=true,enable_
// type change case
sql """INSERT INTO var_nested SELECT *, '{"k1":"1", "k2": 1.1, "k3" : [1234.0], "k4" : 1.10000, "k5" : [["123"]], "nested1" : {"nested2" : [{"a" : "10", "b" : "1.1", "c" : 1111.111}]}}' FROM numbers("number" = "8000") where number > 7000 limit 100;"""
qt_sql """select * from var_nested where v['k2'] = 'what' and array_contains(cast(v['nested1']['nested2']['a'] as array<tinyint>), 10) order by k limit 1;"""
trigger_and_wait_compaction("var_nested", "full")
trigger_compaction_with_retry("var_nested", "full")
qt_sql """select * from var_nested where v['k2'] = 'nested' and array_contains(cast(v['nested1']['nested2']['a'] as array<tinyint>), 10) order by k limit 1;"""
sql """select * from var_nested where v['k2'] = 'some' or v['k3'] = 'nice' limit 100;"""

Expand Down

0 comments on commit feecdc8

Please sign in to comment.