Skip to content

Commit

Permalink
[feature](mtmv)create mtmv support refresh_partition_num (apache#28566)
Browse files Browse the repository at this point in the history
- create/alter mtmv support refresh_partition_num
- mtmv task according to refresh_partition_num executes refresh tasks in batches
- `tasks` tvf add column `CompletedPartitions` and `progress`
- fix mtmv can not `show temp partition` and `drop temp partition`
- fix task can not get error msg when insert overwrite error
- fix when the partition field is capitalized, the verification of creating a mtmv does not pass
  • Loading branch information
zddr authored and HappenLee committed Jan 12, 2024
1 parent b65f6bd commit f1144e3
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 55 deletions.
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ public long getGracePeriod() {
}
}

public int getRefreshPartitionNum() {
if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value;
} else {
return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM;
}
}

public Set<String> getExcludedTriggerTables() {
if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
return Sets.newHashSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public ProcNodeInterface lookup(String entryName) throws AnalysisException {
throw new AnalysisException("Table[" + table.getName() + "] is not a OLAP or ELASTICSEARCH table");
}
} else if (entryName.equals(TEMP_PARTITIONS)) {
if (table.getType() == TableType.OLAP) {
if (table instanceof OlapTable) {
return new PartitionsProcDir((Database) db, (OlapTable) table, true);
} else {
throw new AnalysisException("Table[" + table.getName() + "] does not have temp partitions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public class PropertyAnalyzer {
"enable_duplicate_without_keys_by_default";
public static final String PROPERTIES_GRACE_PERIOD = "grace_period";
public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES = "excluded_trigger_tables";
public static final String PROPERTIES_REFRESH_PARTITION_NUM = "refresh_partition_num";
// For unique key data model, the feature Merge-on-Write will leverage a primary
// key index and a delete-bitmap to mark duplicate keys as deleted in load stage,
// which can avoid the merging cost in read stage, and accelerate the aggregation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1707,7 +1707,8 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause
}

PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST
&& !isTempPartition) {
throw new DdlException("Alter table [" + olapTable.getName() + "] failed. Not a partitioned table");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
Expand All @@ -54,6 +56,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -62,7 +66,7 @@

public class MTMVTask extends AbstractTask {
private static final Logger LOG = LogManager.getLogger(MTMVTask.class);
public static final Long MAX_HISTORY_TASKS_NUM = 100L;
public static final int DEFAULT_REFRESH_PARTITION_NUM = 1;

public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("TaskId", ScalarType.createStringType()),
Expand All @@ -78,7 +82,9 @@ public class MTMVTask extends AbstractTask {
new Column("DurationMs", ScalarType.createStringType()),
new Column("TaskContext", ScalarType.createStringType()),
new Column("RefreshMode", ScalarType.createStringType()),
new Column("RefreshPartitions", ScalarType.createStringType()));
new Column("NeedRefreshPartitions", ScalarType.createStringType()),
new Column("CompletedPartitions", ScalarType.createStringType()),
new Column("Progress", ScalarType.createStringType()));

public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;

Expand All @@ -97,7 +103,7 @@ public enum MTMVTaskTriggerMode {

public enum MTMVTaskRefreshMode {
COMPLETE,
PARTITION,
PARTIAL,
NOT_REFRESH
}

Expand All @@ -107,15 +113,16 @@ public enum MTMVTaskRefreshMode {
private long mtmvId;
@SerializedName("taskContext")
private MTMVTaskContext taskContext;
@SerializedName("refreshPartitions")
List<String> refreshPartitions;
@SerializedName("needRefreshPartitions")
List<String> needRefreshPartitions;
@SerializedName("completedPartitions")
List<String> completedPartitions;
@SerializedName("refreshMode")
MTMVTaskRefreshMode refreshMode;

private MTMV mtmv;
private MTMVRelation relation;
private StmtExecutor executor;
private Set<Long> refreshPartitionIds = Sets.newHashSet();

public MTMVTask() {
}
Expand All @@ -130,30 +137,51 @@ public MTMVTask(long dbId, long mtmvId, MTMVTaskContext taskContext) {
public void run() throws JobException {
try {
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
TUniqueId queryId = generateQueryId();
// Every time a task is run, the relation is regenerated because baseTables and baseViews may change,
// such as deleting a table and creating a view with the same name
relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
calculateRefreshInfo();
Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions();
this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds);
this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
return;
} else if (refreshMode == MTMVTaskRefreshMode.PARTITION) {
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol());
}
refreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, refreshPartitionIds);
UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
.from(mtmv, refreshPartitionIds, tableWithPartKey);
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
ctx.setQueryId(queryId);
command.run(ctx, executor);
Map<OlapTable, String> tableWithPartKey = getIncrementalTableMap();
this.completedPartitions = Lists.newArrayList();
int refreshPartitionNum = mtmv.getRefreshPartitionNum();
long execNum = (needRefreshPartitionIds.size() / refreshPartitionNum) + ((needRefreshPartitionIds.size()
% refreshPartitionNum) > 0 ? 1 : 0);
for (int i = 0; i < execNum; i++) {
int start = i * refreshPartitionNum;
int end = start + refreshPartitionNum;
Set<Long> execPartitionIds = Sets.newHashSet(needRefreshPartitionIds
.subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end));
// need get names before exec
List<String> execPartitionNames = MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
exec(ctx, execPartitionIds, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
}
} catch (Throwable e) {
LOG.warn("run task failed: ", e);
throw new JobException(e);
}
}

private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds, Map<OlapTable, String> tableWithPartKey)
throws Exception {
TUniqueId queryId = generateQueryId();
// if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set
UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
.from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE
? refreshPartitionIds : Sets.newHashSet(), tableWithPartKey);
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
ctx.setQueryId(queryId);
command.run(ctx, executor);
if (ctx.getState().getStateType() != MysqlStateType.OK) {
throw new JobException(ctx.getState().getErrorMessage());
}
}

@Override
public synchronized void onFail() throws JobException {
super.onFail();
Expand Down Expand Up @@ -214,10 +242,33 @@ public TRow getTvfInfo() {
new TCell().setStringVal(refreshMode == null ? FeConstants.null_string : refreshMode.toString()));
trow.addToColumnValue(
new TCell().setStringVal(
refreshPartitions == null ? FeConstants.null_string : new Gson().toJson(refreshPartitions)));
needRefreshPartitions == null ? FeConstants.null_string : new Gson().toJson(
needRefreshPartitions)));
trow.addToColumnValue(
new TCell().setStringVal(
completedPartitions == null ? FeConstants.null_string : new Gson().toJson(
completedPartitions)));
trow.addToColumnValue(
new TCell().setStringVal(getProgress()));
return trow;
}

private String getProgress() {
if (CollectionUtils.isEmpty(needRefreshPartitions)) {
return FeConstants.null_string;
}
int completedSize = CollectionUtils.isEmpty(completedPartitions) ? 0 : completedPartitions.size();
BigDecimal result = new BigDecimal(completedSize * 100)
.divide(new BigDecimal(needRefreshPartitions.size()), 2, RoundingMode.HALF_UP);
StringBuilder builder = new StringBuilder(result.toString());
builder.append("% (");
builder.append(completedSize);
builder.append("/");
builder.append(needRefreshPartitions.size());
builder.append(")");
return builder.toString();
}

private TUniqueId generateQueryId() {
UUID taskId = UUID.randomUUID();
return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
Expand All @@ -229,52 +280,53 @@ private void after() {
mtmv = null;
relation = null;
executor = null;
refreshPartitionIds = null;
}

private void calculateRefreshInfo() throws AnalysisException {
private Map<OlapTable, String> getIncrementalTableMap() throws AnalysisException {
Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol());
}
return tableWithPartKey;
}


private MTMVTaskRefreshMode generateRefreshMode(List<Long> needRefreshPartitionIds) {
if (CollectionUtils.isEmpty(needRefreshPartitionIds)) {
return MTMVTaskRefreshMode.NOT_REFRESH;
} else if (needRefreshPartitionIds.size() == mtmv.getPartitionIds().size()) {
return MTMVTaskRefreshMode.COMPLETE;
} else {
return MTMVTaskRefreshMode.PARTIAL;
}
}

private List<Long> calculateNeedRefreshPartitions() throws AnalysisException {
// check whether the user manually triggers it
if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
if (taskContext.isComplete()) {
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
return;
return mtmv.getPartitionIds();
} else if (!CollectionUtils
.isEmpty(taskContext.getPartitions())) {
this.refreshMode = MTMVTaskRefreshMode.PARTITION;
this.refreshPartitionIds = MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions());
return;
return MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions());
}
}
// check if data is fresh
Set<String> excludedTriggerTables = mtmv.getExcludedTriggerTables();
boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), excludedTriggerTables, 0L);
if (fresh) {
this.refreshMode = MTMVTaskRefreshMode.NOT_REFRESH;
return;
return Lists.newArrayList();
}
// current, if partitionType is SELF_MANAGE, we can only FULL refresh
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
return;
return mtmv.getPartitionIds();
}
// if refreshMethod is COMPLETE, we only FULL refresh
if (mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.COMPLETE) {
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
return;
}
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
excludedTriggerTables.add(relatedTable.getName());
// check if every table except relatedTable is fresh
Set<Long> mtmvNeedRefreshPartitions = MTMVUtil.getMTMVNeedRefreshPartitions(mtmv);
// if true, we can use `Partition`, otherwise must `FULL`
if (mtmvNeedRefreshPartitions.size() != mtmv.getPartitionNum()) {
this.refreshMode = MTMVTaskRefreshMode.PARTITION;
this.refreshPartitionIds = mtmvNeedRefreshPartitions;
return;
} else {
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
return;
return mtmv.getPartitionIds();
}
return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv);
}

public MTMVTaskContext getTaskContext() {
Expand Down
10 changes: 5 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,16 @@ public static Set<Long> getMTMVStalePartitions(MTMV mtmv, OlapTable relatedTable
return ids;
}

public static List<String> getPartitionNamesByIds(MTMV mtmv, Set<Long> ids) throws AnalysisException {
public static List<String> getPartitionNamesByIds(MTMV mtmv, Collection<Long> ids) throws AnalysisException {
List<String> res = Lists.newArrayList();
for (Long partitionId : ids) {
res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName());
}
return res;
}

public static Set<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException {
Set<Long> res = Sets.newHashSet();
public static List<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException {
List<Long> res = Lists.newArrayList();
for (String partitionName : partitions) {
Partition partition = mtmv.getPartitionOrAnalysisException(partitionName);
res.add(partition.getId());
Expand Down Expand Up @@ -286,9 +286,9 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
return res;
}

public static Set<Long> getMTMVNeedRefreshPartitions(MTMV mtmv) {
public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv) {
Collection<Partition> allPartitions = mtmv.getPartitions();
Set<Long> res = Sets.newHashSet();
List<Long> res = Lists.newArrayList();
for (Partition partition : allPartitions) {
try {
if (!isMTMVPartitionSync(mtmv, partition.getId(), mtmv.getRelation().getBaseTables(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ private void analyzeProperties() {
throw new org.apache.doris.nereids.exceptions.AnalysisException(
"valid grace_period: " + properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD));
}
} else if (PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM.equals(key)) {
String refreshPartitionNum = properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
try {
Integer.parseInt(refreshPartitionNum);
} catch (NumberFormatException e) {
throw new AnalysisException(
"valid refresh_partition_num: " + properties
.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
}
} else if (PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES.equals(key)) {
// nothing
} else {
throw new org.apache.doris.nereids.exceptions.AnalysisException("illegal key:" + key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ private void analyzeProperties() {
mvProperties.put(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD, gracePeriod);
properties.remove(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD);
}
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
String refreshPartitionNum = properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
try {
Integer.parseInt(refreshPartitionNum);
} catch (NumberFormatException e) {
throw new AnalysisException(
"valid refresh_partition_num: " + properties
.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
}
mvProperties.put(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM, refreshPartitionNum);
properties.remove(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
}
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
String excludedTriggerTables = properties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
mvProperties.put(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES, excludedTriggerTables);
Expand Down Expand Up @@ -228,9 +240,9 @@ private void analyzePartition(NereidsPlanner planner) {
if (!(followTable instanceof OlapTable)) {
throw new AnalysisException("base table for partitioning only can be OlapTable.");
}
Set<String> partitionColumnNames;
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
partitionColumnNames = ((OlapTable) followTable).getPartitionColumnNames();
partitionColumnNames.addAll(((OlapTable) followTable).getPartitionColumnNames());
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}
Expand Down

0 comments on commit f1144e3

Please sign in to comment.