Skip to content

Commit

Permalink
Add the ability for ClientManager to periodically clean up idle objects
Browse files Browse the repository at this point in the history
  • Loading branch information
Pengzna authored Jan 22, 2024
1 parent 8885df3 commit 9e05b26
Show file tree
Hide file tree
Showing 21 changed files with 66 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ cn_seed_config_node=127.0.0.1:10710
# Datatype: int
# cn_selector_thread_nums_of_client_manager=1

# The maximum number of clients that can be idle for a node in a clientManager.
# When the number of idle clients on a node exceeds this number, newly returned clients will be released
# Datatype: int
# cn_core_client_count_for_each_node_in_client_manager=200

# The maximum number of clients that can be allocated for a node in a clientManager.
# when the number of the client to a single node exceeds this number, the thread for applying for a client will be blocked
# for a while, then ClientManager will throw ClientManagerException if there are no clients after the block time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ public class ConfigNodeConfig {
/** just for test wait for 60 second by default. */
private int thriftServerAwaitTimeForStopService = 60;

/**
* The maximum number of clients that can be idle for a node in a clientManager. When the number
* of idle clients on a node exceeds this number, newly returned clients will be released.
*/
private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;

/**
* The maximum number of clients that can be allocated for a node in a clientManager. When the
* number of the client to a single node exceeds this number, the thread for applying for a client
Expand Down Expand Up @@ -451,15 +445,6 @@ public void setCnThriftDefaultBufferSize(int thriftDefaultBufferSize) {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
}

public int getCoreClientNumForEachNode() {
return coreClientNumForEachNode;
}

public ConfigNodeConfig setCoreClientNumForEachNode(int coreClientNumForEachNode) {
this.coreClientNumForEachNode = coreClientNumForEachNode;
return this;
}

public int getMaxClientNumForEachNode() {
return maxClientNumForEachNode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,6 @@ private void loadProperties(Properties properties) throws BadNodeUrlException, I
"cn_thrift_max_frame_size", String.valueOf(conf.getCnThriftMaxFrameSize()))
.trim()));

conf.setCoreClientNumForEachNode(
Integer.parseInt(
properties
.getProperty(
"cn_core_client_count_for_each_node_in_client_manager",
String.valueOf(conf.getCoreClientNumForEachNode()))
.trim()));

conf.setMaxClientNumForEachNode(
Integer.parseInt(
properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,6 @@ private void setConsensusLayer(ConfigRegionStateMachine stateMachine) throws IOE
CONF.getConfigNodeRatisInitialSleepTimeMs())
.setClientRetryMaxSleepTimeMs(
CONF.getConfigNodeRatisMaxSleepTimeMs())
.setCoreClientNumForEachNode(
CONF.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.build())
.setImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public static class RPC {

private final boolean printLogWhenThriftClientEncounterException;
private final int thriftMaxFrameSize;
private final int coreClientNumForEachNode;
private final int maxClientNumForEachNode;

private RPC(
Expand All @@ -93,7 +92,6 @@ private RPC(
int connectionTimeoutInMs,
boolean printLogWhenThriftClientEncounterException,
int thriftMaxFrameSize,
int coreClientNumForEachNode,
int maxClientNumForEachNode) {
this.rpcSelectorThreadNum = rpcSelectorThreadNum;
this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
Expand All @@ -104,7 +102,6 @@ private RPC(
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.printLogWhenThriftClientEncounterException = printLogWhenThriftClientEncounterException;
this.thriftMaxFrameSize = thriftMaxFrameSize;
this.coreClientNumForEachNode = coreClientNumForEachNode;
this.maxClientNumForEachNode = maxClientNumForEachNode;
}

Expand Down Expand Up @@ -144,10 +141,6 @@ public int getThriftMaxFrameSize() {
return thriftMaxFrameSize;
}

public int getCoreClientNumForEachNode() {
return coreClientNumForEachNode;
}

public int getMaxClientNumForEachNode() {
return maxClientNumForEachNode;
}
Expand All @@ -168,9 +161,6 @@ public static class Builder {

private boolean printLogWhenThriftClientEncounterException = true;
private int thriftMaxFrameSize = 536870912;

private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;

private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;

public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
Expand Down Expand Up @@ -221,11 +211,6 @@ public RPC.Builder setThriftMaxFrameSize(int thriftMaxFrameSize) {
return this;
}

public RPC.Builder setCoreClientNumForEachNode(int coreClientNumForEachNode) {
this.coreClientNumForEachNode = coreClientNumForEachNode;
return this;
}

public Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) {
this.maxClientNumForEachNode = maxClientNumForEachNode;
return this;
Expand All @@ -242,7 +227,6 @@ public RPC build() {
connectionTimeoutInMs,
printLogWhenThriftClientEncounterException,
thriftMaxFrameSize,
coreClientNumForEachNode,
maxClientNumForEachNode);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,21 +821,18 @@ public static class Client {
private final int clientMaxRetryAttempt;
private final long clientRetryInitialSleepTimeMs;
private final long clientRetryMaxSleepTimeMs;
private final int coreClientNumForEachNode;
private final int maxClientNumForEachNode;

public Client(
long clientRequestTimeoutMillis,
int clientMaxRetryAttempt,
long clientRetryInitialSleepTimeMs,
long clientRetryMaxSleepTimeMs,
int coreClientNumForEachNode,
int maxClientNumForEachNode) {
this.clientRequestTimeoutMillis = clientRequestTimeoutMillis;
this.clientMaxRetryAttempt = clientMaxRetryAttempt;
this.clientRetryInitialSleepTimeMs = clientRetryInitialSleepTimeMs;
this.clientRetryMaxSleepTimeMs = clientRetryMaxSleepTimeMs;
this.coreClientNumForEachNode = coreClientNumForEachNode;
this.maxClientNumForEachNode = maxClientNumForEachNode;
}

Expand All @@ -855,10 +852,6 @@ public long getClientRetryMaxSleepTimeMs() {
return clientRetryMaxSleepTimeMs;
}

public int getCoreClientNumForEachNode() {
return coreClientNumForEachNode;
}

public int getMaxClientNumForEachNode() {
return maxClientNumForEachNode;
}
Expand All @@ -873,9 +866,6 @@ public static class Builder {
private int clientMaxRetryAttempt = 10;
private long clientRetryInitialSleepTimeMs = 100;
private long clientRetryMaxSleepTimeMs = 10000;

private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;

private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;

public Client build() {
Expand All @@ -884,7 +874,6 @@ public Client build() {
clientMaxRetryAttempt,
clientRetryInitialSleepTimeMs,
clientRetryMaxSleepTimeMs,
coreClientNumForEachNode,
maxClientNumForEachNode);
}

Expand All @@ -908,11 +897,6 @@ public Builder setClientRetryMaxSleepTimeMs(long clientRetryMaxSleepTimeMs) {
return this;
}

public Builder setCoreClientNumForEachNode(int coreClientNumForEachNode) {
this.coreClientNumForEachNode = coreClientNumForEachNode;
return this;
}

public Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) {
this.maxClientNumForEachNode = maxClientNumForEachNode;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public KeyedObjectPool<TEndPoint, SyncIoTConsensusServiceClient> createClientPoo
config.getRpc().isPrintLogWhenThriftClientEncounterException())
.build()),
new ClientPoolProperty.Builder<SyncIoTConsensusServiceClient>()
.setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(config.getRpc().getMaxClientNumForEachNode())
.build()
.getConfig());
Expand Down Expand Up @@ -105,7 +104,6 @@ public KeyedObjectPool<TEndPoint, AsyncIoTConsensusServiceClient> createClientPo
.build(),
ThreadName.ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncIoTConsensusServiceClient>()
.setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(config.getRpc().getMaxClientNumForEachNode())
.build()
.getConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,6 @@ public KeyedObjectPool<RaftGroup, RatisClient> createClientPool(
new GenericKeyedObjectPool<>(
new RatisClient.Factory(manager, properties, clientRpc, config.getClient()),
new ClientPoolProperty.Builder<RatisClient>()
.setCoreClientNumForEachNode(config.getClient().getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(config.getClient().getMaxClientNumForEachNode())
.build()
.getConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,6 @@ dn_seed_config_node=127.0.0.1:10710
# Datatype: int
# dn_selector_thread_count_of_client_manager=1

# The maximum number of clients that can be idle for a node in a clientManager.
# When the number of idle clients on a node exceeds this number, newly returned clients will be released
# Datatype: int
# dn_core_client_count_for_each_node_in_client_manager=200

# The maximum number of clients that can be allocated for a node in a clientManager.
# When the number of the client to a single node exceeds this number, the thread for applying for a client will be blocked
# for a while, then ClientManager will throw ClientManagerException if there are no clients after the block time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,12 +932,6 @@ public class IoTDBConfig {
? Runtime.getRuntime().availableProcessors() / 4
: 1;

/**
* The maximum number of clients that can be idle for a node in a clientManager. When the number
* of idle clients on a node exceeds this number, newly returned clients will be released
*/
private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;

/**
* The maximum number of clients that can be allocated for a node in a clientManager. When the
* number of the client to a single node exceeds this number, the thread for applying for a client
Expand Down Expand Up @@ -3043,14 +3037,6 @@ public void setMaxClientNumForEachNode(int maxClientNumForEachNode) {
this.maxClientNumForEachNode = maxClientNumForEachNode;
}

public int getCoreClientNumForEachNode() {
return coreClientNumForEachNode;
}

public void setCoreClientNumForEachNode(int coreClientNumForEachNode) {
this.coreClientNumForEachNode = coreClientNumForEachNode;
}

public int getSelectorNumOfClientManager() {
return selectorNumOfClientManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,21 +241,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
"dn_connection_timeout_ms", String.valueOf(conf.getConnectionTimeoutInMS()))
.trim()));

if (properties.getProperty("dn_core_connection_for_internal_service", null) != null) {
conf.setCoreClientNumForEachNode(
Integer.parseInt(
properties.getProperty("dn_core_connection_for_internal_service").trim()));
LOGGER.warn(
"The parameter dn_core_connection_for_internal_service is out of date. Please rename it to dn_core_client_count_for_each_node_in_client_manager.");
}
conf.setCoreClientNumForEachNode(
Integer.parseInt(
properties
.getProperty(
"dn_core_client_count_for_each_node_in_client_manager",
String.valueOf(conf.getCoreClientNumForEachNode()))
.trim()));

if (properties.getProperty("dn_max_connection_for_internal_service", null) != null) {
conf.setMaxClientNumForEachNode(
Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ private static class DataRegionConsensusImplHolder {
.setThriftServerAwaitTimeForStopService(
CONF.getThriftServerAwaitTimeForStopService())
.setThriftMaxFrameSize(CONF.getThriftMaxFrameSize())
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.build())
.setReplication(
Expand Down Expand Up @@ -167,7 +166,6 @@ private static class DataRegionConsensusImplHolder {
CONF.getDataRatisConsensusInitialSleepTimeMs())
.setClientRetryMaxSleepTimeMs(
CONF.getDataRatisConsensusMaxSleepTimeMs())
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.build())
.setImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ private static class SchemaRegionConsensusImplHolder {
CONF.getDataRatisConsensusInitialSleepTimeMs())
.setClientRetryMaxSleepTimeMs(
CONF.getDataRatisConsensusMaxSleepTimeMs())
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.build())
.setImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public KeyedObjectPool<ConfigRegionId, ConfigNodeClient> createClientPool(
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
.build()),
new ClientPoolProperty.Builder<ConfigNodeClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
Expand Down Expand Up @@ -83,7 +82,6 @@ public KeyedObjectPool<ConfigRegionId, ConfigNodeClient> createClientPool(
: 1)
.build()),
new ClientPoolProperty.Builder<ConfigNodeClient>()
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,9 +955,6 @@ data_replication_factor=1
# Recommend to set this value to less than or equal to pipe_sink_max_client_number.
# pipe_sink_selector_number=4

# The core number of clients that can be used in the sink.
# pipe_sink_core_client_number=8

# The maximum number of clients that can be used in the sink.
# pipe_sink_max_client_number=16

Expand Down
Loading

0 comments on commit 9e05b26

Please sign in to comment.