Skip to content

Commit

Permalink
[bugfix](hudi)add timetravel for nereids (apache#38324)
Browse files Browse the repository at this point in the history
## Proposed changes

1. add timetravel for nereids.

```
select * from tb FOR TIME AS OF "2024-07-24 19:58:43";
select * from tb FOR TIME AS OF "20240724195843";
```
2. Add ugi authentication to the thread pool

(cherry picked from commit 446d2a0)
  • Loading branch information
wuwenchi committed Jul 31, 2024
1 parent c011060 commit a8d1a39
Show file tree
Hide file tree
Showing 9 changed files with 563 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public abstract class FileQueryScanNode extends FileScanNode {

protected String brokerName;

@Getter
protected TableSnapshot tableSnapshot;

/**
Expand Down Expand Up @@ -595,4 +594,16 @@ public void stop() {
}
}
}

public void setQueryTableSnapshot(TableSnapshot tableSnapshot) {
this.tableSnapshot = tableSnapshot;
}

public TableSnapshot getQueryTableSnapshot() {
TableSnapshot snapshot = desc.getRef().getTableSnapshot();
if (snapshot != null) {
return snapshot;
}
return this.tableSnapshot;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hudi.source;

import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;

import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.LocalTaskContextSupplier;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.function.FunctionWrapper;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* This file is copied from org.apache.hudi.common.engine.HoodieLocalEngineContext.
* Because we need set ugi in thread pool
* A java based engine context, use this implementation on the query engine integrations if needed.
*/
public final class HudiLocalEngineContext extends HoodieEngineContext {

public HudiLocalEngineContext(Configuration conf) {
this(conf, new LocalTaskContextSupplier());
}

public HudiLocalEngineContext(Configuration conf, TaskContextSupplier taskContextSupplier) {
super(new SerializableConfiguration(conf), taskContextSupplier);
}

@Override
public HoodieAccumulator newAccumulator() {
return HoodieAtomicLongAccumulator.create();
}

@Override
public <T> HoodieData<T> emptyHoodieData() {
return HoodieListData.eager(Collections.emptyList());
}

@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieListData.eager(data);
}

@Override
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
return data.stream().parallel().map(v1 -> {
try {
return HiveMetaStoreClientHelper.ugiDoAs(getHadoopConf().get(), () -> func.apply(v1));
} catch (Exception e) {
throw new HoodieException("Error occurs when executing map", e);
}
}).collect(Collectors.toList());
}

@Override
public <I, K, V> List<V> mapToPairAndReduceByKey(
List<I> data,
SerializablePairFunction<I, K, V> mapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return data.stream().parallel().map(FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc))
.collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
.map(list ->
list.stream()
.map(e -> e.getValue())
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).get())
.collect(Collectors.toList());
}

@Override
public <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey(
Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc,
SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return FunctionWrapper.throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.parallel().iterator())
.collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
.map(entry -> new ImmutablePair<>(entry.getKey(), entry.getValue().stream().map(
Pair::getValue).reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)))
.filter(Objects::nonNull);
}

@Override
public <I, K, V> List<V> reduceByKey(
List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
return data.stream().parallel()
.collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
.map(list ->
list.stream()
.map(e -> e.getValue())
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

@Override
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
return
data.stream().parallel().flatMap(FunctionWrapper.throwingFlatMapWrapper(func)).collect(Collectors.toList());
}

@Override
public <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism) {
data.stream().forEach(FunctionWrapper.throwingForeachWrapper(consumer));
}

@Override
public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
return data.stream().map(FunctionWrapper.throwingMapToPairWrapper(func)).collect(
Collectors.toMap(Pair::getLeft, Pair::getRight, (oldVal, newVal) -> newVal)
);
}

@Override
public void setProperty(EngineProperty key, String value) {
// no operation for now
}

@Override
public Option<String> getProperty(EngineProperty key) {
return Option.empty();
}

@Override
public void setJobStatus(String activeModule, String activityDescription) {
// no operation for now
}

@Override
public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
// no operation for now
}

@Override
public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
return Collections.emptyList();
}

@Override
public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
return Collections.emptyList();
}

@Override
public void cancelJob(String jobId) {
// no operation for now
}

@Override
public void cancelAllJobs() {
// no operation for now
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.datasource.hudi.source;

import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
Expand Down Expand Up @@ -50,7 +49,7 @@ public List<String> getAllPartitionNames(HoodieTableMetaClient tableMetaClient)
.build();

HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
new HoodieLocalEngineContext(tableMetaClient.getHadoopConf()), metadataConfig,
new HudiLocalEngineContext(tableMetaClient.getHadoopConf()), metadataConfig,
tableMetaClient.getBasePathV2().toString(), true);

return newTableMetadata.getAllPartitionPaths();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.datasource.hudi.source;

import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PartitionItem;
Expand Down Expand Up @@ -203,8 +204,12 @@ protected void doInitialize() throws UserException {
}

timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
if (desc.getRef().getTableSnapshot() != null) {
queryInstant = desc.getRef().getTableSnapshot().getTime();
TableSnapshot tableSnapshot = getQueryTableSnapshot();
if (tableSnapshot != null) {
if (tableSnapshot.getType() == TableSnapshot.VersionType.VERSION) {
throw new UserException("Hudi does not support `FOR VERSION AS OF`, please use `FOR TIME AS OF`");
}
queryInstant = tableSnapshot.getTime().replaceAll("[-: ]", "");
snapshotTimestamp = Option.of(queryInstant);
} else {
Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,7 @@ private List<Split> doGetSplits() throws UserException {
}

public Long getSpecifiedSnapshot() throws UserException {
TableSnapshot tableSnapshot = source.getDesc().getRef().getTableSnapshot();
if (tableSnapshot == null) {
tableSnapshot = this.tableSnapshot;
}
TableSnapshot tableSnapshot = getQueryTableSnapshot();
if (tableSnapshot != null) {
TableSnapshot.VersionType type = tableSnapshot.getType();
try {
Expand Down Expand Up @@ -441,8 +438,4 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
return super.getNodeExplainString(prefix, detailLevel)
+ String.format("%sicebergPredicatePushdown=\n%s\n", prefix, sb);
}

public void setTableSnapshot(TableSnapshot tableSnapshot) {
this.tableSnapshot = tableSnapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.es.source.EsScanNode;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
Expand Down Expand Up @@ -571,10 +572,6 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla
switch (((HMSExternalTable) table).getDlaType()) {
case ICEBERG:
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
IcebergScanNode icebergScanNode = (IcebergScanNode) scanNode;
if (fileScan.getTableSnapshot().isPresent()) {
icebergScanNode.setTableSnapshot(fileScan.getTableSnapshot().get());
}
break;
case HIVE:
scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
Expand All @@ -590,16 +587,16 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla
}
} else if (table instanceof IcebergExternalTable) {
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
if (fileScan.getTableSnapshot().isPresent()) {
((IcebergScanNode) scanNode).setTableSnapshot(fileScan.getTableSnapshot().get());
}
} else if (table instanceof PaimonExternalTable) {
scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
} else if (table instanceof MaxComputeExternalTable) {
scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
} else {
throw new RuntimeException("do not support table type " + table.getType());
}
if (fileScan.getTableSnapshot().isPresent() && scanNode instanceof FileQueryScanNode) {
((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
}
return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor);
}

Expand Down Expand Up @@ -661,7 +658,9 @@ public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTransla
PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false,
hudiScan.getScanParams(), hudiScan.getIncrementalRelation());

if (fileScan.getTableSnapshot().isPresent()) {
((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
}
return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor);
}

Expand Down
Loading

0 comments on commit a8d1a39

Please sign in to comment.