From e967c308a0606111963fae69e51ae254b71a1608 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Tue, 2 May 2023 15:57:39 +0530 Subject: [PATCH] Add javadoc and correct styling Signed-off-by: Aman Khare --- .../opensearch/cluster/CompressionHelper.java | 8 ++------ .../cluster/coordination/JoinHelper.java | 18 +++++++----------- .../PublicationTransportHandler.java | 6 ++---- .../cluster/coordination/JoinHelperTests.java | 14 ++++++++++---- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/CompressionHelper.java b/server/src/main/java/org/opensearch/cluster/CompressionHelper.java index dc3292e1d1bb4..d2206a77063bd 100644 --- a/server/src/main/java/org/opensearch/cluster/CompressionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/CompressionHelper.java @@ -33,7 +33,7 @@ public class CompressionHelper { * @param streamBooleanFlag flag used at receiver end to make intelligent decisions. For example, ClusterState * assumes full state of diff of the states based on this flag. * @return reference to serialized bytes - * @throws IOException + * @throws IOException if writing on the compressed stream is failed. */ public static BytesReference serializedWrite(Writeable writer, Version nodeVersion, boolean streamBooleanFlag) throws IOException { final BytesStreamOutput bStream = new BytesStreamOutput(); @@ -43,11 +43,7 @@ public static BytesReference serializedWrite(Writeable writer, Version nodeVersi writer.writeTo(stream); } final BytesReference serializedByteRef = bStream.bytes(); - logger.trace( - "serialized writable object for node version [{}] with size [{}]", - nodeVersion, - serializedByteRef.length() - ); + logger.trace("serialized writable object for node version [{}] with size [{}]", nodeVersion, serializedByteRef.length()); return serializedByteRef; } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 18b62778e0a75..aa64e096ceb72 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -88,7 +88,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -239,9 +238,11 @@ public ClusterTasksResult execute(ClusterState currentSta ); } - private void handleValidateJoinRequest(Supplier currentStateSupplier, - Collection> joinValidators, - BytesTransportRequest request) throws IOException { + private void handleValidateJoinRequest( + Supplier currentStateSupplier, + Collection> joinValidators, + BytesTransportRequest request + ) throws IOException { final Compressor compressor = CompressorFactory.compressor(request.bytes()); StreamInput in = request.bytes().streamInput(); final ClusterState incomingState; @@ -467,16 +468,11 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti if (bytes == null || (System.currentTimeMillis() >= lastRefreshTime + clusterStateRefreshInterval)) { try { // Re-getting current cluster state for validate join request - bytes = CompressionHelper.serializedWrite(state, - node.getVersion(), true); + bytes = CompressionHelper.serializedWrite(state, node.getVersion(), true); serializedStates.put(node.getVersion(), bytes); lastRefreshTime = System.currentTimeMillis(); } catch (Exception e) { - logger.warn( - () -> new ParameterizedMessage("failed to serialize cluster state during validateJoin" + - " {}", node), - e - ); + logger.warn(() -> new ParameterizedMessage("failed to serialize cluster state during validateJoin" + " {}", node), e); listener.onFailure(e); return; } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index adf95c7288ea0..aab91bc72ae83 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -310,8 +310,7 @@ void buildDiffAndSerializeStates() { try { if (sendFullVersion || previousState.nodes().nodeExists(node) == false) { if (serializedStates.containsKey(node.getVersion()) == false) { - serializedStates.put(node.getVersion(), CompressionHelper.serializedWrite(newState, - node.getVersion(), true)); + serializedStates.put(node.getVersion(), CompressionHelper.serializedWrite(newState, node.getVersion(), true)); } } else { // will send a diff @@ -319,8 +318,7 @@ void buildDiffAndSerializeStates() { diff = newState.diff(previousState); } if (serializedDiffs.containsKey(node.getVersion()) == false) { - final BytesReference serializedDiff = CompressionHelper.serializedWrite(newState, - node.getVersion(), false); + final BytesReference serializedDiff = CompressionHelper.serializedWrite(newState, node.getVersion(), false); serializedDiffs.put(node.getVersion(), serializedDiff); logger.trace( "serialized cluster state diff for version [{}] in for node version [{}] with size [{}]", diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java index 64905d29df732..0ee2fbd7e1dd5 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java @@ -231,10 +231,16 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName, ); new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState, (joinRequest, joinCallback) -> { throw new AssertionError(); - }, startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList(), (s, p, r) -> {}, null, - nodeCommissioned -> {}, namedWriteableRegistry); // registers - // request - // handler + }, + startJoinRequest -> { throw new AssertionError(); }, + Collections.emptyList(), + (s, p, r) -> {}, + null, + nodeCommissioned -> {}, + namedWriteableRegistry + ); // registers + // request + // handler transportService.start(); transportService.acceptIncomingRequests();