Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](mtmv)mtmv support paimon partition refresh #43959

Merged
merged 29 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,8 @@ public void processAlterMTMV(AlterMTMV alterMTMV, boolean isReplay) {
mtmv.alterMvProperties(alterMTMV.getMvProperties());
break;
case ADD_TASK:
mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation(), alterMTMV.getPartitionSnapshots());
mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation(), alterMTMV.getPartitionSnapshots(),
isReplay);
break;
default:
throw new RuntimeException("Unknown type value: " + alterMTMV.getOpType());
Expand Down
10 changes: 7 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,19 @@ public MTMVStatus alterStatus(MTMVStatus newStatus) {
}

public void addTaskResult(MTMVTask task, MTMVRelation relation,
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots, boolean isReplay) {
MTMVCache mtmvCache = null;
boolean needUpdateCache = false;
if (task.getStatus() == TaskStatus.SUCCESS && !Env.isCheckpointThread()
&& !Config.enable_check_compatibility_mode) {
needUpdateCache = true;
try {
// shouldn't do this while holding mvWriteLock
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
// The replay thread may not have initialized the catalog yet to avoid getting stuck due
// to connection issues such as S3, so it is directly set to null
if (!isReplay) {
// shouldn't do this while holding mvWriteLock
mtmvCache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this), true);
}
} catch (Throwable e) {
mtmvCache = null;
LOG.warn("generate cache failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,22 @@
package org.apache.doris.datasource.paimon;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.mtmv.MTMVVersionSnapshot;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
Expand All @@ -30,25 +42,35 @@
import org.apache.doris.thrift.TTableType;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.system.PartitionsTable;
import org.apache.paimon.table.system.SnapshotsTable;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class PaimonExternalTable extends ExternalTable {
public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf {

private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class);

Expand All @@ -73,18 +95,95 @@ public Table getPaimonTable() {
return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null);
}

public PaimonPartitionInfo getPartitionInfoFromCache() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
return new PaimonPartitionInfo();
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo();
}

public List<Column> getPartitionColumnsFromCache() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
return Lists.newArrayList();
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionColumns();
}

public long getLatestSnapshotIdFromCache() throws AnalysisException {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
throw new AnalysisException("not present");
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getSnapshootId();
}

@Override
public Optional<SchemaCacheValue> initSchema() {
Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name);
TableSchema schema = ((FileStoreTable) paimonTable).schema();
List<DataField> columns = schema.fields();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
Set<String> partitionColumnNames = Sets.newHashSet(paimonTable.partitionKeys());
List<Column> partitionColumns = Lists.newArrayList();
for (DataField field : columns) {
tmpSchema.add(new Column(field.name().toLowerCase(),
Column column = new Column(field.name().toLowerCase(),
paimonTypeToDorisType(field.type()), true, null, true, field.description(), true,
field.id()));
field.id());
tmpSchema.add(column);
if (partitionColumnNames.contains(field.name())) {
partitionColumns.add(column);
}
}
try {
// after 0.9.0 paimon will support table.getLatestSnapshotId()
long latestSnapshotId = loadLatestSnapshotId();
PaimonPartitionInfo partitionInfo = loadPartitionInfo(partitionColumns);
return Optional.of(new PaimonSchemaCacheValue(tmpSchema, partitionColumns, paimonTable, latestSnapshotId,
partitionInfo));
} catch (IOException | AnalysisException e) {
LOG.warn(e);
return Optional.empty();
}
}

private long loadLatestSnapshotId() throws IOException {
Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName,
name + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS);
// snapshotId
List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}});
long latestSnapshotId = 0L;
for (InternalRow row : rows) {
long snapshotId = row.getLong(0);
if (snapshotId > latestSnapshotId) {
latestSnapshotId = snapshotId;
}
}
return latestSnapshotId;
}

private PaimonPartitionInfo loadPartitionInfo(List<Column> partitionColumns) throws IOException, AnalysisException {
if (CollectionUtils.isEmpty(partitionColumns)) {
return new PaimonPartitionInfo();
}
List<PaimonPartition> paimonPartitions = loadPartitions();
return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions);
}

private List<PaimonPartition> loadPartitions()
throws IOException {
Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName,
name + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS);
List<InternalRow> rows = PaimonUtil.read(table, null);
List<PaimonPartition> res = Lists.newArrayListWithCapacity(rows.size());
for (InternalRow row : rows) {
res.add(PaimonUtil.rowToPartition(row));
}
return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable));
return res;
}

private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
Expand Down Expand Up @@ -205,4 +304,56 @@ public long fetchRowCount() {
}
return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
}

@Override
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
Env.getCurrentEnv().getRefreshManager()
.refreshTable(getCatalog().getName(), getDbName(), getName(), true);
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() {
return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
}

@Override
public PartitionType getPartitionType() {
return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED;
}

@Override
public Set<String> getPartitionColumnNames() {
return getPartitionColumnsFromCache().stream()
.map(c -> c.getName().toLowerCase()).collect(Collectors.toSet());
}

@Override
public List<Column> getPartitionColumns() {
return getPartitionColumnsFromCache();
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName);
if (paimonPartition == null) {
throw new AnalysisException("can not find partition: " + partitionName);
}
return new MTMVTimestampSnapshot(paimonPartition.getLastUpdateTime());
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
}

@Override
public boolean isPartitionColumnAllowNull() {
// Paimon will write to the 'null' partition regardless of whether it is' null or 'null'.
// The logic is inconsistent with Doris' empty partition logic, so it needs to return false.
// However, when Spark creates Paimon tables, specifying 'not null' does not take effect.
// In order to successfully create the materialized view, false is returned here.
// The cost is that Paimon partition writes a null value, and the materialized view cannot detect this data.
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.paimon;

public class PaimonPartition {
private String partitionValues;
private long recordCount;
private long fileSizeInBytes;
private long fileCount;
private long lastUpdateTime;

public PaimonPartition(String partitionValues, long recordCount, long fileSizeInBytes, long fileCount,
long lastUpdateTime) {
this.partitionValues = partitionValues;
this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
this.fileCount = fileCount;
this.lastUpdateTime = lastUpdateTime;
}

public String getPartitionValues() {
return partitionValues;
}

public long getRecordCount() {
return recordCount;
}

public long getFileSizeInBytes() {
return fileSizeInBytes;
}

public long getFileCount() {
return fileCount;
}

public long getLastUpdateTime() {
return lastUpdateTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.paimon;

import org.apache.doris.catalog.PartitionItem;

import com.google.common.collect.Maps;

import java.util.Map;

public class PaimonPartitionInfo {
private Map<String, PartitionItem> nameToPartitionItem;
private Map<String, PaimonPartition> nameToPartition;

public PaimonPartitionInfo() {
this.nameToPartitionItem = Maps.newHashMap();
this.nameToPartition = Maps.newHashMap();
}

public PaimonPartitionInfo(Map<String, PartitionItem> nameToPartitionItem,
Map<String, PaimonPartition> nameToPartition) {
this.nameToPartitionItem = nameToPartitionItem;
this.nameToPartition = nameToPartition;
}

public Map<String, PartitionItem> getNameToPartitionItem() {
return nameToPartitionItem;
}

public Map<String, PaimonPartition> getNameToPartition() {
return nameToPartition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,34 @@
public class PaimonSchemaCacheValue extends SchemaCacheValue {

private Table paimonTable;
private List<Column> partitionColumns;
private PaimonPartitionInfo partitionInfo;

public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
private long snapshootId;

public PaimonSchemaCacheValue(List<Column> schema, List<Column> partitionColumns, Table paimonTable,
long snapshootId,
PaimonPartitionInfo partitionInfo) {
super(schema);
this.partitionColumns = partitionColumns;
this.paimonTable = paimonTable;
this.snapshootId = snapshootId;
this.partitionInfo = partitionInfo;
}

public Table getPaimonTable() {
return paimonTable;
}

public List<Column> getPartitionColumns() {
return partitionColumns;
}

public PaimonPartitionInfo getPartitionInfo() {
return partitionInfo;
}

public long getSnapshootId() {
return snapshootId;
}
}
Loading
Loading