Skip to content

Commit

Permalink
[improve] Concat doris.filter.query option when push down (apache#552)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Feb 11, 2025
1 parent 7f09d02 commit 3c7bd66
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -615,12 +615,12 @@ static QueryPlan getQueryPlan(String response, Logger logger) throws DorisExcept
}

if (queryPlan == null) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
logger.error(SHOULD_NOT_HAPPEN_MESSAGE + " res: " + response);
throw new ShouldNeverHappenException();
}

if (queryPlan.getStatus() != REST_RESPONSE_STATUS_OK) {
String errMsg = "Doris FE's response is not OK, status is " + queryPlan.getStatus();
String errMsg = "Doris FE's response is not OK, res: " + response;
logger.error(errMsg);
throw new DorisException(errMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ public ChangelogMode getChangelogMode() {

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
if (!resolvedFilterQuery.isEmpty()) {
String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
if (!StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
filterQuery =
String.format("(%s) AND (%s)", readOptions.getFilterQuery(), filterQuery);
}
readOptions.setFilterQuery(filterQuery);
}

Expand Down Expand Up @@ -195,8 +199,7 @@ public Result applyFilters(List<ResolvedExpression> filters) {
DorisExpressionVisitor expressionVisitor = new DorisExpressionVisitor();
for (ResolvedExpression filter : filters) {
String filterQuery = filter.accept(expressionVisitor);
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())
&& !StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
acceptedFilters.add(filter);
this.resolvedFilterQuery.add(filterQuery);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
Expand All @@ -68,6 +70,8 @@ public class DorisSourceITCase extends AbstractITCaseService {
static final String TABLE_CSV_TM = "tbl_csv_tm_source";
private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER =
"tbl_read_tbl_push_down_with_union_all_not_eq_filter";
private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY =
"tbl_read_tbl_push_down_with_filter_query";

@Rule
public final MiniClusterWithClientResource miniClusterResource =
Expand Down Expand Up @@ -490,6 +494,80 @@ public void testTableSourceFilterWithUnionAll() throws Exception {
checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected, actual.toArray());
}

@Test
public void testTableSourceFilterWithFilterQuery() throws Exception {
LOG.info("starting to execute testTableSourceFilterWithFilterQuery case.");
// init doris table
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
LOG,
String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
String.format(
"DROP TABLE IF EXISTS %s.%s",
DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY),
String.format(
"CREATE TABLE %s.%s ( \n"
+ "`name` varchar(256),\n"
+ "`dt` date,\n"
+ "`age` int\n"
+ ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\"\n"
+ ")\n",
DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY),
String.format(
"insert into %s.%s values ('doris',date_sub(now(),INTERVAL 7 DAY), 18)",
DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY),
String.format(
"insert into %s.%s values ('flink','2025-02-10', 10)",
DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY),
String.format(
"insert into %s.%s values ('apache',now(), 12)",
DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY));

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sourceDDL =
String.format(
"CREATE TABLE doris_source_filter_with_filter_query ("
+ " name STRING,"
+ " dt DATE,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'doris.filter.query' = ' (dt = DATE_FORMAT(TIMESTAMPADD(DAY , -7, NOW()), ''yyyy-MM-dd'')) '"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY,
getDorisUsername(),
getDorisPassword());
tEnv.executeSql(sourceDDL);
String querySql =
" SELECT * FROM doris_source_filter_with_filter_query where name = 'doris' and age > 2";
TableResult tableResult = tEnv.executeSql(querySql);

List<String> actual = new ArrayList<>();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
while (iterator.hasNext()) {
actual.add(iterator.next().toString());
}
}

String nowDate =
LocalDate.now().minusDays(7).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));

String[] expected = new String[] {"+I[doris, " + nowDate + ", 18]"};
checkResultInAnyOrder("testTableSourceFilterWithFilterQuery", expected, actual.toArray());
}

@Test
public void testTableSourceFilterWithUnionAllNotEqualFilter() throws Exception {
LOG.info("starting to execute testTableSourceFilterWithUnionAllNotEqualFilter case.");
Expand Down

0 comments on commit 3c7bd66

Please sign in to comment.