Skip to content

Commit

Permalink
[Bug](cache) Fix query cache report error message (#47883)
Browse files Browse the repository at this point in the history
the plan:
```
 QUERY_CACHE:                                                                                                                                                                                                                                                                                                                           |
|     CACHE_NODE_ID: 2                                                                                                                                                                                                                                                                                                                     |
|     DIGEST: 557413bec3209f50fd640eb3b0534c12ccd9df35f9d4c86620416936985e2679                                                                                                                                                                                                                                                             |
|                                                                                                                                                                                                                                                                                                                                          |
|   STREAM DATA SINK                                                                                                                                                                                                                                                                                                                       |
|     EXCHANGE ID: 03                                                                                                                                                                                                                                                                                                                      |
|     UNPARTITIONED                                                                                                                                                                                                                                                                                                                        |
|                                                                                                                                                                                                                                                                                                                                          |
|   2:VAGGREGATE (merge finalize)(273)                                                                                                                                                                                                                                                                                                     |
|   |  output: min(partial_min(pk)[#6])[#9], max(partial_max(pk)[#7])[#10]                                                                                                                                                                                                                                                                 |
|   |  group by: pk[#5]                                                                                                                                                                                                                                                                                                                    |
|   |  sortByGroupKey:false                                                                                                                                                                                                                                                                                                                |
|   |  cardinality=0                                                                                                                                                                                                                                                                                                                       |
|   |  final projections: field1[#9], field2[#10], pk[#8]                                                                                                                                                                                                                                                                                  |
|   |  final project output tuple id: 4                                                                                                                                                                                                                                                                                                    |
|   |  distribute expr lists: pk[#5]                                                                                                                                                                                                                                                                                                       |
|   |                                                                                                                                                                                                                                                                                                                                      |
|   1:VAGGREGATE (update serialize)(268)                                                                                                                                                                                                                                                                                                   |
|   |  output: partial_min(pk[#4])[#6], partial_max(pk[#4])[#7]                                                                                                                                                                                                                                                                            |
|   |  group by: pk[#4]                                                                                                                                                                                                                                                                                                                    |
|   |  sortByGroupKey:false                                                                                                                                                                                                                                                                                                                |
|   |  cardinality=0                                                                                                                                                                                                                                                                                                                       |
|   |  distribute expr lists: pk[#4]                                                                                                                                                                                                                                                                                                       |
|   |                                                                                                                                                                                                                                                                                           
```

FE choose the id agg 2, but BE still use id agg 1. BE should support the case
  • Loading branch information
HappenLee authored Feb 14, 2025
1 parent 731a07e commit c58fc18
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 5 deletions.
11 changes: 6 additions & 5 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
": group by and output is empty");
}

bool need_create_cache_op =
enable_query_cache && tnode.node_id == request.fragment.query_cache_param.node_id;
auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
auto cache_node_id = request.local_params[0].per_node_scan_ranges.begin()->first;
auto cache_source_id = next_operator_id();
Expand Down Expand Up @@ -1289,7 +1290,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
request.query_options.__isset.enable_distinct_streaming_aggregation &&
request.query_options.enable_distinct_streaming_aggregation &&
!tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) {
if (enable_query_cache) {
if (need_create_cache_op) {
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));

Expand All @@ -1313,7 +1314,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation &&
!tnode.agg_node.grouping_exprs.empty()) {
if (enable_query_cache) {
if (need_create_cache_op) {
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));

Expand All @@ -1330,7 +1331,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
} else {
// create new pipeline to add query cache operator
PipelinePtr new_pipe;
if (enable_query_cache) {
if (need_create_cache_op) {
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
}

Expand All @@ -1339,7 +1340,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
} else {
op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), descs));
}
if (enable_query_cache) {
if (need_create_cache_op) {
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
RETURN_IF_ERROR(new_pipe->add_operator(
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
Expand Down
22 changes: 22 additions & 0 deletions regression-test/data/query_p0/cache/query_cache.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !query_cache1 --

-- !query_cache2 --
0 0 0
1 1 1
2 2 2

-- !query_cache3 --

-- !query_cache4 --
0 0 0
1 1 1
2 2 2

-- !query_cache5 --

-- !query_cache6 --
0 0 0
1 1 1
2 2 2

163 changes: 163 additions & 0 deletions regression-test/suites/query_p0/cache/query_cache.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import java.util.stream.Collectors

suite("query_cache") {
def tableName = "table_3_undef_partitions2_keys3_properties4_distributed_by53"

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
`pk` int NULL,
`col_varchar_10__undef_signed` varchar(10) NULL,
`col_int_undef_signed` int NULL,
`col_varchar_1024__undef_signed` varchar(1024) NULL
) ENGINE=OLAP
DUPLICATE KEY(`pk`, `col_varchar_10__undef_signed`)
DISTRIBUTED BY HASH(`pk`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V3",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728"
)
"""

sql """
INSERT INTO ${tableName}(pk, col_varchar_10__undef_signed, col_int_undef_signed, col_varchar_1024__undef_signed)
VALUES
(0, "mean", null, "p"),
(1, "is", 6, "what"),
(2, "one", null, "e")
"""

// First complex query - Run without cache
order_qt_query_cache1 """
SELECT
MIN(`pk`) AS field1,
MAX(`pk`) AS field2,
`pk` AS field3
FROM ${tableName} AS alias1
WHERE (
alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD', '%')
AND (
(alias1.`pk` = 154 OR (
alias1.col_varchar_1024__undef_signed LIKE CONCAT('lWpWJPFqXM', '%')
AND alias1.`pk` = 111
))
AND (
alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr'
AND alias1.col_varchar_1024__undef_signed > 'with'
)
AND alias1.`pk` IS NULL
)
AND alias1.col_int_undef_signed < 7
)
GROUP BY field3
"""

// Simple query - Run without cache
order_qt_query_cache2 """
SELECT
MIN(`pk`) AS field1,
MAX(`pk`) AS field2,
`pk` AS field3
FROM ${tableName}
GROUP BY field3
"""

// Enable query cache
sql "set enable_query_cache=true"

// Run the same complex query with cache enabled
order_qt_query_cache3 """
SELECT
MIN(`pk`) AS field1,
MAX(`pk`) AS field2,
`pk` AS field3
FROM ${tableName} AS alias1
WHERE (
alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD', '%')
AND (
(alias1.`pk` = 154 OR (
alias1.col_varchar_1024__undef_signed LIKE CONCAT('lWpWJPFqXM', '%')
AND alias1.`pk` = 111
))
AND (
alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr'
AND alias1.col_varchar_1024__undef_signed > 'with'
)
AND alias1.`pk` IS NULL
)
AND alias1.col_int_undef_signed < 7
)
GROUP BY field3
"""

// Run the same simple query with cache enabled
order_qt_query_cache4 """
SELECT
MIN(`pk`) AS field1,
MAX(`pk`) AS field2,
`pk` AS field3
FROM ${tableName}
GROUP BY field3
"""

// Run both queries again to test cache hit
order_qt_query_cache5 """
SELECT
MIN(`pk`) AS field1,
MAX(`pk`) AS field2,
`pk` AS field3
FROM ${tableName} AS alias1
WHERE (
alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD', '%')
AND (
(alias1.`pk` = 154 OR (
alias1.col_varchar_1024__undef_signed LIKE CONCAT('lWpWJPFqXM', '%')
AND alias1.`pk` = 111
))
AND (
alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr'
AND alias1.col_varchar_1024__undef_signed > 'with'
)
AND alias1.`pk` IS NULL
)
AND alias1.col_int_undef_signed < 7
)
GROUP BY field3
"""

order_qt_query_cache6 """
SELECT
MIN(`pk`) AS field1,
MAX(`pk`) AS field2,
`pk` AS field3
FROM ${tableName}
GROUP BY field3
"""

}

0 comments on commit c58fc18

Please sign in to comment.