Skip to content

Commit

Permalink
fix(core): fix distinct (#552)
Browse files Browse the repository at this point in the history
Co-authored-by: Xu Yihao <48053143+Yihao-Xu@users.noreply.github.com>
  • Loading branch information
jzl18thu and Yihao-Xu authored Jan 6, 2025
1 parent 963759a commit 80e7dce
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,8 @@
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Bitmap;
import cn.edu.tsinghua.iginx.utils.Pair;
import cn.edu.tsinghua.iginx.utils.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -217,40 +215,18 @@ private RowStream executeProject(Project project, Table table) throws PhysicalEx
}

private RowStream executeProjectFromOperator(Project project, Table table) {
List<String> patterns = project.getPatterns();
Header header = table.getHeader();
List<Field> targetFields = new ArrayList<>();

for (Field field : header.getFields()) {
if (project.isRemainKey() && field.getName().endsWith(KEY)) {
targetFields.add(field);
continue;
}
for (String pattern : patterns) {
if (!StringUtils.isPattern(pattern)) {
if (pattern.equals(field.getName())) {
targetFields.add(field);
}
} else {
if (Pattern.matches(StringUtils.reformatPath(pattern), field.getName())) {
targetFields.add(field);
}
}
}
}
Header targetHeader = new Header(header.getKey(), targetFields);
Header targetHeader =
table.getHeader().projectedHeader(project.getPatterns(), project.isRemainKey());
List<Field> targetFields = targetHeader.getFields();
List<Row> targetRows = new ArrayList<>();
table.reset();
while (table.hasNext()) {
Row row = table.next();
Object[] objects = new Object[targetFields.size()];
for (int i = 0; i < targetFields.size(); i++) {
objects[i] = row.getValue(targetFields.get(i));
}
if (header.hasKey()) {
targetRows.add(new Row(targetHeader, row.getKey(), objects));
} else {
targetRows.add(new Row(targetHeader, objects));
}
targetRows.add(new Row(targetHeader, row.getKey(), objects));
}
return new Table(targetHeader, targetRows);
}
Expand Down Expand Up @@ -307,6 +283,7 @@ private RowStream executeDownsample(Downsample downsample, Table table) throws P
FunctionParams params = functionCall.getParams();

Table functable = RowUtils.preRowTransform(table, rowTransformMap, functionCall);
Header tmpHeader = functable.getHeader();
TreeMap<Long, List<Row>> groups = RowUtils.computeDownsampleGroup(downsample, functable);

// <<window_start, window_end> row>
Expand All @@ -324,12 +301,20 @@ private RowStream executeDownsample(Downsample downsample, Table table) throws P
// min和max无需去重
if (!function.getIdentifier().equals(Max.MAX)
&& !function.getIdentifier().equals(Min.MIN)) {
group = removeDuplicateRows(group);
try (Table t = RowUtils.project(tmpHeader, group, params.getPaths())) {
group = removeDuplicateRows(t.getRows());
tmpHeader = t.getHeader();
} catch (PhysicalException e) {
LOGGER.error(
"encounter error when execute distinct in set mapping function {}",
function.getIdentifier(),
e);
}
}
}

try {
Row row = function.transform(new Table(functable.getHeader(), group), params);
Row row = function.transform(new Table(tmpHeader, group), params);
if (row != null) {
transformedRawRows.add(new Pair<>(new Pair<>(windowStartKey, windowEndKey), row));
}
Expand Down Expand Up @@ -396,7 +381,7 @@ private RowStream executeSetTransform(SetTransform setTransform, Table table)
functable = distinctMap.get(params.getPaths());
} else {
Distinct distinct = new Distinct(EmptySource.EMPTY_SOURCE, params.getPaths());
functable = transformToTable(executeDistinct(distinct, table));
functable = transformToTable(executeDistinct(distinct, functable));
distinctMap.put(params.getPaths(), functable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,15 @@ public static void seqApplyFunc(
// min和max无需去重
if (!function.getIdentifier().equals(Max.MAX)
&& !function.getIdentifier().equals(Min.MIN)) {
transformedGroup = removeDuplicateRows(transformedGroup);
try (Table t = RowUtils.project(tmpHeader, transformedGroup, params.getPaths())) {
transformedGroup = removeDuplicateRows(t.getRows());
tmpHeader = t.getHeader();
} catch (PhysicalException e) {
LOGGER.error(
"encounter error when execute distinct in set mapping function {}",
function.getIdentifier(),
e);
}
}
}

Expand Down Expand Up @@ -755,7 +763,7 @@ private static void parallelApplyFunc(
transformedGroup.addAll(tmp.getRows());
} catch (PhysicalException e) {
LOGGER.error(
"encounter error when execute set mapping function parameters");
"encounter error when execute set mapping function parameters", e);
}
} else {
transformedGroup = group;
Expand All @@ -769,11 +777,15 @@ private static void parallelApplyFunc(
// min和max无需去重
if (!function.getIdentifier().equals(Max.MAX)
&& !function.getIdentifier().equals(Min.MIN)) {
try {
transformedGroup = removeDuplicateRows(transformedGroup);
try (Table t =
RowUtils.project(tmpHeader, transformedGroup, params.getPaths())) {
transformedGroup = removeDuplicateRows(t.getRows());
tmpHeader = t.getHeader();
} catch (PhysicalException e) {
LOGGER.error(
"encounter error when execute distinct in set mapping function");
"encounter error when execute distinct in set mapping function {}",
function.getIdentifier(),
e);
}
}
}
Expand All @@ -788,7 +800,10 @@ private static void parallelApplyFunc(
}
}
} catch (Exception e) {
LOGGER.error("encounter error when execute set mapping function ");
LOGGER.error(
"encounter error when execute set mapping function {}.",
function.getIdentifier(),
e);
}
});
latch.countDown();
Expand Down Expand Up @@ -961,6 +976,20 @@ public static void sortRows(List<Row> rows, List<Boolean> ascendingList, List<St
});
}

public static Table project(Header header, List<Row> rows, List<String> patterns) {
Header targetHeader = header.projectedHeader(patterns, false);
List<Field> targetFields = targetHeader.getFields();
List<Row> targetRows = new ArrayList<>();
for (Row row : rows) {
Object[] objects = new Object[targetFields.size()];
for (int i = 0; i < targetFields.size(); i++) {
objects[i] = row.getValue(targetFields.get(i));
}
targetRows.add(new Row(targetHeader, row.getKey(), objects));
}
return new Table(targetHeader, targetRows);
}

public static List<Row> removeDuplicateRows(List<Row> rows) throws PhysicalException {
List<Row> targetRows = new ArrayList<>();
HashMap<Integer, List<Row>> rowsHashMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package cn.edu.tsinghua.iginx.engine.shared.data.read;

import static cn.edu.tsinghua.iginx.engine.shared.Constants.KEY;
import static cn.edu.tsinghua.iginx.engine.shared.Constants.RESERVED_COLS;

import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
Expand Down Expand Up @@ -132,6 +133,35 @@ public boolean equals(Object o) {
&& Objects.equals(indexMap, header.indexMap);
}

/**
* 根据Project算子的patterns和isRemainKey计算投影后的header
*
* @param patterns Project算子参数
* @param isRemainKey Project算子参数
* @return 投影后的header
*/
public Header projectedHeader(List<String> patterns, boolean isRemainKey) {
List<Field> targetFields = new ArrayList<>();
for (Field field : fields) {
if (isRemainKey && field.getName().endsWith("." + KEY)) {
targetFields.add(field);
continue;
}
for (String pattern : patterns) {
if (!StringUtils.isPattern(pattern)) {
if (pattern.equals(field.getName())) {
targetFields.add(field);
}
} else {
if (Pattern.matches(StringUtils.reformatPath(pattern), field.getName())) {
targetFields.add(field);
}
}
}
}
return new Header(key, targetFields);
}

/**
* 根据Rename算子的aliasList和ignorePatterns计算重命名后的header和要升级成key列的普通列的下标
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,27 +907,31 @@ public void testDistinct() {
"INSERT INTO test(key, a, b) values (1, 1, 1), (2, 2, 1), (3, 2, 2), (4, 3, 1), (5, 3, 2), (6, 3, 1), (7, 4, 1), (8, 4, 2), (9, 4, 3), (10, 4, 1);";
executor.execute(insert);

insert =
"INSERT INTO test(key, c) values (1, \"aa\"), (2, \"aa\"), (3, \"bb\"), (4, \"bb\"), (5, \"bb\"), (6, \"bb\"), (7, \"bb\"), (8, \"bb\"), (9, \"bb\"), (10, \"bb\");";
executor.execute(insert);

insert =
"INSERT INTO t(key, a, b) values (1, 1, 1), (2, 1, 1), (3, 1, 2), (4, 2, 1), (5, 2, 2), (6, 3, 1);";
executor.execute(insert);

String statement = "SELECT * FROM test;";
String expected =
"ResultSets:\n"
+ "+---+------+------+\n"
+ "|key|test.a|test.b|\n"
+ "+---+------+------+\n"
+ "| 1| 1| 1|\n"
+ "| 2| 2| 1|\n"
+ "| 3| 2| 2|\n"
+ "| 4| 3| 1|\n"
+ "| 5| 3| 2|\n"
+ "| 6| 3| 1|\n"
+ "| 7| 4| 1|\n"
+ "| 8| 4| 2|\n"
+ "| 9| 4| 3|\n"
+ "| 10| 4| 1|\n"
+ "+---+------+------+\n"
+ "+---+------+------+------+\n"
+ "|key|test.a|test.b|test.c|\n"
+ "+---+------+------+------+\n"
+ "| 1| 1| 1| aa|\n"
+ "| 2| 2| 1| aa|\n"
+ "| 3| 2| 2| bb|\n"
+ "| 4| 3| 1| bb|\n"
+ "| 5| 3| 2| bb|\n"
+ "| 6| 3| 1| bb|\n"
+ "| 7| 4| 1| bb|\n"
+ "| 8| 4| 2| bb|\n"
+ "| 9| 4| 3| bb|\n"
+ "| 10| 4| 1| bb|\n"
+ "+---+------+------+------+\n"
+ "Total line number = 10\n";
executor.executeAndCompare(statement, expected);

Expand Down Expand Up @@ -1019,6 +1023,17 @@ public void testDistinct() {
+ "Total line number = 1\n";
executor.executeAndCompare(statement, expected);

statement = "SELECT COUNT(DISTINCT a), COUNT(DISTINCT b) FROM test;";
expected =
"ResultSets:\n"
+ "+----------------------+----------------------+\n"
+ "|count(distinct test.a)|count(distinct test.b)|\n"
+ "+----------------------+----------------------+\n"
+ "| 4| 3|\n"
+ "+----------------------+----------------------+\n"
+ "Total line number = 1\n";
executor.executeAndCompare(statement, expected);

statement = "SELECT a, COUNT(b), AVG(b), SUM(b), MIN(b), MAX(b) FROM test GROUP BY a;";
expected =
"ResultSets:\n"
Expand Down Expand Up @@ -1048,6 +1063,18 @@ public void testDistinct() {
+ "Total line number = 4\n";
executor.executeAndCompare(statement, expected);

statement = "SELECT c, COUNT(DISTINCT a), COUNT(DISTINCT b) FROM test GROUP BY c;";
expected =
"ResultSets:\n"
+ "+------+----------------------+----------------------+\n"
+ "|test.c|count(distinct test.a)|count(distinct test.b)|\n"
+ "+------+----------------------+----------------------+\n"
+ "| bb| 3| 3|\n"
+ "| aa| 2| 1|\n"
+ "+------+----------------------+----------------------+\n"
+ "Total line number = 2\n";
executor.executeAndCompare(statement, expected);

statement =
"SELECT COUNT(a), AVG(a), SUM(a), MIN(a), MAX(a) FROM test OVER WINDOW (size 2 IN (0, 10]);";
expected =
Expand Down Expand Up @@ -1079,6 +1106,22 @@ public void testDistinct() {
+ "+---+------------+----------+----------------------+--------------------+--------------------+--------------------+--------------------+\n"
+ "Total line number = 5\n";
executor.executeAndCompare(statement, expected);

statement =
"SELECT COUNT(DISTINCT a), COUNT(DISTINCT b) FROM test OVER WINDOW (size 2 IN (0, 10]);";
expected =
"ResultSets:\n"
+ "+---+------------+----------+----------------------+----------------------+\n"
+ "|key|window_start|window_end|count(distinct test.a)|count(distinct test.b)|\n"
+ "+---+------------+----------+----------------------+----------------------+\n"
+ "| 1| 1| 2| 2| 1|\n"
+ "| 3| 3| 4| 2| 2|\n"
+ "| 5| 5| 6| 1| 2|\n"
+ "| 7| 7| 8| 1| 2|\n"
+ "| 9| 9| 10| 1| 2|\n"
+ "+---+------------+----------+----------------------+----------------------+\n"
+ "Total line number = 5\n";
executor.executeAndCompare(statement, expected);
}

@Test
Expand Down

0 comments on commit 80e7dce

Please sign in to comment.