Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix](ScanNode) Move the finalize phase of ScanNode to after the end of the Physical Translate phase. #37565

Merged
merged 5 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
Expand All @@ -40,7 +39,6 @@
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.datasource.iceberg.source.IcebergSplit;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
Expand Down Expand Up @@ -80,7 +78,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* FileQueryScanNode for querying the file access type of catalog, now only support
Expand Down Expand Up @@ -183,16 +180,6 @@ protected void initSchemaParams() throws UserException {
params.setSrcTupleId(-1);
}

/**
* Reset required_slots in contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
*/
@Override
public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
updateRequiredSlots();
}

private void updateRequiredSlots() throws UserException {
params.unsetRequiredSlots();
for (SlotDescriptor slot : desc.getSlots()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
Expand All @@ -39,7 +38,6 @@
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
Expand All @@ -59,7 +57,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class JdbcScanNode extends ExternalScanNode {
Expand Down Expand Up @@ -259,12 +256,6 @@ public void finalizeForNereids() throws UserException {
createScanRangeLocations();
}

@Override
public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> requiredByProjectSlotIdSet)
throws UserException {
createJdbcColumns();
}

@Override
protected void createScanRangeLocations() throws UserException {
scanRangeLocations = Lists.newArrayList(createSingleScanRangeLocations(backendPolicy));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
Expand All @@ -33,7 +32,6 @@
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
Expand All @@ -53,7 +51,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -118,12 +115,6 @@ public void finalizeForNereids() throws UserException {
createScanRangeLocations();
}

@Override
public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> requiredByProjectSlotIdSet)
throws UserException {
createOdbcColumns();
}

@Override
protected void createScanRangeLocations() throws UserException {
scanRangeLocations = Lists.newArrayList(createSingleScanRangeLocations(backendPolicy));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.datasource.paimon.source;

import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
Expand All @@ -27,7 +26,6 @@
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
Expand All @@ -38,7 +36,6 @@
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPaimonDeletionFileDesc;
import org.apache.doris.thrift.TPaimonFileDesc;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TTableFormatFileDesc;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -279,22 +276,6 @@ private boolean supportNativeReader() {
}
}

//When calling 'setPaimonParams' and 'getSplits', the column trimming has not been performed yet,
// Therefore, paimon_column_names is temporarily reset here
@Override
public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
super.updateRequiredSlots(planTranslatorContext, requiredByProjectSlotIdSet);
String cols = desc.getSlots().stream().map(slot -> slot.getColumn().getName())
.collect(Collectors.joining(","));
for (TScanRangeLocations tScanRangeLocations : scanRangeLocations) {
List<TFileRangeDesc> ranges = tScanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges;
for (TFileRangeDesc tFileRangeDesc : ranges) {
tFileRangeDesc.table_format_params.paimon_params.setPaimonColumnNames(cols);
}
}
}

@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
return getLocationType(((FileStoreTable) source.getPaimonTable()).location().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.datasource.trinoconnector.source;

import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
Expand All @@ -28,15 +27,13 @@
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TTableFormatFileDesc;
import org.apache.doris.thrift.TTrinoConnectorFileDesc;
import org.apache.doris.trinoconnector.TrinoColumnMetadata;
Expand Down Expand Up @@ -321,31 +318,6 @@ private <T> String encodeObjectToString(T t, ObjectMapperProvider objectMapperPr
}
}

// When calling 'setTrinoConnectorParams' and 'getSplits', the column trimming has not been performed yet,
// Therefore, trino_connector_column_names is temporarily reset here
@Override
public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
super.updateRequiredSlots(planTranslatorContext, requiredByProjectSlotIdSet);
Map<String, ColumnMetadata> columnMetadataMap = source.getTargetTable().getColumnMetadataMap();
Map<String, ColumnHandle> columnHandleMap = source.getTargetTable().getColumnHandleMap();
List<ColumnHandle> columnHandles = new ArrayList<>();
for (SlotDescriptor slotDescriptor : desc.getSlots()) {
String colName = slotDescriptor.getColumn().getName();
if (columnMetadataMap.containsKey(colName)) {
columnHandles.add(columnHandleMap.get(colName));
}
}

for (TScanRangeLocations tScanRangeLocations : scanRangeLocations) {
List<TFileRangeDesc> ranges = tScanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges;
for (TFileRangeDesc tFileRangeDesc : ranges) {
tFileRangeDesc.table_format_params.trino_connector_params.setTrinoConnectorColumnHandles(
encodeObjectToString(columnHandles, objectMapperProvider));
}
}
}

@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
return getLocationType("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.es.source.EsScanNode;
import org.apache.doris.datasource.hive.HMSExternalTable;
Expand Down Expand Up @@ -275,6 +273,9 @@ public PlanFragment translatePlan(PhysicalPlan physicalPlan) {
throw new AnalysisException("tables with unknown column stats: " + builder);
}
}
for (ScanNode scanNode : context.getScanNodes()) {
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
}
return rootFragment;
}

Expand Down Expand Up @@ -646,7 +647,6 @@ public PlanFragment visitPhysicalEsScan(PhysicalEsScan esScan, PlanTranslatorCon
)
);
context.getTopnFilterContext().translateTarget(esScan, esScanNode, context);
Utils.execWithUncheckedException(esScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), esScanNode, dataPartition);
context.addPlanFragment(planFragment);
Expand Down Expand Up @@ -697,7 +697,6 @@ private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan fileSca
)
);
context.getTopnFilterContext().translateTarget(fileScan, scanNode, context);
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
// Create PlanFragment
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan);
Expand All @@ -723,7 +722,6 @@ public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanTransla
)
);
context.getTopnFilterContext().translateTarget(jdbcScan, jdbcScanNode, context);
Utils.execWithUncheckedException(jdbcScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), jdbcScanNode, dataPartition);
context.addPlanFragment(planFragment);
Expand All @@ -748,7 +746,6 @@ public PlanFragment visitPhysicalOdbcScan(PhysicalOdbcScan odbcScan, PlanTransla
)
);
context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context);
Utils.execWithUncheckedException(odbcScanNode::finalizeForNereids);
context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode, context);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), odbcScanNode, dataPartition);
Expand Down Expand Up @@ -827,8 +824,6 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla
);
context.getTopnFilterContext().translateTarget(olapScan, olapScanNode, context);
olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId()));
// TODO: we need to remove all finalizeForNereids
olapScanNode.finalizeForNereids();
// Create PlanFragment
// TODO: use a util function to convert distribution to DataPartition
DataPartition dataPartition = DataPartition.RANDOM;
Expand Down Expand Up @@ -915,7 +910,6 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT
.translateRuntimeFilterTarget(expr, finalScanNode, context)
)
);
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
context.addScanNode(scanNode, schemaScan);
PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan);
context.addPlanFragment(planFragment);
Expand All @@ -937,7 +931,6 @@ public PlanFragment visitPhysicalTVFRelation(PhysicalTVFRelation tvfRelation, Pl
.forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
)
);
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
context.addScanNode(scanNode, tvfRelation);

// TODO: it is weird update label in this way
Expand Down Expand Up @@ -1997,6 +1990,7 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
}
requiredSlotIdSet.add(lastSlot.getId());
}
((OlapScanNode) inputPlanNode).updateRequiredSlots(context, requiredByProjectSlotIdSet);
}
updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet,
requiredByProjectSlotIdSet, slotIdsByOrder, context);
Expand Down Expand Up @@ -2446,22 +2440,16 @@ private void updateScanSlotsMaterialization(ScanNode scanNode,
if (scanNode.getTupleDesc().getSlots().isEmpty()) {
scanNode.getTupleDesc().getSlots().add(smallest);
}
try {
if (context.getSessionVariable() != null
&& context.getSessionVariable().forbidUnknownColStats
&& !StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
for (SlotId slotId : requiredByProjectSlotIdSet) {
if (context.isColumnStatsUnknown(scanNode, slotId)) {
String colName = scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
throw new AnalysisException("meet unknown column stats: " + colName);
}
if (context.getSessionVariable() != null
&& context.getSessionVariable().forbidUnknownColStats
&& !StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
for (SlotId slotId : requiredByProjectSlotIdSet) {
if (context.isColumnStatsUnknown(scanNode, slotId)) {
String colName = scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
throw new AnalysisException("meet unknown column stats: " + colName);
}
context.removeScanFromStatsUnknownColumnsMap(scanNode);
}
scanNode.updateRequiredSlots(context, requiredByProjectSlotIdSet);
} catch (UserException e) {
Util.logAndThrowRuntimeException(LOG,
"User Exception while reset external file scan node contexts.", e);
context.removeScanFromStatsUnknownColumnsMap(scanNode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,11 @@ Set<String> getDistributionColumnNames() {
: Sets.newTreeSet();
}

@Override
/**
* Update required_slots in scan node contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
* Currently, it is only used by ExternalFileScanNode, add the interface here to keep the Nereids code clean.
*/
public void updateRequiredSlots(PlanTranslatorContext context,
Set<SlotId> requiredByProjectSlotIdSet) {
outputColumnUniqueIds.clear();
Expand Down
10 changes: 0 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.doris.datasource.SplitAssignment;
import org.apache.doris.datasource.SplitGenerator;
import org.apache.doris.datasource.SplitSource;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.statistics.StatisticalType;
Expand Down Expand Up @@ -174,15 +173,6 @@ protected boolean isKeySearch() {
return false;
}

/**
* Update required_slots in scan node contexts. This is called after Nereids planner do the projection.
* In the projection process, some slots may be removed. So call this to update the slots info.
* Currently, it is only used by ExternalFileScanNode, add the interface here to keep the Nereids code clean.
*/
public void updateRequiredSlots(PlanTranslatorContext context,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
}

private void computeColumnFilter(Column column, SlotDescriptor slotDesc, PartitionInfo partitionsInfo) {
// Set `columnFilters` all the time because `DistributionPruner` also use this.
// Maybe we could use `columnNameToRange` for `DistributionPruner` and
Expand Down
Loading