Skip to content

Commit

Permalink
Add javadoc and correct styling
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Khare <amkhar@amazon.com>
  • Loading branch information
Aman Khare committed May 9, 2023
1 parent 9a76702 commit e967c30
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class CompressionHelper {
* @param streamBooleanFlag flag used at receiver end to make intelligent decisions. For example, ClusterState
* assumes full state of diff of the states based on this flag.
* @return reference to serialized bytes
* @throws IOException
* @throws IOException if writing on the compressed stream is failed.
*/
public static BytesReference serializedWrite(Writeable writer, Version nodeVersion, boolean streamBooleanFlag) throws IOException {
final BytesStreamOutput bStream = new BytesStreamOutput();
Expand All @@ -43,11 +43,7 @@ public static BytesReference serializedWrite(Writeable writer, Version nodeVersi
writer.writeTo(stream);
}
final BytesReference serializedByteRef = bStream.bytes();
logger.trace(
"serialized writable object for node version [{}] with size [{}]",
nodeVersion,
serializedByteRef.length()
);
logger.trace("serialized writable object for node version [{}] with size [{}]", nodeVersion, serializedByteRef.length());
return serializedByteRef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -239,9 +238,11 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
);
}

private void handleValidateJoinRequest(Supplier<ClusterState> currentStateSupplier,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators,
BytesTransportRequest request) throws IOException {
private void handleValidateJoinRequest(
Supplier<ClusterState> currentStateSupplier,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators,
BytesTransportRequest request
) throws IOException {
final Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in = request.bytes().streamInput();
final ClusterState incomingState;
Expand Down Expand Up @@ -467,16 +468,11 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti
if (bytes == null || (System.currentTimeMillis() >= lastRefreshTime + clusterStateRefreshInterval)) {
try {
// Re-getting current cluster state for validate join request
bytes = CompressionHelper.serializedWrite(state,
node.getVersion(), true);
bytes = CompressionHelper.serializedWrite(state, node.getVersion(), true);
serializedStates.put(node.getVersion(), bytes);
lastRefreshTime = System.currentTimeMillis();
} catch (Exception e) {
logger.warn(
() -> new ParameterizedMessage("failed to serialize cluster state during validateJoin" +
" {}", node),
e
);
logger.warn(() -> new ParameterizedMessage("failed to serialize cluster state during validateJoin" + " {}", node), e);
listener.onFailure(e);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,17 +310,15 @@ void buildDiffAndSerializeStates() {
try {
if (sendFullVersion || previousState.nodes().nodeExists(node) == false) {
if (serializedStates.containsKey(node.getVersion()) == false) {
serializedStates.put(node.getVersion(), CompressionHelper.serializedWrite(newState,
node.getVersion(), true));
serializedStates.put(node.getVersion(), CompressionHelper.serializedWrite(newState, node.getVersion(), true));
}
} else {
// will send a diff
if (diff == null) {
diff = newState.diff(previousState);
}
if (serializedDiffs.containsKey(node.getVersion()) == false) {
final BytesReference serializedDiff = CompressionHelper.serializedWrite(newState,
node.getVersion(), false);
final BytesReference serializedDiff = CompressionHelper.serializedWrite(newState, node.getVersion(), false);
serializedDiffs.put(node.getVersion(), serializedDiff);
logger.trace(
"serialized cluster state diff for version [{}] in for node version [{}] with size [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,16 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName,
);
new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState, (joinRequest, joinCallback) -> {
throw new AssertionError();
}, startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList(), (s, p, r) -> {}, null,
nodeCommissioned -> {}, namedWriteableRegistry); // registers
// request
// handler
},
startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(),
(s, p, r) -> {},
null,
nodeCommissioned -> {},
namedWriteableRegistry
); // registers
// request
// handler
transportService.start();
transportService.acceptIncomingRequests();

Expand Down

0 comments on commit e967c30

Please sign in to comment.