diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index ebf99ce2e9ef30..12166bfe4b7e35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -78,6 +78,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -86,6 +87,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; public class Alter { private static final Logger LOG = LogManager.getLogger(Alter.class); @@ -892,6 +894,27 @@ private void processModifyMinLoadReplicaNum(Database db, OlapTable olapTable, Al } } + public Set getUnfinishedAlterTableIds() { + Set unfinishedTableIds = Sets.newHashSet(); + for (AlterJobV2 job : schemaChangeHandler.getAlterJobsV2().values()) { + if (!job.isDone()) { + unfinishedTableIds.add(job.getTableId()); + } + } + for (IndexChangeJob job : ((SchemaChangeHandler) schemaChangeHandler).getIndexChangeJobs().values()) { + if (!job.isDone()) { + unfinishedTableIds.add(job.getTableId()); + } + } + for (AlterJobV2 job : materializedViewHandler.getAlterJobsV2().values()) { + if (!job.isDone()) { + unfinishedTableIds.add(job.getTableId()); + } + } + + return unfinishedTableIds; + } + public AlterHandler getSchemaChangeHandler() { return schemaChangeHandler; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 86ffb55ea76785..09e8c984c070cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -1734,6 +1734,10 @@ private void changeTableState(long dbId, long tableId, OlapTableState olapTableS } } + public Map getIndexChangeJobs() { + return indexChangeJobs; + } + public List> getAllIndexChangeJobInfos() { List> indexChangeJobInfos = new LinkedList<>(); for (IndexChangeJob indexChangeJob : ImmutableList.copyOf(indexChangeJobs.values())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java index 0da7428e422521..78452000ca50dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java @@ -17,9 +17,6 @@ package org.apache.doris.clone; -import org.apache.doris.catalog.CatalogRecycleBin; -import org.apache.doris.catalog.ColocateTableIndex; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; @@ -31,7 +28,6 @@ import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -120,15 +116,7 @@ protected List selectAlternativeTabletsForCluster( LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium); List alternativeTabletInfos = Lists.newArrayList(); - - // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) - // so in clone ut recycleBin need to set to null. - CatalogRecycleBin recycleBin = null; - if (!FeConstants.runningUnitTest) { - recycleBin = Env.getCurrentRecycleBin(); - } int clusterAvailableBEnum = infoService.getAllBackendIds(true).size(); - ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex(); List> lowBETablets = lowBEs.stream() .map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId()))) .collect(Collectors.toList()); @@ -230,11 +218,7 @@ protected List selectAlternativeTabletsForCluster( long replicaDataSize = replica.getDataSize(); if (remainingPaths.containsKey(replicaPathHash)) { TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId); - if (tabletMeta == null) { - continue; - } - - if (colocateTableIndex.isColocateTable(tabletMeta.getTableId())) { + if (!canBalanceTablet(tabletMeta)) { continue; } @@ -245,11 +229,6 @@ protected List selectAlternativeTabletsForCluster( continue; } - if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(), - tabletMeta.getTableId(), tabletMeta.getPartitionId())) { - continue; - } - boolean isFit = lowBEs.stream().anyMatch(be -> be.isFit(replicaDataSize, medium, null, false) == BalanceStatus.OK); if (!isFit) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java index 96eef52d597870..a8448b8ffd2f94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java @@ -17,7 +17,6 @@ package org.apache.doris.clone; -import org.apache.doris.catalog.CatalogRecycleBin; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; @@ -59,6 +58,7 @@ public class DiskRebalancer extends Rebalancer { public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex, Map backendsWorkingSlots) { super(infoService, invertedIndex, backendsWorkingSlots); + canBalanceColocateTable = true; } public List filterByPrioBackends(List bes) { @@ -163,12 +163,6 @@ protected List selectAlternativeTabletsForCluster( return alternativeTablets; } - // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) - // so in clone ut recycleBin need to set to null. - CatalogRecycleBin recycleBin = null; - if (!FeConstants.runningUnitTest) { - recycleBin = Env.getCurrentRecycleBin(); - } Set alternativeTabletIds = Sets.newHashSet(); Set unbalancedBEs = Sets.newHashSet(); // choose tablets from backends randomly. @@ -243,11 +237,7 @@ protected List selectAlternativeTabletsForCluster( long replicaPathHash = replica.getPathHash(); if (remainingPaths.containsKey(replicaPathHash)) { TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId); - if (tabletMeta == null) { - continue; - } - if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(), - tabletMeta.getTableId(), tabletMeta.getPartitionId())) { + if (!canBalanceTablet(tabletMeta)) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java index 7095ad8dc54315..5af920c74fdde2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java @@ -138,7 +138,7 @@ protected List selectAlternativeTabletsForCluster( invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium)); BiPredicate canMoveTablet = (Long tabletId, TabletMeta tabletMeta) -> { - return tabletMeta != null + return canBalanceTablet(tabletMeta) && tabletMeta.getPartitionId() == move.partitionId && tabletMeta.getIndexId() == move.indexId && !invalidIds.contains(tabletId) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java index 682c2915989895..af8bc6d67fc9d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java @@ -17,9 +17,14 @@ package org.apache.doris.clone; +import org.apache.doris.catalog.CatalogRecycleBin; +import org.apache.doris.catalog.ColocateTableIndex; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.catalog.TabletMeta; import org.apache.doris.clone.TabletScheduler.PathSlot; import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -29,13 +34,14 @@ import com.google.common.collect.HashBasedTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.collect.Table; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; - +import java.util.Set; /* * Rebalancer is responsible for @@ -61,6 +67,9 @@ public abstract class Rebalancer { // be id -> end time of prio protected Map prioBackends = Maps.newConcurrentMap(); + protected boolean canBalanceColocateTable = false; + private Set alterTableIds = Sets.newHashSet(); + // tag -> (medium, timestamp) private Table lastPickTimeTable = HashBasedTable.create(); @@ -106,6 +115,21 @@ protected boolean unPickOverLongTime(Tag tag, TStorageMedium medium) { return lastPickTime == null || now - lastPickTime >= Config.be_rebalancer_idle_seconds * 1000L; } + protected boolean canBalanceTablet(TabletMeta tabletMeta) { + // Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread) + // so in clone ut recycleBin need to set to null. + ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex(); + CatalogRecycleBin recycleBin = null; + if (!FeConstants.runningUnitTest) { + recycleBin = Env.getCurrentRecycleBin(); + } + return tabletMeta != null + && !alterTableIds.contains(tabletMeta.getTableId()) + && (canBalanceColocateTable || !colocateTableIndex.isColocateTable(tabletMeta.getTableId())) + && (recycleBin == null || !recycleBin.isRecyclePartition(tabletMeta.getDbId(), + tabletMeta.getTableId(), tabletMeta.getPartitionId())); + } + public AgentTask createBalanceTask(TabletSchedCtx tabletCtx) throws SchedException { completeSchedCtx(tabletCtx); @@ -139,6 +163,10 @@ public void updateLoadStatistic(Map statisticMap) { this.statisticMap = statisticMap; } + public void updateAlterTableIds(Set alterTableIds) { + this.alterTableIds = alterTableIds; + } + public void addPrioBackends(List backends, long timeoutS) { long currentTimeMillis = System.currentTimeMillis(); for (Backend backend : backends) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index a3a3a93e0fabab..10a21c2f30556d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -352,6 +352,10 @@ private void updateLoadStatistics() { rebalancer.updateLoadStatistic(statisticMap); diskRebalancer.updateLoadStatistic(statisticMap); + Set alterTableIds = Env.getCurrentEnv().getAlterInstance().getUnfinishedAlterTableIds(); + rebalancer.updateAlterTableIds(alterTableIds); + diskRebalancer.updateAlterTableIds(alterTableIds); + lastStatUpdateTime = System.currentTimeMillis(); }