Skip to content

Commit

Permalink
Add new handler for sending compressed cluster state in validate join…
Browse files Browse the repository at this point in the history
… flow and refactor code

Signed-off-by: Aman Khare <amkhar@amazon.com>
  • Loading branch information
Aman Khare committed May 9, 2023
1 parent e967c30 commit f3ee750
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
Expand All @@ -59,6 +60,7 @@
import java.util.concurrent.TimeoutException;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY;
import static org.opensearch.cluster.coordination.JoinHelper.CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL;
import static org.opensearch.test.NodeRoles.dataNode;
import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -106,7 +108,9 @@ public void testNoShardRelocationsOccurWhenElectedClusterManagerNodeFails() thro
}

public void testHandleNodeJoin_incompatibleClusterState() throws InterruptedException, ExecutionException, TimeoutException {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL.getKey(), TimeValue.timeValueMillis(0)).build()
);
String node1 = internalCluster().startNode();
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1);
Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, clusterManagerNode);
Expand All @@ -117,7 +121,6 @@ public void testHandleNodeJoin_incompatibleClusterState() throws InterruptedExce

final CompletableFuture<Throwable> future = new CompletableFuture<>();
DiscoveryNode node = state.nodes().getLocalNode();

coordinator.sendValidateJoinRequest(
stateWithCustomMetadata,
new JoinRequest(node, 0L, Optional.empty()),
Expand Down
49 changes: 0 additions & 49 deletions server/src/main/java/org/opensearch/cluster/CompressionHelper.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateTaskConfig;
import org.opensearch.cluster.ClusterStateTaskListener;
import org.opensearch.cluster.CompressionHelper;
import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.compress.CompressionHelper;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
Expand All @@ -53,10 +55,6 @@
import org.opensearch.common.Priority;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.compress.Compressor;
import org.opensearch.common.compress.CompressorFactory;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.Setting;
Expand All @@ -79,6 +77,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.InvalidObjectException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -108,6 +107,7 @@ public class JoinHelper {

public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join";
public static final String VALIDATE_JOIN_ACTION_NAME = "internal:cluster/coordination/join/validate";
public static final String VALIDATE_COMPRESSED_JOIN_ACTION_NAME = "internal:cluster/coordination/join" + "/validate_compressed";
public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join";

// the timeout for Zen1 join attempts
Expand All @@ -133,10 +133,9 @@ public class JoinHelper {
private final Supplier<JoinTaskExecutor> joinTaskExecutorGenerator;
private final Consumer<Boolean> nodeCommissioned;
private final NamedWriteableRegistry namedWriteableRegistry;
private final Map<Version, BytesReference> serializedStates = new HashMap<>();
private long lastRefreshTime = 0L;
public static final Setting<TimeValue> CLUSTER_MANAGER_JOIN_STATE_REFRESH_INTERVAL = Setting.timeSetting(
"cluster_manager.join.state.refresh_interval",
private Cache<Version, BytesReference> serializedStates;
public static final Setting<TimeValue> CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL = Setting.timeSetting(
"cluster_manager.validate_join.cache_interval",
TimeValue.timeValueMillis(30000),
TimeValue.timeValueMillis(0),
TimeValue.timeValueMillis(60000),
Expand Down Expand Up @@ -165,7 +164,13 @@ public class JoinHelper {
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.nodeCommissioned = nodeCommissioned;
this.namedWriteableRegistry = namedWriteableRegistry;
this.clusterStateRefreshInterval = CLUSTER_MANAGER_JOIN_STATE_REFRESH_INTERVAL.get(settings).getMillis();
this.clusterStateRefreshInterval = CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL.get(settings).getMillis();
if (clusterStateRefreshInterval != 0) {
CacheBuilder<Version, BytesReference> cacheBuilder = CacheBuilder.builder();
cacheBuilder.setExpireAfterWrite(CLUSTER_MANAGER_VALIDATE_JOIN_CACHE_INTERVAL.get(settings));
this.serializedStates = cacheBuilder.build();
}

this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {

private final long term = currentTermSupplier.getAsLong();
Expand Down Expand Up @@ -230,51 +235,59 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
transportService.registerRequestHandler(
VALIDATE_JOIN_ACTION_NAME,
ThreadPool.Names.GENERIC,
ValidateJoinRequest::new,
(request, channel, task) -> {
runJoinValidators(currentStateSupplier, request.getState(), joinValidators);
channel.sendResponse(Empty.INSTANCE);
}
);

transportService.registerRequestHandler(
VALIDATE_COMPRESSED_JOIN_ACTION_NAME,
ThreadPool.Names.GENERIC,
BytesTransportRequest::new,
(request, channel, task) -> {
handleValidateJoinRequest(currentStateSupplier, joinValidators, request);
channel.sendResponse(Empty.INSTANCE);
}
);

}

private void runJoinValidators(
Supplier<ClusterState> currentStateSupplier,
ClusterState incomingState,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators
) {
final ClusterState localState = currentStateSupplier.get();
if (localState.metadata().clusterUUIDCommitted()
&& localState.metadata().clusterUUID().equals(incomingState.metadata().clusterUUID()) == false) {
throw new CoordinationStateRejectedException(
"join validation on cluster state"
+ " with a different cluster uuid "
+ incomingState.metadata().clusterUUID()
+ " than local cluster uuid "
+ localState.metadata().clusterUUID()
+ ", rejecting"
);
}
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), incomingState));
}

private void handleValidateJoinRequest(
Supplier<ClusterState> currentStateSupplier,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators,
BytesTransportRequest request
) throws IOException {
final Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in = request.bytes().streamInput();
final ClusterState incomingState;
StreamInput in = CompressionHelper.decompressClusterState(request, namedWriteableRegistry);
ClusterState incomingState;
try {
if (compressor != null) {
in = new InputStreamStreamInput(compressor.threadLocalInputStream(in));
}
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
in.setVersion(request.version());
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
// Close early to release resources used by the de-compression as early as possible
try (StreamInput input = in) {
incomingState = ClusterState.readFrom(input, transportService.getLocalNode());
} catch (Exception e) {
logger.warn("unexpected error while deserializing an incoming cluster state", e);
throw e;
}

final ClusterState localState = currentStateSupplier.get();
if (localState.metadata().clusterUUIDCommitted()
&& localState.metadata().clusterUUID().equals(incomingState.metadata().clusterUUID()) == false) {
throw new CoordinationStateRejectedException(
"join validation on cluster state"
+ " with a different cluster uuid "
+ incomingState.metadata().clusterUUID()
+ " than local cluster uuid "
+ localState.metadata().clusterUUID()
+ ", rejecting"
);
}
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), incomingState));
incomingState = CompressionHelper.deserializeFullClusterState(in, transportService);
runJoinValidators(currentStateSupplier, incomingState, joinValidators);
} else {
logger.error("validate new node join request requires full cluster state");
throw new InvalidObjectException("validate new node join request requires full cluster state");
}
} finally {
IOUtils.close(in);
Expand Down Expand Up @@ -463,24 +476,19 @@ public String executor() {

public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener<TransportResponse.Empty> listener) {
try {
BytesReference bytes = serializedStates.get(node.getVersion());
// Refresh serializedStates map every time if clusterStateRefreshInterval is 0
if (bytes == null || (System.currentTimeMillis() >= lastRefreshTime + clusterStateRefreshInterval)) {
try {
// Re-getting current cluster state for validate join request
bytes = CompressionHelper.serializedWrite(state, node.getVersion(), true);
serializedStates.put(node.getVersion(), bytes);
lastRefreshTime = System.currentTimeMillis();
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to serialize cluster state during validateJoin" + " {}", node), e);
listener.onFailure(e);
return;
}
BytesReference bytes;
if (clusterStateRefreshInterval == 0) {
bytes = CompressionHelper.serializeClusterState(state, node, true);
} else {
bytes = serializedStates.computeIfAbsent(
node.getVersion(),
key -> CompressionHelper.serializeClusterState(state, node, true)
);
}
final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion());
transportService.sendRequest(
node,
VALIDATE_JOIN_ACTION_NAME,
VALIDATE_COMPRESSED_JOIN_ACTION_NAME,
request,
new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC)
);
Expand Down
Loading

0 comments on commit f3ee750

Please sign in to comment.