From 9481716ae32c5c8be95acfc7eac1c445907d937d Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Fri, 28 Apr 2023 17:36:46 +0530 Subject: [PATCH 01/16] Compress and cache cluster state during validate join request Signed-off-by: Aman Khare --- .../opensearch/cluster/CompressionHelper.java | 36 ++++++ .../cluster/coordination/Coordinator.java | 3 +- .../cluster/coordination/JoinHelper.java | 105 +++++++++++++++--- .../PublicationTransportHandler.java | 9 +- .../common/settings/ClusterSettings.java | 5 +- .../cluster/coordination/JoinHelperTests.java | 23 +++- 6 files changed, 157 insertions(+), 24 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/CompressionHelper.java diff --git a/server/src/main/java/org/opensearch/cluster/CompressionHelper.java b/server/src/main/java/org/opensearch/cluster/CompressionHelper.java new file mode 100644 index 0000000000000..41cd4b94be7a4 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/CompressionHelper.java @@ -0,0 +1,36 @@ +package org.opensearch.cluster; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.compress.CompressorFactory; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.OutputStreamStreamOutput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * A helper class to utilize the compressed stream. + */ +public class CompressionHelper { + private static final Logger logger = LogManager.getLogger(CompressionHelper.class); + + public static BytesReference serializedWrite(Writeable writer, Version nodeVersion, boolean streamBooleanFlag) throws IOException { + final BytesStreamOutput bStream = new BytesStreamOutput(); + try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) { + stream.setVersion(nodeVersion); + stream.writeBoolean(streamBooleanFlag); + writer.writeTo(stream); + } + final BytesReference serializedByteRef = bStream.bytes(); + logger.trace( + "serialized writable object for node version [{}] with size [{}]", + nodeVersion, + serializedByteRef.length() + ); + return serializedByteRef; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index dd7363426af19..7c1e4244a75a6 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -222,7 +222,8 @@ public Coordinator( this.onJoinValidators, rerouteService, nodeHealthService, - this::onNodeCommissionStatusChange + this::onNodeCommissionStatusChange, + namedWriteableRegistry ); this.persistedStateSupplier = persistedStateSupplier; this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 010e9f47ed39b..18b62778e0a75 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -35,11 +35,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateTaskConfig; import org.opensearch.cluster.ClusterStateTaskListener; +import org.opensearch.cluster.CompressionHelper; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.coordination.Coordinator.Mode; import org.opensearch.cluster.decommission.NodeDecommissionedException; @@ -49,15 +51,23 @@ import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.common.Priority; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.compress.Compressor; +import org.opensearch.common.compress.CompressorFactory; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.io.IOUtils; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; +import org.opensearch.transport.BytesTransportRequest; import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportException; @@ -78,6 +88,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -122,6 +133,17 @@ public class JoinHelper { private final Supplier joinTaskExecutorGenerator; private final Consumer nodeCommissioned; + private final NamedWriteableRegistry namedWriteableRegistry; + private final Map serializedStates = new HashMap<>(); + private long lastRefreshTime = 0L; + public static final Setting CLUSTER_MANAGER_JOIN_STATE_REFRESH_INTERVAL = Setting.timeSetting( + "cluster_manager.join.state.refresh_interval", + TimeValue.timeValueMillis(30000), + TimeValue.timeValueMillis(0), + TimeValue.timeValueMillis(60000), + Setting.Property.NodeScope + ); + private final long clusterStateRefreshInterval; JoinHelper( Settings settings, @@ -135,13 +157,16 @@ public class JoinHelper { Collection> joinValidators, RerouteService rerouteService, NodeHealthService nodeHealthService, - Consumer nodeCommissioned + Consumer nodeCommissioned, + NamedWriteableRegistry namedWriteableRegistry ) { this.clusterManagerService = clusterManagerService; this.transportService = transportService; this.nodeHealthService = nodeHealthService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); this.nodeCommissioned = nodeCommissioned; + this.namedWriteableRegistry = namedWriteableRegistry; + this.clusterStateRefreshInterval = CLUSTER_MANAGER_JOIN_STATE_REFRESH_INTERVAL.get(settings).getMillis(); this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { private final long term = currentTermSupplier.getAsLong(); @@ -206,24 +231,53 @@ public ClusterTasksResult execute(ClusterState currentSta transportService.registerRequestHandler( VALIDATE_JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, - ValidateJoinRequest::new, + BytesTransportRequest::new, (request, channel, task) -> { + handleValidateJoinRequest(currentStateSupplier, joinValidators, request); + channel.sendResponse(Empty.INSTANCE); + } + ); + } + + private void handleValidateJoinRequest(Supplier currentStateSupplier, + Collection> joinValidators, + BytesTransportRequest request) throws IOException { + final Compressor compressor = CompressorFactory.compressor(request.bytes()); + StreamInput in = request.bytes().streamInput(); + final ClusterState incomingState; + try { + if (compressor != null) { + in = new InputStreamStreamInput(compressor.threadLocalInputStream(in)); + } + in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry); + in.setVersion(request.version()); + // If true we received full cluster state - otherwise diffs + if (in.readBoolean()) { + // Close early to release resources used by the de-compression as early as possible + try (StreamInput input = in) { + incomingState = ClusterState.readFrom(input, transportService.getLocalNode()); + } catch (Exception e) { + logger.warn("unexpected error while deserializing an incoming cluster state", e); + throw e; + } + final ClusterState localState = currentStateSupplier.get(); if (localState.metadata().clusterUUIDCommitted() - && localState.metadata().clusterUUID().equals(request.getState().metadata().clusterUUID()) == false) { + && localState.metadata().clusterUUID().equals(incomingState.metadata().clusterUUID()) == false) { throw new CoordinationStateRejectedException( "join validation on cluster state" + " with a different cluster uuid " - + request.getState().metadata().clusterUUID() + + incomingState.metadata().clusterUUID() + " than local cluster uuid " + localState.metadata().clusterUUID() + ", rejecting" ); } - joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState())); - channel.sendResponse(Empty.INSTANCE); + joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), incomingState)); } - ); + } finally { + IOUtils.close(in); + } } private JoinCallback transportJoinCallback(TransportRequest request, TransportChannel channel) { @@ -407,12 +461,37 @@ public String executor() { } public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener listener) { - transportService.sendRequest( - node, - VALIDATE_JOIN_ACTION_NAME, - new ValidateJoinRequest(state), - new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC) - ); + try { + BytesReference bytes = serializedStates.get(node.getVersion()); + // Refresh serializedStates map every time if clusterStateRefreshInterval is 0 + if (bytes == null || (System.currentTimeMillis() >= lastRefreshTime + clusterStateRefreshInterval)) { + try { + // Re-getting current cluster state for validate join request + bytes = CompressionHelper.serializedWrite(state, + node.getVersion(), true); + serializedStates.put(node.getVersion(), bytes); + lastRefreshTime = System.currentTimeMillis(); + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage("failed to serialize cluster state during validateJoin" + + " {}", node), + e + ); + listener.onFailure(e); + return; + } + } + final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion()); + transportService.sendRequest( + node, + VALIDATE_JOIN_ACTION_NAME, + request, + new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC) + ); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e); + listener.onFailure(e); + } } /** diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 77f3b89f950ce..adf95c7288ea0 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -39,6 +39,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.CompressionHelper; import org.opensearch.cluster.Diff; import org.opensearch.cluster.IncompatibleClusterStateVersionException; import org.opensearch.cluster.node.DiscoveryNode; @@ -309,7 +310,8 @@ void buildDiffAndSerializeStates() { try { if (sendFullVersion || previousState.nodes().nodeExists(node) == false) { if (serializedStates.containsKey(node.getVersion()) == false) { - serializedStates.put(node.getVersion(), serializeFullClusterState(newState, node.getVersion())); + serializedStates.put(node.getVersion(), CompressionHelper.serializedWrite(newState, + node.getVersion(), true)); } } else { // will send a diff @@ -317,7 +319,8 @@ void buildDiffAndSerializeStates() { diff = newState.diff(previousState); } if (serializedDiffs.containsKey(node.getVersion()) == false) { - final BytesReference serializedDiff = serializeDiffClusterState(diff, node.getVersion()); + final BytesReference serializedDiff = CompressionHelper.serializedWrite(newState, + node.getVersion(), false); serializedDiffs.put(node.getVersion(), serializedDiff); logger.trace( "serialized cluster state diff for version [{}] in for node version [{}] with size [{}]", @@ -413,7 +416,7 @@ private void sendFullClusterState(DiscoveryNode destination, ActionListener {}, () -> new StatusInfo(HEALTHY, "info"), - nodeCommissioned -> {} + nodeCommissioned -> {}, + namedWriteableRegistry ); transportService.start(); @@ -195,14 +202,14 @@ public void testFailedJoinAttemptLogLevel() { ); } - public void testJoinValidationRejectsMismatchedClusterUUID() { + public void testJoinValidationRejectsMismatchedClusterUUID() throws IOException { assertJoinValidationRejectsMismatchedClusterUUID( JoinHelper.VALIDATE_JOIN_ACTION_NAME, "join validation on cluster state with a different cluster uuid" ); } - private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName, String expectedMessage) { + private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName, String expectedMessage) throws IOException { DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( Settings.builder().put(NODE_NAME_SETTING.getKey(), "node0").build(), random() @@ -224,7 +231,8 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName, ); new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState, (joinRequest, joinCallback) -> { throw new AssertionError(); - }, startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList(), (s, p, r) -> {}, null, nodeCommissioned -> {}); // registers + }, startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList(), (s, p, r) -> {}, null, + nodeCommissioned -> {}, namedWriteableRegistry); // registers // request // handler transportService.start(); @@ -235,10 +243,12 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName, .build(); final PlainActionFuture future = new PlainActionFuture<>(); + BytesReference bytes = CompressionHelper.serializedWrite(otherClusterState, localNode.getVersion(), true); + final BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); transportService.sendRequest( localNode, actionName, - new ValidateJoinRequest(otherClusterState), + request, new ActionListenerResponseHandler<>(future, in -> TransportResponse.Empty.INSTANCE) ); deterministicTaskQueue.runAllTasks(); @@ -282,7 +292,8 @@ public void testJoinFailureOnUnhealthyNodes() { Collections.emptyList(), (s, p, r) -> {}, () -> nodeHealthServiceStatus.get(), - nodeCommissioned -> {} + nodeCommissioned -> {}, + namedWriteableRegistry ); transportService.start(); From 94d16ece6f8123ea0a36d327f4296699f37cab0f Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Tue, 2 May 2023 15:42:24 +0530 Subject: [PATCH 02/16] Add changelog and license Signed-off-by: Aman Khare --- CHANGELOG.md | 3 ++- .../opensearch/cluster/CompressionHelper.java | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f533bdd0c6be..e53f751ac4edc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) - Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792)) +- Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321)) ### Deprecated @@ -117,4 +118,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x diff --git a/server/src/main/java/org/opensearch/cluster/CompressionHelper.java b/server/src/main/java/org/opensearch/cluster/CompressionHelper.java index 41cd4b94be7a4..dc3292e1d1bb4 100644 --- a/server/src/main/java/org/opensearch/cluster/CompressionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/CompressionHelper.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.cluster; import org.apache.logging.log4j.LogManager; @@ -18,6 +26,15 @@ public class CompressionHelper { private static final Logger logger = LogManager.getLogger(CompressionHelper.class); + /** + * It'll always use compression before writing on a newly created output stream. + * @param writer Object which is going to write the content + * @param nodeVersion version of cluster node + * @param streamBooleanFlag flag used at receiver end to make intelligent decisions. For example, ClusterState + * assumes full state of diff of the states based on this flag. + * @return reference to serialized bytes + * @throws IOException + */ public static BytesReference serializedWrite(Writeable writer, Version nodeVersion, boolean streamBooleanFlag) throws IOException { final BytesStreamOutput bStream = new BytesStreamOutput(); try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) { From 1d29ab7f207f8be60ebf54b3e8c9193e3fea616f Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Tue, 2 May 2023 15:57:39 +0530 Subject: [PATCH 03/16] Add javadoc and correct styling Signed-off-by: Aman Khare --- .../opensearch/cluster/CompressionHelper.java | 8 ++------ .../cluster/coordination/JoinHelper.java | 18 +++++++----------- .../PublicationTransportHandler.java | 6 ++---- .../cluster/coordination/JoinHelperTests.java | 14 ++++++++++---- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/CompressionHelper.java b/server/src/main/java/org/opensearch/cluster/CompressionHelper.java index dc3292e1d1bb4..d2206a77063bd 100644 --- a/server/src/main/java/org/opensearch/cluster/CompressionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/CompressionHelper.java @@ -33,7 +33,7 @@ public class CompressionHelper { * @param streamBooleanFlag flag used at receiver end to make intelligent decisions. For example, ClusterState * assumes full state of diff of the states based on this flag. * @return reference to serialized bytes - * @throws IOException + * @throws IOException if writing on the compressed stream is failed. */ public static BytesReference serializedWrite(Writeable writer, Version nodeVersion, boolean streamBooleanFlag) throws IOException { final BytesStreamOutput bStream = new BytesStreamOutput(); @@ -43,11 +43,7 @@ public static BytesReference serializedWrite(Writeable writer, Version nodeVersi writer.writeTo(stream); } final BytesReference serializedByteRef = bStream.bytes(); - logger.trace( - "serialized writable object for node version [{}] with size [{}]", - nodeVersion, - serializedByteRef.length() - ); + logger.trace("serialized writable object for node version [{}] with size [{}]", nodeVersion, serializedByteRef.length()); return serializedByteRef; } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 18b62778e0a75..aa64e096ceb72 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -88,7 +88,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -239,9 +238,11 @@ public ClusterTasksResult execute(ClusterState currentSta ); } - private void handleValidateJoinRequest(Supplier currentStateSupplier, - Collection> joinValidators, - BytesTransportRequest request) throws IOException { + private void handleValidateJoinRequest( + Supplier currentStateSupplier, + Collection> joinValidators, + BytesTransportRequest request + ) throws IOException { final Compressor compressor = CompressorFactory.compressor(request.bytes()); StreamInput in = request.bytes().streamInput(); final ClusterState incomingState; @@ -467,16 +468,11 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti if (bytes == null || (System.currentTimeMillis() >= lastRefreshTime + clusterStateRefreshInterval)) { try { // Re-getting current cluster state for validate join request - bytes = CompressionHelper.serializedWrite(state, - node.getVersion(), true); + bytes = CompressionHelper.serializedWrite(state, node.getVersion(), true); serializedStates.put(node.getVersion(), bytes); lastRefreshTime = System.currentTimeMillis(); } catch (Exception e) { - logger.warn( - () -> new ParameterizedMessage("failed to serialize cluster state during validateJoin" + - " {}", node), - e - ); + logger.warn(() -> new ParameterizedMessage("failed to serialize cluster state during validateJoin" + " {}", node), e); listener.onFailure(e); return; } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index adf95c7288ea0..aab91bc72ae83 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -310,8 +310,7 @@ void buildDiffAndSerializeStates() { try { if (sendFullVersion || previousState.nodes().nodeExists(node) == false) { if (serializedStates.containsKey(node.getVersion()) == false) { - serializedStates.put(node.getVersion(), CompressionHelper.serializedWrite(newState, - node.getVersion(), true)); + serializedStates.put(node.getVersion(), CompressionHelper.serializedWrite(newState, node.getVersion(), true)); } } else { // will send a diff @@ -319,8 +318,7 @@ void buildDiffAndSerializeStates() { diff = newState.diff(previousState); } if (serializedDiffs.containsKey(node.getVersion()) == false) { - final BytesReference serializedDiff = CompressionHelper.serializedWrite(newState, - node.getVersion(), false); + final BytesReference serializedDiff = CompressionHelper.serializedWrite(newState, node.getVersion(), false); serializedDiffs.put(node.getVersion(), serializedDiff); logger.trace( "serialized cluster state diff for version [{}] in for node version [{}] with size [{}]", diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java index 64905d29df732..0ee2fbd7e1dd5 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java @@ -231,10 +231,16 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName, ); new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState, (joinRequest, joinCallback) -> { throw new AssertionError(); - }, startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList(), (s, p, r) -> {}, null, - nodeCommissioned -> {}, namedWriteableRegistry); // registers - // request - // handler + }, + startJoinRequest -> { throw new AssertionError(); }, + Collections.emptyList(), + (s, p, r) -> {}, + null, + nodeCommissioned -> {}, + namedWriteableRegistry + ); // registers + // request + // handler transportService.start(); transportService.acceptIncomingRequests(); From 78db6b0318e33a4af468a9944bb9392941490fa5 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Mon, 8 May 2023 20:52:08 +0530 Subject: [PATCH 04/16] Add new handler for sending compressed cluster state in validate join flow and refactor code Signed-off-by: Aman Khare --- .../cluster/coordination/ZenDiscoveryIT.java | 7 +- .../opensearch/cluster/CompressionHelper.java | 49 ---- .../cluster/coordination/JoinHelper.java | 116 ++++----- .../PublicationTransportHandler.java | 71 +----- .../common/compress/CompressionHelper.java | 100 ++++++++ .../common/settings/ClusterSettings.java | 2 +- .../cluster/coordination/JoinHelperTests.java | 227 ++++++++++++++---- .../cluster/coordination/NodeJoinTests.java | 2 +- 8 files changed, 359 insertions(+), 215 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/cluster/CompressionHelper.java create mode 100644 server/src/main/java/org/opensearch/common/compress/CompressionHelper.java diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java index aaba53dcb2b07..dc9034ab53e73 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentFactory; @@ -59,6 +60,7 @@ import java.util.concurrent.TimeoutException; import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY; +import static org.opensearch.cluster.coordination.JoinHelper.CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL; import static org.opensearch.test.NodeRoles.dataNode; import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode; import static org.hamcrest.Matchers.containsString; @@ -106,7 +108,9 @@ public void testNoShardRelocationsOccurWhenElectedClusterManagerNodeFails() thro } public void testHandleNodeJoin_incompatibleClusterState() throws InterruptedException, ExecutionException, TimeoutException { - String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode( + Settings.builder().put(CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL.getKey(), TimeValue.timeValueMillis(0)).build() + ); String node1 = internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1); Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, clusterManagerNode); @@ -117,7 +121,6 @@ public void testHandleNodeJoin_incompatibleClusterState() throws InterruptedExce final CompletableFuture future = new CompletableFuture<>(); DiscoveryNode node = state.nodes().getLocalNode(); - coordinator.sendValidateJoinRequest( stateWithCustomMetadata, new JoinRequest(node, 0L, Optional.empty()), diff --git a/server/src/main/java/org/opensearch/cluster/CompressionHelper.java b/server/src/main/java/org/opensearch/cluster/CompressionHelper.java deleted file mode 100644 index d2206a77063bd..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/CompressionHelper.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.Version; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.compress.CompressorFactory; -import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.common.io.stream.OutputStreamStreamOutput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; - -import java.io.IOException; - -/** - * A helper class to utilize the compressed stream. - */ -public class CompressionHelper { - private static final Logger logger = LogManager.getLogger(CompressionHelper.class); - - /** - * It'll always use compression before writing on a newly created output stream. - * @param writer Object which is going to write the content - * @param nodeVersion version of cluster node - * @param streamBooleanFlag flag used at receiver end to make intelligent decisions. For example, ClusterState - * assumes full state of diff of the states based on this flag. - * @return reference to serialized bytes - * @throws IOException if writing on the compressed stream is failed. - */ - public static BytesReference serializedWrite(Writeable writer, Version nodeVersion, boolean streamBooleanFlag) throws IOException { - final BytesStreamOutput bStream = new BytesStreamOutput(); - try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) { - stream.setVersion(nodeVersion); - stream.writeBoolean(streamBooleanFlag); - writer.writeTo(stream); - } - final BytesReference serializedByteRef = bStream.bytes(); - logger.trace("serialized writable object for node version [{}] with size [{}]", nodeVersion, serializedByteRef.length()); - return serializedByteRef; - } -} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index aa64e096ceb72..37371be2fb659 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -41,7 +41,9 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateTaskConfig; import org.opensearch.cluster.ClusterStateTaskListener; -import org.opensearch.cluster.CompressionHelper; +import org.opensearch.common.cache.Cache; +import org.opensearch.common.cache.CacheBuilder; +import org.opensearch.common.compress.CompressionHelper; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.coordination.Coordinator.Mode; import org.opensearch.cluster.decommission.NodeDecommissionedException; @@ -53,10 +55,6 @@ import org.opensearch.common.Priority; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.collect.Tuple; -import org.opensearch.common.compress.Compressor; -import org.opensearch.common.compress.CompressorFactory; -import org.opensearch.common.io.stream.InputStreamStreamInput; -import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.settings.Setting; @@ -79,6 +77,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.io.InvalidObjectException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -108,6 +107,7 @@ public class JoinHelper { public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join"; public static final String VALIDATE_JOIN_ACTION_NAME = "internal:cluster/coordination/join/validate"; + public static final String VALIDATE_COMPRESSED_JOIN_ACTION_NAME = "internal:cluster/coordination/join" + "/validate_compressed"; public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join"; // the timeout for Zen1 join attempts @@ -133,10 +133,9 @@ public class JoinHelper { private final Supplier joinTaskExecutorGenerator; private final Consumer nodeCommissioned; private final NamedWriteableRegistry namedWriteableRegistry; - private final Map serializedStates = new HashMap<>(); - private long lastRefreshTime = 0L; - public static final Setting CLUSTER_MANAGER_JOIN_STATE_REFRESH_INTERVAL = Setting.timeSetting( - "cluster_manager.join.state.refresh_interval", + private Cache serializedStates; + public static final Setting CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL = Setting.timeSetting( + "cluster_manager.validate_join.cache_interval", TimeValue.timeValueMillis(30000), TimeValue.timeValueMillis(0), TimeValue.timeValueMillis(60000), @@ -165,7 +164,13 @@ public class JoinHelper { this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); this.nodeCommissioned = nodeCommissioned; this.namedWriteableRegistry = namedWriteableRegistry; - this.clusterStateRefreshInterval = CLUSTER_MANAGER_JOIN_STATE_REFRESH_INTERVAL.get(settings).getMillis(); + this.clusterStateRefreshInterval = CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL.get(settings).getMillis(); + if (clusterStateRefreshInterval != 0) { + CacheBuilder cacheBuilder = CacheBuilder.builder(); + cacheBuilder.setExpireAfterWrite(CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL.get(settings)); + this.serializedStates = cacheBuilder.build(); + } + this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { private final long term = currentTermSupplier.getAsLong(); @@ -230,12 +235,43 @@ public ClusterTasksResult execute(ClusterState currentSta transportService.registerRequestHandler( VALIDATE_JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, + ValidateJoinRequest::new, + (request, channel, task) -> { + runJoinValidators(currentStateSupplier, request.getState(), joinValidators); + channel.sendResponse(Empty.INSTANCE); + } + ); + + transportService.registerRequestHandler( + VALIDATE_COMPRESSED_JOIN_ACTION_NAME, + ThreadPool.Names.GENERIC, BytesTransportRequest::new, (request, channel, task) -> { handleValidateJoinRequest(currentStateSupplier, joinValidators, request); channel.sendResponse(Empty.INSTANCE); } ); + + } + + private void runJoinValidators( + Supplier currentStateSupplier, + ClusterState incomingState, + Collection> joinValidators + ) { + final ClusterState localState = currentStateSupplier.get(); + if (localState.metadata().clusterUUIDCommitted() + && localState.metadata().clusterUUID().equals(incomingState.metadata().clusterUUID()) == false) { + throw new CoordinationStateRejectedException( + "join validation on cluster state" + + " with a different cluster uuid " + + incomingState.metadata().clusterUUID() + + " than local cluster uuid " + + localState.metadata().clusterUUID() + + ", rejecting" + ); + } + joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), incomingState)); } private void handleValidateJoinRequest( @@ -243,38 +279,15 @@ private void handleValidateJoinRequest( Collection> joinValidators, BytesTransportRequest request ) throws IOException { - final Compressor compressor = CompressorFactory.compressor(request.bytes()); - StreamInput in = request.bytes().streamInput(); - final ClusterState incomingState; + StreamInput in = CompressionHelper.decompressClusterState(request, namedWriteableRegistry); + ClusterState incomingState; try { - if (compressor != null) { - in = new InputStreamStreamInput(compressor.threadLocalInputStream(in)); - } - in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry); - in.setVersion(request.version()); - // If true we received full cluster state - otherwise diffs if (in.readBoolean()) { - // Close early to release resources used by the de-compression as early as possible - try (StreamInput input = in) { - incomingState = ClusterState.readFrom(input, transportService.getLocalNode()); - } catch (Exception e) { - logger.warn("unexpected error while deserializing an incoming cluster state", e); - throw e; - } - - final ClusterState localState = currentStateSupplier.get(); - if (localState.metadata().clusterUUIDCommitted() - && localState.metadata().clusterUUID().equals(incomingState.metadata().clusterUUID()) == false) { - throw new CoordinationStateRejectedException( - "join validation on cluster state" - + " with a different cluster uuid " - + incomingState.metadata().clusterUUID() - + " than local cluster uuid " - + localState.metadata().clusterUUID() - + ", rejecting" - ); - } - joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), incomingState)); + incomingState = CompressionHelper.deserializeFullClusterState(in, transportService); + runJoinValidators(currentStateSupplier, incomingState, joinValidators); + } else { + logger.error("validate new node join request requires full cluster state"); + throw new InvalidObjectException("validate new node join request requires full cluster state"); } } finally { IOUtils.close(in); @@ -463,24 +476,19 @@ public String executor() { public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener listener) { try { - BytesReference bytes = serializedStates.get(node.getVersion()); - // Refresh serializedStates map every time if clusterStateRefreshInterval is 0 - if (bytes == null || (System.currentTimeMillis() >= lastRefreshTime + clusterStateRefreshInterval)) { - try { - // Re-getting current cluster state for validate join request - bytes = CompressionHelper.serializedWrite(state, node.getVersion(), true); - serializedStates.put(node.getVersion(), bytes); - lastRefreshTime = System.currentTimeMillis(); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("failed to serialize cluster state during validateJoin" + " {}", node), e); - listener.onFailure(e); - return; - } + BytesReference bytes; + if (clusterStateRefreshInterval == 0) { + bytes = CompressionHelper.serializeClusterState(state, node, true); + } else { + bytes = serializedStates.computeIfAbsent( + node.getVersion(), + key -> CompressionHelper.serializeClusterState(state, node, true) + ); } final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion()); transportService.sendRequest( node, - VALIDATE_JOIN_ACTION_NAME, + VALIDATE_COMPRESSED_JOIN_ACTION_NAME, request, new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC) ); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index aab91bc72ae83..00c1976619c2d 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -39,21 +39,14 @@ import org.opensearch.action.ActionListener; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.CompressionHelper; +import org.opensearch.common.compress.CompressionHelper; import org.opensearch.cluster.Diff; import org.opensearch.cluster.IncompatibleClusterStateVersionException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.compress.Compressor; -import org.opensearch.common.compress.CompressorFactory; -import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.common.io.stream.InputStreamStreamInput; -import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput; import org.opensearch.common.io.stream.NamedWriteableRegistry; -import org.opensearch.common.io.stream.OutputStreamStreamOutput; import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.util.io.IOUtils; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.BytesTransportRequest; @@ -169,24 +162,11 @@ public PublishClusterStateStats stats() { } private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException { - final Compressor compressor = CompressorFactory.compressor(request.bytes()); - StreamInput in = request.bytes().streamInput(); + StreamInput in = CompressionHelper.decompressClusterState(request, namedWriteableRegistry); + ClusterState incomingState; try { - if (compressor != null) { - in = new InputStreamStreamInput(compressor.threadLocalInputStream(in)); - } - in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry); - in.setVersion(request.version()); - // If true we received full cluster state - otherwise diffs if (in.readBoolean()) { - final ClusterState incomingState; - // Close early to release resources used by the de-compression as early as possible - try (StreamInput input = in) { - incomingState = ClusterState.readFrom(input, transportService.getLocalNode()); - } catch (Exception e) { - logger.warn("unexpected error while deserializing an incoming cluster state", e); - throw e; - } + incomingState = CompressionHelper.deserializeFullClusterState(in, transportService); fullClusterStateReceivedCount.incrementAndGet(); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); final PublishWithJoinResponse response = acceptState(incomingState); @@ -199,20 +179,12 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque incompatibleClusterStateDiffReceivedCount.incrementAndGet(); throw new IncompatibleClusterStateVersionException("have no local cluster state"); } else { - ClusterState incomingState; try { - final Diff diff; - // Close stream early to release resources used by the de-compression as early as possible - try (StreamInput input = in) { - diff = ClusterState.readDiffFrom(input, lastSeen.nodes().getLocalNode()); - } + Diff diff = CompressionHelper.deserializeClusterStateDiff(in, lastSeen.getNodes().getLocalNode()); incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException } catch (IncompatibleClusterStateVersionException e) { incompatibleClusterStateDiffReceivedCount.incrementAndGet(); throw e; - } catch (Exception e) { - logger.warn("unexpected error while deserializing an incoming cluster state", e); - throw e; } compatibleClusterStateDiffReceivedCount.incrementAndGet(); logger.debug( @@ -254,33 +226,6 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang return publicationContext; } - private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException { - final BytesStreamOutput bStream = new BytesStreamOutput(); - try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) { - stream.setVersion(nodeVersion); - stream.writeBoolean(true); - clusterState.writeTo(stream); - } - final BytesReference serializedState = bStream.bytes(); - logger.trace( - "serialized full cluster state version [{}] for node version [{}] with size [{}]", - clusterState.version(), - nodeVersion, - serializedState.length() - ); - return serializedState; - } - - private static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException { - final BytesStreamOutput bStream = new BytesStreamOutput(); - try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) { - stream.setVersion(nodeVersion); - stream.writeBoolean(false); - diff.writeTo(stream); - } - return bStream.bytes(); - } - /** * Publishing a cluster state typically involves sending the same cluster state (or diff) to every node, so the work of diffing, * serializing, and compressing the state can be done once and the results shared across publish requests. The @@ -310,7 +255,7 @@ void buildDiffAndSerializeStates() { try { if (sendFullVersion || previousState.nodes().nodeExists(node) == false) { if (serializedStates.containsKey(node.getVersion()) == false) { - serializedStates.put(node.getVersion(), CompressionHelper.serializedWrite(newState, node.getVersion(), true)); + serializedStates.put(node.getVersion(), CompressionHelper.serializeClusterState(newState, node, true)); } } else { // will send a diff @@ -318,7 +263,7 @@ void buildDiffAndSerializeStates() { diff = newState.diff(previousState); } if (serializedDiffs.containsKey(node.getVersion()) == false) { - final BytesReference serializedDiff = CompressionHelper.serializedWrite(newState, node.getVersion(), false); + final BytesReference serializedDiff = CompressionHelper.serializeClusterState(diff, node, false); serializedDiffs.put(node.getVersion(), serializedDiff); logger.trace( "serialized cluster state diff for version [{}] in for node version [{}] with size [{}]", @@ -414,7 +359,7 @@ private void sendFullClusterState(DiscoveryNode destination, ActionListener new ParameterizedMessage("failed to serialize cluster state during validateJoin" + " {}", node), e); + throw e; + } + } + + public static StreamInput decompressClusterState(BytesTransportRequest request, NamedWriteableRegistry namedWriteableRegistry) + throws IOException { + final Compressor compressor = CompressorFactory.compressor(request.bytes()); + StreamInput in = request.bytes().streamInput(); + if (compressor != null) { + in = new InputStreamStreamInput(compressor.threadLocalInputStream(in)); + } + in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry); + in.setVersion(request.version()); + return in; + } + + public static ClusterState deserializeFullClusterState(StreamInput in, TransportService transportService) throws IOException { + final ClusterState incomingState; + try (StreamInput input = in) { + incomingState = ClusterState.readFrom(input, transportService.getLocalNode()); + } catch (Exception e) { + logger.warn("unexpected error while deserializing an incoming cluster state", e); + throw e; + } + return incomingState; + } + + public static Diff deserializeClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException { + final Diff diff; + // Close stream early to release resources used by the de-compression as early as possible + try (StreamInput input = in) { + diff = ClusterState.readDiffFrom(input, localNode); + } catch (Exception e) { + logger.warn("unexpected error while deserializing an incoming cluster state diff", e); + throw e; + } + return diff; + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 5248a75f1b34d..f75980fbab2e3 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -652,7 +652,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE, // Settings related to node join - JoinHelper.CLUSTER_MANAGER_JOIN_STATE_REFRESH_INTERVAL + JoinHelper.CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL ) ) ); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java index 0ee2fbd7e1dd5..55b529bb99f99 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java @@ -33,17 +33,19 @@ import org.apache.logging.log4j.Level; import org.opensearch.Version; +import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.CompressionHelper; +import org.opensearch.common.compress.CompressionHelper; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.UncategorizedExecutionException; import org.opensearch.monitor.StatusInfo; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; @@ -52,14 +54,20 @@ import org.opensearch.transport.BytesTransportRequest; import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; import java.io.IOException; import java.util.Collections; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import static org.opensearch.cluster.coordination.JoinHelper.VALIDATE_COMPRESSED_JOIN_ACTION_NAME; import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.opensearch.node.Node.NODE_NAME_SETTING; @@ -203,6 +211,10 @@ public void testFailedJoinAttemptLogLevel() { } public void testJoinValidationRejectsMismatchedClusterUUID() throws IOException { + assertJoinValidationRejectsMismatchedClusterUUID( + VALIDATE_COMPRESSED_JOIN_ACTION_NAME, + "join validation on cluster state with a different cluster uuid" + ); assertJoinValidationRejectsMismatchedClusterUUID( JoinHelper.VALIDATE_JOIN_ACTION_NAME, "join validation on cluster state with a different cluster uuid" @@ -210,61 +222,40 @@ public void testJoinValidationRejectsMismatchedClusterUUID() throws IOException } private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName, String expectedMessage) throws IOException { - DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( - Settings.builder().put(NODE_NAME_SETTING.getKey(), "node0").build(), - random() - ); - MockTransport mockTransport = new MockTransport(); - DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); - - final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) - .build(); - - TransportService transportService = mockTransport.createTransportService( - Settings.EMPTY, - deterministicTaskQueue.getThreadPool(), - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - x -> localNode, - null, - Collections.emptySet() - ); - new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState, (joinRequest, joinCallback) -> { - throw new AssertionError(); - }, - startJoinRequest -> { throw new AssertionError(); }, - Collections.emptyList(), - (s, p, r) -> {}, - null, - nodeCommissioned -> {}, - namedWriteableRegistry - ); // registers - // request - // handler - transportService.start(); - transportService.acceptIncomingRequests(); + TestClusterSetup testCluster = getTestClusterSetup(); final ClusterState otherClusterState = ClusterState.builder(ClusterName.DEFAULT) .metadata(Metadata.builder().generateClusterUuidIfNeeded()) .build(); - + TransportRequest request; final PlainActionFuture future = new PlainActionFuture<>(); - BytesReference bytes = CompressionHelper.serializedWrite(otherClusterState, localNode.getVersion(), true); - final BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); - transportService.sendRequest( - localNode, - actionName, - request, - new ActionListenerResponseHandler<>(future, in -> TransportResponse.Empty.INSTANCE) - ); - deterministicTaskQueue.runAllTasks(); + if (actionName.equals(VALIDATE_COMPRESSED_JOIN_ACTION_NAME)) { + BytesReference bytes = CompressionHelper.serializeClusterState(otherClusterState, testCluster.localNode, true); + request = new BytesTransportRequest(bytes, testCluster.localNode.getVersion()); + testCluster.transportService.sendRequest( + testCluster.localNode, + actionName, + request, + new ActionListenerResponseHandler<>(future, in -> TransportResponse.Empty.INSTANCE) + ); + } else if (actionName.equals(JoinHelper.VALIDATE_JOIN_ACTION_NAME)) { + request = new ValidateJoinRequest(otherClusterState); + testCluster.transportService.sendRequest( + testCluster.localNode, + actionName, + request, + new ActionListenerResponseHandler<>(future, in -> TransportResponse.Empty.INSTANCE) + ); + } + + testCluster.deterministicTaskQueue.runAllTasks(); final CoordinationStateRejectedException coordinationStateRejectedException = expectThrows( CoordinationStateRejectedException.class, future::actionGet ); assertThat(coordinationStateRejectedException.getMessage(), containsString(expectedMessage)); - assertThat(coordinationStateRejectedException.getMessage(), containsString(localClusterState.metadata().clusterUUID())); + assertThat(coordinationStateRejectedException.getMessage(), containsString(testCluster.localClusterState.metadata().clusterUUID())); assertThat(coordinationStateRejectedException.getMessage(), containsString(otherClusterState.metadata().clusterUUID())); } @@ -339,4 +330,150 @@ public void testJoinFailureOnUnhealthyNodes() { CapturedRequest capturedRequest1a = capturedRequests1a[0]; assertEquals(node1, capturedRequest1a.node); } + + public void testSendValidateJoinFailsOnCompressionHelperException() throws IOException, ExecutionException, InterruptedException, + TimeoutException { + TestClusterSetup testCluster = getTestClusterSetup(); + final CompletableFuture future = new CompletableFuture<>(); + testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, null, new ActionListener<>() { + @Override + public void onResponse(TransportResponse.Empty empty) { + future.completeExceptionally(new AssertionError("validate join should have failed")); + } + + @Override + public void onFailure(Exception e) { + future.complete(e); + } + }); + Throwable t = future.get(10, TimeUnit.SECONDS); + assertTrue(t instanceof ExecutionException); + assertTrue(t.getCause() instanceof NullPointerException); + } + + public void testJoinValidationFailsOnCompressionHelperException() throws IOException { + TestClusterSetup testCluster = getTestClusterSetup(); + final ClusterState otherClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().generateClusterUuidIfNeeded()) + .build(); + TransportRequest request; + final PlainActionFuture future = new PlainActionFuture<>(); + BytesReference bytes = CompressionHelper.serializeClusterState(otherClusterState, testCluster.localNode, false); + request = new BytesTransportRequest(bytes, testCluster.localNode.getVersion()); + testCluster.transportService.sendRequest( + testCluster.localNode, + VALIDATE_COMPRESSED_JOIN_ACTION_NAME, + request, + new ActionListenerResponseHandler<>(future, in -> TransportResponse.Empty.INSTANCE) + ); + testCluster.deterministicTaskQueue.runAllTasks(); + final UncategorizedExecutionException invalidStateException = expectThrows( + UncategorizedExecutionException.class, + future::actionGet + ); + assertTrue(invalidStateException.getCause().getMessage().contains("requires full cluster state")); + } + + public void testJoinHelperCachingOnClusterState() throws IOException, ExecutionException, InterruptedException, TimeoutException { + TestClusterSetup testCluster = getTestClusterSetup(); + final CompletableFuture future = new CompletableFuture<>(); + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(TransportResponse.Empty empty) { + logger.info("validation successful"); + } + + @Override + public void onFailure(Exception e) { + future.completeExceptionally(new AssertionError("validate join should not fail here")); + } + }; + testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, testCluster.localClusterState, listener); + // validation will pass due to cached cluster state + testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, null, listener); + final CompletableFuture future2 = new CompletableFuture<>(); + ActionListener listener2 = new ActionListener<>() { + @Override + public void onResponse(TransportResponse.Empty empty) { + future2.completeExceptionally(new AssertionError("validation should fail now")); + } + + @Override + public void onFailure(Exception e) { + future2.complete(e); + } + }; + Thread.sleep(30 * 1000); + // now sending the validate join request will fail due to null cluster state + testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, null, listener2); + Throwable t = future2.get(10, TimeUnit.SECONDS); + assertTrue(t instanceof ExecutionException); + assertTrue(t.getCause() instanceof NullPointerException); + } + + private TestClusterSetup getTestClusterSetup() { + DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( + Settings.builder().put(NODE_NAME_SETTING.getKey(), "node0").build(), + random() + ); + MockTransport mockTransport = new MockTransport(); + DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + + final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) + .build(); + + TransportService transportService = mockTransport.createTransportService( + Settings.EMPTY, + deterministicTaskQueue.getThreadPool(), + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> localNode, + null, + Collections.emptySet() + ); + JoinHelper joinHelper = new JoinHelper( + Settings.EMPTY, + null, + null, + transportService, + () -> 0L, + () -> localClusterState, + (joinRequest, joinCallback) -> { + throw new AssertionError(); + }, + startJoinRequest -> { throw new AssertionError(); }, + Collections.emptyList(), + (s, p, r) -> {}, + null, + nodeCommissioned -> {}, + namedWriteableRegistry + ); // registers + // request + // handler + transportService.start(); + transportService.acceptIncomingRequests(); + return new TestClusterSetup(deterministicTaskQueue, localNode, transportService, localClusterState, joinHelper); + } + + private static class TestClusterSetup { + public final DeterministicTaskQueue deterministicTaskQueue; + public final DiscoveryNode localNode; + public final TransportService transportService; + public final ClusterState localClusterState; + public final JoinHelper joinHelper; + + public TestClusterSetup( + DeterministicTaskQueue deterministicTaskQueue, + DiscoveryNode localNode, + TransportService transportService, + ClusterState localClusterState, + JoinHelper joinHelper + ) { + this.deterministicTaskQueue = deterministicTaskQueue; + this.localNode = localNode; + this.transportService = transportService; + this.localClusterState = localClusterState; + this.joinHelper = joinHelper; + } + } } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 2752f57b499b3..8f32f6166ce6a 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -213,7 +213,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req requestId, new TransportService.HandshakeResponse(destination, initialState.getClusterName(), destination.getVersion()) ); - } else if (action.equals(JoinHelper.VALIDATE_JOIN_ACTION_NAME)) { + } else if (action.equals(JoinHelper.VALIDATE_COMPRESSED_JOIN_ACTION_NAME)) { handleResponse(requestId, new TransportResponse.Empty()); } else { super.onSendRequest(requestId, action, request, destination); From 41c475ed922eb27bfc9925ba5bb008be89bb46ef Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Tue, 9 May 2023 16:04:11 +0530 Subject: [PATCH 05/16] Refactor util method Signed-off-by: Aman Khare --- .../java/org/opensearch/cluster/coordination/JoinHelper.java | 2 +- .../cluster/coordination/PublicationTransportHandler.java | 2 +- .../org/opensearch/common/compress/CompressionHelper.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 37371be2fb659..c44b798645abf 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -283,7 +283,7 @@ private void handleValidateJoinRequest( ClusterState incomingState; try { if (in.readBoolean()) { - incomingState = CompressionHelper.deserializeFullClusterState(in, transportService); + incomingState = CompressionHelper.deserializeFullClusterState(in, transportService.getLocalNode()); runJoinValidators(currentStateSupplier, incomingState, joinValidators); } else { logger.error("validate new node join request requires full cluster state"); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 00c1976619c2d..9705c698dcb94 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -166,7 +166,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque ClusterState incomingState; try { if (in.readBoolean()) { - incomingState = CompressionHelper.deserializeFullClusterState(in, transportService); + incomingState = CompressionHelper.deserializeFullClusterState(in, transportService.getLocalNode()); fullClusterStateReceivedCount.incrementAndGet(); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); final PublishWithJoinResponse response = acceptState(incomingState); diff --git a/server/src/main/java/org/opensearch/common/compress/CompressionHelper.java b/server/src/main/java/org/opensearch/common/compress/CompressionHelper.java index cd5adc1f99030..e7f756d49c00a 100644 --- a/server/src/main/java/org/opensearch/common/compress/CompressionHelper.java +++ b/server/src/main/java/org/opensearch/common/compress/CompressionHelper.java @@ -75,10 +75,10 @@ public static StreamInput decompressClusterState(BytesTransportRequest request, return in; } - public static ClusterState deserializeFullClusterState(StreamInput in, TransportService transportService) throws IOException { + public static ClusterState deserializeFullClusterState(StreamInput in, DiscoveryNode localNode) throws IOException { final ClusterState incomingState; try (StreamInput input = in) { - incomingState = ClusterState.readFrom(input, transportService.getLocalNode()); + incomingState = ClusterState.readFrom(input, localNode); } catch (Exception e) { logger.warn("unexpected error while deserializing an incoming cluster state", e); throw e; From 8ff8cbc725b884139814b354bb8fd9b1031f214a Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Tue, 9 May 2023 16:13:35 +0530 Subject: [PATCH 06/16] optimize imports Signed-off-by: Aman Khare --- .../java/org/opensearch/common/compress/CompressionHelper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/common/compress/CompressionHelper.java b/server/src/main/java/org/opensearch/common/compress/CompressionHelper.java index e7f756d49c00a..149c22c9db733 100644 --- a/server/src/main/java/org/opensearch/common/compress/CompressionHelper.java +++ b/server/src/main/java/org/opensearch/common/compress/CompressionHelper.java @@ -24,7 +24,6 @@ import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; import org.opensearch.transport.BytesTransportRequest; -import org.opensearch.transport.TransportService; import java.io.IOException; From 7003af63e7999497be649b67b548082aabfce164 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Thu, 25 May 2023 08:52:28 +0530 Subject: [PATCH 07/16] Use cluster state version based cache instead of time based cache Signed-off-by: Aman Khare --- .../cluster/coordination/ZenDiscoveryIT.java | 6 +- .../coordination/ClusterStateUtils.java} | 49 +++--- .../cluster/coordination/JoinHelper.java | 71 +++++---- .../PublicationTransportHandler.java | 16 +- .../common/settings/ClusterSettings.java | 5 +- .../cluster/coordination/JoinHelperTests.java | 140 +++++++++++++----- .../cluster/coordination/NodeJoinTests.java | 23 +-- 7 files changed, 182 insertions(+), 128 deletions(-) rename server/src/main/java/org/opensearch/{common/compress/CompressionHelper.java => cluster/coordination/ClusterStateUtils.java} (67%) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java index dc9034ab53e73..0d4425aedc815 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java @@ -42,7 +42,6 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentFactory; @@ -60,7 +59,6 @@ import java.util.concurrent.TimeoutException; import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY; -import static org.opensearch.cluster.coordination.JoinHelper.CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL; import static org.opensearch.test.NodeRoles.dataNode; import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode; import static org.hamcrest.Matchers.containsString; @@ -108,9 +106,7 @@ public void testNoShardRelocationsOccurWhenElectedClusterManagerNodeFails() thro } public void testHandleNodeJoin_incompatibleClusterState() throws InterruptedException, ExecutionException, TimeoutException { - String clusterManagerNode = internalCluster().startClusterManagerOnlyNode( - Settings.builder().put(CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL.getKey(), TimeValue.timeValueMillis(0)).build() - ); + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); String node1 = internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1); Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, clusterManagerNode); diff --git a/server/src/main/java/org/opensearch/common/compress/CompressionHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java similarity index 67% rename from server/src/main/java/org/opensearch/common/compress/CompressionHelper.java rename to server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java index 149c22c9db733..390da6b29004c 100644 --- a/server/src/main/java/org/opensearch/common/compress/CompressionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java @@ -6,15 +6,16 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.cluster.coordination; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.Diff; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.compress.Compressor; +import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.InputStreamStreamInput; import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput; @@ -30,8 +31,8 @@ /** * A helper class to utilize the compressed stream. */ -public class CompressionHelper { - private static final Logger logger = LogManager.getLogger(CompressionHelper.class); +public final class ClusterStateUtils { + private static final Logger logger = LogManager.getLogger(ClusterStateUtils.class); /** * Serialize the given cluster state or diff. It'll always use compression before writing on a newly created output @@ -46,22 +47,24 @@ public class CompressionHelper { */ public static BytesReference serializeClusterState(Writeable writer, DiscoveryNode node, boolean isFullClusterState) throws IOException { - try { - final BytesStreamOutput bStream = new BytesStreamOutput(); - try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) { - stream.setVersion(node.getVersion()); - stream.writeBoolean(isFullClusterState); - writer.writeTo(stream); - } - final BytesReference serializedByteRef = bStream.bytes(); - logger.trace("serialized writable object for node version [{}] with size [{}]", node.getVersion(), serializedByteRef.length()); - return serializedByteRef; - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("failed to serialize cluster state during validateJoin" + " {}", node), e); - throw e; + final BytesStreamOutput bStream = new BytesStreamOutput(); + try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) { + stream.setVersion(node.getVersion()); + stream.writeBoolean(isFullClusterState); + writer.writeTo(stream); } + final BytesReference serializedByteRef = bStream.bytes(); + logger.trace("serialized writable object for node version [{}] with size [{}]", node.getVersion(), serializedByteRef.length()); + return serializedByteRef; } + /** + * Decompress the incoming compressed BytesTransportRequest into StreamInput which can be deserialized. + * @param request incoming compressed request in bytes form + * @param namedWriteableRegistry existing registry of readers which contains ClusterState writable + * @return StreamInput object containing uncompressed request sent by sender + * @throws IOException if creating StreamInput object fails due to EOF + */ public static StreamInput decompressClusterState(BytesTransportRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException { final Compressor compressor = CompressorFactory.compressor(request.bytes()); @@ -78,22 +81,16 @@ public static ClusterState deserializeFullClusterState(StreamInput in, Discovery final ClusterState incomingState; try (StreamInput input = in) { incomingState = ClusterState.readFrom(input, localNode); - } catch (Exception e) { - logger.warn("unexpected error while deserializing an incoming cluster state", e); - throw e; } return incomingState; } public static Diff deserializeClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException { - final Diff diff; + final Diff incomingStateDiff; // Close stream early to release resources used by the de-compression as early as possible try (StreamInput input = in) { - diff = ClusterState.readDiffFrom(input, localNode); - } catch (Exception e) { - logger.warn("unexpected error while deserializing an incoming cluster state diff", e); - throw e; + incomingStateDiff = ClusterState.readDiffFrom(input, localNode); } - return diff; + return incomingStateDiff; } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index c44b798645abf..0e74020929f84 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -43,7 +43,6 @@ import org.opensearch.cluster.ClusterStateTaskListener; import org.opensearch.common.cache.Cache; import org.opensearch.common.cache.CacheBuilder; -import org.opensearch.common.compress.CompressionHelper; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.coordination.Coordinator.Mode; import org.opensearch.cluster.decommission.NodeDecommissionedException; @@ -107,7 +106,7 @@ public class JoinHelper { public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join"; public static final String VALIDATE_JOIN_ACTION_NAME = "internal:cluster/coordination/join/validate"; - public static final String VALIDATE_COMPRESSED_JOIN_ACTION_NAME = "internal:cluster/coordination/join" + "/validate_compressed"; + public static final String VALIDATE_COMPRESSED_JOIN_ACTION_NAME = JOIN_ACTION_NAME + "/validate_compressed"; public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join"; // the timeout for Zen1 join attempts @@ -133,15 +132,7 @@ public class JoinHelper { private final Supplier joinTaskExecutorGenerator; private final Consumer nodeCommissioned; private final NamedWriteableRegistry namedWriteableRegistry; - private Cache serializedStates; - public static final Setting CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL = Setting.timeSetting( - "cluster_manager.validate_join.cache_interval", - TimeValue.timeValueMillis(30000), - TimeValue.timeValueMillis(0), - TimeValue.timeValueMillis(60000), - Setting.Property.NodeScope - ); - private final long clusterStateRefreshInterval; + private final Cache serializedStates; JoinHelper( Settings settings, @@ -164,12 +155,8 @@ public class JoinHelper { this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); this.nodeCommissioned = nodeCommissioned; this.namedWriteableRegistry = namedWriteableRegistry; - this.clusterStateRefreshInterval = CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL.get(settings).getMillis(); - if (clusterStateRefreshInterval != 0) { - CacheBuilder cacheBuilder = CacheBuilder.builder(); - cacheBuilder.setExpireAfterWrite(CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL.get(settings)); - this.serializedStates = cacheBuilder.build(); - } + CacheBuilder cacheBuilder = CacheBuilder.builder(); + this.serializedStates = cacheBuilder.build(); this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { @@ -247,7 +234,7 @@ public ClusterTasksResult execute(ClusterState currentSta ThreadPool.Names.GENERIC, BytesTransportRequest::new, (request, channel, task) -> { - handleValidateJoinRequest(currentStateSupplier, joinValidators, request); + handleCompressedValidateJoinRequest(currentStateSupplier, joinValidators, request); channel.sendResponse(Empty.INSTANCE); } ); @@ -274,16 +261,16 @@ private void runJoinValidators( joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), incomingState)); } - private void handleValidateJoinRequest( + private void handleCompressedValidateJoinRequest( Supplier currentStateSupplier, Collection> joinValidators, BytesTransportRequest request ) throws IOException { - StreamInput in = CompressionHelper.decompressClusterState(request, namedWriteableRegistry); - ClusterState incomingState; + StreamInput in = null; try { + in = ClusterStateUtils.decompressClusterState(request, namedWriteableRegistry); if (in.readBoolean()) { - incomingState = CompressionHelper.deserializeFullClusterState(in, transportService.getLocalNode()); + ClusterState incomingState = ClusterStateUtils.deserializeFullClusterState(in, transportService.getLocalNode()); runJoinValidators(currentStateSupplier, incomingState, joinValidators); } else { logger.error("validate new node join request requires full cluster state"); @@ -475,26 +462,34 @@ public String executor() { } public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener listener) { - try { - BytesReference bytes; - if (clusterStateRefreshInterval == 0) { - bytes = CompressionHelper.serializeClusterState(state, node, true); - } else { - bytes = serializedStates.computeIfAbsent( - node.getVersion(), - key -> CompressionHelper.serializeClusterState(state, node, true) - ); - } - final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion()); + if (node.getVersion().before(Version.V_2_8_0)) { transportService.sendRequest( node, - VALIDATE_COMPRESSED_JOIN_ACTION_NAME, - request, + VALIDATE_JOIN_ACTION_NAME, + new ValidateJoinRequest(state), new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC) ); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e); - listener.onFailure(e); + } else { + try { + BytesReference bytes; + if (serializedStates.get(state.getVersion()) == null){ + serializedStates.invalidateAll(); + } + bytes = serializedStates.computeIfAbsent( + state.version(), + key -> ClusterStateUtils.serializeClusterState(state, node, true) + ); + final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion()); + transportService.sendRequest( + node, + VALIDATE_COMPRESSED_JOIN_ACTION_NAME, + request, + new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC) + ); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e); + listener.onFailure(e); + } } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 9705c698dcb94..e304eb15c19f0 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -39,7 +39,6 @@ import org.opensearch.action.ActionListener; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterState; -import org.opensearch.common.compress.CompressionHelper; import org.opensearch.cluster.Diff; import org.opensearch.cluster.IncompatibleClusterStateVersionException; import org.opensearch.cluster.node.DiscoveryNode; @@ -162,11 +161,12 @@ public PublishClusterStateStats stats() { } private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException { - StreamInput in = CompressionHelper.decompressClusterState(request, namedWriteableRegistry); - ClusterState incomingState; + StreamInput in = null; try { + in = ClusterStateUtils.decompressClusterState(request, namedWriteableRegistry); + ClusterState incomingState; if (in.readBoolean()) { - incomingState = CompressionHelper.deserializeFullClusterState(in, transportService.getLocalNode()); + incomingState = ClusterStateUtils.deserializeFullClusterState(in, transportService.getLocalNode()); fullClusterStateReceivedCount.incrementAndGet(); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); final PublishWithJoinResponse response = acceptState(incomingState); @@ -180,7 +180,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque throw new IncompatibleClusterStateVersionException("have no local cluster state"); } else { try { - Diff diff = CompressionHelper.deserializeClusterStateDiff(in, lastSeen.getNodes().getLocalNode()); + Diff diff = ClusterStateUtils.deserializeClusterStateDiff(in, lastSeen.getNodes().getLocalNode()); incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException } catch (IncompatibleClusterStateVersionException e) { incompatibleClusterStateDiffReceivedCount.incrementAndGet(); @@ -255,7 +255,7 @@ void buildDiffAndSerializeStates() { try { if (sendFullVersion || previousState.nodes().nodeExists(node) == false) { if (serializedStates.containsKey(node.getVersion()) == false) { - serializedStates.put(node.getVersion(), CompressionHelper.serializeClusterState(newState, node, true)); + serializedStates.put(node.getVersion(), ClusterStateUtils.serializeClusterState(newState, node, true)); } } else { // will send a diff @@ -263,7 +263,7 @@ void buildDiffAndSerializeStates() { diff = newState.diff(previousState); } if (serializedDiffs.containsKey(node.getVersion()) == false) { - final BytesReference serializedDiff = CompressionHelper.serializeClusterState(diff, node, false); + final BytesReference serializedDiff = ClusterStateUtils.serializeClusterState(diff, node, false); serializedDiffs.put(node.getVersion(), serializedDiff); logger.trace( "serialized cluster state diff for version [{}] in for node version [{}] with size [{}]", @@ -359,7 +359,7 @@ private void sendFullClusterState(DiscoveryNode destination, ActionListener future = new PlainActionFuture<>(); if (actionName.equals(VALIDATE_COMPRESSED_JOIN_ACTION_NAME)) { - BytesReference bytes = CompressionHelper.serializeClusterState(otherClusterState, testCluster.localNode, true); + BytesReference bytes = ClusterStateUtils.serializeClusterState(otherClusterState, testCluster.localNode, true); request = new BytesTransportRequest(bytes, testCluster.localNode.getVersion()); testCluster.transportService.sendRequest( testCluster.localNode, @@ -238,7 +238,7 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName, request, new ActionListenerResponseHandler<>(future, in -> TransportResponse.Empty.INSTANCE) ); - } else if (actionName.equals(JoinHelper.VALIDATE_JOIN_ACTION_NAME)) { + } else if (actionName.equals(VALIDATE_JOIN_ACTION_NAME)) { request = new ValidateJoinRequest(otherClusterState); testCluster.transportService.sendRequest( testCluster.localNode, @@ -331,9 +331,8 @@ public void testJoinFailureOnUnhealthyNodes() { assertEquals(node1, capturedRequest1a.node); } - public void testSendValidateJoinFailsOnCompressionHelperException() throws IOException, ExecutionException, InterruptedException, - TimeoutException { - TestClusterSetup testCluster = getTestClusterSetup(); + public void testSendCompressedValidateJoinFailOnSerializeFailure() throws ExecutionException, InterruptedException, TimeoutException { + TestClusterSetup testCluster = getTestClusterSetup(Version.CURRENT, false); final CompletableFuture future = new CompletableFuture<>(); testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, null, new ActionListener<>() { @Override @@ -347,18 +346,42 @@ public void onFailure(Exception e) { } }); Throwable t = future.get(10, TimeUnit.SECONDS); - assertTrue(t instanceof ExecutionException); - assertTrue(t.getCause() instanceof NullPointerException); + assertTrue(t instanceof NullPointerException); } - public void testJoinValidationFailsOnCompressionHelperException() throws IOException { - TestClusterSetup testCluster = getTestClusterSetup(); - final ClusterState otherClusterState = ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().generateClusterUuidIfNeeded()) - .build(); + public void testValidateJoinSentWithCorrectActionForVersions() { + verifyValidateJoinActionSent(VALIDATE_JOIN_ACTION_NAME, Version.V_2_1_0); + verifyValidateJoinActionSent(VALIDATE_JOIN_ACTION_NAME, Version.V_2_7_0); + verifyValidateJoinActionSent(VALIDATE_COMPRESSED_JOIN_ACTION_NAME, Version.V_2_8_0); + verifyValidateJoinActionSent(VALIDATE_COMPRESSED_JOIN_ACTION_NAME, Version.CURRENT); + } + + private void verifyValidateJoinActionSent(String expectedActionName, Version version) { + TestClusterSetup testCluster = getTestClusterSetup(version, true); + final CompletableFuture future = new CompletableFuture<>(); + DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), version); + testCluster.joinHelper.sendValidateJoinRequest(node1, testCluster.localClusterState, new ActionListener<>() { + @Override + public void onResponse(TransportResponse.Empty empty) { + throw new AssertionError("capturing transport shouldn't run"); + } + + @Override + public void onFailure(Exception e) { + future.complete(e); + } + }); + + CapturedRequest[] validateRequests = testCluster.capturingTransport.getCapturedRequestsAndClear(); + assertEquals(1, validateRequests.length); + assertEquals(expectedActionName, validateRequests[0].action); + } + + public void testJoinValidationFailsOnSendingCompressedDiffClusterState() throws IOException { + TestClusterSetup testCluster = getTestClusterSetup(Version.CURRENT, false); TransportRequest request; final PlainActionFuture future = new PlainActionFuture<>(); - BytesReference bytes = CompressionHelper.serializeClusterState(otherClusterState, testCluster.localNode, false); + BytesReference bytes = ClusterStateUtils.serializeClusterState(testCluster.localClusterState, testCluster.localNode, false); request = new BytesTransportRequest(bytes, testCluster.localNode.getVersion()); testCluster.transportService.sendRequest( testCluster.localNode, @@ -374,8 +397,23 @@ public void testJoinValidationFailsOnCompressionHelperException() throws IOExcep assertTrue(invalidStateException.getCause().getMessage().contains("requires full cluster state")); } - public void testJoinHelperCachingOnClusterState() throws IOException, ExecutionException, InterruptedException, TimeoutException { - TestClusterSetup testCluster = getTestClusterSetup(); + public void testJoinValidationFailsOnDecompressionFailure() { + TestClusterSetup testCluster = getTestClusterSetup(Version.CURRENT, false); + TransportRequest request; + final PlainActionFuture future = new PlainActionFuture<>(); + request = new BytesTransportRequest(null, testCluster.localNode.getVersion()); + testCluster.transportService.sendRequest( + testCluster.localNode, + VALIDATE_COMPRESSED_JOIN_ACTION_NAME, + request, + new ActionListenerResponseHandler<>(future, in -> TransportResponse.Empty.INSTANCE) + ); + testCluster.deterministicTaskQueue.runAllTasks(); + expectThrows(NullPointerException.class, future::actionGet); + } + + public void testJoinHelperCachingOnClusterState() throws ExecutionException, InterruptedException, TimeoutException { + TestClusterSetup testCluster = getTestClusterSetup(Version.CURRENT, false); final CompletableFuture future = new CompletableFuture<>(); ActionListener listener = new ActionListener<>() { @Override @@ -390,7 +428,9 @@ public void onFailure(Exception e) { }; testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, testCluster.localClusterState, listener); // validation will pass due to cached cluster state - testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, null, listener); + ClusterState randomState = ClusterState.builder(new ClusterName("random")).version(testCluster.localClusterState.version()).build(); + testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, randomState, listener); + final CompletableFuture future2 = new CompletableFuture<>(); ActionListener listener2 = new ActionListener<>() { @Override @@ -403,34 +443,55 @@ public void onFailure(Exception e) { future2.complete(e); } }; - Thread.sleep(30 * 1000); - // now sending the validate join request will fail due to null cluster state - testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, null, listener2); + DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); + ClusterState randomState2 = ClusterState.builder(new ClusterName("random")) + .stateUUID("random2") + .version(testCluster.localClusterState.version() + 1) + .build(); + // now sending the validate join request will fail due to random cluster name because version is changed + // and cache will be invalidated + testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, randomState2, listener2); + testCluster.deterministicTaskQueue.runAllTasks(); + Throwable t = future2.get(10, TimeUnit.SECONDS); - assertTrue(t instanceof ExecutionException); - assertTrue(t.getCause() instanceof NullPointerException); + assertTrue(t instanceof RemoteTransportException); + assertTrue(t.getCause() instanceof CoordinationStateRejectedException); + assertTrue(t.getCause().getMessage().contains("different cluster uuid")); } - private TestClusterSetup getTestClusterSetup() { + private TestClusterSetup getTestClusterSetup(Version version, boolean isCapturingTransport) { + version = version == null ? Version.CURRENT : version; DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( Settings.builder().put(NODE_NAME_SETTING.getKey(), "node0").build(), random() ); MockTransport mockTransport = new MockTransport(); - DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + CapturingTransport capturingTransport = new CapturingTransport(); + DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), version); final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) .build(); - - TransportService transportService = mockTransport.createTransportService( - Settings.EMPTY, - deterministicTaskQueue.getThreadPool(), - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - x -> localNode, - null, - Collections.emptySet() - ); + TransportService transportService; + if (isCapturingTransport) { + transportService = capturingTransport.createTransportService( + Settings.EMPTY, + deterministicTaskQueue.getThreadPool(), + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> localNode, + null, + Collections.emptySet() + ); + } else { + transportService = mockTransport.createTransportService( + Settings.EMPTY, + deterministicTaskQueue.getThreadPool(), + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> localNode, + null, + Collections.emptySet() + ); + } JoinHelper joinHelper = new JoinHelper( Settings.EMPTY, null, @@ -452,28 +513,31 @@ private TestClusterSetup getTestClusterSetup() { // handler transportService.start(); transportService.acceptIncomingRequests(); - return new TestClusterSetup(deterministicTaskQueue, localNode, transportService, localClusterState, joinHelper); + return new TestClusterSetup(deterministicTaskQueue, localNode, transportService, localClusterState, joinHelper, capturingTransport); } private static class TestClusterSetup { public final DeterministicTaskQueue deterministicTaskQueue; public final DiscoveryNode localNode; - public final TransportService transportService; + public TransportService transportService; public final ClusterState localClusterState; public final JoinHelper joinHelper; + public final CapturingTransport capturingTransport; public TestClusterSetup( DeterministicTaskQueue deterministicTaskQueue, DiscoveryNode localNode, TransportService transportService, ClusterState localClusterState, - JoinHelper joinHelper + JoinHelper joinHelper, + CapturingTransport capturingTransport ) { this.deterministicTaskQueue = deterministicTaskQueue; this.localNode = localNode; this.transportService = transportService; this.localClusterState = localClusterState; this.joinHelper = joinHelper; + this.capturingTransport = capturingTransport; } } } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 8f32f6166ce6a..fb2e7cd73d3bf 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -208,15 +208,20 @@ private void setupClusterManagerServiceAndCoordinator( CapturingTransport capturingTransport = new CapturingTransport() { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) { - if (action.equals(HANDSHAKE_ACTION_NAME)) { - handleResponse( - requestId, - new TransportService.HandshakeResponse(destination, initialState.getClusterName(), destination.getVersion()) - ); - } else if (action.equals(JoinHelper.VALIDATE_COMPRESSED_JOIN_ACTION_NAME)) { - handleResponse(requestId, new TransportResponse.Empty()); - } else { - super.onSendRequest(requestId, action, request, destination); + switch (action) { + case HANDSHAKE_ACTION_NAME: + handleResponse( + requestId, + new TransportService.HandshakeResponse(destination, initialState.getClusterName(), destination.getVersion()) + ); + break; + case JoinHelper.VALIDATE_JOIN_ACTION_NAME: + case JoinHelper.VALIDATE_COMPRESSED_JOIN_ACTION_NAME: + handleResponse(requestId, new TransportResponse.Empty()); + break; + default: + super.onSendRequest(requestId, action, request, destination); + break; } } From 772d7d32c05d502089467de62a5fea977129c129 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Thu, 25 May 2023 09:33:42 +0530 Subject: [PATCH 08/16] style fix Signed-off-by: Aman Khare --- .../org/opensearch/cluster/coordination/JoinHelper.java | 2 +- .../opensearch/cluster/coordination/JoinHelperTests.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 0e74020929f84..36f6bf222feda 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -472,7 +472,7 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti } else { try { BytesReference bytes; - if (serializedStates.get(state.getVersion()) == null){ + if (serializedStates.get(state.getVersion()) == null) { serializedStates.invalidateAll(); } bytes = serializedStates.computeIfAbsent( diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java index 9c439162a8150..2ac3634958097 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java @@ -428,7 +428,8 @@ public void onFailure(Exception e) { }; testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, testCluster.localClusterState, listener); // validation will pass due to cached cluster state - ClusterState randomState = ClusterState.builder(new ClusterName("random")).version(testCluster.localClusterState.version()).build(); + ClusterState randomState = ClusterState.builder(new ClusterName("random")).stateUUID("random2"). + version(testCluster.localClusterState.version()).build(); testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, randomState, listener); final CompletableFuture future2 = new CompletableFuture<>(); @@ -443,12 +444,11 @@ public void onFailure(Exception e) { future2.complete(e); } }; - DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); ClusterState randomState2 = ClusterState.builder(new ClusterName("random")) .stateUUID("random2") .version(testCluster.localClusterState.version() + 1) .build(); - // now sending the validate join request will fail due to random cluster name because version is changed + // now sending the validate join request will fail due to random cluster uuid because version is changed // and cache will be invalidated testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, randomState2, listener2); testCluster.deterministicTaskQueue.runAllTasks(); From ac7bdd62f945f1147107fe5e1d2181e8c47aff15 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Thu, 25 May 2023 09:58:26 +0530 Subject: [PATCH 09/16] fix styling 2 Signed-off-by: Aman Khare --- .../opensearch/cluster/coordination/JoinHelperTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java index 2ac3634958097..95dbea894b79e 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java @@ -428,8 +428,10 @@ public void onFailure(Exception e) { }; testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, testCluster.localClusterState, listener); // validation will pass due to cached cluster state - ClusterState randomState = ClusterState.builder(new ClusterName("random")).stateUUID("random2"). - version(testCluster.localClusterState.version()).build(); + ClusterState randomState = ClusterState.builder(new ClusterName("random")) + .stateUUID("random2") + .version(testCluster.localClusterState.version()) + .build(); testCluster.joinHelper.sendValidateJoinRequest(testCluster.localNode, randomState, listener); final CompletableFuture future2 = new CompletableFuture<>(); From 0f3aaa863c1582153da0156bf44e80761248af56 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Tue, 30 May 2023 12:42:32 +0530 Subject: [PATCH 10/16] Use concurrent hashmap instead of cache, add UT class for ClusterStateUtils Signed-off-by: Aman Khare --- .../cluster/coordination/ZenDiscoveryIT.java | 1 + .../coordination/ClusterStateUtils.java | 2 + .../cluster/coordination/JoinHelper.java | 29 ++--- .../bootstrap/test-framework.policy | 4 + .../coordination/ClusterStateUtilsTests.java | 109 ++++++++++++++++++ .../cluster/coordination/JoinHelperTests.java | 2 +- 6 files changed, 132 insertions(+), 15 deletions(-) create mode 100644 server/src/test/java/org/opensearch/cluster/coordination/ClusterStateUtilsTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java index 0d4425aedc815..aaba53dcb2b07 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java @@ -117,6 +117,7 @@ public void testHandleNodeJoin_incompatibleClusterState() throws InterruptedExce final CompletableFuture future = new CompletableFuture<>(); DiscoveryNode node = state.nodes().getLocalNode(); + coordinator.sendValidateJoinRequest( stateWithCustomMetadata, new JoinRequest(node, 0L, Optional.empty()), diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java index 390da6b29004c..dd3d6fe29fe55 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java @@ -30,6 +30,8 @@ /** * A helper class to utilize the compressed stream. + * + * @opensearch.internal */ public final class ClusterStateUtils { private static final Logger logger = LogManager.getLogger(ClusterStateUtils.class); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 36f6bf222feda..2622c30f71d11 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -41,8 +41,6 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateTaskConfig; import org.opensearch.cluster.ClusterStateTaskListener; -import org.opensearch.common.cache.Cache; -import org.opensearch.common.cache.CacheBuilder; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.coordination.Coordinator.Mode; import org.opensearch.cluster.decommission.NodeDecommissionedException; @@ -86,6 +84,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -132,7 +131,7 @@ public class JoinHelper { private final Supplier joinTaskExecutorGenerator; private final Consumer nodeCommissioned; private final NamedWriteableRegistry namedWriteableRegistry; - private final Cache serializedStates; + private final ConcurrentHashMap serializedStates = new ConcurrentHashMap<>(); JoinHelper( Settings settings, @@ -155,8 +154,6 @@ public class JoinHelper { this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); this.nodeCommissioned = nodeCommissioned; this.namedWriteableRegistry = namedWriteableRegistry; - CacheBuilder cacheBuilder = CacheBuilder.builder(); - this.serializedStates = cacheBuilder.build(); this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { @@ -462,7 +459,7 @@ public String executor() { } public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener listener) { - if (node.getVersion().before(Version.V_2_8_0)) { + if (node.getVersion().before(Version.V_3_0_0)) { transportService.sendRequest( node, VALIDATE_JOIN_ACTION_NAME, @@ -471,14 +468,18 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti ); } else { try { - BytesReference bytes; - if (serializedStates.get(state.getVersion()) == null) { - serializedStates.invalidateAll(); + if (!serializedStates.containsKey(state.version())){ + serializedStates.clear(); } - bytes = serializedStates.computeIfAbsent( - state.version(), - key -> ClusterStateUtils.serializeClusterState(state, node, true) - ); + BytesReference bytes = serializedStates.computeIfAbsent(state.version(), + version -> { + try { + return ClusterStateUtils.serializeClusterState(state, node, true); + } catch (IOException e) { + // mandatory as ConcurrentHashMap doesn't rethrow IOException. + throw new RuntimeException(e); + } + }); final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion()); transportService.sendRequest( node, @@ -487,7 +488,7 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC) ); } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e); + logger.warn("error sending cluster state to {}", node); listener.onFailure(e); } } diff --git a/server/src/main/resources/org/opensearch/bootstrap/test-framework.policy b/server/src/main/resources/org/opensearch/bootstrap/test-framework.policy index 41076569a3c50..0abfd7ef22ae7 100644 --- a/server/src/main/resources/org/opensearch/bootstrap/test-framework.policy +++ b/server/src/main/resources/org/opensearch/bootstrap/test-framework.policy @@ -153,4 +153,8 @@ grant codeBase "file:${gradle.worker.jar}" { grant { // since the gradle test worker jar is on the test classpath, our tests should be able to read it permission java.io.FilePermission "${gradle.worker.jar}", "read"; + permission java.lang.RuntimePermission "accessDeclaredMembers"; + permission java.lang.RuntimePermission "reflectionFactoryAccess"; + permission java.lang.RuntimePermission "accessClassInPackage.sun.reflect"; + permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; }; diff --git a/server/src/test/java/org/opensearch/cluster/coordination/ClusterStateUtilsTests.java b/server/src/test/java/org/opensearch/cluster/coordination/ClusterStateUtilsTests.java new file mode 100644 index 0000000000000..975ce43c77e05 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/coordination/ClusterStateUtilsTests.java @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.BytesTransportRequest; + +import java.io.EOFException; +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Contains tests for {@link ClusterStateUtils} + */ +public class ClusterStateUtilsTests extends OpenSearchTestCase { + + public void testSerializeClusterState() throws IOException { + // serialization success with normal state + final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) + .build(); + DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode, false); + assertNotNull(bytes); + + // Fail on write failure on mocked cluster state's writeTo exception + ClusterState mockedState = mock(ClusterState.class); + doThrow(IOException.class).when(mockedState).writeTo(any()); + assertThrows(IOException.class, () -> ClusterStateUtils.serializeClusterState(mockedState, localNode, false)); + } + + public void testDecompressClusterState() throws IOException { + // Decompression works fine + final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) + .build(); + DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode, false); + BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); + StreamInput in = ClusterStateUtils.decompressClusterState(request, DEFAULT_NAMED_WRITABLE_REGISTRY); + assertEquals(request.version(), in.getVersion()); + + // Decompression fails with AssertionError on non-compressed request + BytesTransportRequest mockedRequest = mock(BytesTransportRequest.class, RETURNS_DEEP_STUBS); + when(mockedRequest.bytes().streamInput()).thenThrow(IOException.class); + assertThrows(AssertionError.class, () -> ClusterStateUtils.decompressClusterState(mockedRequest, + DEFAULT_NAMED_WRITABLE_REGISTRY)); + } + + public void testDeserializeFullClusterState() throws IOException { + final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) + .build(); + DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode, true); + BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); + StreamInput in = ClusterStateUtils.decompressClusterState(request, DEFAULT_NAMED_WRITABLE_REGISTRY); + ClusterState decompressedState = null; + // success when state data is correct + if (in.readBoolean()) { + decompressedState = ClusterStateUtils.deserializeFullClusterState(in, localNode); + } + assertEquals(localClusterState.getClusterName(), decompressedState.getClusterName()); + assertEquals(localClusterState.metadata().clusterUUID(), decompressedState.metadata().clusterUUID()); + + // failure when mocked stream or null + assertThrows(NullPointerException.class, () -> ClusterStateUtils.deserializeFullClusterState(null, localNode)); + StreamInput mockedStreamInput = mock(StreamInput.class, RETURNS_DEEP_STUBS); + assertThrows(NullPointerException.class, () -> ClusterStateUtils.deserializeFullClusterState(mockedStreamInput, + localNode)); + } + + public void testDeserializeDiffClusterState() throws IOException { + final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) + .build(); + DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + // fail with NPE if mocked stream is passed + StreamInput mockedStreamInput = mock(StreamInput.class, RETURNS_DEEP_STUBS); + assertThrows(NullPointerException.class, () -> ClusterStateUtils.deserializeClusterStateDiff(mockedStreamInput, + localNode)); + + // fail with EOF is full cluster state is passed + BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode, true); + BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); + try (StreamInput in = ClusterStateUtils.decompressClusterState(request, DEFAULT_NAMED_WRITABLE_REGISTRY)) { + assertThrows(EOFException.class, () -> ClusterStateUtils.deserializeClusterStateDiff(in, + localNode)); + } + } +} diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java index 95dbea894b79e..a62962f684e71 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java @@ -352,7 +352,7 @@ public void onFailure(Exception e) { public void testValidateJoinSentWithCorrectActionForVersions() { verifyValidateJoinActionSent(VALIDATE_JOIN_ACTION_NAME, Version.V_2_1_0); verifyValidateJoinActionSent(VALIDATE_JOIN_ACTION_NAME, Version.V_2_7_0); - verifyValidateJoinActionSent(VALIDATE_COMPRESSED_JOIN_ACTION_NAME, Version.V_2_8_0); + verifyValidateJoinActionSent(VALIDATE_JOIN_ACTION_NAME, Version.V_2_8_0); verifyValidateJoinActionSent(VALIDATE_COMPRESSED_JOIN_ACTION_NAME, Version.CURRENT); } From e2ab91e364a321dc5343196e8fa209dc85296fe8 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Tue, 30 May 2023 14:09:05 +0530 Subject: [PATCH 11/16] style fix Signed-off-by: Aman Khare --- .../cluster/coordination/JoinHelper.java | 19 +++++++++---------- .../coordination/ClusterStateUtilsTests.java | 12 ++++-------- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 2622c30f71d11..f1686cce0867f 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -468,18 +468,17 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti ); } else { try { - if (!serializedStates.containsKey(state.version())){ + if (!serializedStates.containsKey(state.version())) { serializedStates.clear(); } - BytesReference bytes = serializedStates.computeIfAbsent(state.version(), - version -> { - try { - return ClusterStateUtils.serializeClusterState(state, node, true); - } catch (IOException e) { - // mandatory as ConcurrentHashMap doesn't rethrow IOException. - throw new RuntimeException(e); - } - }); + BytesReference bytes = serializedStates.computeIfAbsent(state.version(), version -> { + try { + return ClusterStateUtils.serializeClusterState(state, node, true); + } catch (IOException e) { + // mandatory as ConcurrentHashMap doesn't rethrow IOException. + throw new RuntimeException(e); + } + }); final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion()); transportService.sendRequest( node, diff --git a/server/src/test/java/org/opensearch/cluster/coordination/ClusterStateUtilsTests.java b/server/src/test/java/org/opensearch/cluster/coordination/ClusterStateUtilsTests.java index 975ce43c77e05..ba2c36dc669e1 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/ClusterStateUtilsTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/ClusterStateUtilsTests.java @@ -61,8 +61,7 @@ public void testDecompressClusterState() throws IOException { // Decompression fails with AssertionError on non-compressed request BytesTransportRequest mockedRequest = mock(BytesTransportRequest.class, RETURNS_DEEP_STUBS); when(mockedRequest.bytes().streamInput()).thenThrow(IOException.class); - assertThrows(AssertionError.class, () -> ClusterStateUtils.decompressClusterState(mockedRequest, - DEFAULT_NAMED_WRITABLE_REGISTRY)); + assertThrows(AssertionError.class, () -> ClusterStateUtils.decompressClusterState(mockedRequest, DEFAULT_NAMED_WRITABLE_REGISTRY)); } public void testDeserializeFullClusterState() throws IOException { @@ -84,8 +83,7 @@ public void testDeserializeFullClusterState() throws IOException { // failure when mocked stream or null assertThrows(NullPointerException.class, () -> ClusterStateUtils.deserializeFullClusterState(null, localNode)); StreamInput mockedStreamInput = mock(StreamInput.class, RETURNS_DEEP_STUBS); - assertThrows(NullPointerException.class, () -> ClusterStateUtils.deserializeFullClusterState(mockedStreamInput, - localNode)); + assertThrows(NullPointerException.class, () -> ClusterStateUtils.deserializeFullClusterState(mockedStreamInput, localNode)); } public void testDeserializeDiffClusterState() throws IOException { @@ -95,15 +93,13 @@ public void testDeserializeDiffClusterState() throws IOException { DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); // fail with NPE if mocked stream is passed StreamInput mockedStreamInput = mock(StreamInput.class, RETURNS_DEEP_STUBS); - assertThrows(NullPointerException.class, () -> ClusterStateUtils.deserializeClusterStateDiff(mockedStreamInput, - localNode)); + assertThrows(NullPointerException.class, () -> ClusterStateUtils.deserializeClusterStateDiff(mockedStreamInput, localNode)); // fail with EOF is full cluster state is passed BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode, true); BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); try (StreamInput in = ClusterStateUtils.decompressClusterState(request, DEFAULT_NAMED_WRITABLE_REGISTRY)) { - assertThrows(EOFException.class, () -> ClusterStateUtils.deserializeClusterStateDiff(in, - localNode)); + assertThrows(EOFException.class, () -> ClusterStateUtils.deserializeClusterStateDiff(in, localNode)); } } } From 8b3197f2e85efd8b77bf377d8e39d44092d995e6 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Wed, 31 May 2023 15:54:32 +0530 Subject: [PATCH 12/16] Use AtomicReference instead of ConcurrentHashMap Signed-off-by: Aman Khare --- .../cluster/coordination/JoinHelper.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index f1686cce0867f..5480f96be54fe 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -84,7 +84,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -131,7 +130,7 @@ public class JoinHelper { private final Supplier joinTaskExecutorGenerator; private final Consumer nodeCommissioned; private final NamedWriteableRegistry namedWriteableRegistry; - private final ConcurrentHashMap serializedStates = new ConcurrentHashMap<>(); + private final AtomicReference> serializedState = new AtomicReference<>(); JoinHelper( Settings settings, @@ -468,17 +467,18 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti ); } else { try { - if (!serializedStates.containsKey(state.version())) { - serializedStates.clear(); - } - BytesReference bytes = serializedStates.computeIfAbsent(state.version(), version -> { - try { - return ClusterStateUtils.serializeClusterState(state, node, true); - } catch (IOException e) { - // mandatory as ConcurrentHashMap doesn't rethrow IOException. - throw new RuntimeException(e); + final BytesReference bytes = serializedState.updateAndGet(cachedState -> { + if (cachedState == null || cachedState.v1() != state.version()) { + try { + return new Tuple<>(state.version(), ClusterStateUtils.serializeClusterState(state, node, true)); + } catch (IOException e) { + // mandatory as AtomicReference doesn't rethrow IOException. + throw new RuntimeException(e); + } + } else { + return cachedState; } - }); + }).v2(); final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion()); transportService.sendRequest( node, From 4767c5a59434c5bf94ae2ecaa41e0c79e28ef8b9 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Thu, 1 Jun 2023 13:07:29 +0530 Subject: [PATCH 13/16] Use method overloading to simplify the caller code Signed-off-by: Aman Khare --- .../coordination/ClusterStateUtils.java | 31 ++++++++++++++++--- .../cluster/coordination/JoinHelper.java | 2 +- .../PublicationTransportHandler.java | 6 ++-- .../coordination/ClusterStateUtilsTests.java | 10 +++--- .../cluster/coordination/JoinHelperTests.java | 6 ++-- 5 files changed, 40 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java index dd3d6fe29fe55..0a5b45274986e 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java @@ -36,18 +36,41 @@ public final class ClusterStateUtils { private static final Logger logger = LogManager.getLogger(ClusterStateUtils.class); + /** + * Serialize the given cluster state. It'll always use compression before writing on a newly created output stream. + * + * @param clusterState full cluster state of the cluster + * @param node version of cluster node where we are sending the state + * @return reference to serialized bytes + * @throws IOException if writing on the compressed stream is failed. + */ + public static BytesReference serializeClusterState(ClusterState clusterState, DiscoveryNode node) throws IOException { + return serializeClusterStateOrDiff(clusterState, node, true); + } + + /** + * Serialize the given cluster state diff. It'll always use compression before writing on a newly created output + * stream. + * + * @param clusterStateDiff diff of two cluster states + * @param node node where we are sending the state + * @return reference to serialized bytes + * @throws IOException if writing on the compressed stream is failed. + */ + public static BytesReference serializeClusterState(Diff clusterStateDiff, DiscoveryNode node) throws IOException { + return serializeClusterStateOrDiff(clusterStateDiff, node, false); + } + /** * Serialize the given cluster state or diff. It'll always use compression before writing on a newly created output * stream. * * @param writer Object which is going to write the content - * @param node version of cluster node - * @param isFullClusterState flag used at receiver end to make intelligent decisions. For example, ClusterState - * assumes full state of diff of the states based on this flag. + * @param node cluster node where we are sending cluster state or diff * @return reference to serialized bytes * @throws IOException if writing on the compressed stream is failed. */ - public static BytesReference serializeClusterState(Writeable writer, DiscoveryNode node, boolean isFullClusterState) + private static BytesReference serializeClusterStateOrDiff(Writeable writer, DiscoveryNode node, boolean isFullClusterState) throws IOException { final BytesStreamOutput bStream = new BytesStreamOutput(); try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 5480f96be54fe..9a113adc7290f 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -470,7 +470,7 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti final BytesReference bytes = serializedState.updateAndGet(cachedState -> { if (cachedState == null || cachedState.v1() != state.version()) { try { - return new Tuple<>(state.version(), ClusterStateUtils.serializeClusterState(state, node, true)); + return new Tuple<>(state.version(), ClusterStateUtils.serializeClusterState(state, node)); } catch (IOException e) { // mandatory as AtomicReference doesn't rethrow IOException. throw new RuntimeException(e); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index e304eb15c19f0..37286022bc0bf 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -255,7 +255,7 @@ void buildDiffAndSerializeStates() { try { if (sendFullVersion || previousState.nodes().nodeExists(node) == false) { if (serializedStates.containsKey(node.getVersion()) == false) { - serializedStates.put(node.getVersion(), ClusterStateUtils.serializeClusterState(newState, node, true)); + serializedStates.put(node.getVersion(), ClusterStateUtils.serializeClusterState(newState, node)); } } else { // will send a diff @@ -263,7 +263,7 @@ void buildDiffAndSerializeStates() { diff = newState.diff(previousState); } if (serializedDiffs.containsKey(node.getVersion()) == false) { - final BytesReference serializedDiff = ClusterStateUtils.serializeClusterState(diff, node, false); + final BytesReference serializedDiff = ClusterStateUtils.serializeClusterState(diff, node); serializedDiffs.put(node.getVersion(), serializedDiff); logger.trace( "serialized cluster state diff for version [{}] in for node version [{}] with size [{}]", @@ -359,7 +359,7 @@ private void sendFullClusterState(DiscoveryNode destination, ActionListener ClusterStateUtils.serializeClusterState(mockedState, localNode, false)); + assertThrows(IOException.class, () -> ClusterStateUtils.serializeClusterState(mockedState, localNode)); } public void testDecompressClusterState() throws IOException { @@ -53,7 +53,7 @@ public void testDecompressClusterState() throws IOException { .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) .build(); DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); - BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode, false); + BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode); BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); StreamInput in = ClusterStateUtils.decompressClusterState(request, DEFAULT_NAMED_WRITABLE_REGISTRY); assertEquals(request.version(), in.getVersion()); @@ -69,7 +69,7 @@ public void testDeserializeFullClusterState() throws IOException { .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) .build(); DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); - BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode, true); + BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode); BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); StreamInput in = ClusterStateUtils.decompressClusterState(request, DEFAULT_NAMED_WRITABLE_REGISTRY); ClusterState decompressedState = null; @@ -96,7 +96,7 @@ public void testDeserializeDiffClusterState() throws IOException { assertThrows(NullPointerException.class, () -> ClusterStateUtils.deserializeClusterStateDiff(mockedStreamInput, localNode)); // fail with EOF is full cluster state is passed - BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode, true); + BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode); BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); try (StreamInput in = ClusterStateUtils.decompressClusterState(request, DEFAULT_NAMED_WRITABLE_REGISTRY)) { assertThrows(EOFException.class, () -> ClusterStateUtils.deserializeClusterStateDiff(in, localNode)); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java index a62962f684e71..eae7b7093b220 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java @@ -38,6 +38,7 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.Diff; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -230,7 +231,7 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName, TransportRequest request; final PlainActionFuture future = new PlainActionFuture<>(); if (actionName.equals(VALIDATE_COMPRESSED_JOIN_ACTION_NAME)) { - BytesReference bytes = ClusterStateUtils.serializeClusterState(otherClusterState, testCluster.localNode, true); + BytesReference bytes = ClusterStateUtils.serializeClusterState(otherClusterState, testCluster.localNode); request = new BytesTransportRequest(bytes, testCluster.localNode.getVersion()); testCluster.transportService.sendRequest( testCluster.localNode, @@ -379,9 +380,10 @@ public void onFailure(Exception e) { public void testJoinValidationFailsOnSendingCompressedDiffClusterState() throws IOException { TestClusterSetup testCluster = getTestClusterSetup(Version.CURRENT, false); + Diff clusterStateDiff = testCluster.localClusterState.diff(ClusterState.EMPTY_STATE); TransportRequest request; final PlainActionFuture future = new PlainActionFuture<>(); - BytesReference bytes = ClusterStateUtils.serializeClusterState(testCluster.localClusterState, testCluster.localNode, false); + BytesReference bytes = ClusterStateUtils.serializeClusterState(clusterStateDiff, testCluster.localNode); request = new BytesTransportRequest(bytes, testCluster.localNode.getVersion()); testCluster.transportService.sendRequest( testCluster.localNode, From d346a716d5fea8f8fafa2481ef4447afeb19e652 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Thu, 1 Jun 2023 13:12:28 +0530 Subject: [PATCH 14/16] Resolve conflicts Signed-off-by: Aman Khare --- CHANGELOG.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bb26829ff38a7..5f6e33079849c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,11 +49,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) - Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792)) -<<<<<<< HEAD -- Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321)) -======= - Reduce memory copy in zstd compression ([#7681](https://github.com/opensearch-project/OpenSearch/pull/7681)) ->>>>>>> upstream/main +- Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321)) ### Deprecated From d9092d57174bfca52719d5028ca45199e125d7e0 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Tue, 6 Jun 2023 14:11:06 +0530 Subject: [PATCH 15/16] Change code structure to separate the flow for JoinHelper and PublicationTransportHelper Signed-off-by: Aman Khare --- .../coordination/ClusterStateUtils.java | 121 ------------------ .../coordination/CompressedStreamUtils.java | 61 +++++++++ .../cluster/coordination/JoinHelper.java | 23 ++-- .../PublicationTransportHandler.java | 51 ++++++-- .../coordination/ClusterStateUtilsTests.java | 105 --------------- .../CompressedStreamUtilsTests.java | 65 ++++++++++ .../cluster/coordination/JoinHelperTests.java | 28 +--- 7 files changed, 178 insertions(+), 276 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java create mode 100644 server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java delete mode 100644 server/src/test/java/org/opensearch/cluster/coordination/ClusterStateUtilsTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/coordination/CompressedStreamUtilsTests.java diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java b/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java deleted file mode 100644 index 0a5b45274986e..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/coordination/ClusterStateUtils.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.coordination; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.Diff; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.compress.Compressor; -import org.opensearch.common.compress.CompressorFactory; -import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.common.io.stream.InputStreamStreamInput; -import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput; -import org.opensearch.common.io.stream.NamedWriteableRegistry; -import org.opensearch.common.io.stream.OutputStreamStreamOutput; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; -import org.opensearch.transport.BytesTransportRequest; - -import java.io.IOException; - -/** - * A helper class to utilize the compressed stream. - * - * @opensearch.internal - */ -public final class ClusterStateUtils { - private static final Logger logger = LogManager.getLogger(ClusterStateUtils.class); - - /** - * Serialize the given cluster state. It'll always use compression before writing on a newly created output stream. - * - * @param clusterState full cluster state of the cluster - * @param node version of cluster node where we are sending the state - * @return reference to serialized bytes - * @throws IOException if writing on the compressed stream is failed. - */ - public static BytesReference serializeClusterState(ClusterState clusterState, DiscoveryNode node) throws IOException { - return serializeClusterStateOrDiff(clusterState, node, true); - } - - /** - * Serialize the given cluster state diff. It'll always use compression before writing on a newly created output - * stream. - * - * @param clusterStateDiff diff of two cluster states - * @param node node where we are sending the state - * @return reference to serialized bytes - * @throws IOException if writing on the compressed stream is failed. - */ - public static BytesReference serializeClusterState(Diff clusterStateDiff, DiscoveryNode node) throws IOException { - return serializeClusterStateOrDiff(clusterStateDiff, node, false); - } - - /** - * Serialize the given cluster state or diff. It'll always use compression before writing on a newly created output - * stream. - * - * @param writer Object which is going to write the content - * @param node cluster node where we are sending cluster state or diff - * @return reference to serialized bytes - * @throws IOException if writing on the compressed stream is failed. - */ - private static BytesReference serializeClusterStateOrDiff(Writeable writer, DiscoveryNode node, boolean isFullClusterState) - throws IOException { - final BytesStreamOutput bStream = new BytesStreamOutput(); - try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) { - stream.setVersion(node.getVersion()); - stream.writeBoolean(isFullClusterState); - writer.writeTo(stream); - } - final BytesReference serializedByteRef = bStream.bytes(); - logger.trace("serialized writable object for node version [{}] with size [{}]", node.getVersion(), serializedByteRef.length()); - return serializedByteRef; - } - - /** - * Decompress the incoming compressed BytesTransportRequest into StreamInput which can be deserialized. - * @param request incoming compressed request in bytes form - * @param namedWriteableRegistry existing registry of readers which contains ClusterState writable - * @return StreamInput object containing uncompressed request sent by sender - * @throws IOException if creating StreamInput object fails due to EOF - */ - public static StreamInput decompressClusterState(BytesTransportRequest request, NamedWriteableRegistry namedWriteableRegistry) - throws IOException { - final Compressor compressor = CompressorFactory.compressor(request.bytes()); - StreamInput in = request.bytes().streamInput(); - if (compressor != null) { - in = new InputStreamStreamInput(compressor.threadLocalInputStream(in)); - } - in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry); - in.setVersion(request.version()); - return in; - } - - public static ClusterState deserializeFullClusterState(StreamInput in, DiscoveryNode localNode) throws IOException { - final ClusterState incomingState; - try (StreamInput input = in) { - incomingState = ClusterState.readFrom(input, localNode); - } - return incomingState; - } - - public static Diff deserializeClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException { - final Diff incomingStateDiff; - // Close stream early to release resources used by the de-compression as early as possible - try (StreamInput input = in) { - incomingStateDiff = ClusterState.readDiffFrom(input, localNode); - } - return incomingStateDiff; - } -} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java b/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java new file mode 100644 index 0000000000000..57359f553b5a5 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.common.CheckedConsumer; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.compress.Compressor; +import org.opensearch.common.compress.CompressorFactory; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.io.stream.OutputStreamStreamOutput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.BytesTransportRequest; + +import java.io.IOException; + +/** + * A helper class to utilize the compressed stream. + * + * @opensearch.internal + */ +public final class CompressedStreamUtils { + private static final Logger logger = LogManager.getLogger(CompressedStreamUtils.class); + + public static BytesReference createCompressedStream(Version version, CheckedConsumer outputConsumer) + throws IOException { + final BytesStreamOutput bStream = new BytesStreamOutput(); + try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) { + stream.setVersion(version); + outputConsumer.accept(stream); + } + final BytesReference serializedByteRef = bStream.bytes(); + logger.trace("serialized writable object for node version [{}] with size [{}]", version, serializedByteRef.length()); + return serializedByteRef; + } + + public static StreamInput decompressBytes(BytesTransportRequest request, NamedWriteableRegistry namedWriteableRegistry) + throws IOException { + final Compressor compressor = CompressorFactory.compressor(request.bytes()); + final StreamInput in; + if (compressor != null) { + in = new InputStreamStreamInput(compressor.threadLocalInputStream(request.bytes().streamInput())); + } else { + in = request.bytes().streamInput(); + } + in.setVersion(request.version()); + return new NamedWriteableAwareStreamInput(in, namedWriteableRegistry); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 9a113adc7290f..8a7eda64347c4 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -57,7 +57,6 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.io.IOUtils; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; import org.opensearch.threadpool.ThreadPool; @@ -74,7 +73,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InvalidObjectException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -262,18 +260,10 @@ private void handleCompressedValidateJoinRequest( Collection> joinValidators, BytesTransportRequest request ) throws IOException { - StreamInput in = null; - try { - in = ClusterStateUtils.decompressClusterState(request, namedWriteableRegistry); - if (in.readBoolean()) { - ClusterState incomingState = ClusterStateUtils.deserializeFullClusterState(in, transportService.getLocalNode()); - runJoinValidators(currentStateSupplier, incomingState, joinValidators); - } else { - logger.error("validate new node join request requires full cluster state"); - throw new InvalidObjectException("validate new node join request requires full cluster state"); - } - } finally { - IOUtils.close(in); + try (StreamInput input = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) { + input.setVersion(request.version()); + ClusterState incomingState = ClusterState.readFrom(input, transportService.getLocalNode()); + runJoinValidators(currentStateSupplier, incomingState, joinValidators); } } @@ -470,7 +460,10 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti final BytesReference bytes = serializedState.updateAndGet(cachedState -> { if (cachedState == null || cachedState.v1() != state.version()) { try { - return new Tuple<>(state.version(), ClusterStateUtils.serializeClusterState(state, node)); + return new Tuple<>( + state.version(), + CompressedStreamUtils.createCompressedStream(node.getVersion(), state::writeTo) + ); } catch (IOException e) { // mandatory as AtomicReference doesn't rethrow IOException. throw new RuntimeException(e); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 37286022bc0bf..21ef89e9d5790 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -46,7 +46,6 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.util.io.IOUtils; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.BytesTransportRequest; import org.opensearch.transport.TransportChannel; @@ -161,12 +160,16 @@ public PublishClusterStateStats stats() { } private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException { - StreamInput in = null; - try { - in = ClusterStateUtils.decompressClusterState(request, namedWriteableRegistry); + try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) { ClusterState incomingState; if (in.readBoolean()) { - incomingState = ClusterStateUtils.deserializeFullClusterState(in, transportService.getLocalNode()); + // Close early to release resources used by the de-compression as early as possible + try (StreamInput input = in) { + incomingState = ClusterState.readFrom(input, transportService.getLocalNode()); + } catch (Exception e) { + logger.warn("unexpected error while deserializing an incoming cluster state", e); + throw e; + } fullClusterStateReceivedCount.incrementAndGet(); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); final PublishWithJoinResponse response = acceptState(incomingState); @@ -180,11 +183,18 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque throw new IncompatibleClusterStateVersionException("have no local cluster state"); } else { try { - Diff diff = ClusterStateUtils.deserializeClusterStateDiff(in, lastSeen.getNodes().getLocalNode()); + final Diff diff; + // Close stream early to release resources used by the de-compression as early as possible + try (StreamInput input = in) { + diff = ClusterState.readDiffFrom(input, lastSeen.nodes().getLocalNode()); + } incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException } catch (IncompatibleClusterStateVersionException e) { incompatibleClusterStateDiffReceivedCount.incrementAndGet(); throw e; + } catch (Exception e) { + logger.warn("unexpected error while deserializing an incoming cluster state", e); + throw e; } compatibleClusterStateDiffReceivedCount.incrementAndGet(); logger.debug( @@ -198,8 +208,6 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque return response; } } - } finally { - IOUtils.close(in); } } @@ -226,6 +234,27 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang return publicationContext; } + private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException { + final BytesReference serializedState = CompressedStreamUtils.createCompressedStream(nodeVersion, stream -> { + stream.writeBoolean(true); + clusterState.writeTo(stream); + }); + logger.trace( + "serialized full cluster state version [{}] for node version [{}] with size [{}]", + clusterState.version(), + nodeVersion, + serializedState.length() + ); + return serializedState; + } + + private static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException { + return CompressedStreamUtils.createCompressedStream(nodeVersion, stream -> { + stream.writeBoolean(false); + diff.writeTo(stream); + }); + } + /** * Publishing a cluster state typically involves sending the same cluster state (or diff) to every node, so the work of diffing, * serializing, and compressing the state can be done once and the results shared across publish requests. The @@ -255,7 +284,7 @@ void buildDiffAndSerializeStates() { try { if (sendFullVersion || previousState.nodes().nodeExists(node) == false) { if (serializedStates.containsKey(node.getVersion()) == false) { - serializedStates.put(node.getVersion(), ClusterStateUtils.serializeClusterState(newState, node)); + serializedStates.put(node.getVersion(), serializeFullClusterState(newState, node.getVersion())); } } else { // will send a diff @@ -263,7 +292,7 @@ void buildDiffAndSerializeStates() { diff = newState.diff(previousState); } if (serializedDiffs.containsKey(node.getVersion()) == false) { - final BytesReference serializedDiff = ClusterStateUtils.serializeClusterState(diff, node); + final BytesReference serializedDiff = serializeDiffClusterState(diff, node.getVersion()); serializedDiffs.put(node.getVersion(), serializedDiff); logger.trace( "serialized cluster state diff for version [{}] in for node version [{}] with size [{}]", @@ -359,7 +388,7 @@ private void sendFullClusterState(DiscoveryNode destination, ActionListener ClusterStateUtils.serializeClusterState(mockedState, localNode)); - } - - public void testDecompressClusterState() throws IOException { - // Decompression works fine - final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) - .build(); - DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); - BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode); - BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); - StreamInput in = ClusterStateUtils.decompressClusterState(request, DEFAULT_NAMED_WRITABLE_REGISTRY); - assertEquals(request.version(), in.getVersion()); - - // Decompression fails with AssertionError on non-compressed request - BytesTransportRequest mockedRequest = mock(BytesTransportRequest.class, RETURNS_DEEP_STUBS); - when(mockedRequest.bytes().streamInput()).thenThrow(IOException.class); - assertThrows(AssertionError.class, () -> ClusterStateUtils.decompressClusterState(mockedRequest, DEFAULT_NAMED_WRITABLE_REGISTRY)); - } - - public void testDeserializeFullClusterState() throws IOException { - final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) - .build(); - DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); - BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode); - BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); - StreamInput in = ClusterStateUtils.decompressClusterState(request, DEFAULT_NAMED_WRITABLE_REGISTRY); - ClusterState decompressedState = null; - // success when state data is correct - if (in.readBoolean()) { - decompressedState = ClusterStateUtils.deserializeFullClusterState(in, localNode); - } - assertEquals(localClusterState.getClusterName(), decompressedState.getClusterName()); - assertEquals(localClusterState.metadata().clusterUUID(), decompressedState.metadata().clusterUUID()); - - // failure when mocked stream or null - assertThrows(NullPointerException.class, () -> ClusterStateUtils.deserializeFullClusterState(null, localNode)); - StreamInput mockedStreamInput = mock(StreamInput.class, RETURNS_DEEP_STUBS); - assertThrows(NullPointerException.class, () -> ClusterStateUtils.deserializeFullClusterState(mockedStreamInput, localNode)); - } - - public void testDeserializeDiffClusterState() throws IOException { - final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) - .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) - .build(); - DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); - // fail with NPE if mocked stream is passed - StreamInput mockedStreamInput = mock(StreamInput.class, RETURNS_DEEP_STUBS); - assertThrows(NullPointerException.class, () -> ClusterStateUtils.deserializeClusterStateDiff(mockedStreamInput, localNode)); - - // fail with EOF is full cluster state is passed - BytesReference bytes = ClusterStateUtils.serializeClusterState(localClusterState, localNode); - BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); - try (StreamInput in = ClusterStateUtils.decompressClusterState(request, DEFAULT_NAMED_WRITABLE_REGISTRY)) { - assertThrows(EOFException.class, () -> ClusterStateUtils.deserializeClusterStateDiff(in, localNode)); - } - } -} diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CompressedStreamUtilsTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CompressedStreamUtilsTests.java new file mode 100644 index 0000000000000..e8faa73315e85 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/coordination/CompressedStreamUtilsTests.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.transport.BytesTransportRequest; + +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Contains tests for {@link CompressedStreamUtils} + */ +public class CompressedStreamUtilsTests extends OpenSearchTestCase { + + public void testCreateCompressedStream() throws IOException { + // serialization success with normal state + final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) + .build(); + DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + BytesReference bytes = CompressedStreamUtils.createCompressedStream(localNode.getVersion(), localClusterState::writeTo); + assertNotNull(bytes); + + // Fail on write failure on mocked cluster state's writeTo exception + ClusterState mockedState = mock(ClusterState.class); + doThrow(IOException.class).when(mockedState).writeTo(any()); + assertThrows(IOException.class, () -> CompressedStreamUtils.createCompressedStream(localNode.getVersion(), mockedState::writeTo)); + } + + public void testDecompressBytes() throws IOException { + // Decompression works fine + final ClusterState localClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().generateClusterUuidIfNeeded().clusterUUIDCommitted(true)) + .build(); + DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + BytesReference bytes = CompressedStreamUtils.createCompressedStream(localNode.getVersion(), localClusterState::writeTo); + BytesTransportRequest request = new BytesTransportRequest(bytes, localNode.getVersion()); + StreamInput input = CompressedStreamUtils.decompressBytes(request, DEFAULT_NAMED_WRITABLE_REGISTRY); + assertEquals(request.version(), input.getVersion()); + + // Decompression fails with AssertionError on non-compressed request + BytesTransportRequest mockedRequest = mock(BytesTransportRequest.class, RETURNS_DEEP_STUBS); + when(mockedRequest.bytes().streamInput()).thenThrow(IOException.class); + assertThrows(AssertionError.class, () -> CompressedStreamUtils.decompressBytes(mockedRequest, DEFAULT_NAMED_WRITABLE_REGISTRY)); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java index eae7b7093b220..ad9dfd564d648 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java @@ -38,14 +38,12 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.Diff; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.UncategorizedExecutionException; import org.opensearch.monitor.StatusInfo; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; @@ -231,7 +229,10 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName, TransportRequest request; final PlainActionFuture future = new PlainActionFuture<>(); if (actionName.equals(VALIDATE_COMPRESSED_JOIN_ACTION_NAME)) { - BytesReference bytes = ClusterStateUtils.serializeClusterState(otherClusterState, testCluster.localNode); + BytesReference bytes = CompressedStreamUtils.createCompressedStream( + testCluster.localNode.getVersion(), + otherClusterState::writeTo + ); request = new BytesTransportRequest(bytes, testCluster.localNode.getVersion()); testCluster.transportService.sendRequest( testCluster.localNode, @@ -378,27 +379,6 @@ public void onFailure(Exception e) { assertEquals(expectedActionName, validateRequests[0].action); } - public void testJoinValidationFailsOnSendingCompressedDiffClusterState() throws IOException { - TestClusterSetup testCluster = getTestClusterSetup(Version.CURRENT, false); - Diff clusterStateDiff = testCluster.localClusterState.diff(ClusterState.EMPTY_STATE); - TransportRequest request; - final PlainActionFuture future = new PlainActionFuture<>(); - BytesReference bytes = ClusterStateUtils.serializeClusterState(clusterStateDiff, testCluster.localNode); - request = new BytesTransportRequest(bytes, testCluster.localNode.getVersion()); - testCluster.transportService.sendRequest( - testCluster.localNode, - VALIDATE_COMPRESSED_JOIN_ACTION_NAME, - request, - new ActionListenerResponseHandler<>(future, in -> TransportResponse.Empty.INSTANCE) - ); - testCluster.deterministicTaskQueue.runAllTasks(); - final UncategorizedExecutionException invalidStateException = expectThrows( - UncategorizedExecutionException.class, - future::actionGet - ); - assertTrue(invalidStateException.getCause().getMessage().contains("requires full cluster state")); - } - public void testJoinValidationFailsOnDecompressionFailure() { TestClusterSetup testCluster = getTestClusterSetup(Version.CURRENT, false); TransportRequest request; From 5605928f469cb6cda0421e5d0c5917f232eeb9d5 Mon Sep 17 00:00:00 2001 From: Aman Khare <85096200+amkhar@users.noreply.github.com> Date: Tue, 6 Jun 2023 22:04:58 +0530 Subject: [PATCH 16/16] Remove unnecessary input.setVersion line Co-authored-by: Andrew Ross Signed-off-by: Aman Khare <85096200+amkhar@users.noreply.github.com> --- .../java/org/opensearch/cluster/coordination/JoinHelper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 8a7eda64347c4..08cfea1abf270 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -261,7 +261,6 @@ private void handleCompressedValidateJoinRequest( BytesTransportRequest request ) throws IOException { try (StreamInput input = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) { - input.setVersion(request.version()); ClusterState incomingState = ClusterState.readFrom(input, transportService.getLocalNode()); runJoinValidators(currentStateSupplier, incomingState, joinValidators); }