From 3c7bd664ee52361a91df85fa24336c335a4dcceb Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 11 Feb 2025 14:28:54 +0800 Subject: [PATCH] [improve] Concat doris.filter.query option when push down (#552) --- .../apache/doris/flink/rest/RestService.java | 4 +- .../flink/table/DorisDynamicTableSource.java | 9 ++- .../doris/flink/source/DorisSourceITCase.java | 78 +++++++++++++++++++ 3 files changed, 86 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 75c1e27d3..523a39e22 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -615,12 +615,12 @@ static QueryPlan getQueryPlan(String response, Logger logger) throws DorisExcept } if (queryPlan == null) { - logger.error(SHOULD_NOT_HAPPEN_MESSAGE); + logger.error(SHOULD_NOT_HAPPEN_MESSAGE + " res: " + response); throw new ShouldNeverHappenException(); } if (queryPlan.getStatus() != REST_RESPONSE_STATUS_OK) { - String errMsg = "Doris FE's response is not OK, status is " + queryPlan.getStatus(); + String errMsg = "Doris FE's response is not OK, res: " + response; logger.error(errMsg); throw new DorisException(errMsg); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 9763a888a..a68cf1892 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -90,8 +90,12 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { - if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) { + if (!resolvedFilterQuery.isEmpty()) { String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); + if (!StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) { + filterQuery = + String.format("(%s) AND (%s)", readOptions.getFilterQuery(), filterQuery); + } readOptions.setFilterQuery(filterQuery); } @@ -195,8 +199,7 @@ public Result applyFilters(List filters) { DorisExpressionVisitor expressionVisitor = new DorisExpressionVisitor(); for (ResolvedExpression filter : filters) { String filterQuery = filter.accept(expressionVisitor); - if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery()) - && !StringUtils.isNullOrWhitespaceOnly(filterQuery)) { + if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) { acceptedFilters.add(filter); this.resolvedFilterQuery.add(filterQuery); } else { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 18de700ed..3eb965978 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -44,6 +44,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -68,6 +70,8 @@ public class DorisSourceITCase extends AbstractITCaseService { static final String TABLE_CSV_TM = "tbl_csv_tm_source"; private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER = "tbl_read_tbl_push_down_with_union_all_not_eq_filter"; + private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY = + "tbl_read_tbl_push_down_with_filter_query"; @Rule public final MiniClusterWithClientResource miniClusterResource = @@ -490,6 +494,80 @@ public void testTableSourceFilterWithUnionAll() throws Exception { checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected, actual.toArray()); } + @Test + public void testTableSourceFilterWithFilterQuery() throws Exception { + LOG.info("starting to execute testTableSourceFilterWithFilterQuery case."); + // init doris table + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format( + "DROP TABLE IF EXISTS %s.%s", + DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY), + String.format( + "CREATE TABLE %s.%s ( \n" + + "`name` varchar(256),\n" + + "`dt` date,\n" + + "`age` int\n" + + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n", + DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY), + String.format( + "insert into %s.%s values ('doris',date_sub(now(),INTERVAL 7 DAY), 18)", + DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY), + String.format( + "insert into %s.%s values ('flink','2025-02-10', 10)", + DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY), + String.format( + "insert into %s.%s values ('apache',now(), 12)", + DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY)); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sourceDDL = + String.format( + "CREATE TABLE doris_source_filter_with_filter_query (" + + " name STRING," + + " dt DATE," + + " age INT" + + ") WITH (" + + " 'connector' = '" + + DorisConfigOptions.IDENTIFIER + + "'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'doris.filter.query' = ' (dt = DATE_FORMAT(TIMESTAMPADD(DAY , -7, NOW()), ''yyyy-MM-dd'')) '" + + ")", + getFenodes(), + DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY, + getDorisUsername(), + getDorisPassword()); + tEnv.executeSql(sourceDDL); + String querySql = + " SELECT * FROM doris_source_filter_with_filter_query where name = 'doris' and age > 2"; + TableResult tableResult = tEnv.executeSql(querySql); + + List actual = new ArrayList<>(); + try (CloseableIterator iterator = tableResult.collect()) { + while (iterator.hasNext()) { + actual.add(iterator.next().toString()); + } + } + + String nowDate = + LocalDate.now().minusDays(7).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); + + String[] expected = new String[] {"+I[doris, " + nowDate + ", 18]"}; + checkResultInAnyOrder("testTableSourceFilterWithFilterQuery", expected, actual.toArray()); + } + @Test public void testTableSourceFilterWithUnionAllNotEqualFilter() throws Exception { LOG.info("starting to execute testTableSourceFilterWithUnionAllNotEqualFilter case.");