Skip to content

Commit

Permalink
[improvement](balance) don't balance tablet which has unfinish alter …
Browse files Browse the repository at this point in the history
…job (#39121)

Improvement:   don't balance tablets that have unfished alter job.

Also fix partition rebalancer may balance colocate tablets.
  • Loading branch information
yujun777 authored and dataroaring committed Aug 16, 2024
1 parent 6677024 commit 4433bdf
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 36 deletions.
23 changes: 23 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -89,6 +90,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

public class Alter {
private static final Logger LOG = LogManager.getLogger(Alter.class);
Expand Down Expand Up @@ -905,6 +907,27 @@ private void processModifyMinLoadReplicaNum(Database db, OlapTable olapTable, Al
}
}

public Set<Long> getUnfinishedAlterTableIds() {
Set<Long> unfinishedTableIds = Sets.newHashSet();
for (AlterJobV2 job : schemaChangeHandler.getAlterJobsV2().values()) {
if (!job.isDone()) {
unfinishedTableIds.add(job.getTableId());
}
}
for (IndexChangeJob job : ((SchemaChangeHandler) schemaChangeHandler).getIndexChangeJobs().values()) {
if (!job.isDone()) {
unfinishedTableIds.add(job.getTableId());
}
}
for (AlterJobV2 job : materializedViewHandler.getAlterJobsV2().values()) {
if (!job.isDone()) {
unfinishedTableIds.add(job.getTableId());
}
}

return unfinishedTableIds;
}

public AlterHandler getSchemaChangeHandler() {
return schemaChangeHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,10 @@ private void changeTableState(long dbId, long tableId, OlapTableState olapTableS
}
}

public Map<Long, IndexChangeJob> getIndexChangeJobs() {
return indexChangeJobs;
}

public List<List<Comparable>> getAllIndexChangeJobInfos() {
List<List<Comparable>> indexChangeJobInfos = new LinkedList<>();
for (IndexChangeJob indexChangeJob : ImmutableList.copyOf(indexChangeJobs.values())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.doris.clone;

import org.apache.doris.catalog.CatalogRecycleBin;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
Expand All @@ -31,7 +28,6 @@
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
Expand Down Expand Up @@ -120,15 +116,7 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium);

List<String> alternativeTabletInfos = Lists.newArrayList();

// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
// so in clone ut recycleBin need to set to null.
CatalogRecycleBin recycleBin = null;
if (!FeConstants.runningUnitTest) {
recycleBin = Env.getCurrentRecycleBin();
}
int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
List<Set<Long>> lowBETablets = lowBEs.stream()
.map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
.collect(Collectors.toList());
Expand Down Expand Up @@ -230,11 +218,7 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
long replicaDataSize = replica.getDataSize();
if (remainingPaths.containsKey(replicaPathHash)) {
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (tabletMeta == null) {
continue;
}

if (colocateTableIndex.isColocateTable(tabletMeta.getTableId())) {
if (!canBalanceTablet(tabletMeta)) {
continue;
}

Expand All @@ -245,11 +229,6 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
continue;
}

if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(),
tabletMeta.getTableId(), tabletMeta.getPartitionId())) {
continue;
}

boolean isFit = lowBEs.stream().anyMatch(be -> be.isFit(replicaDataSize,
medium, null, false) == BalanceStatus.OK);
if (!isFit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.clone;

import org.apache.doris.catalog.CatalogRecycleBin;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
Expand Down Expand Up @@ -59,6 +58,7 @@ public class DiskRebalancer extends Rebalancer {
public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex,
Map<Long, PathSlot> backendsWorkingSlots) {
super(infoService, invertedIndex, backendsWorkingSlots);
canBalanceColocateTable = true;
}

public List<BackendLoadStatistic> filterByPrioBackends(List<BackendLoadStatistic> bes) {
Expand Down Expand Up @@ -163,12 +163,6 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
return alternativeTablets;
}

// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
// so in clone ut recycleBin need to set to null.
CatalogRecycleBin recycleBin = null;
if (!FeConstants.runningUnitTest) {
recycleBin = Env.getCurrentRecycleBin();
}
Set<Long> alternativeTabletIds = Sets.newHashSet();
Set<Long> unbalancedBEs = Sets.newHashSet();
// choose tablets from backends randomly.
Expand Down Expand Up @@ -243,11 +237,7 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
long replicaPathHash = replica.getPathHash();
if (remainingPaths.containsKey(replicaPathHash)) {
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (tabletMeta == null) {
continue;
}
if (recycleBin != null && recycleBin.isRecyclePartition(tabletMeta.getDbId(),
tabletMeta.getTableId(), tabletMeta.getPartitionId())) {
if (!canBalanceTablet(tabletMeta)) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium));

BiPredicate<Long, TabletMeta> canMoveTablet = (Long tabletId, TabletMeta tabletMeta) -> {
return tabletMeta != null
return canBalanceTablet(tabletMeta)
&& tabletMeta.getPartitionId() == move.partitionId
&& tabletMeta.getIndexId() == move.indexId
&& !invalidIds.contains(tabletId)
Expand Down
30 changes: 29 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.doris.clone;

import org.apache.doris.catalog.CatalogRecycleBin;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
Expand All @@ -29,13 +34,14 @@
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;

import java.util.Set;

/*
* Rebalancer is responsible for
Expand All @@ -61,6 +67,9 @@ public abstract class Rebalancer {
// be id -> end time of prio
protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();

protected boolean canBalanceColocateTable = false;
private Set<Long> alterTableIds = Sets.newHashSet();

// tag -> (medium, timestamp)
private Table<Tag, TStorageMedium, Long> lastPickTimeTable = HashBasedTable.create();

Expand Down Expand Up @@ -106,6 +115,21 @@ protected boolean unPickOverLongTime(Tag tag, TStorageMedium medium) {
return lastPickTime == null || now - lastPickTime >= Config.be_rebalancer_idle_seconds * 1000L;
}

protected boolean canBalanceTablet(TabletMeta tabletMeta) {
// Clone ut mocked env, but CatalogRecycleBin is not mockable (it extends from Thread)
// so in clone ut recycleBin need to set to null.
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
CatalogRecycleBin recycleBin = null;
if (!FeConstants.runningUnitTest) {
recycleBin = Env.getCurrentRecycleBin();
}
return tabletMeta != null
&& !alterTableIds.contains(tabletMeta.getTableId())
&& (canBalanceColocateTable || !colocateTableIndex.isColocateTable(tabletMeta.getTableId()))
&& (recycleBin == null || !recycleBin.isRecyclePartition(tabletMeta.getDbId(),
tabletMeta.getTableId(), tabletMeta.getPartitionId()));
}

public AgentTask createBalanceTask(TabletSchedCtx tabletCtx)
throws SchedException {
completeSchedCtx(tabletCtx);
Expand Down Expand Up @@ -139,6 +163,10 @@ public void updateLoadStatistic(Map<Tag, LoadStatisticForTag> statisticMap) {
this.statisticMap = statisticMap;
}

public void updateAlterTableIds(Set<Long> alterTableIds) {
this.alterTableIds = alterTableIds;
}

public void addPrioBackends(List<Backend> backends, long timeoutS) {
long currentTimeMillis = System.currentTimeMillis();
for (Backend backend : backends) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ private void updateLoadStatistics() {
rebalancer.updateLoadStatistic(statisticMap);
diskRebalancer.updateLoadStatistic(statisticMap);

Set<Long> alterTableIds = Env.getCurrentEnv().getAlterInstance().getUnfinishedAlterTableIds();
rebalancer.updateAlterTableIds(alterTableIds);
diskRebalancer.updateAlterTableIds(alterTableIds);

lastStatUpdateTime = System.currentTimeMillis();
}

Expand Down

0 comments on commit 4433bdf

Please sign in to comment.