Skip to content

Commit

Permalink
[feat](mtmv)mtmv support paimon partition refresh (#43959)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?
Previously, when using Paimon to create MTMV, it was not possible to
perceive changes in partition lists and data, so only `refresh
materialized view mv1 complete` could be used to force full refresh.

This PR obtains the partition list of Paimon, the last update time of
the partition, and the latest snapshotId of the table.

Therefore, MTMV can be partitioned based on Paimon tables and perceive
changes in data, automatically refreshing partitions

### Release note
mtmv support paimon partition refresh
  • Loading branch information
zddr authored Nov 21, 2024
1 parent 3aa6d04 commit 95e9765
Show file tree
Hide file tree
Showing 8 changed files with 718 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,22 @@
package org.apache.doris.datasource.paimon;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.mtmv.MTMVVersionSnapshot;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
Expand All @@ -30,25 +42,35 @@
import org.apache.doris.thrift.TTableType;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.system.PartitionsTable;
import org.apache.paimon.table.system.SnapshotsTable;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class PaimonExternalTable extends ExternalTable {
public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf {

private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class);

Expand All @@ -73,18 +95,95 @@ public Table getPaimonTable() {
return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null);
}

private PaimonPartitionInfo getPartitionInfoFromCache() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
return new PaimonPartitionInfo();
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo();
}

private List<Column> getPartitionColumnsFromCache() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
return Lists.newArrayList();
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionColumns();
}

public long getLatestSnapshotIdFromCache() throws AnalysisException {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
throw new AnalysisException("not present");
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getSnapshootId();
}

@Override
public Optional<SchemaCacheValue> initSchema() {
Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name);
TableSchema schema = ((FileStoreTable) paimonTable).schema();
List<DataField> columns = schema.fields();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
Set<String> partitionColumnNames = Sets.newHashSet(paimonTable.partitionKeys());
List<Column> partitionColumns = Lists.newArrayList();
for (DataField field : columns) {
tmpSchema.add(new Column(field.name().toLowerCase(),
Column column = new Column(field.name().toLowerCase(),
paimonTypeToDorisType(field.type()), true, null, true, field.description(), true,
field.id()));
field.id());
tmpSchema.add(column);
if (partitionColumnNames.contains(field.name())) {
partitionColumns.add(column);
}
}
try {
// after 0.9.0 paimon will support table.getLatestSnapshotId()
long latestSnapshotId = loadLatestSnapshotId();
PaimonPartitionInfo partitionInfo = loadPartitionInfo(partitionColumns);
return Optional.of(new PaimonSchemaCacheValue(tmpSchema, partitionColumns, paimonTable, latestSnapshotId,
partitionInfo));
} catch (IOException | AnalysisException e) {
LOG.warn(e);
return Optional.empty();
}
}

private long loadLatestSnapshotId() throws IOException {
Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName,
name + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS);
// snapshotId
List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}});
long latestSnapshotId = 0L;
for (InternalRow row : rows) {
long snapshotId = row.getLong(0);
if (snapshotId > latestSnapshotId) {
latestSnapshotId = snapshotId;
}
}
return latestSnapshotId;
}

private PaimonPartitionInfo loadPartitionInfo(List<Column> partitionColumns) throws IOException, AnalysisException {
if (CollectionUtils.isEmpty(partitionColumns)) {
return new PaimonPartitionInfo();
}
List<PaimonPartition> paimonPartitions = loadPartitions();
return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions);
}

private List<PaimonPartition> loadPartitions()
throws IOException {
Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName,
name + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS);
List<InternalRow> rows = PaimonUtil.read(table, null);
List<PaimonPartition> res = Lists.newArrayListWithCapacity(rows.size());
for (InternalRow row : rows) {
res.add(PaimonUtil.rowToPartition(row));
}
return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable));
return res;
}

private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
Expand Down Expand Up @@ -205,4 +304,56 @@ public long fetchRowCount() {
}
return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
}

@Override
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
Env.getCurrentEnv().getRefreshManager()
.refreshTable(getCatalog().getName(), getDbName(), getName(), true);
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() {
return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
}

@Override
public PartitionType getPartitionType() {
return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED;
}

@Override
public Set<String> getPartitionColumnNames() {
return getPartitionColumnsFromCache().stream()
.map(c -> c.getName().toLowerCase()).collect(Collectors.toSet());
}

@Override
public List<Column> getPartitionColumns() {
return getPartitionColumnsFromCache();
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName);
if (paimonPartition == null) {
throw new AnalysisException("can not find partition: " + partitionName);
}
return new MTMVTimestampSnapshot(paimonPartition.getLastUpdateTime());
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
}

@Override
public boolean isPartitionColumnAllowNull() {
// Paimon will write to the 'null' partition regardless of whether it is' null or 'null'.
// The logic is inconsistent with Doris' empty partition logic, so it needs to return false.
// However, when Spark creates Paimon tables, specifying 'not null' does not take effect.
// In order to successfully create the materialized view, false is returned here.
// The cost is that Paimon partition writes a null value, and the materialized view cannot detect this data.
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.paimon;

// https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table
public class PaimonPartition {
// Partition values, for example: [1, dd]
private final String partitionValues;
// The amount of data in the partition
private final long recordCount;
// Partition file size
private final long fileSizeInBytes;
// Number of partition files
private final long fileCount;
// Last update time of partition
private final long lastUpdateTime;

public PaimonPartition(String partitionValues, long recordCount, long fileSizeInBytes, long fileCount,
long lastUpdateTime) {
this.partitionValues = partitionValues;
this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
this.fileCount = fileCount;
this.lastUpdateTime = lastUpdateTime;
}

public String getPartitionValues() {
return partitionValues;
}

public long getRecordCount() {
return recordCount;
}

public long getFileSizeInBytes() {
return fileSizeInBytes;
}

public long getFileCount() {
return fileCount;
}

public long getLastUpdateTime() {
return lastUpdateTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.paimon;

import org.apache.doris.catalog.PartitionItem;

import com.google.common.collect.Maps;

import java.util.Map;

public class PaimonPartitionInfo {
private Map<String, PartitionItem> nameToPartitionItem;
private Map<String, PaimonPartition> nameToPartition;

public PaimonPartitionInfo() {
this.nameToPartitionItem = Maps.newHashMap();
this.nameToPartition = Maps.newHashMap();
}

public PaimonPartitionInfo(Map<String, PartitionItem> nameToPartitionItem,
Map<String, PaimonPartition> nameToPartition) {
this.nameToPartitionItem = nameToPartitionItem;
this.nameToPartition = nameToPartition;
}

public Map<String, PartitionItem> getNameToPartitionItem() {
return nameToPartitionItem;
}

public Map<String, PaimonPartition> getNameToPartition() {
return nameToPartition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,34 @@
public class PaimonSchemaCacheValue extends SchemaCacheValue {

private Table paimonTable;
private List<Column> partitionColumns;
private PaimonPartitionInfo partitionInfo;

public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
private long snapshootId;

public PaimonSchemaCacheValue(List<Column> schema, List<Column> partitionColumns, Table paimonTable,
long snapshootId,
PaimonPartitionInfo partitionInfo) {
super(schema);
this.partitionColumns = partitionColumns;
this.paimonTable = paimonTable;
this.snapshootId = snapshootId;
this.partitionInfo = partitionInfo;
}

public Table getPaimonTable() {
return paimonTable;
}

public List<Column> getPartitionColumns() {
return partitionColumns;
}

public PaimonPartitionInfo getPartitionInfo() {
return partitionInfo;
}

public long getSnapshootId() {
return snapshootId;
}
}
Loading

0 comments on commit 95e9765

Please sign in to comment.