From afd328868511d352ad9a45296afb7c76a69fce7b Mon Sep 17 00:00:00 2001 From: Murphy Date: Fri, 21 Feb 2025 16:50:56 +0800 Subject: [PATCH 1/4] use StmtExecutor::execute to forward statement on follower Signed-off-by: Murphy --- .../load/pipe/filelist/RepoExecutor.java | 16 +++++++++++----- .../com/starrocks/statistic/StatisticUtils.java | 15 ++++++++++++++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/RepoExecutor.java b/fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/RepoExecutor.java index 1ac1b3e21f018e..7ba846a46de7c7 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/RepoExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/pipe/filelist/RepoExecutor.java @@ -20,6 +20,7 @@ import com.starrocks.common.Status; import com.starrocks.common.util.DebugUtil; import com.starrocks.common.util.UUIDUtil; +import com.starrocks.http.HttpConnectContext; import com.starrocks.qe.ConnectContext; import com.starrocks.qe.StmtExecutor; import com.starrocks.server.GlobalStateMgr; @@ -60,18 +61,15 @@ private RepoExecutor() { public void executeDML(String sql) { ConnectContext prev = ConnectContext.get(); try { - ConnectContext context = createConnectContext(); - + ConnectContext context = createHttpConnectContext(); StatementBase parsedStmt = SqlParser.parseOneWithStarRocksDialect(sql, context.getSessionVariable()); Preconditions.checkState(parsedStmt instanceof DmlStmt, "the statement should be dml"); - DmlStmt dmlStmt = (DmlStmt) parsedStmt; - ExecPlan execPlan = StatementPlanner.plan(parsedStmt, context, TResultSinkType.HTTP_PROTOCAL); StmtExecutor executor = StmtExecutor.newInternalExecutor(context, parsedStmt); context.setExecutor(executor); context.setQueryId(UUIDUtil.genUUID()); AuditLog.getInternalAudit().info("RepoExecutor execute SQL | Query_id {} | SQL {}", DebugUtil.printId(context.getQueryId()), sql); - executor.handleDMLStmt(execPlan, dmlStmt); + executor.execute(); } catch (Exception e) { LOG.error("RepoExecutor execute SQL {} failed: {}", sql, e.getMessage(), e); throw new SemanticException(String.format("execute sql failed: %s", e.getMessage()), e); @@ -136,4 +134,12 @@ private static ConnectContext createConnectContext() { return context; } + private static HttpConnectContext createHttpConnectContext() { + HttpConnectContext context = + (HttpConnectContext) StatisticUtils.buildConnectContext(TResultSinkType.HTTP_PROTOCAL); + context.setThreadLocalInfo(); + context.setNeedQueued(false); + return context; + } + } diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java index deb2f563b3e03f..9775dd421f68f3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java @@ -47,15 +47,18 @@ import com.starrocks.common.util.DateUtils; import com.starrocks.common.util.UUIDUtil; import com.starrocks.connector.ConnectorPartitionTraits; +import com.starrocks.http.HttpConnectContext; import com.starrocks.load.pipe.filelist.RepoExecutor; import com.starrocks.qe.ConnectContext; import com.starrocks.server.GlobalStateMgr; import com.starrocks.server.WarehouseManager; +import com.starrocks.service.arrow.flight.sql.ArrowFlightSqlConnectContext; import com.starrocks.sql.ast.ColumnDef; import com.starrocks.sql.ast.UserIdentity; import com.starrocks.sql.common.ErrorType; import com.starrocks.sql.common.StarRocksPlannerException; import com.starrocks.sql.optimizer.statistics.StatisticsEstimateCoefficient; +import com.starrocks.thrift.TResultSinkType; import com.starrocks.transaction.InsertOverwriteJobStats; import com.starrocks.transaction.TransactionState; import com.starrocks.warehouse.Warehouse; @@ -89,7 +92,17 @@ public class StatisticUtils { .add("information_schema").build(); public static ConnectContext buildConnectContext() { - ConnectContext context = ConnectContext.buildInner(); + return buildConnectContext(TResultSinkType.MYSQL_PROTOCAL); + } + + public static ConnectContext buildConnectContext(TResultSinkType connectType) { + ConnectContext context = + switch (connectType) { + case MYSQL_PROTOCAL -> ConnectContext.buildInner(); + case HTTP_PROTOCAL -> HttpConnectContext.build(); + case ARROW_FLIGHT_PROTOCAL -> new ArrowFlightSqlConnectContext(); + default -> throw new IllegalStateException("Unexpected value: " + connectType); + }; // Note: statistics query does not register query id to QeProcessorImpl::coordinatorMap, // but QeProcessorImpl::reportExecStatus will check query id, // So we must disable report query status from BE to FE From e0b86109d3952c18834d3266c0a6f509dfb980d9 Mon Sep 17 00:00:00 2001 From: Murphy Date: Fri, 21 Feb 2025 17:04:57 +0800 Subject: [PATCH 2/4] fallback to old style Signed-off-by: Murphy --- .../starrocks/statistic/StatisticUtils.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java index 9775dd421f68f3..966ee69cb2cad1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java @@ -96,13 +96,20 @@ public static ConnectContext buildConnectContext() { } public static ConnectContext buildConnectContext(TResultSinkType connectType) { - ConnectContext context = - switch (connectType) { - case MYSQL_PROTOCAL -> ConnectContext.buildInner(); - case HTTP_PROTOCAL -> HttpConnectContext.build(); - case ARROW_FLIGHT_PROTOCAL -> new ArrowFlightSqlConnectContext(); - default -> throw new IllegalStateException("Unexpected value: " + connectType); - }; + ConnectContext context; + switch (connectType) { + case MYSQL_PROTOCAL: + context = ConnectContext.buildInner(); + break; + case HTTP_PROTOCAL: + context = HttpConnectContext.build(); + break; + case ARROW_FLIGHT_PROTOCAL: + context = new ArrowFlightSqlConnectContext(); + break; + default: + throw new IllegalStateException("Unexpected value: " + connectType); + } // Note: statistics query does not register query id to QeProcessorImpl::coordinatorMap, // but QeProcessorImpl::reportExecStatus will check query id, // So we must disable report query status from BE to FE From c5c153c2b7ae59a5c2d5b74ccc904f8c2edeca2d Mon Sep 17 00:00:00 2001 From: Murphy Date: Mon, 24 Feb 2025 10:42:16 +0800 Subject: [PATCH 3/4] fix Signed-off-by: Murphy --- .../src/main/java/com/starrocks/statistic/StatisticUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java index 966ee69cb2cad1..0deb6cc574c1db 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/StatisticUtils.java @@ -102,7 +102,7 @@ public static ConnectContext buildConnectContext(TResultSinkType connectType) { context = ConnectContext.buildInner(); break; case HTTP_PROTOCAL: - context = HttpConnectContext.build(); + context = new HttpConnectContext(); break; case ARROW_FLIGHT_PROTOCAL: context = new ArrowFlightSqlConnectContext(); From 255132c130fdf286e396a3f871df7916ef0c2742 Mon Sep 17 00:00:00 2001 From: Murphy Date: Mon, 24 Feb 2025 11:33:41 +0800 Subject: [PATCH 4/4] fix test Signed-off-by: Murphy --- .../com/starrocks/load/pipe/filelist/FileListRepoTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/fe/fe-core/src/test/java/com/starrocks/load/pipe/filelist/FileListRepoTest.java b/fe/fe-core/src/test/java/com/starrocks/load/pipe/filelist/FileListRepoTest.java index 6f0745358d218e..1e06fb45ee794f 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/pipe/filelist/FileListRepoTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/pipe/filelist/FileListRepoTest.java @@ -24,7 +24,6 @@ import com.starrocks.load.pipe.PipeId; import com.starrocks.qe.ConnectContext; import com.starrocks.qe.StmtExecutor; -import com.starrocks.sql.analyzer.SemanticException; import com.starrocks.sql.ast.DmlStmt; import com.starrocks.sql.plan.ExecPlan; import com.starrocks.system.SystemInfoService; @@ -414,8 +413,6 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { Assert.assertTrue(executor.executeDQL("select now()").isEmpty()); - Assert.assertThrows(SemanticException.class, () -> executor.executeDML("insert into a.b values (1) ")); - Assert.assertThrows(RuntimeException.class, () -> executor.executeDDL("create table a (id int) ")); }