Skip to content

Commit

Permalink
Unset discovery nodes in all transport node actions request.
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <gupthasg@amazon.com>
  • Loading branch information
Swetha Guptha committed Aug 6, 2024
1 parent 49b7cd4 commit 596132b
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
* */
private DiscoveryNode[] concreteNodes;

/**
* Since do not use the discovery nodes coming from the request in all code paths following a request extended off from
* BaseNodeRequest, we do not require it to sent around across all nodes.
*
* Setting default behavior as `true` but can be explicitly changed in requests that do not require.
*/
private boolean includeDiscoveryNodes = true;
private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);

private TimeValue timeout;
Expand Down Expand Up @@ -127,14 +120,6 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) {
this.concreteNodes = concreteNodes;
}

public void setIncludeDiscoveryNodes(boolean value) {
includeDiscoveryNodes = value;
}

public boolean getIncludeDiscoveryNodes() {
return includeDiscoveryNodes;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -56,7 +57,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -202,11 +202,22 @@ protected NodeResponse nodeOperation(NodeRequest request, Task task) {

/**
* resolve node ids to concrete nodes of the incoming request
**/
protected void resolveRequest(NodesRequest request, ClusterState clusterState) {
*
* @return
*/
protected DiscoveryNode[] resolveRequest(NodesRequest request, ClusterState clusterState) {
if (request.concreteNodes() != null) {
return request.concreteNodes();
}
assert request.concreteNodes() == null : "request concreteNodes shouldn't be set";
String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds());
request.setConcreteNodes(Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new));
List<DiscoveryNode> list = new ArrayList<>();
DiscoveryNodes discoveryNodes = clusterState.nodes();
for (String nodesId : nodesIds) {
DiscoveryNode discoveryNode = discoveryNodes.get(nodesId);
list.add(discoveryNode);
}
return list.toArray(new DiscoveryNode[0]);
}

/**
Expand Down Expand Up @@ -234,23 +245,16 @@ class AsyncAction {
this.task = task;
this.request = request;
this.listener = listener;
if (request.concreteNodes() == null) {
resolveRequest(request, clusterService.state());
assert request.concreteNodes() != null;
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
this.concreteNodes = request.concreteNodes();

if (request.getIncludeDiscoveryNodes() == false) {
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, we
// remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
// the number of concrete nodes in the memory.
request.setConcreteNodes(null);
}
this.concreteNodes = resolveRequest(request, clusterService.state());
this.responses = new AtomicReferenceArray<>(this.concreteNodes.length);
}

void start() {
final DiscoveryNode[] nodes = this.concreteNodes;
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, we
// remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
// the number of concrete nodes in the memory.
request.setConcreteNodes(null);
if (nodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.timeout(request.param("timeout"));
clusterStatsRequest.setIncludeDiscoveryNodes(false);
clusterStatsRequest.useAggregatedNodeLevelResponses(true);
return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final NodesInfoRequest nodesInfoRequest = prepareRequest(request);
nodesInfoRequest.timeout(request.param("timeout"));
settingsFilter.addFilterSettingParams(request);
nodesInfoRequest.setIncludeDiscoveryNodes(false);
return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// If no levels are passed in this results in an empty array.
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.setIncludeDiscoveryNodes(false);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.setIncludeDiscoveryNodes(false);
nodesInfoRequest.clear()
.addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
Expand All @@ -138,7 +137,6 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) {
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.setIncludeDiscoveryNodes(false);
nodesStatsRequest.clear()
.indices(true)
.addMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,48 +33,12 @@

public class TransportClusterStatsActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testClusterStatsActionWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.setIncludeDiscoveryNodes(true);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

public void testClusterStatsActionWithPreFilledConcreteNodesAndWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is
* asserted in this test.
*/
public void testClusterStatsActionWithoutRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
Expand All @@ -88,7 +52,6 @@ public void testClusterStatsActionWithPreFilledConcreteNodesAndWithoutRetentionO
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ private static class DataNodesOnlyTransportNodesAction extends TestTransportNode
}

@Override
protected void resolveRequest(TestNodesRequest request, ClusterState clusterState) {
request.setConcreteNodes(clusterState.nodes().getDataNodes().values().toArray(new DiscoveryNode[0]));
protected DiscoveryNode[] resolveRequest(TestNodesRequest request, ClusterState clusterState) {
return clusterState.nodes().getDataNodes().values().toArray(new DiscoveryNode[0]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,12 @@

public class TransportNodesInfoActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testNodesInfoActionWithRetentionOfDiscoveryNodesList() {
NodesInfoRequest request = new NodesInfoRequest();
request.setIncludeDiscoveryNodes(true);
Map<String, List<MockNodesInfoRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is
* asserted in this test.
*/
public void testNodesInfoActionWithoutRetentionOfDiscoveryNodesList() {
NodesInfoRequest request = new NodesInfoRequest();
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockNodesInfoRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,11 @@
public class TransportNodesStatsActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testNodesStatsActionWithRetentionOfDiscoveryNodesList() {
NodesStatsRequest request = new NodesStatsRequest();
request.setIncludeDiscoveryNodes(true);
Map<String, List<MockNodeStatsRequest>> combinedSentRequest = performNodesStatsAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
* We don't want to send discovery nodes list to each request that is sent across from the coordinator node.
* This behavior is asserted in this test.
*/
public void testNodesStatsActionWithoutRetentionOfDiscoveryNodesList() {
NodesStatsRequest request = new NodesStatsRequest();
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockNodeStatsRequest>> combinedSentRequest = performNodesStatsAction(request);

assertNotNull(combinedSentRequest);
Expand Down

0 comments on commit 596132b

Please sign in to comment.