Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add changes for graceful node decommission #4586

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
bdb6c6f
Add changes for graceful node decommission
pranikum Sep 25, 2022
afec689
Add WRRWeights class
pranikum Sep 25, 2022
3314ec3
Add logger message
pranikum Sep 25, 2022
bf622c4
Add tests
pranikum Sep 25, 2022
d933377
Remove unused code
pranikum Sep 26, 2022
bf4c217
Remove test annonation
pranikum Sep 26, 2022
c57530a
Fix spotless
pranikum Sep 26, 2022
7c43ad1
Add package info
pranikum Sep 26, 2022
ba8f500
Add weigh away status
pranikum Sep 26, 2022
8920480
PR comments
pranikum Sep 26, 2022
eeed711
Update changelog
pranikum Sep 26, 2022
ed1ccbc
Fix tests
pranikum Sep 26, 2022
7922291
Remove time check. Just we will schedule
pranikum Sep 27, 2022
bed942a
Set decommission status to In Progress on completion of timeout
pranikum Sep 27, 2022
67622a2
PR comments. Take latest changes for WRR API
pranikum Sep 27, 2022
b75b108
Add drain timeout to decommission request
pranikum Sep 27, 2022
f79b51d
Resolve merge conflicts
pranikum Sep 28, 2022
46a1b12
Merge conflict
pranikum Sep 28, 2022
afda24c
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Sep 28, 2022
509015e
Merge with latests
pranikum Sep 28, 2022
3f832f9
Fix tests and split set weights and weights population
pranikum Sep 28, 2022
b70370c
PR comments
pranikum Sep 28, 2022
5724ba8
Merging with latest
pranikum Sep 29, 2022
6d1ba1a
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Sep 29, 2022
e401908
Merge with latest.
pranikum Sep 29, 2022
289077f
Add changelog
pranikum Sep 29, 2022
0890c39
Merge with latest
pranikum Oct 3, 2022
ec0f0e1
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Oct 3, 2022
2f10664
Merge on latest
pranikum Oct 3, 2022
daa0065
Spotless java check
pranikum Oct 3, 2022
e9e25f1
Fix compilation issue
pranikum Oct 3, 2022
4590da3
PR comments
pranikum Oct 3, 2022
abbec12
Fix logger check
pranikum Oct 3, 2022
bd6f485
PR comments
pranikum Oct 3, 2022
9c1b2aa
Fix PR comments
pranikum Oct 3, 2022
9aa4f37
Handle PR comments
pranikum Oct 6, 2022
fd9fabe
Merge with latest changes
pranikum Oct 6, 2022
79f06fc
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Oct 6, 2022
0faa9b9
Merge graceful changes
pranikum Oct 6, 2022
adbc120
Make variable final
pranikum Oct 6, 2022
81d320b
Fix logger usage
pranikum Oct 6, 2022
de54abd
Merge changelog with latest
pranikum Oct 10, 2022
d2b0444
Update changelog Avoid joining during draining state
pranikum Oct 10, 2022
4ddaae4
Merge with latest
pranikum Oct 10, 2022
e5984f7
Add changelog
pranikum Oct 10, 2022
c600c6d
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Oct 10, 2022
b97c83b
Handle node draining for node decommission
pranikum Oct 10, 2022
926c969
Fix changelog
pranikum Oct 10, 2022
abb9825
Changelog merge
pranikum Oct 12, 2022
26ba03e
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Oct 12, 2022
8564208
Resolve conflict with latest
pranikum Oct 12, 2022
3e1c3b1
Merge to latest
pranikum Oct 14, 2022
01b208c
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Oct 14, 2022
d8f0166
Merge with latest
pranikum Oct 14, 2022
f367f4f
Merge with Latest
pranikum Oct 14, 2022
e5ae1f3
Add changelog
pranikum Oct 14, 2022
d9d4904
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Oct 14, 2022
f9ced18
Update Changelog
pranikum Oct 14, 2022
9881ef0
PR comments
pranikum Oct 15, 2022
1ed1b57
Fix validation message
pranikum Oct 15, 2022
dde8f48
Fix Tests
pranikum Oct 15, 2022
8453ca2
Add decommission State transition validation
pranikum Oct 16, 2022
3c417f5
Fix transition state
pranikum Oct 17, 2022
602ced2
Merge with latest
pranikum Oct 19, 2022
0574db5
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Oct 19, 2022
3115772
checks valid weights and don't set it.
pranikum Oct 19, 2022
95fc853
Empty-Commit
pranikum Oct 19, 2022
165ce7d
Fix the integ tests
pranikum Oct 19, 2022
8e0f681
Merge with latest
pranikum Oct 20, 2022
460e96c
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Oct 20, 2022
f3a7240
Resolve conflict
pranikum Oct 20, 2022
c147d15
PR comments
pranikum Oct 20, 2022
13c0c61
Spotless Fix and changes
pranikum Oct 20, 2022
56c48eb
Fix log message for logging
pranikum Oct 20, 2022
ce3cc6f
Fix spotless Apply
pranikum Oct 20, 2022
09aec9f
Fix logger usage
pranikum Oct 20, 2022
252a976
PR comments
pranikum Oct 20, 2022
60d4e87
Fix Tests
pranikum Oct 20, 2022
abdda72
Empty-Commit
pranikum Oct 20, 2022
1b8234c
Fix Tests
pranikum Oct 20, 2022
a4a2ff6
Empty-Commit
pranikum Oct 20, 2022
e410e45
Fix Integ tests
pranikum Oct 20, 2022
7d2f2df
Change dely for Draining check
pranikum Oct 20, 2022
e25e411
Lower delay to 100 mills
pranikum Oct 20, 2022
09b7cdc
Fix typo
pranikum Oct 20, 2022
878dad4
Take latest for main
pranikum Oct 20, 2022
9d28440
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Oct 20, 2022
d2303c5
Merge with latest
pranikum Oct 20, 2022
b78b452
Fix spotless Java
pranikum Oct 20, 2022
d1bb936
Update delay to 300
pranikum Oct 20, 2022
a8a6032
Add no_Delay param. No need to use schedule when nodelay is true
pranikum Oct 21, 2022
b724464
Remove exposed param delay_timeout
pranikum Oct 21, 2022
10a6e5f
Update delay to 500 since still we see failure for cluster state
pranikum Oct 21, 2022
d2d171d
Empty-Commit
pranikum Oct 21, 2022
68e4e94
In case of no delay.. Avoid draining state
pranikum Oct 24, 2022
d6fe3c4
Merging with lates
pranikum Oct 24, 2022
ca0418f
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Oct 24, 2022
04e4676
Resolve conflict with main
pranikum Oct 24, 2022
a6fbdb2
Fix test
pranikum Oct 24, 2022
811fe8e
Take latest file
pranikum Oct 25, 2022
97c93c1
Merge branch 'main' into graceful-node-decommission-wrr-2
pranikum Oct 25, 2022
122b16c
Resolve conflict
pranikum Oct 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add dev help in gradle check CI failures ([4872](https://github.com/opensearch-project/OpenSearch/pull/4872))
- Copy `build.sh` over from opensearch-build ([#4887](https://github.com/opensearch-project/OpenSearch/pull/4887))
- Add project health badges to the README.md ([#4843](https://github.com/opensearch-project/OpenSearch/pull/4843))

- Added changes for graceful node decommission ([#4586](https://github.com/opensearch-project/OpenSearch/pull/4586))
### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,38 +120,53 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx

logger.info("--> starting decommissioning nodes in zone {}", 'c');
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
// Set the timeout to 0 to do immediate Decommission
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
decommissionRequest.setNoDelay(true);
DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get();
assertTrue(decommissionResponse.isAcknowledged());

logger.info("--> Received decommissioning nodes in zone {}", 'c');
// Keep some delay for scheduler to invoke decommission flow
Thread.sleep(500);

// Will wait for all events to complete
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

logger.info("--> Received LANGUID event");

// assert that decommission status is successful
GetDecommissionStateResponse response = client().execute(
GetDecommissionStateResponse response = client(clusterManagerNodes.get(0)).execute(
GetDecommissionStateAction.INSTANCE,
new GetDecommissionStateRequest(decommissionAttribute.attributeName())
).get();
assertEquals(response.getAttributeValue(), decommissionAttribute.attributeValue());
assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL);
assertEquals(DecommissionStatus.SUCCESSFUL, response.getDecommissionStatus());

logger.info("--> Decommission status is successful");
ClusterState clusterState = client(clusterManagerNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState();
assertEquals(4, clusterState.nodes().getSize());

logger.info("--> Got cluster state with 4 nodes.");
// assert status on nodes that are part of cluster currently
Iterator<DiscoveryNode> discoveryNodeIterator = clusterState.nodes().getNodes().valuesIt();
DiscoveryNode clusterManagerNodeAfterDecommission = null;
while (discoveryNodeIterator.hasNext()) {
// assert no node has decommissioned attribute
DiscoveryNode node = discoveryNodeIterator.next();
assertNotEquals(node.getAttributes().get("zone"), "c");

if (node.isClusterManagerNode()) {
clusterManagerNodeAfterDecommission = node;
}
// assert all the nodes has status as SUCCESSFUL
ClusterService localNodeClusterService = internalCluster().getInstance(ClusterService.class, node.getName());
assertEquals(
localNodeClusterService.state().metadata().decommissionAttributeMetadata().status(),
DecommissionStatus.SUCCESSFUL
);
}
assertNotNull("Cluster Manager not found after decommission", clusterManagerNodeAfterDecommission);
logger.info("--> Cluster Manager node found after decommission");

// assert status on decommissioned node
// Here we will verify that until it got kicked out, it received appropriate status updates
Expand All @@ -163,16 +178,18 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx
decommissionedNodeClusterService.state().metadata().decommissionAttributeMetadata().status(),
DecommissionStatus.IN_PROGRESS
);
logger.info("--> Verified the decommissioned node Has in progress state.");

// Will wait for all events to complete
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

client(clusterManagerNodeAfterDecommission.getName()).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
logger.info("--> Got LANGUID event");
// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodes.get(0)).execute(
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodeAfterDecommission.getName()).execute(
DeleteDecommissionStateAction.INSTANCE,
new DeleteDecommissionStateRequest()
).get();
assertTrue(deleteDecommissionStateResponse.isAcknowledged());
logger.info("--> Deleting decommission done.");

// will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes)
// as by then all nodes should have joined the cluster
Expand Down Expand Up @@ -201,6 +218,7 @@ public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception

DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
decommissionRequest.setNoDelay(true);
assertBusy(() -> {
DecommissioningFailedException ex = expectThrows(
DecommissioningFailedException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;

Expand All @@ -28,8 +29,15 @@
*/
public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionRequest> {

public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120);

private DecommissionAttribute decommissionAttribute;

private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT;

// holder for no_delay param. To avoid draining time timeout.
private boolean noDelay = false;

public DecommissionRequest() {}

public DecommissionRequest(DecommissionAttribute decommissionAttribute) {
Expand All @@ -39,12 +47,14 @@ public DecommissionRequest(DecommissionAttribute decommissionAttribute) {
public DecommissionRequest(StreamInput in) throws IOException {
super(in);
decommissionAttribute = new DecommissionAttribute(in);
this.delayTimeout = in.readTimeValue();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We missed adding noDelay here. With this no delay would never be read from the request

}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
decommissionAttribute.writeTo(out);
out.writeTimeValue(delayTimeout);
}

/**
Expand All @@ -65,6 +75,19 @@ public DecommissionAttribute getDecommissionAttribute() {
return this.decommissionAttribute;
}

public TimeValue getDelayTimeout() {
return this.delayTimeout;
}

public void setNoDelay(boolean noDelay) {
this.delayTimeout = TimeValue.ZERO;
this.noDelay = noDelay;
}

public boolean isNoDelay() {
return noDelay;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -74,6 +97,14 @@ public ActionRequestValidationException validate() {
if (decommissionAttribute.attributeValue() == null || Strings.isEmpty(decommissionAttribute.attributeValue())) {
validationException = addValidationError("attribute value is missing", validationException);
}
// This validation should not fail since we are not allowing delay timeout to be set externally.
// Still keeping it for double check.
if (noDelay && delayTimeout.getSeconds() > 0) {
final String validationMessage = "Invalid decommission request. no_delay is true and delay_timeout is set to "
+ delayTimeout.getSeconds()
+ "] Seconds";
validationException = addValidationError(validationMessage, validationException);
}
return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ protected ClusterBlockException checkBlock(DecommissionRequest request, ClusterS
protected void clusterManagerOperation(DecommissionRequest request, ClusterState state, ActionListener<DecommissionResponse> listener)
throws Exception {
logger.info("starting awareness attribute [{}] decommissioning", request.getDecommissionAttribute().toString());
decommissionService.startDecommissionAction(request.getDecommissionAttribute(), listener);
decommissionService.startDecommissionAction(request, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;

/**
* Contains metadata about decommission attribute
Expand Down Expand Up @@ -88,11 +89,14 @@ public synchronized void validateNewStatus(DecommissionStatus newStatus) {
}
// We don't expect that INIT will be new status, as it is registered only when starting the decommission action
switch (newStatus) {
case DRAINING:
validateStatus(Set.of(DecommissionStatus.INIT), newStatus);
break;
case IN_PROGRESS:
validateStatus(DecommissionStatus.INIT, newStatus);
validateStatus(Set.of(DecommissionStatus.DRAINING, DecommissionStatus.INIT), newStatus);
break;
case SUCCESSFUL:
validateStatus(DecommissionStatus.IN_PROGRESS, newStatus);
validateStatus(Set.of(DecommissionStatus.IN_PROGRESS), newStatus);
break;
default:
throw new IllegalArgumentException(
Expand All @@ -101,17 +105,17 @@ public synchronized void validateNewStatus(DecommissionStatus newStatus) {
}
}

private void validateStatus(DecommissionStatus expected, DecommissionStatus next) {
if (status.equals(expected) == false) {
private void validateStatus(Set<DecommissionStatus> expectedStatuses, DecommissionStatus next) {
if (expectedStatuses.contains(status) == false) {
assert false : "can't move decommission status to ["
+ next
+ "]. current status: ["
+ status
+ "] (expected ["
+ expected
+ "] (allowed statuses ["
+ expectedStatuses
+ "])";
throw new IllegalStateException(
"can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])"
"can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expectedStatuses + "])"
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand All @@ -32,14 +36,17 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.http.HttpStats;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -271,4 +278,61 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});
}

private void logActiveConnections(NodesStatsResponse nodesStatsResponse) {
if (nodesStatsResponse == null || nodesStatsResponse.getNodes() == null) {
logger.info("Node stats response received is null/empty.");
return;
}

Map<String, Long> nodeActiveConnectionMap = new HashMap<>();
List<NodeStats> responseNodes = nodesStatsResponse.getNodes();
for (int i = 0; i < responseNodes.size(); i++) {
HttpStats httpStats = responseNodes.get(i).getHttp();
DiscoveryNode node = responseNodes.get(i).getNode();
nodeActiveConnectionMap.put(node.getId(), httpStats.getServerOpen());
}
logger.info("Decommissioning node with connections : [{}]", nodeActiveConnectionMap);
Comment on lines +288 to +295
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid a map just for logging in a single line. I would suggest introducing a settings that could control minimum acceptable connections during draining

Copy link
Contributor Author

@pranikum pranikum Oct 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bukhtawar .. We are just logging the active connections for now. I was thinking once we have datapoint related to active connections count for the nodes we can set the minimum connection setting. I will open a task to track that change. Let me know your thoughts.

Created an Issue to track the same: #4804

}

void getActiveRequestCountOnDecommissionedNodes(Set<DiscoveryNode> decommissionedNodes) {
if (decommissionedNodes == null || decommissionedNodes.isEmpty()) {
return;
}
String[] nodes = decommissionedNodes.stream().map(DiscoveryNode::getId).toArray(String[]::new);
if (nodes.length == 0) {
return;
}

final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(nodes);
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.HTTP.metricName());

transportService.sendRequest(
transportService.getLocalNode(),
NodesStatsAction.NAME,
nodesStatsRequest,
new TransportResponseHandler<NodesStatsResponse>() {
@Override
public void handleResponse(NodesStatsResponse response) {
logActiveConnections(response);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can response be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled for null response. Added log for null response.

}

@Override
public void handleException(TransportException exp) {
logger.error("Failure occurred while dumping connection for decommission nodes - ", exp.unwrapCause());
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
Copy link
Member

@imRishN imRishN Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generic thread? @Bukhtawar what do you think?

}

@Override
public NodesStatsResponse read(StreamInput in) throws IOException {
return new NodesStatsResponse(in);
}
}
);
}
}
Loading