From 3081c905f6a7fabb6914ecd0e97da2d7cec90629 Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Sat, 20 Jul 2024 15:46:49 +0800 Subject: [PATCH 1/2] [Enhancement](group commit)Optimize be select for group commit (#35558) 1. Streamload and insert into, if batched and sent to the master FE, should use a consistent BE strategy (previously, insert into reused the first selected BE, while streamload used round robin). First, a map records a fixed be id for a certain table. The first time a table is imported, a BE is randomly selected, and this table id and be id are recorded in the map permanently. Subsequently, all data imported into this table will select the BE corresponding to the table id recorded in the map. This ensures that batching is maximized to a single BE. To address the issue of excessive load on a single BE, a variable similar to a bvar window is used to monitor the total data volume sent to a specific BE for a specific table during the batch interval (default 10 seconds). A second map is used to track this. If a new import finds that its corresponding BE's window variable is less than a certain value (e.g., 1G), the new import continues to be sent to the corresponding BE according to map1. If it exceeds this value, the new import is sent to another BE with the smallest window variable value, and map1 is updated. If every BE exceeds this value, the one with the smallest value is still chosen. This helps to alleviate excessive pressure on a single BE. 2. For streamload, if batched and sent to a BE, it will batch directly on this BE and will commit the transaction at the end of the import. At this point, a request is sent to the FE, which records the size of this import and adds it to the window variable. 3. Streamload sent to observer FE, as well as insert into sent to observer FE, follow the logic in 1 by RPC, passing the table id to the master FE to obtain the selected be id. --- be/src/runtime/group_commit_mgr.cpp | 7 + .../common/util/SlidingWindowCounter.java | 73 +++++++ .../apache/doris/httpv2/rest/LoadAction.java | 66 ++++-- .../apache/doris/load/GroupCommitManager.java | 196 ++++++++++++++++++ .../doris/planner/GroupCommitPlanner.java | 10 +- .../java/org/apache/doris/qe/Coordinator.java | 25 ++- .../org/apache/doris/qe/MasterOpExecutor.java | 68 ++++++ .../doris/service/FrontendServiceImpl.java | 31 +++ gensrc/thrift/FrontendService.thrift | 46 ++++ 9 files changed, 505 insertions(+), 17 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index fd9b96b0c1fe9b..6bbbe88d028b04 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -397,6 +397,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ request.__set_db_id(db_id); request.__set_table_id(table_id); request.__set_txnId(txn_id); + request.__set_groupCommit(true); + request.__set_receiveBytes(state->num_bytes_load_total()); + if (_exec_env->master_info()->__isset.backend_id) { + request.__set_backendId(_exec_env->master_info()->backend_id); + } else { + LOG(WARNING) << "_exec_env->master_info not set backend_id"; + } if (state) { request.__set_commitInfos(state->tablet_commit_infos()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java new file mode 100644 index 00000000000000..787fbb06a2f7bb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import java.util.concurrent.atomic.AtomicLongArray; + +public class SlidingWindowCounter { + private final int windowSizeInSeconds; + private final int numberOfBuckets; + private final AtomicLongArray buckets; + private final AtomicLongArray bucketTimestamps; + + public SlidingWindowCounter(int windowSizeInSeconds) { + this.windowSizeInSeconds = windowSizeInSeconds; + this.numberOfBuckets = windowSizeInSeconds; // Each bucket represents 1 second + this.buckets = new AtomicLongArray(numberOfBuckets); + this.bucketTimestamps = new AtomicLongArray(numberOfBuckets); + } + + private int getCurrentBucketIndex() { + long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds + return (int) (currentTime % numberOfBuckets); + } + + private void updateCurrentBucket() { + long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds + int currentBucketIndex = getCurrentBucketIndex(); + long bucketTimestamp = bucketTimestamps.get(currentBucketIndex); + + if (currentTime - bucketTimestamp >= 1) { + buckets.set(currentBucketIndex, 0); + bucketTimestamps.set(currentBucketIndex, currentTime); + } + } + + public void add(long value) { + updateCurrentBucket(); + int bucketIndex = getCurrentBucketIndex(); + buckets.addAndGet(bucketIndex, value); + } + + public long get() { + updateCurrentBucket(); + long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds + long count = 0; + + for (int i = 0; i < numberOfBuckets; i++) { + if (currentTime - bucketTimestamps.get(i) < windowSizeInSeconds) { + count += buckets.get(i); + } + } + return count; + } + + public String toString() { + return String.valueOf(get()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 259062a828b8c6..26fcd047c49946 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -128,11 +129,16 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse String sql = request.getHeader("sql"); LOG.info("streaming load sql={}", sql); boolean groupCommit = false; + long tableId = -1; String groupCommitStr = request.getHeader("group_commit"); if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { groupCommit = true; try { String[] pair = parseDbAndTb(sql); + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s)); + Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); + tableId = tbl.getId(); if (isGroupCommitBlock(pair[0], pair[1])) { String msg = "insert table " + pair[1] + " is blocked on schema change"; return new RestBaseResult(msg); @@ -150,8 +156,7 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse } String label = request.getHeader(LABEL_KEY); - TNetworkAddress redirectAddr; - redirectAddr = selectRedirectBackend(groupCommit); + TNetworkAddress redirectAddr = selectRedirectBackend(request, groupCommit, tableId); LOG.info("redirect load action to destination={}, label: {}", redirectAddr.toString(), label); @@ -274,7 +279,9 @@ private Object executeWithoutPassword(HttpServletRequest request, return new RestBaseResult(e.getMessage()); } } else { - redirectAddr = selectRedirectBackend(groupCommit); + long tableId = ((OlapTable) ((Database) Env.getCurrentEnv().getCurrentCatalog().getDb(dbName) + .get()).getTable(tableName).get()).getId(); + redirectAddr = selectRedirectBackend(request, groupCommit, tableId); } LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}", @@ -305,7 +312,7 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) { return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected."); } - TNetworkAddress redirectAddr = selectRedirectBackend(false); + TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1); LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}", redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation); @@ -323,12 +330,40 @@ private final synchronized int getLastSelectedBackendIndexAndUpdate() { return index; } - private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadException { + private String getCloudClusterName(HttpServletRequest request) { + String cloudClusterName = request.getHeader(SessionVariable.CLOUD_CLUSTER); + if (!Strings.isNullOrEmpty(cloudClusterName)) { + return cloudClusterName; + } + + cloudClusterName = ConnectContext.get().getCloudCluster(); + if (!Strings.isNullOrEmpty(cloudClusterName)) { + return cloudClusterName; + } + + return ""; + } + + private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit, long tableId) + throws LoadException { long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L); if (debugBackendId != -1L) { Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId); return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); } + if (Config.isCloudMode()) { + String cloudClusterName = getCloudClusterName(request); + if (Strings.isNullOrEmpty(cloudClusterName)) { + throw new LoadException("No cloud cluster name selected."); + } + return selectCloudRedirectBackend(cloudClusterName, request, groupCommit); + } else { + return selectLocalRedirectBackend(groupCommit, request, tableId); + } + } + + private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServletRequest request, long tableId) + throws LoadException { Backend backend = null; BeSelectionPolicy policy = null; String qualifiedUser = ConnectContext.get().getQualifiedUser(); @@ -348,12 +383,17 @@ private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadEx throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } if (groupCommit) { - for (Long backendId : backendIds) { - Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId); - if (!candidateBe.isDecommissioned()) { - backend = candidateBe; - break; - } + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + ctx.setRemoteIP(request.getRemoteAddr()); + ctx.setThreadLocalInfo(); + + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(tableId, ctx, false); + } catch (DdlException e) { + throw new RuntimeException(e); } } else { backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); @@ -416,10 +456,10 @@ private Object executeWithClusterToken(HttpServletRequest request, String db, return new RestBaseResult("No label selected."); } - TNetworkAddress redirectAddr = selectRedirectBackend(false); + TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1); LOG.info("Redirect load action with auth token to destination={}," - + "stream: {}, db: {}, tbl: {}, label: {}", + + "stream: {}, db: {}, tbl: {}, label: {}", redirectAddr.toString(), isStreamLoad, dbName, tableName, label); URI urlObj = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index c4bf1e03c9cf58..ac365f9166f7b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -18,21 +18,38 @@ package org.apache.doris.load; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.util.SlidingWindowCounter; +import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.MasterOpExecutor; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; +import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.Nullable; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import java.util.stream.Collectors; public class GroupCommitManager { @@ -40,6 +57,11 @@ public class GroupCommitManager { private Set blockedTableIds = new HashSet<>(); + // Table id to BE id map. Only for group commit. + private Map tableToBeMap = new ConcurrentHashMap<>(); + // BE id to pressure map. Only for group commit. + private Map tablePressureMap = new ConcurrentHashMap<>(); + public boolean isBlock(long tableId) { return blockedTableIds.contains(tableId); } @@ -163,4 +185,178 @@ public long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) { return size; } + public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, boolean isCloud) + throws LoadException, DdlException { + // If a group commit request is sent to the follower FE, we will send this request to the master FE. master FE + // can select a BE and return this BE id to follower FE. + if (!Env.getCurrentEnv().isMaster()) { + try { + long backendId = new MasterOpExecutor(context) + .getGroupCommitLoadBeId(tableId, context.getCloudCluster(), isCloud); + return Env.getCurrentSystemInfo().getBackend(backendId); + } catch (Exception e) { + throw new LoadException(e.getMessage()); + } + } else { + // Master FE will select BE by itself. + return Env.getCurrentSystemInfo() + .getBackend(selectBackendForGroupCommitInternal(tableId, context.getCloudCluster(), isCloud)); + } + } + + public long selectBackendForGroupCommitInternal(long tableId, String cluster, boolean isCloud) + throws LoadException, DdlException { + // Understanding Group Commit and Backend Selection Logic + // + // Group commit is a server-side technique used for batching data imports. + // The primary purpose of group commit is to enhance import performance by + // reducing the number of versions created for high-frequency, small-batch imports. + // Without batching, each import operation creates a separate version, similar to a rowset in an LSM Tree, + // which can consume significant compaction resources and degrade system performance. + // By batching data, fewer versions are generated from the same amount of data, + // thus minimizing compaction and improving performance. For detailed usage, + // you can refer to the Group Commit Manual + // (https://doris.incubator.apache.org/docs/data-operate/import/group-commit-manual/) . + // + // The specific backend (BE) selection logic for group commits aims to + // direct data belonging to the same table to the same BE for batching. + // This is because group commit batches data imported to the same table + // on the same BE into a single version, which is then flushed periodically. + // For example, if data for the same table is distributed across three BEs, + // it will result in three versions. + // Conversely, if data for four different tables is directed to the same BE, + // it will create four versions. However, + // directing all data for the same table to a single BE will only produce one version. + // + // To optimize performance and avoid overloading a single BE, the strategy for selecting a BE works as follows: + // + // If a BE is already handling imports for table A and is not under significant load, + // the data is sent to this BE. + // If the BE is overloaded or if there is no existing record of a BE handling imports for table A, + // a BE is chosen at random. This BE is then recorded along with the mapping of table A and its load level. + // This approach ensures that group commits can effectively batch data together + // while managing the load on each BE efficiently. + return isCloud ? selectBackendForCloudGroupCommitInternal(tableId, cluster) + : selectBackendForLocalGroupCommitInternal(tableId); + } + + private long selectBackendForCloudGroupCommitInternal(long tableId, String cluster) + throws DdlException, LoadException { + LOG.debug("cloud group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), + tablePressureMap.toString()); + if (Strings.isNullOrEmpty(cluster)) { + ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR); + } + + Long cachedBackendId = getCachedBackend(tableId); + if (cachedBackendId != null) { + return cachedBackendId; + } + + List backends = new ArrayList<>( + ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster) + .values()); + if (backends.isEmpty()) { + throw new LoadException("No alive backend"); + } + // If the cached backend is not active or decommissioned, select a random new backend. + Long randomBackendId = getRandomBackend(tableId, backends); + if (randomBackendId != null) { + return randomBackendId; + } + List backendsInfo = backends.stream() + .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() + + ", decommission=" + be.isDecommissioned() + " }") + .collect(Collectors.toList()); + throw new LoadException("No suitable backend for cloud cluster=" + cluster + ", backends = " + backendsInfo); + } + + private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException { + LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), + tablePressureMap.toString()); + Long cachedBackendId = getCachedBackend(tableId); + if (cachedBackendId != null) { + return cachedBackendId; + } + + List backends = new ArrayList<>((Env.getCurrentSystemInfo()).getAllBackends()); + if (backends.isEmpty()) { + throw new LoadException("No alive backend"); + } + + // If the cached backend is not active or decommissioned, select a random new backend. + Long randomBackendId = getRandomBackend(tableId, backends); + if (randomBackendId != null) { + return randomBackendId; + } + List backendsInfo = backends.stream() + .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() + + ", decommission=" + be.isDecommissioned() + " }") + .collect(Collectors.toList()); + throw new LoadException("No suitable backend " + ", backends = " + backendsInfo); + } + + @Nullable + private Long getCachedBackend(long tableId) { + OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); + if (tableToBeMap.containsKey(tableId)) { + if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { + Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId)); + if (backend.isActive() && !backend.isDecommissioned()) { + return backend.getId(); + } else { + tableToBeMap.remove(tableId); + } + } else { + tableToBeMap.remove(tableId); + } + } + return null; + } + + @Nullable + private Long getRandomBackend(long tableId, List backends) { + OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); + Collections.shuffle(backends); + for (Backend backend : backends) { + if (backend.isActive() && !backend.isDecommissioned()) { + tableToBeMap.put(tableId, backend.getId()); + tablePressureMap.put(tableId, + new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); + return backend.getId(); + } + } + return null; + } + + public void updateLoadData(long tableId, long receiveData) { + if (tableId == -1) { + LOG.warn("invalid table id: " + tableId); + } + if (!Env.getCurrentEnv().isMaster()) { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + // set user to ADMIN_USER, so that we can get the proper resource tag + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setThreadLocalInfo(); + try { + new MasterOpExecutor(ctx).updateLoadData(tableId, receiveData); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + updateLoadDataInternal(tableId, receiveData); + } + } + + public void updateLoadDataInternal(long tableId, long receiveData) { + if (tablePressureMap.containsKey(tableId)) { + tablePressureMap.get(tableId).add(receiveData); + LOG.info("Update load data for table{}, receiveData {}, tablePressureMap {}", tableId, receiveData, + tablePressureMap.toString()); + } else { + LOG.warn("can not find backend id: {}", tableId); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 6e235443bc3a4d..07b2bc74971939 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -30,6 +30,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.FormatOptions; +import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; @@ -61,7 +62,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -202,6 +202,14 @@ private static void processExprVal(Expr expr, InternalService.PDataRow.Builder r } } + protected void selectBackends(ConnectContext ctx) throws DdlException { + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(this.table.getId(), ctx, false); + } catch (LoadException e) { + throw new DdlException("No suitable backend"); + } + public Backend getBackend() { return backend; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 0f0c0b799f61d1..2904cfdb5e47f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -273,6 +273,8 @@ public class Coordinator implements CoordInterface { private boolean enablePipelineXEngine = false; private boolean useNereids = false; + private Backend groupCommitBackend; + // Runtime filter merge instance address and ID public TNetworkAddress runtimeFilterMergeAddr; public TUniqueId runtimeFilterMergeInstanceId; @@ -298,6 +300,10 @@ public class Coordinator implements CoordInterface { // fragmentid -> backendid private MarkedCountDownLatch fragmentsDoneLatch = null; + public void setGroupCommitBe(Backend backend) { + this.groupCommitBackend = backend; + } + public void setTWorkloadGroups(List tWorkloadGroups) { this.tWorkloadGroups = tWorkloadGroups; } @@ -1955,8 +1961,11 @@ private void computeFragmentHosts() throws Exception { if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) { Reference backendIdRef = new Reference(); TNetworkAddress execHostport; - if (((ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()) || (isAllExternalScan - && Config.prefer_compute_node_for_external_table)) && !addressToBackendID.isEmpty()) { + if (groupCommitBackend != null) { + execHostport = getGroupCommitBackend(addressToBackendID); + } else if (((ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()) || ( + isAllExternalScan + && Config.prefer_compute_node_for_external_table)) && !addressToBackendID.isEmpty()) { // 2 cases: // case 1: user set resource tag, we need to use the BE with the specified resource tags. // case 2: All scan nodes are external scan node, @@ -2148,7 +2157,9 @@ private void computeFragmentHosts() throws Exception { if (params.instanceExecParams.isEmpty()) { Reference backendIdRef = new Reference(); TNetworkAddress execHostport; - if (ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet() + if (groupCommitBackend != null) { + execHostport = getGroupCommitBackend(addressToBackendID); + } else if (ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet() && !addressToBackendID.isEmpty()) { // In this case, we only use the BE where the replica selected by the tag is located to // execute this query. Otherwise, except for the scan node, the rest of the execution nodes @@ -2172,6 +2183,14 @@ private void computeFragmentHosts() throws Exception { } } + private TNetworkAddress getGroupCommitBackend(Map addressToBackendID) { + // Used for Nereids planner Group commit insert BE select. + TNetworkAddress execHostport = new TNetworkAddress(groupCommitBackend.getHost(), + groupCommitBackend.getBePort()); + addressToBackendID.put(execHostport, groupCommitBackend.getId()); + return execHostport; + } + // Traverse the expected runtimeFilterID in each fragment, and establish the corresponding relationship // between runtimeFilterID and fragment instance addr and select the merge instance of runtimeFilter private void assignRuntimeFilterAddr() throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 934c221905fcb4..7b528049ed2811 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -26,6 +26,7 @@ import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TExprNode; +import org.apache.doris.thrift.TGroupCommitInfo; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TNetworkAddress; @@ -88,6 +89,36 @@ public void syncJournal() throws Exception { waitOnReplaying(); } + public long getGroupCommitLoadBeId(long tableId, String cluster, boolean isCloud) throws Exception { + result = forward(buildGetGroupCommitLoadBeIdParmas(tableId, cluster, isCloud)); + waitOnReplaying(); + return result.groupCommitLoadBeId; + } + + public void updateLoadData(long tableId, long receiveData) throws Exception { + result = forward(buildUpdateLoadDataParams(tableId, receiveData)); + waitOnReplaying(); + } + + public void cancel() throws Exception { + TUniqueId queryId = ctx.queryId(); + if (queryId == null) { + return; + } + Preconditions.checkNotNull(masterAddr, "query with id %s is not forwarded to master", queryId); + TMasterOpRequest request = new TMasterOpRequest(); + request.setCancelQeury(true); + request.setQueryId(queryId); + request.setDb(ctx.getDatabase()); + request.setUser(ctx.getQualifiedUser()); + request.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); + request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); + // just make the protocol happy + request.setSql(""); + result = forward(masterAddr, request); + waitOnReplaying(); + } + private void waitOnReplaying() throws DdlException { LOG.info("forwarding to master get result max journal id: {}", result.maxJournalId); ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, waitTimeoutMs); @@ -187,6 +218,43 @@ private TMasterOpRequest buildSyncJournalParmas() { return params; } + private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId, String cluster, boolean isCloud) { + final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo(); + groupCommitParams.setGetGroupCommitLoadBeId(true); + groupCommitParams.setGroupCommitLoadTableId(tableId); + groupCommitParams.setCluster(cluster); + groupCommitParams.setIsCloud(isCloud); + + final TMasterOpRequest params = new TMasterOpRequest(); + // node ident + params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); + params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); + params.setGroupCommitInfo(groupCommitParams); + params.setDb(ctx.getDatabase()); + params.setUser(ctx.getQualifiedUser()); + // just make the protocol happy + params.setSql(""); + return params; + } + + private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long receiveData) { + final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo(); + groupCommitParams.setUpdateLoadData(true); + groupCommitParams.setTableId(tableId); + groupCommitParams.setReceiveData(receiveData); + + final TMasterOpRequest params = new TMasterOpRequest(); + // node ident + params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); + params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); + params.setGroupCommitInfo(groupCommitParams); + params.setDb(ctx.getDatabase()); + params.setUser(ctx.getQualifiedUser()); + // just make the protocol happy + params.setSql(""); + return params; + } + public ByteBuffer getOutputPacket() { if (result == null) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index bf2adf5591bc15..9da7338e444f8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -53,6 +53,7 @@ import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.PatternMatcher; @@ -170,6 +171,7 @@ import org.apache.doris.thrift.TGetTablesResult; import org.apache.doris.thrift.TGetTabletReplicaInfosRequest; import org.apache.doris.thrift.TGetTabletReplicaInfosResult; +import org.apache.doris.thrift.TGroupCommitInfo; import org.apache.doris.thrift.TInitExternalCtlMetaRequest; import org.apache.doris.thrift.TInitExternalCtlMetaResult; import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest; @@ -1008,6 +1010,28 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { result.setPacket("".getBytes()); return result; } + if (params.getGroupCommitInfo().isGetGroupCommitLoadBeId()) { + final TGroupCommitInfo info = params.getGroupCommitInfo(); + final TMasterOpResult result = new TMasterOpResult(); + try { + result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommitInternal(info.groupCommitLoadTableId, info.cluster, info.isCloud)); + } catch (LoadException | DdlException e) { + throw new TException(e.getMessage()); + } + // just make the protocol happy + result.setPacket("".getBytes()); + return result; + } + if (params.getGroupCommitInfo().isUpdateLoadData()) { + final TGroupCommitInfo info = params.getGroupCommitInfo(); + final TMasterOpResult result = new TMasterOpResult(); + Env.getCurrentEnv().getGroupCommitManager() + .updateLoadData(info.tableId, info.receiveData); + // just make the protocol happy + result.setPacket("".getBytes()); + return result; + } // add this log so that we can track this stmt if (LOG.isDebugEnabled()) { @@ -1567,6 +1591,13 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserExce request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); } } + if (request.groupCommit) { + try { + Env.getCurrentEnv().getGroupCommitManager().updateLoadData(request.table_id, request.receiveBytes); + } catch (Exception e) { + LOG.warn("Failed to update group commit load data, {}", e.getMessage()); + } + } // get database Env env = Env.getCurrentEnv(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index f1bf6eadd60d96..d9312c196fcf01 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -492,6 +492,38 @@ struct TFeResult { 1: required FrontendServiceVersion protocolVersion 2: required Status.TStatus status } + +enum TSubTxnType { + INSERT = 0, + DELETE = 1 +} + +struct TSubTxnInfo { + 1: optional i64 sub_txn_id + 2: optional i64 table_id + 3: optional list tablet_commit_infos + 4: optional TSubTxnType sub_txn_type +} + +struct TTxnLoadInfo { + 1: optional string label + 2: optional i64 dbId + 3: optional i64 txnId + 4: optional i64 timeoutTimestamp + 5: optional i64 allSubTxnNum + 6: optional list subTxnInfos +} + +struct TGroupCommitInfo{ + 1: optional bool getGroupCommitLoadBeId + 2: optional i64 groupCommitLoadTableId + 3: optional string cluster + 4: optional bool isCloud + 5: optional bool updateLoadData + 6: optional i64 tableId + 7: optional i64 receiveData +} + struct TMasterOpRequest { 1: required string user 2: required string db @@ -523,6 +555,13 @@ struct TMasterOpRequest { 26: optional string defaultDatabase 27: optional bool cancel_qeury // if set to true, this request means to cancel one forwarded query, and query_id needs to be set 28: optional map user_variables + // transaction load + 29: optional TTxnLoadInfo txnLoadInfo + 30: optional TGroupCommitInfo groupCommitInfo + + // selectdb cloud + 1000: optional string cloud_cluster + 1001: optional bool noAuth; } struct TColumnDefinition { @@ -550,6 +589,9 @@ struct TMasterOpResult { 6: optional i32 statusCode; 7: optional string errMessage; 8: optional list queryResultBufList; + // transaction load + 9: optional TTxnLoadInfo txnLoadInfo; + 10: optional i64 groupCommitLoadBeId; } struct TUpdateExportTaskStatusRequest { @@ -750,6 +792,10 @@ struct TLoadTxnCommitRequest { 14: optional i64 db_id 15: optional list tbls 16: optional i64 table_id + 17: optional string auth_code_uuid + 18: optional bool groupCommit + 19: optional i64 receiveBytes + 20: optional i64 backendId } struct TLoadTxnCommitResult { From 92200a538731921a614a987b7f568920e915f0ba Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Mon, 22 Jul 2024 16:26:29 +0800 Subject: [PATCH 2/2] 2 --- .../apache/doris/httpv2/rest/LoadAction.java | 24 +-------- .../apache/doris/load/GroupCommitManager.java | 50 +++---------------- .../doris/planner/GroupCommitPlanner.java | 2 + .../org/apache/doris/qe/MasterOpExecutor.java | 27 ++-------- .../doris/service/FrontendServiceImpl.java | 2 +- gensrc/thrift/FrontendService.thrift | 12 ++--- 6 files changed, 17 insertions(+), 100 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 26fcd047c49946..edcbc2dd7f1343 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -330,20 +330,6 @@ private final synchronized int getLastSelectedBackendIndexAndUpdate() { return index; } - private String getCloudClusterName(HttpServletRequest request) { - String cloudClusterName = request.getHeader(SessionVariable.CLOUD_CLUSTER); - if (!Strings.isNullOrEmpty(cloudClusterName)) { - return cloudClusterName; - } - - cloudClusterName = ConnectContext.get().getCloudCluster(); - if (!Strings.isNullOrEmpty(cloudClusterName)) { - return cloudClusterName; - } - - return ""; - } - private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit, long tableId) throws LoadException { long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L); @@ -351,15 +337,7 @@ private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolea Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId); return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); } - if (Config.isCloudMode()) { - String cloudClusterName = getCloudClusterName(request); - if (Strings.isNullOrEmpty(cloudClusterName)) { - throw new LoadException("No cloud cluster name selected."); - } - return selectCloudRedirectBackend(cloudClusterName, request, groupCommit); - } else { - return selectLocalRedirectBackend(groupCommit, request, tableId); - } + return selectLocalRedirectBackend(groupCommit, request, tableId); } private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServletRequest request, long tableId) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index ac365f9166f7b6..1ec6a06179e443 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -19,11 +19,8 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; import org.apache.doris.common.LoadException; import org.apache.doris.common.util.SlidingWindowCounter; import org.apache.doris.mysql.privilege.Auth; @@ -36,7 +33,6 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; -import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; @@ -192,7 +188,7 @@ public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, if (!Env.getCurrentEnv().isMaster()) { try { long backendId = new MasterOpExecutor(context) - .getGroupCommitLoadBeId(tableId, context.getCloudCluster(), isCloud); + .getGroupCommitLoadBeId(tableId); return Env.getCurrentSystemInfo().getBackend(backendId); } catch (Exception e) { throw new LoadException(e.getMessage()); @@ -200,11 +196,11 @@ public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, } else { // Master FE will select BE by itself. return Env.getCurrentSystemInfo() - .getBackend(selectBackendForGroupCommitInternal(tableId, context.getCloudCluster(), isCloud)); + .getBackend(selectBackendForGroupCommitInternal(tableId)); } } - public long selectBackendForGroupCommitInternal(long tableId, String cluster, boolean isCloud) + public long selectBackendForGroupCommitInternal(long tableId) throws LoadException, DdlException { // Understanding Group Commit and Backend Selection Logic // @@ -236,39 +232,7 @@ public long selectBackendForGroupCommitInternal(long tableId, String cluster, bo // a BE is chosen at random. This BE is then recorded along with the mapping of table A and its load level. // This approach ensures that group commits can effectively batch data together // while managing the load on each BE efficiently. - return isCloud ? selectBackendForCloudGroupCommitInternal(tableId, cluster) - : selectBackendForLocalGroupCommitInternal(tableId); - } - - private long selectBackendForCloudGroupCommitInternal(long tableId, String cluster) - throws DdlException, LoadException { - LOG.debug("cloud group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), - tablePressureMap.toString()); - if (Strings.isNullOrEmpty(cluster)) { - ErrorReport.reportDdlException(ErrorCode.ERR_NO_CLUSTER_ERROR); - } - - Long cachedBackendId = getCachedBackend(tableId); - if (cachedBackendId != null) { - return cachedBackendId; - } - - List backends = new ArrayList<>( - ((CloudSystemInfoService) Env.getCurrentSystemInfo()).getCloudIdToBackend(cluster) - .values()); - if (backends.isEmpty()) { - throw new LoadException("No alive backend"); - } - // If the cached backend is not active or decommissioned, select a random new backend. - Long randomBackendId = getRandomBackend(tableId, backends); - if (randomBackendId != null) { - return randomBackendId; - } - List backendsInfo = backends.stream() - .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() - + ", decommission=" + be.isDecommissioned() + " }") - .collect(Collectors.toList()); - throw new LoadException("No suitable backend for cloud cluster=" + cluster + ", backends = " + backendsInfo); + return selectBackendForLocalGroupCommitInternal(tableId); } private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException { @@ -290,7 +254,7 @@ private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadE return randomBackendId; } List backendsInfo = backends.stream() - .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", active=" + be.isActive() + .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + ", decommission=" + be.isDecommissioned() + " }") .collect(Collectors.toList()); throw new LoadException("No suitable backend " + ", backends = " + backendsInfo); @@ -302,7 +266,7 @@ private Long getCachedBackend(long tableId) { if (tableToBeMap.containsKey(tableId)) { if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId)); - if (backend.isActive() && !backend.isDecommissioned()) { + if (backend.isAlive() && !backend.isDecommissioned()) { return backend.getId(); } else { tableToBeMap.remove(tableId); @@ -319,7 +283,7 @@ private Long getRandomBackend(long tableId, List backends) { OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); Collections.shuffle(backends); for (Backend backend : backends) { - if (backend.isActive() && !backend.isDecommissioned()) { + if (backend.isAlive() && !backend.isDecommissioned()) { tableToBeMap.put(tableId, backend.getId()); tablePressureMap.put(tableId, new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 07b2bc74971939..0b051aeb88806b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -62,6 +62,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -209,6 +210,7 @@ protected void selectBackends(ConnectContext ctx) throws DdlException { } catch (LoadException e) { throw new DdlException("No suitable backend"); } + } public Backend getBackend() { return backend; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 7b528049ed2811..9ff3acecdd9fe9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -89,8 +89,8 @@ public void syncJournal() throws Exception { waitOnReplaying(); } - public long getGroupCommitLoadBeId(long tableId, String cluster, boolean isCloud) throws Exception { - result = forward(buildGetGroupCommitLoadBeIdParmas(tableId, cluster, isCloud)); + public long getGroupCommitLoadBeId(long tableId) throws Exception { + result = forward(buildGetGroupCommitLoadBeIdParmas(tableId)); waitOnReplaying(); return result.groupCommitLoadBeId; } @@ -100,25 +100,6 @@ public void updateLoadData(long tableId, long receiveData) throws Exception { waitOnReplaying(); } - public void cancel() throws Exception { - TUniqueId queryId = ctx.queryId(); - if (queryId == null) { - return; - } - Preconditions.checkNotNull(masterAddr, "query with id %s is not forwarded to master", queryId); - TMasterOpRequest request = new TMasterOpRequest(); - request.setCancelQeury(true); - request.setQueryId(queryId); - request.setDb(ctx.getDatabase()); - request.setUser(ctx.getQualifiedUser()); - request.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); - request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); - // just make the protocol happy - request.setSql(""); - result = forward(masterAddr, request); - waitOnReplaying(); - } - private void waitOnReplaying() throws DdlException { LOG.info("forwarding to master get result max journal id: {}", result.maxJournalId); ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, waitTimeoutMs); @@ -218,12 +199,10 @@ private TMasterOpRequest buildSyncJournalParmas() { return params; } - private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId, String cluster, boolean isCloud) { + private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId) { final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo(); groupCommitParams.setGetGroupCommitLoadBeId(true); groupCommitParams.setGroupCommitLoadTableId(tableId); - groupCommitParams.setCluster(cluster); - groupCommitParams.setIsCloud(isCloud); final TMasterOpRequest params = new TMasterOpRequest(); // node ident diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 9da7338e444f8d..448b1231ddea86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1015,7 +1015,7 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { final TMasterOpResult result = new TMasterOpResult(); try { result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager() - .selectBackendForGroupCommitInternal(info.groupCommitLoadTableId, info.cluster, info.isCloud)); + .selectBackendForGroupCommitInternal(info.groupCommitLoadTableId)); } catch (LoadException | DdlException e) { throw new TException(e.getMessage()); } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index d9312c196fcf01..d0b45d72647b60 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -517,11 +517,9 @@ struct TTxnLoadInfo { struct TGroupCommitInfo{ 1: optional bool getGroupCommitLoadBeId 2: optional i64 groupCommitLoadTableId - 3: optional string cluster - 4: optional bool isCloud - 5: optional bool updateLoadData - 6: optional i64 tableId - 7: optional i64 receiveData + 3: optional bool updateLoadData + 4: optional i64 tableId + 5: optional i64 receiveData } struct TMasterOpRequest { @@ -558,10 +556,6 @@ struct TMasterOpRequest { // transaction load 29: optional TTxnLoadInfo txnLoadInfo 30: optional TGroupCommitInfo groupCommitInfo - - // selectdb cloud - 1000: optional string cloud_cluster - 1001: optional bool noAuth; } struct TColumnDefinition {