From b3593232fbb11b502469de823fba815540412b49 Mon Sep 17 00:00:00 2001 From: nanxiang xia <162968176+XNX02@users.noreply.github.com> Date: Fri, 14 Feb 2025 17:57:02 +0800 Subject: [PATCH] Pipe: Default parameters adjustment for pipe threads and pipeStuckRestartMinIntervalMs (#14819) (#14838) --- .../conf/iotdb-system.properties.template | 10 ++++---- .../iotdb/commons/conf/CommonConfig.java | 15 ++++++------ .../iotdb/commons/conf/CommonDescriptor.java | 24 +++++++++++-------- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 50b619ec3350..13f488b0e6f5 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1715,10 +1715,10 @@ continuous_query_min_every_interval_in_ms=1000 pipe_lib_dir=ext/pipe # The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor. -# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)). +# When <= 0, use max(5, CPU core number). # effectiveMode: restart # Datatype: int -pipe_subtask_executor_max_thread_num=5 +pipe_subtask_executor_max_thread_num=0 # The connection timeout (in milliseconds) for the thrift client. # effectiveMode: restart @@ -1727,14 +1727,16 @@ pipe_sink_timeout_ms=900000 # The maximum number of selectors that can be used in the sink. # Recommend to set this value to less than or equal to pipe_sink_max_client_number. +# When <= 0, use max(4, CPU core number). # effectiveMode: restart # Datatype: int -pipe_sink_selector_number=4 +pipe_sink_selector_number=0 # The maximum number of clients that can be used in the sink. +# When <= 0, use max(16, CPU core number). # effectiveMode: restart # Datatype: int -pipe_sink_max_client_number=16 +pipe_sink_max_client_number=0 # Whether to enable receiving pipe data through air gap. # The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index b2eeaf25f2cb..566a37e82ebc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -207,7 +207,7 @@ public class CommonConfig { /** The maximum number of threads that can be used to execute subtasks in PipeSubtaskExecutor. */ private int pipeSubtaskExecutorMaxThreadNum = - Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); + Math.max(5, Runtime.getRuntime().availableProcessors()); private int pipeNonForwardingEventsProgressReportInterval = 100; @@ -232,8 +232,10 @@ public class CommonConfig { private long pipeConnectorRetryIntervalMs = 1000L; private boolean pipeConnectorRPCThriftCompressionEnabled = false; - private int pipeAsyncConnectorSelectorNumber = 4; - private int pipeAsyncConnectorMaxClientNumber = 16; + private int pipeAsyncConnectorSelectorNumber = + Math.max(4, Runtime.getRuntime().availableProcessors()); + private int pipeAsyncConnectorMaxClientNumber = + Math.max(16, Runtime.getRuntime().availableProcessors()); private double pipeAllSinksRateLimitBytesPerSecond = -1; private int rateLimiterHotReloadCheckIntervalMs = 1000; @@ -259,7 +261,7 @@ public class CommonConfig { private long pipeMaxAllowedLinkedTsFileCount = 100; private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F; private long pipeStuckRestartIntervalSeconds = 120; - private long pipeStuckRestartMinIntervalMs = 30 * 60 * 1000L; // 30 minutes + private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes private int pipeMetaReportMaxLogNumPerRound = 10; private int pipeMetaReportMaxLogIntervalRounds = 36; @@ -940,10 +942,7 @@ public int getPipeSubtaskExecutorMaxThreadNum() { } public void setPipeSubtaskExecutorMaxThreadNum(int pipeSubtaskExecutorMaxThreadNum) { - this.pipeSubtaskExecutorMaxThreadNum = - Math.min( - pipeSubtaskExecutorMaxThreadNum, - Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); + this.pipeSubtaskExecutorMaxThreadNum = pipeSubtaskExecutorMaxThreadNum; } public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 529d1053c422..3661feefa6ee 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -306,13 +306,13 @@ private void loadPipeProps(TrimProperties properties) { "pipe_realtime_queue_poll_history_threshold", Integer.toString(config.getPipeRealTimeQueuePollHistoryThreshold())))); - config.setPipeSubtaskExecutorMaxThreadNum( + int pipeSubtaskExecutorMaxThreadNum = Integer.parseInt( properties.getProperty( "pipe_subtask_executor_max_thread_num", - Integer.toString(config.getPipeSubtaskExecutorMaxThreadNum())))); - if (config.getPipeSubtaskExecutorMaxThreadNum() <= 0) { - config.setPipeSubtaskExecutorMaxThreadNum(5); + Integer.toString(config.getPipeSubtaskExecutorMaxThreadNum()))); + if (pipeSubtaskExecutorMaxThreadNum > 0) { + config.setPipeSubtaskExecutorMaxThreadNum(pipeSubtaskExecutorMaxThreadNum); } config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount( Integer.parseInt( @@ -405,22 +405,26 @@ private void loadPipeProps(TrimProperties properties) { properties.getProperty( "pipe_connector_rpc_thrift_compression_enabled", String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled()))))); - - config.setPipeAsyncConnectorSelectorNumber( + int pipeAsyncConnectorSelectorNumber = Integer.parseInt( Optional.ofNullable(properties.getProperty("pipe_sink_selector_number")) .orElse( properties.getProperty( "pipe_async_connector_selector_number", - String.valueOf(config.getPipeAsyncConnectorSelectorNumber()))))); - config.setPipeAsyncConnectorMaxClientNumber( + String.valueOf(config.getPipeAsyncConnectorSelectorNumber())))); + if (pipeAsyncConnectorSelectorNumber > 0) { + config.setPipeAsyncConnectorSelectorNumber(pipeAsyncConnectorSelectorNumber); + } + int pipeAsyncConnectorMaxClientNumber = Integer.parseInt( Optional.ofNullable(properties.getProperty("pipe_sink_max_client_number")) .orElse( properties.getProperty( "pipe_async_connector_max_client_number", - String.valueOf(config.getPipeAsyncConnectorMaxClientNumber()))))); - + String.valueOf(config.getPipeAsyncConnectorMaxClientNumber())))); + if (pipeAsyncConnectorMaxClientNumber > 0) { + config.setPipeAsyncConnectorMaxClientNumber(pipeAsyncConnectorMaxClientNumber); + } config.setPipeAllSinksRateLimitBytesPerSecond( Double.parseDouble( properties.getProperty(