Skip to content

Commit

Permalink
Pipe: Default parameters adjustment for pipe threads and pipeStuckRes…
Browse files Browse the repository at this point in the history
…tartMinIntervalMs (#14819) (#14838)
  • Loading branch information
XNX02 authored Feb 14, 2025
1 parent 84b2fd9 commit b359323
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1715,10 +1715,10 @@ continuous_query_min_every_interval_in_ms=1000
pipe_lib_dir=ext/pipe

# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# When <= 0, use max(5, CPU core number).
# effectiveMode: restart
# Datatype: int
pipe_subtask_executor_max_thread_num=5
pipe_subtask_executor_max_thread_num=0

# The connection timeout (in milliseconds) for the thrift client.
# effectiveMode: restart
Expand All @@ -1727,14 +1727,16 @@ pipe_sink_timeout_ms=900000

# The maximum number of selectors that can be used in the sink.
# Recommend to set this value to less than or equal to pipe_sink_max_client_number.
# When <= 0, use max(4, CPU core number).
# effectiveMode: restart
# Datatype: int
pipe_sink_selector_number=4
pipe_sink_selector_number=0

# The maximum number of clients that can be used in the sink.
# When <= 0, use max(16, CPU core number).
# effectiveMode: restart
# Datatype: int
pipe_sink_max_client_number=16
pipe_sink_max_client_number=0

# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public class CommonConfig {

/** The maximum number of threads that can be used to execute subtasks in PipeSubtaskExecutor. */
private int pipeSubtaskExecutorMaxThreadNum =
Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
Math.max(5, Runtime.getRuntime().availableProcessors());

private int pipeNonForwardingEventsProgressReportInterval = 100;

Expand All @@ -232,8 +232,10 @@ public class CommonConfig {
private long pipeConnectorRetryIntervalMs = 1000L;
private boolean pipeConnectorRPCThriftCompressionEnabled = false;

private int pipeAsyncConnectorSelectorNumber = 4;
private int pipeAsyncConnectorMaxClientNumber = 16;
private int pipeAsyncConnectorSelectorNumber =
Math.max(4, Runtime.getRuntime().availableProcessors());
private int pipeAsyncConnectorMaxClientNumber =
Math.max(16, Runtime.getRuntime().availableProcessors());

private double pipeAllSinksRateLimitBytesPerSecond = -1;
private int rateLimiterHotReloadCheckIntervalMs = 1000;
Expand All @@ -259,7 +261,7 @@ public class CommonConfig {
private long pipeMaxAllowedLinkedTsFileCount = 100;
private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
private long pipeStuckRestartIntervalSeconds = 120;
private long pipeStuckRestartMinIntervalMs = 30 * 60 * 1000L; // 30 minutes
private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes

private int pipeMetaReportMaxLogNumPerRound = 10;
private int pipeMetaReportMaxLogIntervalRounds = 36;
Expand Down Expand Up @@ -940,10 +942,7 @@ public int getPipeSubtaskExecutorMaxThreadNum() {
}

public void setPipeSubtaskExecutorMaxThreadNum(int pipeSubtaskExecutorMaxThreadNum) {
this.pipeSubtaskExecutorMaxThreadNum =
Math.min(
pipeSubtaskExecutorMaxThreadNum,
Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
this.pipeSubtaskExecutorMaxThreadNum = pipeSubtaskExecutorMaxThreadNum;
}

public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,13 @@ private void loadPipeProps(TrimProperties properties) {
"pipe_realtime_queue_poll_history_threshold",
Integer.toString(config.getPipeRealTimeQueuePollHistoryThreshold()))));

config.setPipeSubtaskExecutorMaxThreadNum(
int pipeSubtaskExecutorMaxThreadNum =
Integer.parseInt(
properties.getProperty(
"pipe_subtask_executor_max_thread_num",
Integer.toString(config.getPipeSubtaskExecutorMaxThreadNum()))));
if (config.getPipeSubtaskExecutorMaxThreadNum() <= 0) {
config.setPipeSubtaskExecutorMaxThreadNum(5);
Integer.toString(config.getPipeSubtaskExecutorMaxThreadNum())));
if (pipeSubtaskExecutorMaxThreadNum > 0) {
config.setPipeSubtaskExecutorMaxThreadNum(pipeSubtaskExecutorMaxThreadNum);
}
config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
Integer.parseInt(
Expand Down Expand Up @@ -405,22 +405,26 @@ private void loadPipeProps(TrimProperties properties) {
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));

config.setPipeAsyncConnectorSelectorNumber(
int pipeAsyncConnectorSelectorNumber =
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_selector_number"))
.orElse(
properties.getProperty(
"pipe_async_connector_selector_number",
String.valueOf(config.getPipeAsyncConnectorSelectorNumber())))));
config.setPipeAsyncConnectorMaxClientNumber(
String.valueOf(config.getPipeAsyncConnectorSelectorNumber()))));
if (pipeAsyncConnectorSelectorNumber > 0) {
config.setPipeAsyncConnectorSelectorNumber(pipeAsyncConnectorSelectorNumber);
}
int pipeAsyncConnectorMaxClientNumber =
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_max_client_number"))
.orElse(
properties.getProperty(
"pipe_async_connector_max_client_number",
String.valueOf(config.getPipeAsyncConnectorMaxClientNumber())))));

String.valueOf(config.getPipeAsyncConnectorMaxClientNumber()))));
if (pipeAsyncConnectorMaxClientNumber > 0) {
config.setPipeAsyncConnectorMaxClientNumber(pipeAsyncConnectorMaxClientNumber);
}
config.setPipeAllSinksRateLimitBytesPerSecond(
Double.parseDouble(
properties.getProperty(
Expand Down

0 comments on commit b359323

Please sign in to comment.