Skip to content

Commit

Permalink
[bug](pipelineX) Fix pipelineX bug on multiple BE (apache#28792)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored and stephen committed Dec 28, 2023
1 parent 2b8b354 commit 23810d3
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
36 changes: 27 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1585,10 +1585,14 @@ private void computeFragmentExecParams() throws Exception {
dest.server = dummyServer;
dest.setBrpcServer(dummyServer);

int parallelTasksNum = destParams.ignoreDataDistribution
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) {
Set<TNetworkAddress> hostSet = new HashSet<>();
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (destParams.ignoreDataDistribution
&& hostSet.contains(instanceExecParams.host)) {
continue;
}
hostSet.add(instanceExecParams.host);
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
Expand Down Expand Up @@ -1623,10 +1627,14 @@ private void computeFragmentExecParams() throws Exception {
}
});
} else {
Set<TNetworkAddress> hostSet = new HashSet<>();
// add destination host to this fragment's destination
int parallelTasksNum = destParams.ignoreDataDistribution
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
for (int j = 0; j < parallelTasksNum; ++j) {
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
if (destParams.ignoreDataDistribution
&& hostSet.contains(destParams.instanceExecParams.get(j).host)) {
continue;
}
hostSet.add(destParams.instanceExecParams.get(j).host);
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
Expand Down Expand Up @@ -1698,10 +1706,14 @@ private void computeMultiCastFragmentParams() throws Exception {
dest.server = dummyServer;
dest.setBrpcServer(dummyServer);

int parallelTasksNum = destParams.ignoreDataDistribution
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) {
Set<TNetworkAddress> hostSet = new HashSet<>();
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (destParams.ignoreDataDistribution
&& hostSet.contains(instanceExecParams.host)) {
continue;
}
hostSet.add(instanceExecParams.host);
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
Expand Down Expand Up @@ -1736,7 +1748,13 @@ private void computeMultiCastFragmentParams() throws Exception {
}
});
} else {
Set<TNetworkAddress> hostSet = new HashSet<>();
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
if (destParams.ignoreDataDistribution
&& hostSet.contains(destParams.instanceExecParams.get(j).host)) {
continue;
}
hostSet.add(destParams.instanceExecParams.get(j).host);
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3175,7 +3175,8 @@ public boolean isMaterializedViewRewriteEnableContainForeignTable() {
}

public boolean isIgnoreStorageDataDistribution() {
return ignoreStorageDataDistribution && getEnablePipelineXEngine() && enableLocalShuffle;
return ignoreStorageDataDistribution && getEnablePipelineXEngine() && enableLocalShuffle
&& enableNereidsPlanner;
}

public void setIgnoreStorageDataDistribution(boolean ignoreStorageDataDistribution) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ suite("test_runtime_filter") {
sql "set enable_fallback_to_original_planner=false"
sql "set disable_join_reorder=true"

sql "set ignore_storage_data_distribution=false"
explain{
sql ("""select * from rf_tblA join rf_tblB on a < b""")
contains "runtime filters: RF000[max] -> a"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ suite("test_bitmap_filter_nereids") {
exception "Doris hll, bitmap, array, map, struct, jsonb, variant column must use with specific function, and don't support filter"
}

sql "set ignore_storage_data_distribution=false"
explain{
sql "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2}) order by k1;"
contains "RF000[bitmap]"
Expand Down

0 comments on commit 23810d3

Please sign in to comment.