Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](balance) don't balance tablet which has unfinish alter job #39121 #39202

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -86,6 +87,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

public class Alter {
private static final Logger LOG = LogManager.getLogger(Alter.class);
Expand Down Expand Up @@ -892,6 +894,27 @@ private void processModifyMinLoadReplicaNum(Database db, OlapTable olapTable, Al
}
}

public Set<Long> getUnfinishedAlterTableIds() {
Set<Long> unfinishedTableIds = Sets.newHashSet();
for (AlterJobV2 job : schemaChangeHandler.getAlterJobsV2().values()) {
if (!job.isDone()) {
unfinishedTableIds.add(job.getTableId());
}
}
for (IndexChangeJob job : ((SchemaChangeHandler) schemaChangeHandler).getIndexChangeJobs().values()) {
if (!job.isDone()) {
unfinishedTableIds.add(job.getTableId());
}
}
for (AlterJobV2 job : materializedViewHandler.getAlterJobsV2().values()) {
if (!job.isDone()) {
unfinishedTableIds.add(job.getTableId());
}
}

return unfinishedTableIds;
}

public AlterHandler getSchemaChangeHandler() {
return schemaChangeHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,10 @@ private void changeTableState(long dbId, long tableId, OlapTableState olapTableS
}
}

public Map<Long, IndexChangeJob> getIndexChangeJobs() {
return indexChangeJobs;
}

public List<List<Comparable>> getAllIndexChangeJobInfos() {
List<List<Comparable>> indexChangeJobInfos = new LinkedList<>();
for (IndexChangeJob indexChangeJob : ImmutableList.copyOf(indexChangeJobs.values())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.doris.clone;

import org.apache.doris.catalog.CatalogRecycleBin;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
Expand All @@ -31,7 +28,6 @@
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
Expand Down Expand Up @@ -120,15 +116,7 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium);

List<String> alternativeTabletInfos = Lists.newArrayList();

// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
// so in clone ut recycleBin need to set to null.
CatalogRecycleBin recycleBin = null;
if (!FeConstants.runningUnitTest) {
recycleBin = Env.getCurrentRecycleBin();
}
int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
List<Set<Long>> lowBETablets = lowBEs.stream()
.map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
.collect(Collectors.toList());
Expand Down Expand Up @@ -230,11 +218,7 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
long replicaDataSize = replica.getDataSize();
if (remainingPaths.containsKey(replicaPathHash)) {
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (tabletMeta == null) {
continue;
}

if (colocateTableIndex.isColocateTable(tabletMeta.getTableId())) {
if (!canBalanceTablet(tabletMeta)) {
continue;
}

Expand All @@ -245,11 +229,6 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
continue;
}

if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(),
tabletMeta.getTableId(), tabletMeta.getPartitionId())) {
continue;
}

boolean isFit = lowBEs.stream().anyMatch(be -> be.isFit(replicaDataSize,
medium, null, false) == BalanceStatus.OK);
if (!isFit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.clone;

import org.apache.doris.catalog.CatalogRecycleBin;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
Expand Down Expand Up @@ -59,6 +58,7 @@ public class DiskRebalancer extends Rebalancer {
public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
Map<Long, PathSlot> backendsWorkingSlots) {
super(infoService, invertedIndex, backendsWorkingSlots);
canBalanceColocateTable = true;
}

public List<BackendLoadStatistic> filterByPrioBackends(List<BackendLoadStatistic> bes) {
Expand Down Expand Up @@ -163,12 +163,6 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
return alternativeTablets;
}

// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
// so in clone ut recycleBin need to set to null.
CatalogRecycleBin recycleBin = null;
if (!FeConstants.runningUnitTest) {
recycleBin = Env.getCurrentRecycleBin();
}
Set<Long> alternativeTabletIds = Sets.newHashSet();
Set<Long> unbalancedBEs = Sets.newHashSet();
// choose tablets from backends randomly.
Expand Down Expand Up @@ -243,11 +237,7 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
long replicaPathHash = replica.getPathHash();
if (remainingPaths.containsKey(replicaPathHash)) {
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (tabletMeta == null) {
continue;
}
if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(),
tabletMeta.getTableId(), tabletMeta.getPartitionId())) {
if (!canBalanceTablet(tabletMeta)) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium));

BiPredicate<Long, TabletMeta> canMoveTablet = (Long tabletId, TabletMeta tabletMeta) -> {
return tabletMeta != null
return canBalanceTablet(tabletMeta)
&& tabletMeta.getPartitionId() == move.partitionId
&& tabletMeta.getIndexId() == move.indexId
&& !invalidIds.contains(tabletId)
Expand Down
30 changes: 29 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.doris.clone;

import org.apache.doris.catalog.CatalogRecycleBin;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
Expand All @@ -29,13 +34,14 @@
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;

import java.util.Set;

/*
* Rebalancer is responsible for
Expand All @@ -61,6 +67,9 @@ public abstract class Rebalancer {
// be id -> end time of prio
protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();

protected boolean canBalanceColocateTable = false;
private Set<Long> alterTableIds = Sets.newHashSet();

// tag -> (medium, timestamp)
private Table<Tag, TStorageMedium, Long> lastPickTimeTable = HashBasedTable.create();

Expand Down Expand Up @@ -106,6 +115,21 @@ protected boolean unPickOverLongTime(Tag tag, TStorageMedium medium) {
return lastPickTime == null || now - lastPickTime >= Config.be_rebalancer_idle_seconds * 1000L;
}

protected boolean canBalanceTablet(TabletMeta tabletMeta) {
// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
// so in clone ut recycleBin need to set to null.
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
CatalogRecycleBin recycleBin = null;
if (!FeConstants.runningUnitTest) {
recycleBin = Env.getCurrentRecycleBin();
}
return tabletMeta != null
&& !alterTableIds.contains(tabletMeta.getTableId())
&& (canBalanceColocateTable || !colocateTableIndex.isColocateTable(tabletMeta.getTableId()))
&& (recycleBin == null || !recycleBin.isRecyclePartition(tabletMeta.getDbId(),
tabletMeta.getTableId(), tabletMeta.getPartitionId()));
}

public AgentTask createBalanceTask(TabletSchedCtx tabletCtx)
throws SchedException {
completeSchedCtx(tabletCtx);
Expand Down Expand Up @@ -139,6 +163,10 @@ public void updateLoadStatistic(Map<Tag, LoadStatisticForTag> statisticMap) {
this.statisticMap = statisticMap;
}

public void updateAlterTableIds(Set<Long> alterTableIds) {
this.alterTableIds = alterTableIds;
}

public void addPrioBackends(List<Backend> backends, long timeoutS) {
long currentTimeMillis = System.currentTimeMillis();
for (Backend backend : backends) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ private void updateLoadStatistics() {
rebalancer.updateLoadStatistic(statisticMap);
diskRebalancer.updateLoadStatistic(statisticMap);

Set<Long> alterTableIds = Env.getCurrentEnv().getAlterInstance().getUnfinishedAlterTableIds();
rebalancer.updateAlterTableIds(alterTableIds);
diskRebalancer.updateAlterTableIds(alterTableIds);

lastStatUpdateTime = System.currentTimeMillis();
}

Expand Down
Loading