diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index fd9b96b0c1fe9b..6bbbe88d028b04 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -397,6 +397,13 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ request.__set_db_id(db_id); request.__set_table_id(table_id); request.__set_txnId(txn_id); + request.__set_groupCommit(true); + request.__set_receiveBytes(state->num_bytes_load_total()); + if (_exec_env->master_info()->__isset.backend_id) { + request.__set_backendId(_exec_env->master_info()->backend_id); + } else { + LOG(WARNING) << "_exec_env->master_info not set backend_id"; + } if (state) { request.__set_commitInfos(state->tablet_commit_infos()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java new file mode 100644 index 00000000000000..787fbb06a2f7bb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/SlidingWindowCounter.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import java.util.concurrent.atomic.AtomicLongArray; + +public class SlidingWindowCounter { + private final int windowSizeInSeconds; + private final int numberOfBuckets; + private final AtomicLongArray buckets; + private final AtomicLongArray bucketTimestamps; + + public SlidingWindowCounter(int windowSizeInSeconds) { + this.windowSizeInSeconds = windowSizeInSeconds; + this.numberOfBuckets = windowSizeInSeconds; // Each bucket represents 1 second + this.buckets = new AtomicLongArray(numberOfBuckets); + this.bucketTimestamps = new AtomicLongArray(numberOfBuckets); + } + + private int getCurrentBucketIndex() { + long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds + return (int) (currentTime % numberOfBuckets); + } + + private void updateCurrentBucket() { + long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds + int currentBucketIndex = getCurrentBucketIndex(); + long bucketTimestamp = bucketTimestamps.get(currentBucketIndex); + + if (currentTime - bucketTimestamp >= 1) { + buckets.set(currentBucketIndex, 0); + bucketTimestamps.set(currentBucketIndex, currentTime); + } + } + + public void add(long value) { + updateCurrentBucket(); + int bucketIndex = getCurrentBucketIndex(); + buckets.addAndGet(bucketIndex, value); + } + + public long get() { + updateCurrentBucket(); + long currentTime = System.currentTimeMillis() / 1000; // Current time in seconds + long count = 0; + + for (int i = 0; i < numberOfBuckets; i++) { + if (currentTime - bucketTimestamps.get(i) < windowSizeInSeconds) { + count += buckets.get(i); + } + } + return count; + } + + public String toString() { + return String.valueOf(get()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 259062a828b8c6..edcbc2dd7f1343 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -128,11 +129,16 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse String sql = request.getHeader("sql"); LOG.info("streaming load sql={}", sql); boolean groupCommit = false; + long tableId = -1; String groupCommitStr = request.getHeader("group_commit"); if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { groupCommit = true; try { String[] pair = parseDbAndTb(sql); + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s)); + Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); + tableId = tbl.getId(); if (isGroupCommitBlock(pair[0], pair[1])) { String msg = "insert table " + pair[1] + " is blocked on schema change"; return new RestBaseResult(msg); @@ -150,8 +156,7 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse } String label = request.getHeader(LABEL_KEY); - TNetworkAddress redirectAddr; - redirectAddr = selectRedirectBackend(groupCommit); + TNetworkAddress redirectAddr = selectRedirectBackend(request, groupCommit, tableId); LOG.info("redirect load action to destination={}, label: {}", redirectAddr.toString(), label); @@ -274,7 +279,9 @@ private Object executeWithoutPassword(HttpServletRequest request, return new RestBaseResult(e.getMessage()); } } else { - redirectAddr = selectRedirectBackend(groupCommit); + long tableId = ((OlapTable) ((Database) Env.getCurrentEnv().getCurrentCatalog().getDb(dbName) + .get()).getTable(tableName).get()).getId(); + redirectAddr = selectRedirectBackend(request, groupCommit, tableId); } LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}", @@ -305,7 +312,7 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) { return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected."); } - TNetworkAddress redirectAddr = selectRedirectBackend(false); + TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1); LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}", redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation); @@ -323,12 +330,18 @@ private final synchronized int getLastSelectedBackendIndexAndUpdate() { return index; } - private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadException { + private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit, long tableId) + throws LoadException { long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L); if (debugBackendId != -1L) { Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId); return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); } + return selectLocalRedirectBackend(groupCommit, request, tableId); + } + + private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServletRequest request, long tableId) + throws LoadException { Backend backend = null; BeSelectionPolicy policy = null; String qualifiedUser = ConnectContext.get().getQualifiedUser(); @@ -348,12 +361,17 @@ private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadEx throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } if (groupCommit) { - for (Long backendId : backendIds) { - Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId); - if (!candidateBe.isDecommissioned()) { - backend = candidateBe; - break; - } + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + ctx.setRemoteIP(request.getRemoteAddr()); + ctx.setThreadLocalInfo(); + + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(tableId, ctx, false); + } catch (DdlException e) { + throw new RuntimeException(e); } } else { backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); @@ -416,10 +434,10 @@ private Object executeWithClusterToken(HttpServletRequest request, String db, return new RestBaseResult("No label selected."); } - TNetworkAddress redirectAddr = selectRedirectBackend(false); + TNetworkAddress redirectAddr = selectRedirectBackend(request, false, -1); LOG.info("Redirect load action with auth token to destination={}," - + "stream: {}, db: {}, tbl: {}, label: {}", + + "stream: {}, db: {}, tbl: {}, label: {}", redirectAddr.toString(), isStreamLoad, dbName, tableName, label); URI urlObj = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index c4bf1e03c9cf58..1ec6a06179e443 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -18,9 +18,16 @@ package org.apache.doris.load; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.util.SlidingWindowCounter; +import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.MasterOpExecutor; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; @@ -28,11 +35,17 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.Nullable; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import java.util.stream.Collectors; public class GroupCommitManager { @@ -40,6 +53,11 @@ public class GroupCommitManager { private Set blockedTableIds = new HashSet<>(); + // Table id to BE id map. Only for group commit. + private Map tableToBeMap = new ConcurrentHashMap<>(); + // BE id to pressure map. Only for group commit. + private Map tablePressureMap = new ConcurrentHashMap<>(); + public boolean isBlock(long tableId) { return blockedTableIds.contains(tableId); } @@ -163,4 +181,146 @@ public long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) { return size; } + public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, boolean isCloud) + throws LoadException, DdlException { + // If a group commit request is sent to the follower FE, we will send this request to the master FE. master FE + // can select a BE and return this BE id to follower FE. + if (!Env.getCurrentEnv().isMaster()) { + try { + long backendId = new MasterOpExecutor(context) + .getGroupCommitLoadBeId(tableId); + return Env.getCurrentSystemInfo().getBackend(backendId); + } catch (Exception e) { + throw new LoadException(e.getMessage()); + } + } else { + // Master FE will select BE by itself. + return Env.getCurrentSystemInfo() + .getBackend(selectBackendForGroupCommitInternal(tableId)); + } + } + + public long selectBackendForGroupCommitInternal(long tableId) + throws LoadException, DdlException { + // Understanding Group Commit and Backend Selection Logic + // + // Group commit is a server-side technique used for batching data imports. + // The primary purpose of group commit is to enhance import performance by + // reducing the number of versions created for high-frequency, small-batch imports. + // Without batching, each import operation creates a separate version, similar to a rowset in an LSM Tree, + // which can consume significant compaction resources and degrade system performance. + // By batching data, fewer versions are generated from the same amount of data, + // thus minimizing compaction and improving performance. For detailed usage, + // you can refer to the Group Commit Manual + // (https://doris.incubator.apache.org/docs/data-operate/import/group-commit-manual/) . + // + // The specific backend (BE) selection logic for group commits aims to + // direct data belonging to the same table to the same BE for batching. + // This is because group commit batches data imported to the same table + // on the same BE into a single version, which is then flushed periodically. + // For example, if data for the same table is distributed across three BEs, + // it will result in three versions. + // Conversely, if data for four different tables is directed to the same BE, + // it will create four versions. However, + // directing all data for the same table to a single BE will only produce one version. + // + // To optimize performance and avoid overloading a single BE, the strategy for selecting a BE works as follows: + // + // If a BE is already handling imports for table A and is not under significant load, + // the data is sent to this BE. + // If the BE is overloaded or if there is no existing record of a BE handling imports for table A, + // a BE is chosen at random. This BE is then recorded along with the mapping of table A and its load level. + // This approach ensures that group commits can effectively batch data together + // while managing the load on each BE efficiently. + return selectBackendForLocalGroupCommitInternal(tableId); + } + + private long selectBackendForLocalGroupCommitInternal(long tableId) throws LoadException { + LOG.debug("group commit select be info, tableToBeMap {}, tablePressureMap {}", tableToBeMap.toString(), + tablePressureMap.toString()); + Long cachedBackendId = getCachedBackend(tableId); + if (cachedBackendId != null) { + return cachedBackendId; + } + + List backends = new ArrayList<>((Env.getCurrentSystemInfo()).getAllBackends()); + if (backends.isEmpty()) { + throw new LoadException("No alive backend"); + } + + // If the cached backend is not active or decommissioned, select a random new backend. + Long randomBackendId = getRandomBackend(tableId, backends); + if (randomBackendId != null) { + return randomBackendId; + } + List backendsInfo = backends.stream() + .map(be -> "{ beId=" + be.getId() + ", alive=" + be.isAlive() + + ", decommission=" + be.isDecommissioned() + " }") + .collect(Collectors.toList()); + throw new LoadException("No suitable backend " + ", backends = " + backendsInfo); + } + + @Nullable + private Long getCachedBackend(long tableId) { + OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); + if (tableToBeMap.containsKey(tableId)) { + if (tablePressureMap.get(tableId).get() < table.getGroupCommitDataBytes()) { + Backend backend = Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId)); + if (backend.isAlive() && !backend.isDecommissioned()) { + return backend.getId(); + } else { + tableToBeMap.remove(tableId); + } + } else { + tableToBeMap.remove(tableId); + } + } + return null; + } + + @Nullable + private Long getRandomBackend(long tableId, List backends) { + OlapTable table = (OlapTable) Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId); + Collections.shuffle(backends); + for (Backend backend : backends) { + if (backend.isAlive() && !backend.isDecommissioned()) { + tableToBeMap.put(tableId, backend.getId()); + tablePressureMap.put(tableId, + new SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1)); + return backend.getId(); + } + } + return null; + } + + public void updateLoadData(long tableId, long receiveData) { + if (tableId == -1) { + LOG.warn("invalid table id: " + tableId); + } + if (!Env.getCurrentEnv().isMaster()) { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + // set user to ADMIN_USER, so that we can get the proper resource tag + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setThreadLocalInfo(); + try { + new MasterOpExecutor(ctx).updateLoadData(tableId, receiveData); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + updateLoadDataInternal(tableId, receiveData); + } + } + + public void updateLoadDataInternal(long tableId, long receiveData) { + if (tablePressureMap.containsKey(tableId)) { + tablePressureMap.get(tableId).add(receiveData); + LOG.info("Update load data for table{}, receiveData {}, tablePressureMap {}", tableId, receiveData, + tablePressureMap.toString()); + } else { + LOG.warn("can not find backend id: {}", tableId); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 6e235443bc3a4d..0b051aeb88806b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -30,6 +30,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.FormatOptions; +import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; @@ -202,6 +203,15 @@ private static void processExprVal(Expr expr, InternalService.PDataRow.Builder r } } + protected void selectBackends(ConnectContext ctx) throws DdlException { + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(this.table.getId(), ctx, false); + } catch (LoadException e) { + throw new DdlException("No suitable backend"); + } + } + public Backend getBackend() { return backend; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 0f0c0b799f61d1..2904cfdb5e47f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -273,6 +273,8 @@ public class Coordinator implements CoordInterface { private boolean enablePipelineXEngine = false; private boolean useNereids = false; + private Backend groupCommitBackend; + // Runtime filter merge instance address and ID public TNetworkAddress runtimeFilterMergeAddr; public TUniqueId runtimeFilterMergeInstanceId; @@ -298,6 +300,10 @@ public class Coordinator implements CoordInterface { // fragmentid -> backendid private MarkedCountDownLatch fragmentsDoneLatch = null; + public void setGroupCommitBe(Backend backend) { + this.groupCommitBackend = backend; + } + public void setTWorkloadGroups(List tWorkloadGroups) { this.tWorkloadGroups = tWorkloadGroups; } @@ -1955,8 +1961,11 @@ private void computeFragmentHosts() throws Exception { if (fragment.getDataPartition() == DataPartition.UNPARTITIONED) { Reference backendIdRef = new Reference(); TNetworkAddress execHostport; - if (((ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()) || (isAllExternalScan - && Config.prefer_compute_node_for_external_table)) && !addressToBackendID.isEmpty()) { + if (groupCommitBackend != null) { + execHostport = getGroupCommitBackend(addressToBackendID); + } else if (((ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()) || ( + isAllExternalScan + && Config.prefer_compute_node_for_external_table)) && !addressToBackendID.isEmpty()) { // 2 cases: // case 1: user set resource tag, we need to use the BE with the specified resource tags. // case 2: All scan nodes are external scan node, @@ -2148,7 +2157,9 @@ private void computeFragmentHosts() throws Exception { if (params.instanceExecParams.isEmpty()) { Reference backendIdRef = new Reference(); TNetworkAddress execHostport; - if (ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet() + if (groupCommitBackend != null) { + execHostport = getGroupCommitBackend(addressToBackendID); + } else if (ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet() && !addressToBackendID.isEmpty()) { // In this case, we only use the BE where the replica selected by the tag is located to // execute this query. Otherwise, except for the scan node, the rest of the execution nodes @@ -2172,6 +2183,14 @@ private void computeFragmentHosts() throws Exception { } } + private TNetworkAddress getGroupCommitBackend(Map addressToBackendID) { + // Used for Nereids planner Group commit insert BE select. + TNetworkAddress execHostport = new TNetworkAddress(groupCommitBackend.getHost(), + groupCommitBackend.getBePort()); + addressToBackendID.put(execHostport, groupCommitBackend.getId()); + return execHostport; + } + // Traverse the expected runtimeFilterID in each fragment, and establish the corresponding relationship // between runtimeFilterID and fragment instance addr and select the merge instance of runtimeFilter private void assignRuntimeFilterAddr() throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 934c221905fcb4..9ff3acecdd9fe9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -26,6 +26,7 @@ import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TExprNode; +import org.apache.doris.thrift.TGroupCommitInfo; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TNetworkAddress; @@ -88,6 +89,17 @@ public void syncJournal() throws Exception { waitOnReplaying(); } + public long getGroupCommitLoadBeId(long tableId) throws Exception { + result = forward(buildGetGroupCommitLoadBeIdParmas(tableId)); + waitOnReplaying(); + return result.groupCommitLoadBeId; + } + + public void updateLoadData(long tableId, long receiveData) throws Exception { + result = forward(buildUpdateLoadDataParams(tableId, receiveData)); + waitOnReplaying(); + } + private void waitOnReplaying() throws DdlException { LOG.info("forwarding to master get result max journal id: {}", result.maxJournalId); ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, waitTimeoutMs); @@ -187,6 +199,41 @@ private TMasterOpRequest buildSyncJournalParmas() { return params; } + private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId) { + final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo(); + groupCommitParams.setGetGroupCommitLoadBeId(true); + groupCommitParams.setGroupCommitLoadTableId(tableId); + + final TMasterOpRequest params = new TMasterOpRequest(); + // node ident + params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); + params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); + params.setGroupCommitInfo(groupCommitParams); + params.setDb(ctx.getDatabase()); + params.setUser(ctx.getQualifiedUser()); + // just make the protocol happy + params.setSql(""); + return params; + } + + private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long receiveData) { + final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo(); + groupCommitParams.setUpdateLoadData(true); + groupCommitParams.setTableId(tableId); + groupCommitParams.setReceiveData(receiveData); + + final TMasterOpRequest params = new TMasterOpRequest(); + // node ident + params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); + params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); + params.setGroupCommitInfo(groupCommitParams); + params.setDb(ctx.getDatabase()); + params.setUser(ctx.getQualifiedUser()); + // just make the protocol happy + params.setSql(""); + return params; + } + public ByteBuffer getOutputPacket() { if (result == null) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index bf2adf5591bc15..448b1231ddea86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -53,6 +53,7 @@ import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.PatternMatcher; @@ -170,6 +171,7 @@ import org.apache.doris.thrift.TGetTablesResult; import org.apache.doris.thrift.TGetTabletReplicaInfosRequest; import org.apache.doris.thrift.TGetTabletReplicaInfosResult; +import org.apache.doris.thrift.TGroupCommitInfo; import org.apache.doris.thrift.TInitExternalCtlMetaRequest; import org.apache.doris.thrift.TInitExternalCtlMetaResult; import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest; @@ -1008,6 +1010,28 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { result.setPacket("".getBytes()); return result; } + if (params.getGroupCommitInfo().isGetGroupCommitLoadBeId()) { + final TGroupCommitInfo info = params.getGroupCommitInfo(); + final TMasterOpResult result = new TMasterOpResult(); + try { + result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommitInternal(info.groupCommitLoadTableId)); + } catch (LoadException | DdlException e) { + throw new TException(e.getMessage()); + } + // just make the protocol happy + result.setPacket("".getBytes()); + return result; + } + if (params.getGroupCommitInfo().isUpdateLoadData()) { + final TGroupCommitInfo info = params.getGroupCommitInfo(); + final TMasterOpResult result = new TMasterOpResult(); + Env.getCurrentEnv().getGroupCommitManager() + .updateLoadData(info.tableId, info.receiveData); + // just make the protocol happy + result.setPacket("".getBytes()); + return result; + } // add this log so that we can track this stmt if (LOG.isDebugEnabled()) { @@ -1567,6 +1591,13 @@ private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserExce request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); } } + if (request.groupCommit) { + try { + Env.getCurrentEnv().getGroupCommitManager().updateLoadData(request.table_id, request.receiveBytes); + } catch (Exception e) { + LOG.warn("Failed to update group commit load data, {}", e.getMessage()); + } + } // get database Env env = Env.getCurrentEnv(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index f1bf6eadd60d96..d0b45d72647b60 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -492,6 +492,36 @@ struct TFeResult { 1: required FrontendServiceVersion protocolVersion 2: required Status.TStatus status } + +enum TSubTxnType { + INSERT = 0, + DELETE = 1 +} + +struct TSubTxnInfo { + 1: optional i64 sub_txn_id + 2: optional i64 table_id + 3: optional list tablet_commit_infos + 4: optional TSubTxnType sub_txn_type +} + +struct TTxnLoadInfo { + 1: optional string label + 2: optional i64 dbId + 3: optional i64 txnId + 4: optional i64 timeoutTimestamp + 5: optional i64 allSubTxnNum + 6: optional list subTxnInfos +} + +struct TGroupCommitInfo{ + 1: optional bool getGroupCommitLoadBeId + 2: optional i64 groupCommitLoadTableId + 3: optional bool updateLoadData + 4: optional i64 tableId + 5: optional i64 receiveData +} + struct TMasterOpRequest { 1: required string user 2: required string db @@ -523,6 +553,9 @@ struct TMasterOpRequest { 26: optional string defaultDatabase 27: optional bool cancel_qeury // if set to true, this request means to cancel one forwarded query, and query_id needs to be set 28: optional map user_variables + // transaction load + 29: optional TTxnLoadInfo txnLoadInfo + 30: optional TGroupCommitInfo groupCommitInfo } struct TColumnDefinition { @@ -550,6 +583,9 @@ struct TMasterOpResult { 6: optional i32 statusCode; 7: optional string errMessage; 8: optional list queryResultBufList; + // transaction load + 9: optional TTxnLoadInfo txnLoadInfo; + 10: optional i64 groupCommitLoadBeId; } struct TUpdateExportTaskStatusRequest { @@ -750,6 +786,10 @@ struct TLoadTxnCommitRequest { 14: optional i64 db_id 15: optional list tbls 16: optional i64 table_id + 17: optional string auth_code_uuid + 18: optional bool groupCommit + 19: optional i64 receiveBytes + 20: optional i64 backendId } struct TLoadTxnCommitResult {