Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compress and cache cluster state during validate join request #7321

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321))

### Deprecated

Expand Down Expand Up @@ -117,4 +118,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.7...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
Expand All @@ -59,6 +60,7 @@
import java.util.concurrent.TimeoutException;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY;
import static org.opensearch.cluster.coordination.JoinHelper.CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL;
import static org.opensearch.test.NodeRoles.dataNode;
import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -106,7 +108,9 @@ public void testNoShardRelocationsOccurWhenElectedClusterManagerNodeFails() thro
}

public void testHandleNodeJoin_incompatibleClusterState() throws InterruptedException, ExecutionException, TimeoutException {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL.getKey(), TimeValue.timeValueMillis(0)).build()
);
String node1 = internalCluster().startNode();
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1);
Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, clusterManagerNode);
Expand All @@ -117,7 +121,6 @@ public void testHandleNodeJoin_incompatibleClusterState() throws InterruptedExce

final CompletableFuture<Throwable> future = new CompletableFuture<>();
DiscoveryNode node = state.nodes().getLocalNode();

amkhar marked this conversation as resolved.
Show resolved Hide resolved
coordinator.sendValidateJoinRequest(
stateWithCustomMetadata,
new JoinRequest(node, 0L, Optional.empty()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ public Coordinator(
this.onJoinValidators,
rerouteService,
nodeHealthService,
this::onNodeCommissionStatusChange
this::onNodeCommissionStatusChange,
namedWriteableRegistry
);
this.persistedStateSupplier = persistedStateSupplier;
this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateTaskConfig;
import org.opensearch.cluster.ClusterStateTaskListener;
import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.compress.CompressionHelper;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
Expand All @@ -49,15 +53,19 @@
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.common.Priority;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.BytesTransportRequest;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportException;
Expand All @@ -69,6 +77,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.InvalidObjectException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -98,6 +107,7 @@ public class JoinHelper {

public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join";
public static final String VALIDATE_JOIN_ACTION_NAME = "internal:cluster/coordination/join/validate";
public static final String VALIDATE_COMPRESSED_JOIN_ACTION_NAME = "internal:cluster/coordination/join" + "/validate_compressed";
amkhar marked this conversation as resolved.
Show resolved Hide resolved
public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join";

// the timeout for Zen1 join attempts
Expand All @@ -122,6 +132,16 @@ public class JoinHelper {

private final Supplier<JoinTaskExecutor> joinTaskExecutorGenerator;
private final Consumer<Boolean> nodeCommissioned;
private final NamedWriteableRegistry namedWriteableRegistry;
private Cache<Version, BytesReference> serializedStates;
public static final Setting<TimeValue> CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL = Setting.timeSetting(
amkhar marked this conversation as resolved.
Show resolved Hide resolved
"cluster_manager.validate_join.cache_interval",
TimeValue.timeValueMillis(30000),
TimeValue.timeValueMillis(0),
TimeValue.timeValueMillis(60000),
amkhar marked this conversation as resolved.
Show resolved Hide resolved
Setting.Property.NodeScope
);
private final long clusterStateRefreshInterval;

JoinHelper(
Settings settings,
Expand All @@ -135,13 +155,22 @@ public class JoinHelper {
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators,
RerouteService rerouteService,
NodeHealthService nodeHealthService,
Consumer<Boolean> nodeCommissioned
Consumer<Boolean> nodeCommissioned,
NamedWriteableRegistry namedWriteableRegistry
) {
this.clusterManagerService = clusterManagerService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.nodeCommissioned = nodeCommissioned;
this.namedWriteableRegistry = namedWriteableRegistry;
this.clusterStateRefreshInterval = CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL.get(settings).getMillis();
if (clusterStateRefreshInterval != 0) {
CacheBuilder<Version, BytesReference> cacheBuilder = CacheBuilder.builder();
cacheBuilder.setExpireAfterWrite(CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL.get(settings));
this.serializedStates = cacheBuilder.build();
}

this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {

private final long term = currentTermSupplier.getAsLong();
Expand Down Expand Up @@ -208,22 +237,61 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
ThreadPool.Names.GENERIC,
ValidateJoinRequest::new,
(request, channel, task) -> {
final ClusterState localState = currentStateSupplier.get();
if (localState.metadata().clusterUUIDCommitted()
&& localState.metadata().clusterUUID().equals(request.getState().metadata().clusterUUID()) == false) {
throw new CoordinationStateRejectedException(
"join validation on cluster state"
+ " with a different cluster uuid "
+ request.getState().metadata().clusterUUID()
+ " than local cluster uuid "
+ localState.metadata().clusterUUID()
+ ", rejecting"
);
}
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
runJoinValidators(currentStateSupplier, request.getState(), joinValidators);
channel.sendResponse(Empty.INSTANCE);
}
);

transportService.registerRequestHandler(
VALIDATE_COMPRESSED_JOIN_ACTION_NAME,
ThreadPool.Names.GENERIC,
BytesTransportRequest::new,
(request, channel, task) -> {
handleValidateJoinRequest(currentStateSupplier, joinValidators, request);
channel.sendResponse(Empty.INSTANCE);
}
);

}

private void runJoinValidators(
Supplier<ClusterState> currentStateSupplier,
ClusterState incomingState,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators
) {
final ClusterState localState = currentStateSupplier.get();
if (localState.metadata().clusterUUIDCommitted()
&& localState.metadata().clusterUUID().equals(incomingState.metadata().clusterUUID()) == false) {
throw new CoordinationStateRejectedException(
"join validation on cluster state"
+ " with a different cluster uuid "
+ incomingState.metadata().clusterUUID()
+ " than local cluster uuid "
+ localState.metadata().clusterUUID()
+ ", rejecting"
);
}
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), incomingState));
}

private void handleValidateJoinRequest(
amkhar marked this conversation as resolved.
Show resolved Hide resolved
amkhar marked this conversation as resolved.
Show resolved Hide resolved
Supplier<ClusterState> currentStateSupplier,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators,
BytesTransportRequest request
) throws IOException {
StreamInput in = CompressionHelper.decompressClusterState(request, namedWriteableRegistry);
amkhar marked this conversation as resolved.
Show resolved Hide resolved
ClusterState incomingState;
amkhar marked this conversation as resolved.
Show resolved Hide resolved
try {
if (in.readBoolean()) {
incomingState = CompressionHelper.deserializeFullClusterState(in, transportService.getLocalNode());
runJoinValidators(currentStateSupplier, incomingState, joinValidators);
} else {
logger.error("validate new node join request requires full cluster state");
throw new InvalidObjectException("validate new node join request requires full cluster state");
}
} finally {
IOUtils.close(in);
}
}

private JoinCallback transportJoinCallback(TransportRequest request, TransportChannel channel) {
Expand Down Expand Up @@ -407,12 +475,27 @@ public String executor() {
}

public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener<TransportResponse.Empty> listener) {
transportService.sendRequest(
node,
VALIDATE_JOIN_ACTION_NAME,
new ValidateJoinRequest(state),
new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC)
);
try {
BytesReference bytes;
if (clusterStateRefreshInterval == 0) {
bytes = CompressionHelper.serializeClusterState(state, node, true);
} else {
bytes = serializedStates.computeIfAbsent(
node.getVersion(),
amkhar marked this conversation as resolved.
Show resolved Hide resolved
key -> CompressionHelper.serializeClusterState(state, node, true)
);
}
final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion());
transportService.sendRequest(
node,
VALIDATE_COMPRESSED_JOIN_ACTION_NAME,
request,
new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC)
amkhar marked this conversation as resolved.
Show resolved Hide resolved
);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);
listener.onFailure(e);
}
}

/**
Expand Down
Loading