diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java index 979fee85dcdf7..7af2455f25655 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java @@ -54,6 +54,7 @@ import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED; import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled; +import static com.facebook.presto.iceberg.IcebergTableType.DATA; import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable; import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith; import static com.google.common.base.Preconditions.checkArgument; @@ -125,6 +126,9 @@ public PlanNode visitFilter(FilterNode filter, RewriteContext context) } TableScanNode tableScan = (TableScanNode) filter.getSource(); + if (((IcebergTableHandle) tableScan.getTable().getConnectorHandle()).getIcebergTableName().getTableType() != DATA) { + return visitPlan(filter, context); + } Map nameToColumnHandlesMapping = tableScan.getAssignments().entrySet().stream() .collect(Collectors.toMap(e -> e.getKey().getName(), e -> (IcebergColumnHandle) e.getValue())); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableChangelog.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableChangelog.java index 57a7201bc090d..e5c5a65a86e02 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableChangelog.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergTableChangelog.java @@ -15,11 +15,13 @@ import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import java.util.Arrays; import java.util.Map; import java.util.stream.Collectors; @@ -118,6 +120,36 @@ public void testSelectPredicateStaticColumns() assertQuerySucceeds(String.format("SELECT * FROM \"ctas_orders@%d$changelog\" WHERE operation = 'INSERT'", snapshots[0])); } + @Test + public void testVerifyProjectAndFilterOutput() + { + assertQuerySucceeds("CREATE TABLE test_changelog (a int, b int) WITH (partitioning = ARRAY['a'], delete_mode = 'copy-on-write')"); + assertQuerySucceeds("INSERT INTO test_changelog VALUES (1, 2)"); + assertQuerySucceeds("INSERT INTO test_changelog VALUES (2, 2)"); + assertQuerySucceeds("DELETE FROM test_changelog WHERE a = 2"); + long[] testSnapshots = Lists.reverse( + getQueryRunner().execute("SELECT snapshot_id FROM \"test_changelog$snapshots\" ORDER BY committed_at desc").getOnlyColumn() + .collect(Collectors.toList())) + // skip the earliest snapshot since the changelog starts from there. + .stream().skip(1) + .mapToLong(Long.class::cast) + .toArray(); + assertQuery("SELECT snapshotid FROM \"test_changelog$changelog\" order by ordinal asc", "VALUES " + Joiner.on(", ").join(Arrays.stream(testSnapshots).iterator())); + // Verify correct projections for single columns + assertQuery("SELECT ordinal FROM \"test_changelog$changelog\" order by ordinal asc", "VALUES 0, 1"); + assertQuery("SELECT operation FROM \"test_changelog$changelog\" order by ordinal asc", "VALUES 'INSERT', 'DELETE'"); + assertQuery("SELECT rowdata.a, rowdata.b FROM \"test_changelog$changelog\" order by ordinal asc", "VALUES (2, 2), (2, 2)"); // inserted then deleted + // Verify correct filters results on filters + assertQuery("SELECT ordinal, operation FROM \"test_changelog$changelog\" WHERE ordinal = 0 order by ordinal asc", "VALUES (0, 'INSERT')"); + assertQuery("SELECT ordinal, operation FROM \"test_changelog$changelog\" WHERE ordinal = 1 order by ordinal asc", "VALUES (1, 'DELETE')"); + assertQuery("SELECT ordinal, operation FROM \"test_changelog$changelog\" WHERE operation = 'INSERT' order by ordinal asc", "VALUES (0, 'INSERT')"); + assertQuery("SELECT ordinal, operation FROM \"test_changelog$changelog\" WHERE operation = 'DELETE' order by ordinal asc", "VALUES (1, 'DELETE')"); + assertQueryReturnsEmptyResult("SELECT * FROM \"test_changelog$changelog\" WHERE operation = 'AAABBBCCC'"); + assertQuery(String.format("SELECT ordinal FROM \"test_changelog$changelog\" WHERE snapshotid = %d order by ordinal asc", testSnapshots[0]), "VALUES 0"); + assertQuery(String.format("SELECT ordinal FROM \"test_changelog$changelog\" WHERE snapshotid = %d order by ordinal asc", testSnapshots[1]), "VALUES 1"); + assertQuery("SELECT * FROM \"test_changelog$changelog\" WHERE rowdata.a = 2 order by ordinal asc", String.format("VALUES ('INSERT', 0, %d, (2, 2)), ('DELETE', 1, %d, (2, 2))", testSnapshots[0], testSnapshots[1])); + } + @Test public void testSelectCount() {