Skip to content

Commit

Permalink
Change Remote state read thread pool to Fixed type (#16850)
Browse files Browse the repository at this point in the history
* Change Remote state read thread pool to Fixed type

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha authored Dec 18, 2024
1 parent 57a6605 commit 8aa3185
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,13 @@ private ClusterState getStateFromLocalNode(GetTermVersionResponse termVersionRes

if (remoteClusterStateService != null && termVersionResponse.isStatePresentInRemote()) {
try {
logger.info(
() -> new ParameterizedMessage(
"Term version checker downloading full cluster state for term {}, version {}",
termVersion.getTerm(),
termVersion.getVersion()
)
);
ClusterStateTermVersion clusterStateTermVersion = termVersionResponse.getClusterStateTermVersion();
Optional<ClusterMetadataManifest> clusterMetadataManifest = remoteClusterStateService
.getClusterMetadataManifestByTermVersion(
Expand All @@ -454,7 +461,7 @@ private ClusterState getStateFromLocalNode(GetTermVersionResponse termVersionRes
return clusterStateFromRemote;
}
} catch (Exception e) {
logger.trace("Error while fetching from remote cluster state", e);
logger.error("Error while fetching from remote cluster state", e);
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
}

if (applyFullState == true) {
logger.debug(
logger.info(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
manifest.getClusterTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1494,8 +1494,22 @@ public ClusterState getClusterStateForManifest(
try {
ClusterState stateFromCache = remoteClusterStateCache.getState(clusterName, manifest);
if (stateFromCache != null) {
logger.trace(
() -> new ParameterizedMessage(
"Found cluster state in cache for term {} and version {}",
manifest.getClusterTerm(),
manifest.getStateVersion()
)
);
return stateFromCache;
}
logger.info(
() -> new ParameterizedMessage(
"Cluster state not found in cache for term {} and version {}",
manifest.getClusterTerm(),
manifest.getStateVersion()
)
);

final ClusterState clusterState;
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public static ThreadPoolType fromType(String type) {
map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING);
map.put(Names.REMOTE_REFRESH_RETRY, ThreadPoolType.SCALING);
map.put(Names.REMOTE_RECOVERY, ThreadPoolType.SCALING);
map.put(Names.REMOTE_STATE_READ, ThreadPoolType.SCALING);
map.put(Names.REMOTE_STATE_READ, ThreadPoolType.FIXED);
map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE);
map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.FIXED);
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
Expand Down Expand Up @@ -306,7 +306,7 @@ public ThreadPool(
);
builders.put(
Names.REMOTE_STATE_READ,
new ScalingExecutorBuilder(Names.REMOTE_STATE_READ, 1, boundedBy(4 * allocatedProcessors, 4, 32), TimeValue.timeValueMinutes(5))
new FixedExecutorBuilder(settings, Names.REMOTE_STATE_READ, boundedBy(4 * allocatedProcessors, 4, 32), 120000)
);
builders.put(
Names.INDEX_SEARCHER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2354,6 +2354,14 @@ public void testReadLatestClusterStateFromCache() throws IOException {
.getState(clusterState.getClusterName().value(), expectedManifest);
assertEquals(stateFromCache.getMetadata(), state.getMetadata());

ClusterState stateFromCache2 = remoteClusterStateService.getClusterStateForManifest(
clusterState.getClusterName().value(),
expectedManifest,
"nodeA",
true
);
assertEquals(stateFromCache2.getMetadata(), state.getMetadata());

final ClusterMetadataManifest notExistMetadata = ClusterMetadataManifest.builder()
.indices(List.of())
.clusterTerm(1L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso
sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessors);
sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessors);
sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::twiceAllocatedProcessors);
sizes.put(ThreadPool.Names.REMOTE_STATE_READ, n -> ThreadPool.boundedBy(4 * n, 4, 32));
return sizes.get(threadPoolName).apply(numberOfProcessors);
}

Expand Down

0 comments on commit 8aa3185

Please sign in to comment.