From f505967ba4bce5a7edb01ef6e83ccfd6f7b656fc Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 18 Dec 2023 14:29:47 +0800 Subject: [PATCH 1/7] support refresh num --- .../java/org/apache/doris/catalog/MTMV.java | 9 ++ .../doris/common/util/PropertyAnalyzer.java | 1 + .../doris/job/extensions/mtmv/MTMVTask.java | 136 ++++++++++++------ .../java/org/apache/doris/mtmv/MTMVUtil.java | 10 +- .../commands/info/AlterMTMVPropertyInfo.java | 11 ++ .../plans/commands/info/CreateMTMVInfo.java | 12 ++ 6 files changed, 128 insertions(+), 51 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index a2c87581fc8885..fbdc213f1ff259 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -185,6 +185,15 @@ public long getGracePeriod() { } } + public int getRefreshPartitionNum() { + if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) { + int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)); + return value < 1 ? 1 : value; + } else { + return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM; + } + } + public Set getExcludedTriggerTables() { if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) { return Sets.newHashSet(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 32ba1c8306d6ef..36c9d2b5dcd195 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -159,6 +159,7 @@ public class PropertyAnalyzer { "enable_duplicate_without_keys_by_default"; public static final String PROPERTIES_GRACE_PERIOD = "grace_period"; public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES = "excluded_trigger_tables"; + public static final String PROPERTIES_REFRESH_PARTITION_NUM = "refresh_partition_num"; // For unique key data model, the feature Merge-on-Write will leverage a primary // key index and a delete-bitmap to mark duplicate keys as deleted in load stage, // which can avoid the merging cost in read stage, and accelerate the aggregation diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 8b2d2835127484..e10aba23c772b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -46,6 +46,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.Gson; @@ -54,6 +55,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.util.List; import java.util.Map; import java.util.Objects; @@ -62,7 +65,7 @@ public class MTMVTask extends AbstractTask { private static final Logger LOG = LogManager.getLogger(MTMVTask.class); - public static final Long MAX_HISTORY_TASKS_NUM = 100L; + public static final Integer DEFAULT_REFRESH_PARTITION_NUM = 1; public static final ImmutableList SCHEMA = ImmutableList.of( new Column("TaskId", ScalarType.createStringType()), @@ -78,7 +81,9 @@ public class MTMVTask extends AbstractTask { new Column("DurationMs", ScalarType.createStringType()), new Column("TaskContext", ScalarType.createStringType()), new Column("RefreshMode", ScalarType.createStringType()), - new Column("RefreshPartitions", ScalarType.createStringType())); + new Column("NeedRefreshPartitions", ScalarType.createStringType()), + new Column("CompletedPartitions", ScalarType.createStringType()), + new Column("Progress", ScalarType.createStringType())); public static final ImmutableMap COLUMN_TO_INDEX; @@ -97,7 +102,7 @@ public enum MTMVTaskTriggerMode { public enum MTMVTaskRefreshMode { COMPLETE, - PARTITION, + PARTIAL, NOT_REFRESH } @@ -107,15 +112,16 @@ public enum MTMVTaskRefreshMode { private long mtmvId; @SerializedName("taskContext") private MTMVTaskContext taskContext; - @SerializedName("refreshPartitions") - List refreshPartitions; + @SerializedName("needRefreshPartitions") + List needRefreshPartitions; + @SerializedName("completedPartitions") + List completedPartitions; @SerializedName("refreshMode") MTMVTaskRefreshMode refreshMode; private MTMV mtmv; private MTMVRelation relation; private StmtExecutor executor; - private Set refreshPartitionIds = Sets.newHashSet(); public MTMVTask() { } @@ -130,30 +136,42 @@ public MTMVTask(long dbId, long mtmvId, MTMVTaskContext taskContext) { public void run() throws JobException { try { ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); - TUniqueId queryId = generateQueryId(); // Every time a task is run, the relation is regenerated because baseTables and baseViews may change, // such as deleting a table and creating a view with the same name - relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); - calculateRefreshInfo(); - Map tableWithPartKey = Maps.newHashMap(); + this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); + List needRefreshPartitionIds = calculateNeedRefreshPartitions(); + this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds); + this.refreshMode = generateRefreshMode(needRefreshPartitionIds); if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) { return; - } else if (refreshMode == MTMVTaskRefreshMode.PARTITION) { - OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable()); - tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol()); } - refreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, refreshPartitionIds); - UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand - .from(mtmv, refreshPartitionIds, tableWithPartKey); - executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); - ctx.setQueryId(queryId); - command.run(ctx, executor); + Map tableWithPartKey = getIncrementalTableMap(); + int refreshPartitionNum = mtmv.getRefreshPartitionNum(); + long execNums = needRefreshPartitionIds.size() / refreshPartitionNum; + for (int i = 0; i < execNums; i++) { + int start = i * refreshPartitionNum; + int end = start + refreshPartitionNum; + Set execPartitionIds = Sets.newHashSet(needRefreshPartitionIds + .subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end)); + doRefresh(ctx, execPartitionIds, tableWithPartKey); + completedPartitions.addAll(MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds)); + } } catch (Throwable e) { LOG.warn("run task failed: ", e); throw new JobException(e); } } + public void doRefresh(ConnectContext ctx, Set refreshPartitionIds, Map tableWithPartKey) + throws Exception { + TUniqueId queryId = generateQueryId(); + UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand + .from(mtmv, refreshPartitionIds, tableWithPartKey); + executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); + ctx.setQueryId(queryId); + command.run(ctx, executor); + } + @Override public synchronized void onFail() throws JobException { super.onFail(); @@ -214,10 +232,35 @@ public TRow getTvfInfo() { new TCell().setStringVal(refreshMode == null ? FeConstants.null_string : refreshMode.toString())); trow.addToColumnValue( new TCell().setStringVal( - refreshPartitions == null ? FeConstants.null_string : new Gson().toJson(refreshPartitions))); + needRefreshPartitions == null ? FeConstants.null_string : new Gson().toJson( + needRefreshPartitions))); + trow.addToColumnValue( + new TCell().setStringVal( + completedPartitions == null ? FeConstants.null_string : new Gson().toJson( + completedPartitions))); + trow.addToColumnValue( + new TCell().setStringVal(getProgress())); return trow; } + private String getProgress() { + if (CollectionUtils.isEmpty(needRefreshPartitions)) { + return FeConstants.null_string; + } + int completedSize = CollectionUtils.isEmpty(needRefreshPartitions) ? 0 : needRefreshPartitions.size(); + BigDecimal result = new BigDecimal(completedSize * 100) + .divide(new BigDecimal(needRefreshPartitions.size()), 2, RoundingMode.HALF_UP); + StringBuilder builder = new StringBuilder(result.toString()); + builder.append("%"); + builder.append(" "); + builder.append("("); + builder.append(completedSize); + builder.append("/"); + builder.append(needRefreshPartitions.size()); + builder.append(")"); + return builder.toString(); + } + private TUniqueId generateQueryId() { UUID taskId = UUID.randomUUID(); return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); @@ -229,52 +272,53 @@ private void after() { mtmv = null; relation = null; executor = null; - refreshPartitionIds = null; } - private void calculateRefreshInfo() throws AnalysisException { + private Map getIncrementalTableMap() throws AnalysisException { + Map tableWithPartKey = Maps.newHashMap(); + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable()); + tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol()); + } + return tableWithPartKey; + } + + + private MTMVTaskRefreshMode generateRefreshMode(List needRefreshPartitionIds) { + if (CollectionUtils.isEmpty(needRefreshPartitionIds)) { + return MTMVTaskRefreshMode.NOT_REFRESH; + } else if (needRefreshPartitionIds.size() == mtmv.getPartitionIds().size()) { + return MTMVTaskRefreshMode.COMPLETE; + } else { + return MTMVTaskRefreshMode.PARTIAL; + } + } + + private List calculateNeedRefreshPartitions() throws AnalysisException { // check whether the user manually triggers it if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) { if (taskContext.isComplete()) { - this.refreshMode = MTMVTaskRefreshMode.COMPLETE; - return; + return mtmv.getPartitionIds(); } else if (!CollectionUtils .isEmpty(taskContext.getPartitions())) { - this.refreshMode = MTMVTaskRefreshMode.PARTITION; - this.refreshPartitionIds = MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions()); - return; + return MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions()); } } // check if data is fresh Set excludedTriggerTables = mtmv.getExcludedTriggerTables(); boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), excludedTriggerTables, 0L); if (fresh) { - this.refreshMode = MTMVTaskRefreshMode.NOT_REFRESH; - return; + return Lists.newArrayList(); } // current, if partitionType is SELF_MANAGE, we can only FULL refresh if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.SELF_MANAGE) { - this.refreshMode = MTMVTaskRefreshMode.COMPLETE; - return; + return mtmv.getPartitionIds(); } // if refreshMethod is COMPLETE, we only FULL refresh if (mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.COMPLETE) { - this.refreshMode = MTMVTaskRefreshMode.COMPLETE; - return; - } - OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable()); - excludedTriggerTables.add(relatedTable.getName()); - // check if every table except relatedTable is fresh - Set mtmvNeedRefreshPartitions = MTMVUtil.getMTMVNeedRefreshPartitions(mtmv); - // if true, we can use `Partition`, otherwise must `FULL` - if (mtmvNeedRefreshPartitions.size() != mtmv.getPartitionNum()) { - this.refreshMode = MTMVTaskRefreshMode.PARTITION; - this.refreshPartitionIds = mtmvNeedRefreshPartitions; - return; - } else { - this.refreshMode = MTMVTaskRefreshMode.COMPLETE; - return; + return mtmv.getPartitionIds(); } + return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv); } public MTMVTaskContext getTaskContext() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index 81da2946ff1223..308138157cf1b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -172,7 +172,7 @@ public static Set getMTMVStalePartitions(MTMV mtmv, OlapTable relatedTable return ids; } - public static List getPartitionNamesByIds(MTMV mtmv, Set ids) throws AnalysisException { + public static List getPartitionNamesByIds(MTMV mtmv, Collection ids) throws AnalysisException { List res = Lists.newArrayList(); for (Long partitionId : ids) { res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName()); @@ -180,8 +180,8 @@ public static List getPartitionNamesByIds(MTMV mtmv, Set ids) thro return res; } - public static Set getPartitionsIdsByNames(MTMV mtmv, List partitions) throws AnalysisException { - Set res = Sets.newHashSet(); + public static List getPartitionsIdsByNames(MTMV mtmv, List partitions) throws AnalysisException { + List res = Lists.newArrayList(); for (String partitionName : partitions) { Partition partition = mtmv.getPartitionOrAnalysisException(partitionName); res.add(partition.getId()); @@ -286,9 +286,9 @@ public static Collection getMTMVCanRewritePartitions(MTMV mtmv, Conne return res; } - public static Set getMTMVNeedRefreshPartitions(MTMV mtmv) { + public static List getMTMVNeedRefreshPartitions(MTMV mtmv) { Collection allPartitions = mtmv.getPartitions(); - Set res = Sets.newHashSet(); + List res = Lists.newArrayList(); for (Partition partition : allPartitions) { try { if (!isMTMVPartitionSync(mtmv, partition.getId(), mtmv.getRelation().getBaseTables(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java index 5f9be47fc70c3c..d90e2e8f1beca7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVPropertyInfo.java @@ -60,6 +60,17 @@ private void analyzeProperties() { throw new org.apache.doris.nereids.exceptions.AnalysisException( "valid grace_period: " + properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)); } + } else if (PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM.equals(key)) { + String refreshPartitionNum = properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM); + try { + Integer.parseInt(refreshPartitionNum); + } catch (NumberFormatException e) { + throw new AnalysisException( + "valid refresh_partition_num: " + properties + .get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)); + } + } else if (PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES.equals(key)) { + // nothing } else { throw new org.apache.doris.nereids.exceptions.AnalysisException("illegal key:" + key); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java index 66bc1b22107969..bc6b1354901d0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java @@ -174,6 +174,18 @@ private void analyzeProperties() { mvProperties.put(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD, gracePeriod); properties.remove(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD); } + if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) { + String refreshPartitionNum = properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM); + try { + Integer.parseInt(refreshPartitionNum); + } catch (NumberFormatException e) { + throw new AnalysisException( + "valid refresh_partition_num: " + properties + .get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)); + } + mvProperties.put(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM, refreshPartitionNum); + properties.remove(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM); + } if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) { String excludedTriggerTables = properties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES); mvProperties.put(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES, excludedTriggerTables); From aa57205102611581bfeca5dad7ac65190e8d036e Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 18 Dec 2023 16:03:06 +0800 Subject: [PATCH 2/7] support refresh num --- .../doris/job/extensions/mtmv/MTMVTask.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index e10aba23c772b0..4ef8d58f207164 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -146,15 +146,19 @@ public void run() throws JobException { return; } Map tableWithPartKey = getIncrementalTableMap(); + this.completedPartitions = Lists.newArrayList(); int refreshPartitionNum = mtmv.getRefreshPartitionNum(); - long execNums = needRefreshPartitionIds.size() / refreshPartitionNum; - for (int i = 0; i < execNums; i++) { + long execNum = (needRefreshPartitionIds.size() / refreshPartitionNum) + ((needRefreshPartitionIds.size() + % refreshPartitionNum) > 0 ? 1 : 0); + for (int i = 0; i < execNum; i++) { int start = i * refreshPartitionNum; int end = start + refreshPartitionNum; Set execPartitionIds = Sets.newHashSet(needRefreshPartitionIds .subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end)); - doRefresh(ctx, execPartitionIds, tableWithPartKey); - completedPartitions.addAll(MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds)); + // need get names before exec + List execPartitionNames = MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds); + exec(ctx, execPartitionIds, tableWithPartKey); + completedPartitions.addAll(execPartitionNames); } } catch (Throwable e) { LOG.warn("run task failed: ", e); @@ -162,11 +166,13 @@ public void run() throws JobException { } } - public void doRefresh(ConnectContext ctx, Set refreshPartitionIds, Map tableWithPartKey) + public void exec(ConnectContext ctx, Set refreshPartitionIds, Map tableWithPartKey) throws Exception { TUniqueId queryId = generateQueryId(); + // if SELF_MANAGE, will not have partitionItem, so we give empty set UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand - .from(mtmv, refreshPartitionIds, tableWithPartKey); + .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE + ? refreshPartitionIds : Sets.newHashSet(), tableWithPartKey); executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); ctx.setQueryId(queryId); command.run(ctx, executor); @@ -247,7 +253,7 @@ private String getProgress() { if (CollectionUtils.isEmpty(needRefreshPartitions)) { return FeConstants.null_string; } - int completedSize = CollectionUtils.isEmpty(needRefreshPartitions) ? 0 : needRefreshPartitions.size(); + int completedSize = CollectionUtils.isEmpty(completedPartitions) ? 0 : completedPartitions.size(); BigDecimal result = new BigDecimal(completedSize * 100) .divide(new BigDecimal(needRefreshPartitions.size()), 2, RoundingMode.HALF_UP); StringBuilder builder = new StringBuilder(result.toString()); From f38c2cbf676d0108a83bd4c96fea52fa495a5067 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 18 Dec 2023 16:52:17 +0800 Subject: [PATCH 3/7] support refresh num --- .../main/java/org/apache/doris/common/proc/TableProcDir.java | 2 +- .../java/org/apache/doris/datasource/InternalCatalog.java | 3 ++- .../java/org/apache/doris/job/extensions/mtmv/MTMVTask.java | 4 ++++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TableProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TableProcDir.java index a06d8b4f78e028..dc1b80c48adc7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TableProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TableProcDir.java @@ -86,7 +86,7 @@ public ProcNodeInterface lookup(String entryName) throws AnalysisException { throw new AnalysisException("Table[" + table.getName() + "] is not a OLAP or ELASTICSEARCH table"); } } else if (entryName.equals(TEMP_PARTITIONS)) { - if (table.getType() == TableType.OLAP) { + if (table instanceof OlapTable) { return new PartitionsProcDir((Database) db, (OlapTable) table, true); } else { throw new AnalysisException("Table[" + table.getName() + "] does not have temp partitions"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 2ca083c73fb1b1..dc332b96d0377e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1707,7 +1707,8 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause } PartitionInfo partitionInfo = olapTable.getPartitionInfo(); - if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) { + if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST + && !isTempPartition) { throw new DdlException("Alter table [" + olapTable.getName() + "] failed. Not a partitioned table"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 4ef8d58f207164..ef1edc84947c49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -39,6 +39,7 @@ import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; @@ -176,6 +177,9 @@ public void exec(ConnectContext ctx, Set refreshPartitionIds, Map Date: Mon, 18 Dec 2023 17:34:59 +0800 Subject: [PATCH 4/7] support refresh num --- .../doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java index bc6b1354901d0d..ab17401797cac2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java @@ -247,7 +247,7 @@ private void analyzePartition(NereidsPlanner planner) { throw new AnalysisException(e.getMessage(), e); } - if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) { + if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn().toLowerCase())) { throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn()); } if (partitionColumnNames.size() != 1) { From c6902b13da79d0ff199a21df7f223705467fc8a9 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 18 Dec 2023 20:03:45 +0800 Subject: [PATCH 5/7] 1 --- fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java | 2 +- .../java/org/apache/doris/job/extensions/mtmv/MTMVTask.java | 4 +--- .../nereids/trees/plans/commands/info/CreateMTMVInfo.java | 6 +++--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index fbdc213f1ff259..caab7d428fed22 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -188,7 +188,7 @@ public long getGracePeriod() { public int getRefreshPartitionNum() { if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) { int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)); - return value < 1 ? 1 : value; + return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value; } else { return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index ef1edc84947c49..439268fa0b92e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -261,9 +261,7 @@ private String getProgress() { BigDecimal result = new BigDecimal(completedSize * 100) .divide(new BigDecimal(needRefreshPartitions.size()), 2, RoundingMode.HALF_UP); StringBuilder builder = new StringBuilder(result.toString()); - builder.append("%"); - builder.append(" "); - builder.append("("); + builder.append("% ("); builder.append(completedSize); builder.append("/"); builder.append(needRefreshPartitions.size()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java index ab17401797cac2..c8a4bfeebb1c68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java @@ -240,14 +240,14 @@ private void analyzePartition(NereidsPlanner planner) { if (!(followTable instanceof OlapTable)) { throw new AnalysisException("base table for partitioning only can be OlapTable."); } - Set partitionColumnNames; + Set partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); try { - partitionColumnNames = ((OlapTable) followTable).getPartitionColumnNames(); + partitionColumnNames.addAll(((OlapTable) followTable).getPartitionColumnNames()); } catch (DdlException e) { throw new AnalysisException(e.getMessage(), e); } - if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn().toLowerCase())) { + if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) { throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn()); } if (partitionColumnNames.size() != 1) { From bbdba262f7d78b04498c1658ea60e1175c7d65f3 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 18 Dec 2023 20:07:01 +0800 Subject: [PATCH 6/7] 1 --- .../java/org/apache/doris/job/extensions/mtmv/MTMVTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 439268fa0b92e1..713ac0f332d546 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -66,7 +66,7 @@ public class MTMVTask extends AbstractTask { private static final Logger LOG = LogManager.getLogger(MTMVTask.class); - public static final Integer DEFAULT_REFRESH_PARTITION_NUM = 1; + public static final int DEFAULT_REFRESH_PARTITION_NUM = 1; public static final ImmutableList SCHEMA = ImmutableList.of( new Column("TaskId", ScalarType.createStringType()), From 5b4cf0a10165c8a8ed8ce3982f6555d1455c640f Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Tue, 19 Dec 2023 10:39:17 +0800 Subject: [PATCH 7/7] 1 --- .../java/org/apache/doris/job/extensions/mtmv/MTMVTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 713ac0f332d546..e13b45885d275e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -167,10 +167,10 @@ public void run() throws JobException { } } - public void exec(ConnectContext ctx, Set refreshPartitionIds, Map tableWithPartKey) + private void exec(ConnectContext ctx, Set refreshPartitionIds, Map tableWithPartKey) throws Exception { TUniqueId queryId = generateQueryId(); - // if SELF_MANAGE, will not have partitionItem, so we give empty set + // if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE ? refreshPartitionIds : Sets.newHashSet(), tableWithPartKey);