Skip to content

Commit

Permalink
Skip IcebergPlanOptimizer for non-DATA tables
Browse files Browse the repository at this point in the history
The rule was being incorrectly applied to the $changelog
system table. It would assume certain columns were
partition columns resulting in incorrect query output.
  • Loading branch information
ZacBlanco authored and tdcmeehan committed Apr 5, 2024
1 parent 6ff2a97 commit f024583
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -125,6 +126,9 @@ public PlanNode visitFilter(FilterNode filter, RewriteContext<Void> context)
}

TableScanNode tableScan = (TableScanNode) filter.getSource();
if (((IcebergTableHandle) tableScan.getTable().getConnectorHandle()).getIcebergTableName().getTableType() != DATA) {
return visitPlan(filter, context);
}

Map<String, IcebergColumnHandle> nameToColumnHandlesMapping = tableScan.getAssignments().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().getName(), e -> (IcebergColumnHandle) e.getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -118,6 +120,36 @@ public void testSelectPredicateStaticColumns()
assertQuerySucceeds(String.format("SELECT * FROM \"ctas_orders@%d$changelog\" WHERE operation = 'INSERT'", snapshots[0]));
}

@Test
public void testVerifyProjectAndFilterOutput()
{
assertQuerySucceeds("CREATE TABLE test_changelog (a int, b int) WITH (partitioning = ARRAY['a'], delete_mode = 'copy-on-write')");
assertQuerySucceeds("INSERT INTO test_changelog VALUES (1, 2)");
assertQuerySucceeds("INSERT INTO test_changelog VALUES (2, 2)");
assertQuerySucceeds("DELETE FROM test_changelog WHERE a = 2");
long[] testSnapshots = Lists.reverse(
getQueryRunner().execute("SELECT snapshot_id FROM \"test_changelog$snapshots\" ORDER BY committed_at desc").getOnlyColumn()
.collect(Collectors.toList()))
// skip the earliest snapshot since the changelog starts from there.
.stream().skip(1)
.mapToLong(Long.class::cast)
.toArray();
assertQuery("SELECT snapshotid FROM \"test_changelog$changelog\" order by ordinal asc", "VALUES " + Joiner.on(", ").join(Arrays.stream(testSnapshots).iterator()));
// Verify correct projections for single columns
assertQuery("SELECT ordinal FROM \"test_changelog$changelog\" order by ordinal asc", "VALUES 0, 1");
assertQuery("SELECT operation FROM \"test_changelog$changelog\" order by ordinal asc", "VALUES 'INSERT', 'DELETE'");
assertQuery("SELECT rowdata.a, rowdata.b FROM \"test_changelog$changelog\" order by ordinal asc", "VALUES (2, 2), (2, 2)"); // inserted then deleted
// Verify correct filters results on filters
assertQuery("SELECT ordinal, operation FROM \"test_changelog$changelog\" WHERE ordinal = 0 order by ordinal asc", "VALUES (0, 'INSERT')");
assertQuery("SELECT ordinal, operation FROM \"test_changelog$changelog\" WHERE ordinal = 1 order by ordinal asc", "VALUES (1, 'DELETE')");
assertQuery("SELECT ordinal, operation FROM \"test_changelog$changelog\" WHERE operation = 'INSERT' order by ordinal asc", "VALUES (0, 'INSERT')");
assertQuery("SELECT ordinal, operation FROM \"test_changelog$changelog\" WHERE operation = 'DELETE' order by ordinal asc", "VALUES (1, 'DELETE')");
assertQueryReturnsEmptyResult("SELECT * FROM \"test_changelog$changelog\" WHERE operation = 'AAABBBCCC'");
assertQuery(String.format("SELECT ordinal FROM \"test_changelog$changelog\" WHERE snapshotid = %d order by ordinal asc", testSnapshots[0]), "VALUES 0");
assertQuery(String.format("SELECT ordinal FROM \"test_changelog$changelog\" WHERE snapshotid = %d order by ordinal asc", testSnapshots[1]), "VALUES 1");
assertQuery("SELECT * FROM \"test_changelog$changelog\" WHERE rowdata.a = 2 order by ordinal asc", String.format("VALUES ('INSERT', 0, %d, (2, 2)), ('DELETE', 1, %d, (2, 2))", testSnapshots[0], testSnapshots[1]));
}

@Test
public void testSelectCount()
{
Expand Down

0 comments on commit f024583

Please sign in to comment.