From 09f462ce85bab739ce6aa72670d35fffdd67a250 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Sun, 21 Jul 2024 10:58:09 +0800 Subject: [PATCH 1/4] fix --- .../apache/doris/datasource/FileQueryScanNode.java | 13 ++++++++++++- .../doris/datasource/hudi/source/HudiScanNode.java | 9 +++++++-- .../datasource/iceberg/source/IcebergScanNode.java | 9 +-------- .../glue/translator/PhysicalPlanTranslator.java | 11 ++++------- .../java/org/apache/doris/planner/ScanNode.java | 3 +++ 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 89acd8b87afd5c..03f3a4b01bda09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -97,7 +97,6 @@ public abstract class FileQueryScanNode extends FileScanNode { protected String brokerName; - @Getter protected TableSnapshot tableSnapshot; /** @@ -594,4 +593,16 @@ public void stop() { } } } + + public void setQueryTableSnapshot(TableSnapshot tableSnapshot) { + this.tableSnapshot = tableSnapshot; + } + + public TableSnapshot getQueryTableSnapshot() { + TableSnapshot snapshot = desc.getRef().getTableSnapshot(); + if (snapshot != null) { + return snapshot; + } + return this.tableSnapshot; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 111f0877f3a82b..7ff23cc67e0821 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.hudi.source; import org.apache.doris.analysis.TableScanParams; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.PartitionItem; @@ -203,8 +204,12 @@ protected void doInitialize() throws UserException { } timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); - if (desc.getRef().getTableSnapshot() != null) { - queryInstant = desc.getRef().getTableSnapshot().getTime(); + TableSnapshot tableSnapshot = getQueryTableSnapshot(); + if (tableSnapshot != null) { + if (tableSnapshot.getType() == TableSnapshot.VersionType.VERSION) { + throw new UserException("Hudi does not support `FOR VERSION AS OF`, please use `FOR TIME AS OF`"); + } + queryInstant = tableSnapshot.getTime(); snapshotTimestamp = Option.of(queryInstant); } else { Option snapshotInstant = timeline.lastInstant(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 85778d0ae40273..442f3dd17cf8cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -275,10 +275,7 @@ private List doGetSplits() throws UserException { } public Long getSpecifiedSnapshot() throws UserException { - TableSnapshot tableSnapshot = source.getDesc().getRef().getTableSnapshot(); - if (tableSnapshot == null) { - tableSnapshot = this.tableSnapshot; - } + TableSnapshot tableSnapshot = getQueryTableSnapshot(); if (tableSnapshot != null) { TableSnapshot.VersionType type = tableSnapshot.getType(); try { @@ -441,8 +438,4 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { return super.getNodeExplainString(prefix, detailLevel) + String.format("%sicebergPredicatePushdown=\n%s\n", prefix, sb); } - - public void setTableSnapshot(TableSnapshot tableSnapshot) { - this.tableSnapshot = tableSnapshot; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index cc4d8301238229..0aed61e15ac04b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -49,6 +49,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.es.source.EsScanNode; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; @@ -576,10 +577,6 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla switch (((HMSExternalTable) table).getDlaType()) { case ICEBERG: scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false); - IcebergScanNode icebergScanNode = (IcebergScanNode) scanNode; - if (fileScan.getTableSnapshot().isPresent()) { - icebergScanNode.setTableSnapshot(fileScan.getTableSnapshot().get()); - } break; case HIVE: scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false); @@ -595,9 +592,6 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla } } else if (table instanceof IcebergExternalTable) { scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false); - if (fileScan.getTableSnapshot().isPresent()) { - ((IcebergScanNode) scanNode).setTableSnapshot(fileScan.getTableSnapshot().get()); - } } else if (table instanceof PaimonExternalTable) { scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false); } else if (table instanceof TrinoConnectorExternalTable) { @@ -609,6 +603,9 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla } else { throw new RuntimeException("do not support table type " + table.getType()); } + if (fileScan.getTableSnapshot().isPresent() && scanNode instanceof FileQueryScanNode) { + ((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get()); + } return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 224f85b37a567f..608e60f4e44df7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -34,6 +34,7 @@ import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.Column; @@ -108,6 +109,8 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { // support multi topn filter protected final List topnFilterSortNodes = Lists.newArrayList(); + protected TableSnapshot tableSnapshot; + public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType) { super(id, desc.getId().asList(), planNodeName, statisticalType); this.desc = desc; From 3dbf81d5316c4ca6fd254602014e5795f090965f Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Tue, 23 Jul 2024 10:57:05 +0800 Subject: [PATCH 2/4] fix --- .../org/apache/doris/datasource/hudi/source/HudiScanNode.java | 2 +- .../doris/nereids/glue/translator/PhysicalPlanTranslator.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 7ff23cc67e0821..66c14446845b6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -209,7 +209,7 @@ protected void doInitialize() throws UserException { if (tableSnapshot.getType() == TableSnapshot.VersionType.VERSION) { throw new UserException("Hudi does not support `FOR VERSION AS OF`, please use `FOR TIME AS OF`"); } - queryInstant = tableSnapshot.getTime(); + queryInstant = tableSnapshot.getTime().replaceAll("[-: ]", ""); snapshotTimestamp = Option.of(queryInstant); } else { Option snapshotInstant = timeline.lastInstant(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 0aed61e15ac04b..a0fc6ea58b35e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -668,7 +668,9 @@ public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTransla PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan; ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false, hudiScan.getScanParams(), hudiScan.getIncrementalRelation()); - + if (fileScan.getTableSnapshot().isPresent()) { + ((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get()); + } return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor); } From 4fa65defa926114e469d43e84dfb7780bce91c1f Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 24 Jul 2024 17:01:04 +0800 Subject: [PATCH 3/4] fix --- .../hudi/source/HudiLocalEngineContext.java | 188 ++++++++++++++++++ .../hudi/source/HudiPartitionProcessor.java | 3 +- 2 files changed, 189 insertions(+), 2 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java new file mode 100644 index 00000000000000..26ef6fdfef7086 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiLocalEngineContext.java @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi.source; + +import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.data.HoodieAccumulator; +import org.apache.hudi.common.data.HoodieAtomicLongAccumulator; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; +import org.apache.hudi.common.data.HoodieListData; +import org.apache.hudi.common.engine.EngineProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.LocalTaskContextSupplier; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.function.FunctionWrapper; +import org.apache.hudi.common.function.SerializableBiFunction; +import org.apache.hudi.common.function.SerializableConsumer; +import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.function.SerializablePairFlatMapFunction; +import org.apache.hudi.common.function.SerializablePairFunction; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This file is copied from org.apache.hudi.common.engine.HoodieLocalEngineContext. + * Because we need set ugi in thread pool + * A java based engine context, use this implementation on the query engine integrations if needed. + */ +public final class HudiLocalEngineContext extends HoodieEngineContext { + + public HudiLocalEngineContext(Configuration conf) { + this(conf, new LocalTaskContextSupplier()); + } + + public HudiLocalEngineContext(Configuration conf, TaskContextSupplier taskContextSupplier) { + super(new SerializableConfiguration(conf), taskContextSupplier); + } + + @Override + public HoodieAccumulator newAccumulator() { + return HoodieAtomicLongAccumulator.create(); + } + + @Override + public HoodieData emptyHoodieData() { + return HoodieListData.eager(Collections.emptyList()); + } + + @Override + public HoodieData parallelize(List data, int parallelism) { + return HoodieListData.eager(data); + } + + @Override + public List map(List data, SerializableFunction func, int parallelism) { + return data.stream().parallel().map(v1 -> { + try { + return HiveMetaStoreClientHelper.ugiDoAs(getHadoopConf().get(), () -> func.apply(v1)); + } catch (Exception e) { + throw new HoodieException("Error occurs when executing map", e); + } + }).collect(Collectors.toList()); + } + + @Override + public List mapToPairAndReduceByKey( + List data, + SerializablePairFunction mapToPairFunc, + SerializableBiFunction reduceFunc, int parallelism) { + return data.stream().parallel().map(FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc)) + .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() + .map(list -> + list.stream() + .map(e -> e.getValue()) + .reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).get()) + .collect(Collectors.toList()); + } + + @Override + public Stream> mapPartitionsToPairAndReduceByKey( + Stream data, SerializablePairFlatMapFunction, K, V> flatMapToPairFunc, + SerializableBiFunction reduceFunc, int parallelism) { + return FunctionWrapper.throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator()) + .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream() + .map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map( + Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))) + .filter(Objects::nonNull); + } + + @Override + public List reduceByKey( + List> data, SerializableBiFunction reduceFunc, int parallelism) { + return data.stream().parallel() + .collect(Collectors.groupingBy(p -> p.getKey())).values().stream() + .map(list -> + list.stream() + .map(e -> e.getValue()) + .reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + @Override + public List flatMap(List data, SerializableFunction> func, int parallelism) { + return + data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func)).collect(Collectors.toList()); + } + + @Override + public void foreach(List data, SerializableConsumer consumer, int parallelism) { + data.stream().forEach(FunctionWrapper.throwingForeachWrapper(consumer)); + } + + @Override + public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) { + return data.stream().map(FunctionWrapper.throwingMapToPairWrapper(func)).collect( + Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal) -> newVal) + ); + } + + @Override + public void setProperty(EngineProperty key, String value) { + // no operation for now + } + + @Override + public Option getProperty(EngineProperty key) { + return Option.empty(); + } + + @Override + public void setJobStatus(String activeModule, String activityDescription) { + // no operation for now + } + + @Override + public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) { + // no operation for now + } + + @Override + public List getCachedDataIds(HoodieDataCacheKey cacheKey) { + return Collections.emptyList(); + } + + @Override + public List removeCachedDataIds(HoodieDataCacheKey cacheKey) { + return Collections.emptyList(); + } + + @Override + public void cancelJob(String jobId) { + // no operation for now + } + + @Override + public void cancelAllJobs() { + // no operation for now + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java index 4baa147704125c..738b2638588e03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiPartitionProcessor.java @@ -18,7 +18,6 @@ package org.apache.doris.datasource.hudi.source; import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; @@ -50,7 +49,7 @@ public List getAllPartitionNames(HoodieTableMetaClient tableMetaClient) .build(); HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create( - new HoodieLocalEngineContext(tableMetaClient.getHadoopConf()), metadataConfig, + new HudiLocalEngineContext(tableMetaClient.getHadoopConf()), metadataConfig, tableMetaClient.getBasePathV2().toString(), true); return newTableMetadata.getAllPartitionPaths(); From eedf4cd664353b993dc2706d1ed83955cd282e60 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 24 Jul 2024 20:51:35 +0800 Subject: [PATCH 4/4] fix --- .../hudi/test_hudi_timetravel.out | 125 ++++++++++++++++++ .../hudi/test_hudi_timetravel.groovy | 107 +++++++++++++++ 2 files changed, 232 insertions(+) create mode 100644 regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out create mode 100644 regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out b/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out new file mode 100644 index 00000000000000..38b6ff7846f49a --- /dev/null +++ b/regression-test/data/external_table_p2/hudi/test_hudi_timetravel.out @@ -0,0 +1,125 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q00 -- +20240724195843565 20240724195843565_0_0 20240724195843565_0_0 PAR1=para/par2=para 7a788b37-9ef0-409a-bc42-6793e35fcad5-0_0-129-127_20240724195848377.parquet 1 a b para para +20240724195845718 20240724195845718_0_0 20240724195845718_0_0 PAR1=para/par2=parb fef19b36-4a18-4d8c-b204-1ed448f7de51-0_0-152-155_20240724195850799.parquet 2 a b para parb +20240724195848377 20240724195848377_0_1 20240724195848377_0_0 PAR1=para/par2=para 7a788b37-9ef0-409a-bc42-6793e35fcad5-0_0-129-127_20240724195848377.parquet 3 a b para para +20240724195850799 20240724195850799_0_1 20240724195850799_0_0 PAR1=para/par2=parb fef19b36-4a18-4d8c-b204-1ed448f7de51-0_0-152-155_20240724195850799.parquet 4 a b para parb + +-- !q01 -- + +-- !q02 -- + +-- !q01 -- +20240724195843565 20240724195843565_0_0 20240724195843565_0_0 PAR1=para/par2=para 7a788b37-9ef0-409a-bc42-6793e35fcad5-0_0-129-127_20240724195848377.parquet 1 a b para para +20240724195845718 20240724195845718_0_0 20240724195845718_0_0 PAR1=para/par2=parb fef19b36-4a18-4d8c-b204-1ed448f7de51-0_0-152-155_20240724195850799.parquet 2 a b para parb +20240724195848377 20240724195848377_0_1 20240724195848377_0_0 PAR1=para/par2=para 7a788b37-9ef0-409a-bc42-6793e35fcad5-0_0-129-127_20240724195848377.parquet 3 a b para para +20240724195850799 20240724195850799_0_1 20240724195850799_0_0 PAR1=para/par2=parb fef19b36-4a18-4d8c-b204-1ed448f7de51-0_0-152-155_20240724195850799.parquet 4 a b para parb + +-- !q02 -- +20240724195843565 20240724195843565_0_0 20240724195843565_0_0 PAR1=para/par2=para 7a788b37-9ef0-409a-bc42-6793e35fcad5-0_0-129-127_20240724195848377.parquet 1 a b para para +20240724195845718 20240724195845718_0_0 20240724195845718_0_0 PAR1=para/par2=parb fef19b36-4a18-4d8c-b204-1ed448f7de51-0_0-152-155_20240724195850799.parquet 2 a b para parb +20240724195848377 20240724195848377_0_1 20240724195848377_0_0 PAR1=para/par2=para 7a788b37-9ef0-409a-bc42-6793e35fcad5-0_0-129-127_20240724195848377.parquet 3 a b para para +20240724195850799 20240724195850799_0_1 20240724195850799_0_0 PAR1=para/par2=parb fef19b36-4a18-4d8c-b204-1ed448f7de51-0_0-152-155_20240724195850799.parquet 4 a b para parb + +-- !q03 -- + +-- !q04 -- + +-- !q05 -- +1 a b para para + +-- !q06 -- +1 a b para para + +-- !q07 -- +1 a b para para +2 a b para parb + +-- !q08 -- +1 a b para para +2 a b para parb + +-- !q09 -- +1 a b para para +2 a b para parb +3 a b para para + +-- !q10 -- +1 a b para para +2 a b para parb +3 a b para para + +-- !q11 -- +1 a b para para +2 a b para parb +3 a b para para +4 a b para parb + +-- !q12 -- +1 a b para para +2 a b para parb +3 a b para para +4 a b para parb + +-- !q50 -- +20240724195853736 20240724195853736_0_0 Id:1 PAR1=para/par2=para c5a8ebb7-f929-43ba-9f8d-d733fae27605-0_0-203-210_20240724195858450.parquet 1 a b para para +20240724195856338 20240724195856338_0_0 Id:2 PAR1=para/par2=parb 23756678-cf81-481c-b559-85c0b47b0a80-0_0-219-228_20240724195902682.parquet 2 a b para parb +20240724195858450 20240724195858450_0_1 Id:3 PAR1=para/par2=para c5a8ebb7-f929-43ba-9f8d-d733fae27605-0_0-203-210_20240724195858450.parquet 3 a b para para +20240724195902682 20240724195902682_0_1 Id:4 PAR1=para/par2=parb 23756678-cf81-481c-b559-85c0b47b0a80-0_0-219-228_20240724195902682.parquet 4 a b para parb + +-- !q51 -- + +-- !q52 -- + +-- !q51 -- +20240724195853736 20240724195853736_0_0 Id:1 PAR1=para/par2=para c5a8ebb7-f929-43ba-9f8d-d733fae27605-0_0-203-210_20240724195858450.parquet 1 a b para para +20240724195856338 20240724195856338_0_0 Id:2 PAR1=para/par2=parb 23756678-cf81-481c-b559-85c0b47b0a80-0_0-219-228_20240724195902682.parquet 2 a b para parb +20240724195858450 20240724195858450_0_1 Id:3 PAR1=para/par2=para c5a8ebb7-f929-43ba-9f8d-d733fae27605-0_0-203-210_20240724195858450.parquet 3 a b para para +20240724195902682 20240724195902682_0_1 Id:4 PAR1=para/par2=parb 23756678-cf81-481c-b559-85c0b47b0a80-0_0-219-228_20240724195902682.parquet 4 a b para parb + +-- !q52 -- +20240724195853736 20240724195853736_0_0 Id:1 PAR1=para/par2=para c5a8ebb7-f929-43ba-9f8d-d733fae27605-0_0-203-210_20240724195858450.parquet 1 a b para para +20240724195856338 20240724195856338_0_0 Id:2 PAR1=para/par2=parb 23756678-cf81-481c-b559-85c0b47b0a80-0_0-219-228_20240724195902682.parquet 2 a b para parb +20240724195858450 20240724195858450_0_1 Id:3 PAR1=para/par2=para c5a8ebb7-f929-43ba-9f8d-d733fae27605-0_0-203-210_20240724195858450.parquet 3 a b para para +20240724195902682 20240724195902682_0_1 Id:4 PAR1=para/par2=parb 23756678-cf81-481c-b559-85c0b47b0a80-0_0-219-228_20240724195902682.parquet 4 a b para parb + +-- !q53 -- + +-- !q54 -- + +-- !q55 -- +1 a b para para + +-- !q56 -- +1 a b para para + +-- !q57 -- +1 a b para para +2 a b para parb + +-- !q58 -- +1 a b para para +2 a b para parb + +-- !q59 -- +1 a b para para +2 a b para parb +3 a b para para + +-- !q60 -- +1 a b para para +2 a b para parb +3 a b para para + +-- !q61 -- +1 a b para para +2 a b para parb +3 a b para para +4 a b para parb + +-- !q62 -- +1 a b para para +2 a b para parb +3 a b para para +4 a b para parb + diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy new file mode 100644 index 00000000000000..e8c859698326b3 --- /dev/null +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_timetravel.groovy @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hudi_timetravel", "p2,external,hudi,external_remote,external_remote_hudi") { + + String enabled = context.config.otherConfigs.get("enableExternalHudiTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable hudi test") + } + + String catalog_name = "test_hudi_timetravel" + String props = context.config.otherConfigs.get("hudiEmrCatalog") + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + ${props} + ); + """ + + sql """switch ${catalog_name};""" + sql """ use regression_hudi;""" + sql """ set enable_fallback_to_original_planner=false """ + + qt_q00 """select * from timetravel_cow order by id""" + qt_q01 """select * from timetravel_cow FOR TIME AS OF "2024-07-24" order by id""" // no data + qt_q02 """select * from timetravel_cow FOR TIME AS OF "20240724" order by id""" // no data + qt_q01 """select * from timetravel_cow FOR TIME AS OF "2024-07-25" order by id""" + qt_q02 """select * from timetravel_cow FOR TIME AS OF "20240725" order by id""" + qt_q03 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS OF "2024-07-24 19:58:43" order by id """ // no data + qt_q04 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS OF "20240724195843" order by id """ // no data + qt_q05 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS OF "2024-07-24 19:58:44" order by id """ // one + qt_q06 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS OF "20240724195844" order by id """ //one + qt_q07 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS OF "2024-07-24 19:58:48" order by id """ // two + qt_q08 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS OF "20240724195848" order by id """ // two + qt_q09 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS OF "2024-07-24 19:58:49" order by id """ // three + qt_q10 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS OF "20240724195849" order by id """ // three + qt_q11 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS OF "2024-07-24 19:58:51" order by id """ // four + qt_q12 """ select id, val1,val2,par1,par2 from timetravel_cow FOR TIME AS OF "20240724195851" order by id """ // four + + qt_q50 """select * from timetravel_mor order by id""" + qt_q51 """select * from timetravel_mor FOR TIME AS OF "2024-07-24" order by id""" // no data + qt_q52 """select * from timetravel_mor FOR TIME AS OF "20240724" order by id""" // no data + qt_q51 """select * from timetravel_mor FOR TIME AS OF "2024-07-25" order by id""" + qt_q52 """select * from timetravel_mor FOR TIME AS OF "20240725" order by id""" + qt_q53 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS OF "2024-07-24 19:58:53" order by id """ // no data + qt_q54 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS OF "20240724195853" order by id """ // no data + qt_q55 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS OF "2024-07-24 19:58:54" order by id """ // one + qt_q56 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS OF "20240724195854" order by id """ //one + qt_q57 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS OF "2024-07-24 19:58:58" order by id """ // two + qt_q58 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS OF "20240724195858" order by id """ // two + qt_q59 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS OF "2024-07-24 19:58:59" order by id """ // three + qt_q60 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS OF "20240724195859" order by id """ // three + qt_q61 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS OF "2024-07-24 19:59:03" order by id """ // four + qt_q62 """ select id, val1,val2,par1,par2 from timetravel_mor FOR TIME AS OF "20240724195903" order by id """ // four +} + + +/* + +create table timetravel_cow ( + Id int, + VAL1 string, + val2 string, + PAR1 string, + par2 string +) using hudi +partitioned by (par1, par2) +TBLPROPERTIES ( + 'type' = 'cow'); + +create table timetravel_mor ( + Id int, + VAL1 string, + val2 string, + PAR1 string, + par2 string +) using hudi +partitioned by (par1, par2) +TBLPROPERTIES ( + 'primaryKey' = 'Id', + 'type' = 'mor'); + +insert into timetravel_cow values (1, 'a','b','para','para'); +insert into timetravel_cow values (2, 'a','b','para','parb'); +insert into timetravel_cow values (3, 'a','b','para','para'); +insert into timetravel_cow values (4, 'a','b','para','parb'); + +insert into timetravel_mor values (1, 'a','b','para','para'); +insert into timetravel_mor values (2, 'a','b','para','parb'); +insert into timetravel_mor values (3, 'a','b','para','para'); +insert into timetravel_mor values (4, 'a','b','para','parb'); + +*/