Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick](branch-2.1) Pick "[Enhancement](group commit)Optimize be select for group commit #35558" #37830

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
request.__set_db_id(db_id);
request.__set_table_id(table_id);
request.__set_txnId(txn_id);
request.__set_groupCommit(true);
request.__set_receiveBytes(state->num_bytes_load_total());
if (_exec_env->master_info()->__isset.backend_id) {
request.__set_backendId(_exec_env->master_info()->backend_id);
} else {
LOG(WARNING) << "_exec_env->master_info not set backend_id";
}
if (state) {
request.__set_commitInfos(state->tablet_commit_infos());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.util;

import java.util.concurrent.atomic.AtomicLongArray;

public class SlidingWindowCounter {
private final int windowSizeInSeconds;
private final int numberOfBuckets;
private final AtomicLongArray buckets;
private final AtomicLongArray bucketTimestamps;

public SlidingWindowCounter(int windowSizeInSeconds) {
this.windowSizeInSeconds = windowSizeInSeconds;
this.numberOfBuckets = windowSizeInSeconds; // Each bucket represents 1 second
this.buckets = new AtomicLongArray(numberOfBuckets);
this.bucketTimestamps = new AtomicLongArray(numberOfBuckets);
}

private int getCurrentBucketIndex() {
long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds
return (int) (currentTime % numberOfBuckets);
}

private void updateCurrentBucket() {
long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds
int currentBucketIndex = getCurrentBucketIndex();
long bucketTimestamp = bucketTimestamps.get(currentBucketIndex);

if (currentTime - bucketTimestamp >= 1) {
buckets.set(currentBucketIndex, 0);
bucketTimestamps.set(currentBucketIndex, currentTime);
}
}

public void add(long value) {
updateCurrentBucket();
int bucketIndex = getCurrentBucketIndex();
buckets.addAndGet(bucketIndex, value);
}

public long get() {
updateCurrentBucket();
long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds
long count = 0;

for (int i = 0; i < numberOfBuckets; i++) {
if (currentTime - bucketTimestamps.get(i) < windowSizeInSeconds) {
count += buckets.get(i);
}
}
return count;
}

public String toString() {
return String.valueOf(get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
Expand Down Expand Up @@ -128,11 +129,16 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse
String sql = request.getHeader("sql");
LOG.info("streaming load sql={}", sql);
boolean groupCommit = false;
long tableId = -1;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
String[] pair = parseDbAndTb(sql);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
tableId = tbl.getId();
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + " is blocked on schema change";
return new RestBaseResult(msg);
Expand All @@ -150,8 +156,7 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse
}

String label = request.getHeader(LABEL_KEY);
TNetworkAddress redirectAddr;
redirectAddr = selectRedirectBackend(groupCommit);
TNetworkAddress redirectAddr = selectRedirectBackend(request, groupCommit, tableId);

LOG.info("redirect load action to destination={}, label: {}",
redirectAddr.toString(), label);
Expand Down Expand Up @@ -274,7 +279,9 @@ private Object executeWithoutPassword(HttpServletRequest request,
return new RestBaseResult(e.getMessage());
}
} else {
redirectAddr = selectRedirectBackend(groupCommit);
long tableId = ((OlapTable) ((Database) Env.getCurrentEnv().getCurrentCatalog().getDb(dbName)
.get()).getTable(tableName).get()).getId();
redirectAddr = selectRedirectBackend(request, groupCommit, tableId);
}

LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}",
Expand Down Expand Up @@ -305,7 +312,7 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) {
return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected.");
}

TNetworkAddress redirectAddr = selectRedirectBackend(false);
TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1);
LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}",
redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation);

Expand All @@ -323,12 +330,18 @@ private final synchronized int getLastSelectedBackendIndexAndUpdate() {
return index;
}

private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadException {
private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit, long tableId)
throws LoadException {
long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L);
if (debugBackendId != -1L) {
Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId);
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
return selectLocalRedirectBackend(groupCommit, request, tableId);
}

private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServletRequest request, long tableId)
throws LoadException {
Backend backend = null;
BeSelectionPolicy policy = null;
String qualifiedUser = ConnectContext.get().getQualifiedUser();
Expand All @@ -348,12 +361,17 @@ private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadEx
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
if (groupCommit) {
for (Long backendId : backendIds) {
Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId);
if (!candidateBe.isDecommissioned()) {
backend = candidateBe;
break;
}
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setRemoteIP(request.getRemoteAddr());
ctx.setThreadLocalInfo();

try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(tableId, ctx, false);
} catch (DdlException e) {
throw new RuntimeException(e);
}
} else {
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
Expand Down Expand Up @@ -416,10 +434,10 @@ private Object executeWithClusterToken(HttpServletRequest request, String db,
return new RestBaseResult("No label selected.");
}

TNetworkAddress redirectAddr = selectRedirectBackend(false);
TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1);

LOG.info("Redirect load action with auth token to destination={},"
+ "stream: {}, db: {}, tbl: {}, label: {}",
+ "stream: {}, db: {}, tbl: {}, label: {}",
redirectAddr.toString(), isStreamLoad, dbName, tableName, label);

URI urlObj = null;
Expand Down
160 changes: 160 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,46 @@
package org.apache.doris.load;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.util.SlidingWindowCounter;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.MasterOpExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public class GroupCommitManager {

private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class);

private Set<Long> blockedTableIds = new HashSet<>();

// Table id to BE id map. Only for group commit.
private Map<Long, Long> tableToBeMap = new ConcurrentHashMap<>();
// BE id to pressure map. Only for group commit.
private Map<Long, SlidingWindowCounter> tablePressureMap = new ConcurrentHashMap<>();

public boolean isBlock(long tableId) {
return blockedTableIds.contains(tableId);
}
Expand Down Expand Up @@ -163,4 +181,146 @@ public long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) {
return size;
}

public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, boolean isCloud)
throws LoadException, DdlException {
// If a group commit request is sent to the follower FE, we will send this request to the master FE. master FE
// can select a BE and return this BE id to follower FE.
if (!Env.getCurrentEnv().isMaster()) {
try {
long backendId = new MasterOpExecutor(context)
.getGroupCommitLoadBeId(tableId);
return Env.getCurrentSystemInfo().getBackend(backendId);
} catch (Exception e) {
throw new LoadException(e.getMessage());
}
} else {
// Master FE will select BE by itself.
return Env.getCurrentSystemInfo()
.getBackend(selectBackendForGroupCommitInternal(tableId));
}
}

public long selectBackendForGroupCommitInternal(long tableId)
throws LoadException, DdlException {
// Understanding Group Commit and Backend Selection Logic
//
// Group commit is a server-side technique used for batching data imports.
// The primary purpose of group commit is to enhance import performance by
// reducing the number of versions created for high-frequency, small-batch imports.
// Without batching, each import operation creates a separate version, similar to a rowset in an LSM Tree,
// which can consume significant compaction resources and degrade system performance.
// By batching data, fewer versions are generated from the same amount of data,
// thus minimizing compaction and improving performance. For detailed usage,
// you can refer to the Group Commit Manual
// (https://doris.incubator.apache.org/docs/data-operate/import/group-commit-manual/) .
//
// The specific backend (BE) selection logic for group commits aims to
// direct data belonging to the same table to the same BE for batching.
// This is because group commit batches data imported to the same table
// on the same BE into a single version, which is then flushed periodically.
// For example, if data for the same table is distributed across three BEs,
// it will result in three versions.
// Conversely, if data for four different tables is directed to the same BE,
// it will create four versions. However,
// directing all data for the same table to a single BE will only produce one version.
//
// To optimize performance and avoid overloading a single BE, the strategy for selecting a BE works as follows:
//
// If a BE is already handling imports for table A and is not under significant load,
// the data is sent to this BE.
// If the BE is overloaded or if there is no existing record of a BE handling imports for table A,
// a BE is chosen at random. This BE is then recorded along with the mapping of table A and its load level.
// This approach ensures that group commits can effectively batch data together
// while managing the load on each BE efficiently.
return selectBackendForLocalGroupCommitInternal(tableId);
}

private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException {
LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(),
tablePressureMap.toString());
Long cachedBackendId = getCachedBackend(tableId);
if (cachedBackendId != null) {
return cachedBackendId;
}

List<Backend> backends = new ArrayList<>((Env.getCurrentSystemInfo()).getAllBackends());
if (backends.isEmpty()) {
throw new LoadException("No alive backend");
}

// If the cached backend is not active or decommissioned, select a random new backend.
Long randomBackendId = getRandomBackend(tableId, backends);
if (randomBackendId != null) {
return randomBackendId;
}
List<String> backendsInfo = backends.stream()
.map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive()
+ ", decommission=" + be.isDecommissioned() + " }")
.collect(Collectors.toList());
throw new LoadException("No suitable backend " + ", backends = " + backendsInfo);
}

@Nullable
private Long getCachedBackend(long tableId) {
OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
if (tableToBeMap.containsKey(tableId)) {
if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) {
Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId));
if (backend.isAlive() && !backend.isDecommissioned()) {
return backend.getId();
} else {
tableToBeMap.remove(tableId);
}
} else {
tableToBeMap.remove(tableId);
}
}
return null;
}

@Nullable
private Long getRandomBackend(long tableId, List<Backend> backends) {
OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
Collections.shuffle(backends);
for (Backend backend : backends) {
if (backend.isAlive() && !backend.isDecommissioned()) {
tableToBeMap.put(tableId, backend.getId());
tablePressureMap.put(tableId,
new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1));
return backend.getId();
}
}
return null;
}

public void updateLoadData(long tableId, long receiveData) {
if (tableId == -1) {
LOG.warn("invalid table id: " + tableId);
}
if (!Env.getCurrentEnv().isMaster()) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
// set user to ADMIN_USER, so that we can get the proper resource tag
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setThreadLocalInfo();
try {
new MasterOpExecutor(ctx).updateLoadData(tableId, receiveData);
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
updateLoadDataInternal(tableId, receiveData);
}
}

public void updateLoadDataInternal(long tableId, long receiveData) {
if (tablePressureMap.containsKey(tableId)) {
tablePressureMap.get(tableId).add(receiveData);
LOG.info("Update load data for table{}, receiveData {}, tablePressureMap {}", tableId, receiveData,
tablePressureMap.toString());
} else {
LOG.warn("can not find backend id: {}", tableId);
}
}
}
Loading
Loading