From 23810d3ae61716e53e95113fa175e10c2930b229 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 21 Dec 2023 19:15:52 +0800 Subject: [PATCH] [bug](pipelineX) Fix pipelineX bug on multiple BE (#28792) --- .../java/org/apache/doris/qe/Coordinator.java | 36 ++++++++++++++----- .../org/apache/doris/qe/SessionVariable.java | 3 +- .../correctness_p0/test_runtime_filter.groovy | 1 + .../join/test_bitmap_filter_nereids.groovy | 1 + 4 files changed, 31 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 0a89d6f5afb0dfb..d448091676f9fae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1585,10 +1585,14 @@ private void computeFragmentExecParams() throws Exception { dest.server = dummyServer; dest.setBrpcServer(dummyServer); - int parallelTasksNum = destParams.ignoreDataDistribution - ? destParams.parallelTasksNum : destParams.instanceExecParams.size(); - for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) { + Set hostSet = new HashSet<>(); + for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); + if (destParams.ignoreDataDistribution + && hostSet.contains(instanceExecParams.host)) { + continue; + } + hostSet.add(instanceExecParams.host); if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { dest.fragment_instance_id = instanceExecParams.instanceId; dest.server = toRpcHost(instanceExecParams.host); @@ -1623,10 +1627,14 @@ private void computeFragmentExecParams() throws Exception { } }); } else { + Set hostSet = new HashSet<>(); // add destination host to this fragment's destination - int parallelTasksNum = destParams.ignoreDataDistribution - ? destParams.parallelTasksNum : destParams.instanceExecParams.size(); - for (int j = 0; j < parallelTasksNum; ++j) { + for (int j = 0; j < destParams.instanceExecParams.size(); ++j) { + if (destParams.ignoreDataDistribution + && hostSet.contains(destParams.instanceExecParams.get(j).host)) { + continue; + } + hostSet.add(destParams.instanceExecParams.get(j).host); TPlanFragmentDestination dest = new TPlanFragmentDestination(); dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); @@ -1698,10 +1706,14 @@ private void computeMultiCastFragmentParams() throws Exception { dest.server = dummyServer; dest.setBrpcServer(dummyServer); - int parallelTasksNum = destParams.ignoreDataDistribution - ? destParams.parallelTasksNum : destParams.instanceExecParams.size(); - for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) { + Set hostSet = new HashSet<>(); + for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) { FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); + if (destParams.ignoreDataDistribution + && hostSet.contains(instanceExecParams.host)) { + continue; + } + hostSet.add(instanceExecParams.host); if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { dest.fragment_instance_id = instanceExecParams.instanceId; dest.server = toRpcHost(instanceExecParams.host); @@ -1736,7 +1748,13 @@ private void computeMultiCastFragmentParams() throws Exception { } }); } else { + Set hostSet = new HashSet<>(); for (int j = 0; j < destParams.instanceExecParams.size(); ++j) { + if (destParams.ignoreDataDistribution + && hostSet.contains(destParams.instanceExecParams.get(j).host)) { + continue; + } + hostSet.add(destParams.instanceExecParams.get(j).host); TPlanFragmentDestination dest = new TPlanFragmentDestination(); dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 182cfc4424aff3d..2fbeda323904db6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -3175,7 +3175,8 @@ public boolean isMaterializedViewRewriteEnableContainForeignTable() { } public boolean isIgnoreStorageDataDistribution() { - return ignoreStorageDataDistribution && getEnablePipelineXEngine() && enableLocalShuffle; + return ignoreStorageDataDistribution && getEnablePipelineXEngine() && enableLocalShuffle + && enableNereidsPlanner; } public void setIgnoreStorageDataDistribution(boolean ignoreStorageDataDistribution) { diff --git a/regression-test/suites/correctness_p0/test_runtime_filter.groovy b/regression-test/suites/correctness_p0/test_runtime_filter.groovy index f691bd160606c61..6cdc53042f37f52 100644 --- a/regression-test/suites/correctness_p0/test_runtime_filter.groovy +++ b/regression-test/suites/correctness_p0/test_runtime_filter.groovy @@ -61,6 +61,7 @@ suite("test_runtime_filter") { sql "set enable_fallback_to_original_planner=false" sql "set disable_join_reorder=true" + sql "set ignore_storage_data_distribution=false" explain{ sql ("""select * from rf_tblA join rf_tblB on a < b""") contains "runtime filters: RF000[max] -> a" diff --git a/regression-test/suites/query_p0/join/test_bitmap_filter_nereids.groovy b/regression-test/suites/query_p0/join/test_bitmap_filter_nereids.groovy index e8e5d33d167c68f..e219ce03f27872c 100644 --- a/regression-test/suites/query_p0/join/test_bitmap_filter_nereids.groovy +++ b/regression-test/suites/query_p0/join/test_bitmap_filter_nereids.groovy @@ -81,6 +81,7 @@ suite("test_bitmap_filter_nereids") { exception "Doris hll, bitmap, array, map, struct, jsonb, variant column must use with specific function, and don't support filter" } + sql "set ignore_storage_data_distribution=false" explain{ sql "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2}) order by k1;" contains "RF000[bitmap]"