Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](mtmv)create mtmv support refresh_partition_num #28566

Merged
merged 7 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ public long getGracePeriod() {
}
}

public int getRefreshPartitionNum() {
if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value;
} else {
return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM;
}
}

public Set<String> getExcludedTriggerTables() {
if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
return Sets.newHashSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public ProcNodeInterface lookup(String entryName) throws AnalysisException {
throw new AnalysisException("Table[" + table.getName() + "] is not a OLAP or ELASTICSEARCH table");
}
} else if (entryName.equals(TEMP_PARTITIONS)) {
if (table.getType() == TableType.OLAP) {
if (table instanceof OlapTable) {
return new PartitionsProcDir((Database) db, (OlapTable) table, true);
} else {
throw new AnalysisException("Table[" + table.getName() + "] does not have temp partitions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public class PropertyAnalyzer {
"enable_duplicate_without_keys_by_default";
public static final String PROPERTIES_GRACE_PERIOD = "grace_period";
public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES = "excluded_trigger_tables";
public static final String PROPERTIES_REFRESH_PARTITION_NUM = "refresh_partition_num";
// For unique key data model, the feature Merge-on-Write will leverage a primary
// key index and a delete-bitmap to mark duplicate keys as deleted in load stage,
// which can avoid the merging cost in read stage, and accelerate the aggregation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1707,7 +1707,8 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause
}

PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST
&& !isTempPartition) {
throw new DdlException("Alter table [" + olapTable.getName() + "] failed. Not a partitioned table");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
Expand All @@ -54,6 +56,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -62,7 +66,7 @@

public class MTMVTask extends AbstractTask {
private static final Logger LOG = LogManager.getLogger(MTMVTask.class);
public static final Long MAX_HISTORY_TASKS_NUM = 100L;
public static final int DEFAULT_REFRESH_PARTITION_NUM = 1;

public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("TaskId", ScalarType.createStringType()),
Expand All @@ -78,7 +82,9 @@ public class MTMVTask extends AbstractTask {
new Column("DurationMs", ScalarType.createStringType()),
new Column("TaskContext", ScalarType.createStringType()),
new Column("RefreshMode", ScalarType.createStringType()),
new Column("RefreshPartitions", ScalarType.createStringType()));
new Column("NeedRefreshPartitions", ScalarType.createStringType()),
new Column("CompletedPartitions", ScalarType.createStringType()),
new Column("Progress", ScalarType.createStringType()));

public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;

Expand All @@ -97,7 +103,7 @@ public enum MTMVTaskTriggerMode {

public enum MTMVTaskRefreshMode {
COMPLETE,
PARTITION,
PARTIAL,
NOT_REFRESH
}

Expand All @@ -107,15 +113,16 @@ public enum MTMVTaskRefreshMode {
private long mtmvId;
@SerializedName("taskContext")
private MTMVTaskContext taskContext;
@SerializedName("refreshPartitions")
List<String> refreshPartitions;
@SerializedName("needRefreshPartitions")
List<String> needRefreshPartitions;
@SerializedName("completedPartitions")
List<String> completedPartitions;
@SerializedName("refreshMode")
MTMVTaskRefreshMode refreshMode;

private MTMV mtmv;
private MTMVRelation relation;
private StmtExecutor executor;
private Set<Long> refreshPartitionIds = Sets.newHashSet();

public MTMVTask() {
}
Expand All @@ -130,30 +137,51 @@ public MTMVTask(long dbId, long mtmvId, MTMVTaskContext taskContext) {
public void run() throws JobException {
try {
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
TUniqueId queryId = generateQueryId();
// Every time a task is run, the relation is regenerated because baseTables and baseViews may change,
// such as deleting a table and creating a view with the same name
relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
calculateRefreshInfo();
Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions();
this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds);
this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
return;
} else if (refreshMode == MTMVTaskRefreshMode.PARTITION) {
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol());
}
refreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, refreshPartitionIds);
UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
.from(mtmv, refreshPartitionIds, tableWithPartKey);
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
ctx.setQueryId(queryId);
command.run(ctx, executor);
Map<OlapTable, String> tableWithPartKey = getIncrementalTableMap();
this.completedPartitions = Lists.newArrayList();
int refreshPartitionNum = mtmv.getRefreshPartitionNum();
long execNum = (needRefreshPartitionIds.size() / refreshPartitionNum) + ((needRefreshPartitionIds.size()
% refreshPartitionNum) > 0 ? 1 : 0);
for (int i = 0; i < execNum; i++) {
int start = i * refreshPartitionNum;
int end = start + refreshPartitionNum;
Set<Long> execPartitionIds = Sets.newHashSet(needRefreshPartitionIds
.subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end));
// need get names before exec
List<String> execPartitionNames = MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
exec(ctx, execPartitionIds, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
}
} catch (Throwable e) {
LOG.warn("run task failed: ", e);
throw new JobException(e);
}
}

private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds, Map<OlapTable, String> tableWithPartKey)
throws Exception {
TUniqueId queryId = generateQueryId();
// if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set
UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
.from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE
? refreshPartitionIds : Sets.newHashSet(), tableWithPartKey);
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
ctx.setQueryId(queryId);
command.run(ctx, executor);
if (ctx.getState().getStateType() != MysqlStateType.OK) {
throw new JobException(ctx.getState().getErrorMessage());
}
}

@Override
public synchronized void onFail() throws JobException {
super.onFail();
Expand Down Expand Up @@ -214,10 +242,33 @@ public TRow getTvfInfo() {
new TCell().setStringVal(refreshMode == null ? FeConstants.null_string : refreshMode.toString()));
trow.addToColumnValue(
new TCell().setStringVal(
refreshPartitions == null ? FeConstants.null_string : new Gson().toJson(refreshPartitions)));
needRefreshPartitions == null ? FeConstants.null_string : new Gson().toJson(
needRefreshPartitions)));
trow.addToColumnValue(
new TCell().setStringVal(
completedPartitions == null ? FeConstants.null_string : new Gson().toJson(
completedPartitions)));
trow.addToColumnValue(
new TCell().setStringVal(getProgress()));
return trow;
}

private String getProgress() {
if (CollectionUtils.isEmpty(needRefreshPartitions)) {
return FeConstants.null_string;
}
int completedSize = CollectionUtils.isEmpty(completedPartitions) ? 0 : completedPartitions.size();
BigDecimal result = new BigDecimal(completedSize * 100)
.divide(new BigDecimal(needRefreshPartitions.size()), 2, RoundingMode.HALF_UP);
StringBuilder builder = new StringBuilder(result.toString());
builder.append("% (");
builder.append(completedSize);
builder.append("/");
builder.append(needRefreshPartitions.size());
builder.append(")");
return builder.toString();
}

private TUniqueId generateQueryId() {
UUID taskId = UUID.randomUUID();
return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
Expand All @@ -229,52 +280,53 @@ private void after() {
mtmv = null;
relation = null;
executor = null;
refreshPartitionIds = null;
}

private void calculateRefreshInfo() throws AnalysisException {
private Map<OlapTable, String> getIncrementalTableMap() throws AnalysisException {
Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol());
}
return tableWithPartKey;
}


private MTMVTaskRefreshMode generateRefreshMode(List<Long> needRefreshPartitionIds) {
if (CollectionUtils.isEmpty(needRefreshPartitionIds)) {
return MTMVTaskRefreshMode.NOT_REFRESH;
} else if (needRefreshPartitionIds.size() == mtmv.getPartitionIds().size()) {
return MTMVTaskRefreshMode.COMPLETE;
} else {
return MTMVTaskRefreshMode.PARTIAL;
}
}

private List<Long> calculateNeedRefreshPartitions() throws AnalysisException {
// check whether the user manually triggers it
if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
if (taskContext.isComplete()) {
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
return;
return mtmv.getPartitionIds();
} else if (!CollectionUtils
.isEmpty(taskContext.getPartitions())) {
this.refreshMode = MTMVTaskRefreshMode.PARTITION;
this.refreshPartitionIds = MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions());
return;
return MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions());
}
}
// check if data is fresh
Set<String> excludedTriggerTables = mtmv.getExcludedTriggerTables();
boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), excludedTriggerTables, 0L);
if (fresh) {
this.refreshMode = MTMVTaskRefreshMode.NOT_REFRESH;
return;
return Lists.newArrayList();
}
// current, if partitionType is SELF_MANAGE, we can only FULL refresh
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
return;
return mtmv.getPartitionIds();
}
// if refreshMethod is COMPLETE, we only FULL refresh
if (mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.COMPLETE) {
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
return;
}
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
excludedTriggerTables.add(relatedTable.getName());
// check if every table except relatedTable is fresh
Set<Long> mtmvNeedRefreshPartitions = MTMVUtil.getMTMVNeedRefreshPartitions(mtmv);
// if true, we can use `Partition`, otherwise must `FULL`
if (mtmvNeedRefreshPartitions.size() != mtmv.getPartitionNum()) {
this.refreshMode = MTMVTaskRefreshMode.PARTITION;
this.refreshPartitionIds = mtmvNeedRefreshPartitions;
return;
} else {
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
return;
return mtmv.getPartitionIds();
}
return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv);
}

public MTMVTaskContext getTaskContext() {
Expand Down
10 changes: 5 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,16 @@ public static Set<Long> getMTMVStalePartitions(MTMV mtmv, OlapTable relatedTable
return ids;
}

public static List<String> getPartitionNamesByIds(MTMV mtmv, Set<Long> ids) throws AnalysisException {
public static List<String> getPartitionNamesByIds(MTMV mtmv, Collection<Long> ids) throws AnalysisException {
List<String> res = Lists.newArrayList();
for (Long partitionId : ids) {
res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName());
}
return res;
}

public static Set<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException {
Set<Long> res = Sets.newHashSet();
public static List<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException {
List<Long> res = Lists.newArrayList();
for (String partitionName : partitions) {
Partition partition = mtmv.getPartitionOrAnalysisException(partitionName);
res.add(partition.getId());
Expand Down Expand Up @@ -286,9 +286,9 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
return res;
}

public static Set<Long> getMTMVNeedRefreshPartitions(MTMV mtmv) {
public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv) {
Collection<Partition> allPartitions = mtmv.getPartitions();
Set<Long> res = Sets.newHashSet();
List<Long> res = Lists.newArrayList();
for (Partition partition : allPartitions) {
try {
if (!isMTMVPartitionSync(mtmv, partition.getId(), mtmv.getRelation().getBaseTables(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ private void analyzeProperties() {
throw new org.apache.doris.nereids.exceptions.AnalysisException(
"valid grace_period: " + properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD));
}
} else if (PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM.equals(key)) {
String refreshPartitionNum = properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
try {
Integer.parseInt(refreshPartitionNum);
} catch (NumberFormatException e) {
throw new AnalysisException(
"valid refresh_partition_num: " + properties
.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
}
} else if (PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES.equals(key)) {
// nothing
} else {
throw new org.apache.doris.nereids.exceptions.AnalysisException("illegal key:" + key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ private void analyzeProperties() {
mvProperties.put(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD, gracePeriod);
properties.remove(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD);
}
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
String refreshPartitionNum = properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
try {
Integer.parseInt(refreshPartitionNum);
} catch (NumberFormatException e) {
throw new AnalysisException(
"valid refresh_partition_num: " + properties
.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
}
mvProperties.put(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM, refreshPartitionNum);
properties.remove(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
}
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
String excludedTriggerTables = properties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
mvProperties.put(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES, excludedTriggerTables);
Expand Down Expand Up @@ -228,9 +240,9 @@ private void analyzePartition(NereidsPlanner planner) {
if (!(followTable instanceof OlapTable)) {
throw new AnalysisException("base table for partitioning only can be OlapTable.");
}
Set<String> partitionColumnNames;
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
partitionColumnNames = ((OlapTable) followTable).getPartitionColumnNames();
partitionColumnNames.addAll(((OlapTable) followTable).getPartitionColumnNames());
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}
Expand Down
Loading