Skip to content

Commit

Permalink
Allow NetworkDisruption to reconnect to known nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Jun 26, 2018
1 parent 9ea4166 commit 8f977ff
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,16 @@ public NodeConnectionsService(Settings settings, ThreadPool threadPool, Transpor
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
}

public void connectToNodes(DiscoveryNodes discoveryNodes) {
public void connectToNodes(DiscoveryNodes discoveryNodes, boolean reconnectToKnownNodes) {
CountDownLatch latch = new CountDownLatch(discoveryNodes.getSize());
for (final DiscoveryNode node : discoveryNodes) {
final boolean shouldConnect;
try (Releasable ignored = nodeLocks.acquire(node)) {
// We try and connect to any new nodes before returning. However, on the elected master the connections are established
// during joining, so we also check that we're not already connected to avoid the need to execute any background tasks in
// that case.
shouldConnect = nodes.putIfAbsent(node, 0) == null && transportService.nodeConnected(node) == false;
shouldConnect = (nodes.putIfAbsent(node, 0) == null || reconnectToKnownNodes)
&& transportService.nodeConnected(node) == false;
}
if (shouldConnect) {
// spawn to another thread to do in parallel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
}
}

nodeConnectionsService.connectToNodes(newClusterState.nodes());
nodeConnectionsService.connectToNodes(newClusterState.nodes(), false);

logger.debug("applying cluster state version {}", newClusterState.version());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testConnectAndDisconnect() {
ClusterState current = clusterStateFromNodes(Collections.emptyList());
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);

service.connectToNodes(event.state().nodes());
service.connectToNodes(event.state().nodes(), false);
assertConnected(event.state().nodes());

service.disconnectFromNodesExcept(event.state().nodes());
Expand All @@ -97,7 +97,7 @@ public void testConnectAndDisconnect() {
current = event.state();
event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);

service.connectToNodes(event.state().nodes());
service.connectToNodes(event.state().nodes(), false);
assertConnected(event.state().nodes());

service.disconnectFromNodesExcept(event.state().nodes());
Expand All @@ -114,7 +114,7 @@ public void testReconnect() {

transport.randomConnectionExceptions = true;

service.connectToNodes(event.state().nodes());
service.connectToNodes(event.state().nodes(), false);

for (int i = 0; i < 3; i++) {
// simulate disconnects
Expand All @@ -137,21 +137,23 @@ public void testDoesNotReconnectToKnownNodesOnNewClusterState() {
final ClusterState state1 = clusterStateFromNodes(randomSubsetOf(nodes));
final ClusterState state2 = clusterStateFromNodes(randomSubsetOf(nodes));

service.connectToNodes(state1.nodes());
service.connectToNodes(state1.nodes(), false);

final Set<DiscoveryNode> disconnectedNodes = new HashSet<>(randomSubsetOf(nodes));
for (final DiscoveryNode node : disconnectedNodes) {
transport.disconnectFromNode(node);
}

service.connectToNodes(state2.nodes());
boolean shouldReconnect = randomBoolean();

service.connectToNodes(state2.nodes(), shouldReconnect);

final Set<DiscoveryNode> expectedNodes = new HashSet<>(nodes.size());
state1.nodes().forEach(expectedNodes::add);
disconnectedNodes.forEach(expectedNodes::remove);
for (final DiscoveryNode discoveryNode : state2.nodes()) {
if (state1.nodes().get(discoveryNode.getId()) == null) {
// Only expect to be connected to _new_ nodes in state2.
if (shouldReconnect || state1.nodes().get(discoveryNode.getId()) == null) {
// Only expect to be connected to _new_ nodes in state2 unless shouldReconnect is set
expectedNodes.add(discoveryNode);
}
}
Expand All @@ -166,7 +168,7 @@ public void testDoesNotReconnectToKnownNodesOnNewClusterState() {
assertConnected(expectedNodes);
assertThat(transport.connectedNodes.size(), equalTo(expectedNodes.size()));
}

private void assertConnectedExactlyToNodes(ClusterState state) {
assertConnected(state.nodes());
assertThat(transport.connectedNodes.size(), equalTo(state.nodes().getSize()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ TimedClusterApplierService createTimedClusterService(boolean makeMaster) throws
"ClusterApplierServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool);
timedClusterApplierService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {

@Override
public void connectToNodes(DiscoveryNodes discoveryNodes) {
public void connectToNodes(DiscoveryNodes discoveryNodes, boolean reconnectToKnownNodes) {
// skip
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public static ClusterService createClusterService(ThreadPool threadPool, Discove
clusterSettings, threadPool, Collections.emptyMap());
clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToNodes(DiscoveryNodes discoveryNodes) {
public void connectToNodes(DiscoveryNodes discoveryNodes, boolean reconnectToKnownNodes) {
// skip
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void ensureHealthy(InternalTestCluster cluster) {
public static void ensureFullyConnectedCluster(InternalTestCluster cluster) {
for (String node: cluster.getNodeNames()) {
ClusterState stateOnNode = cluster.getInstance(ClusterService.class, node).state();
cluster.getInstance(NodeConnectionsService.class, node).connectToNodes(stateOnNode.nodes());
cluster.getInstance(NodeConnectionsService.class, node).connectToNodes(stateOnNode.nodes(), true);
}
}

Expand Down

0 comments on commit 8f977ff

Please sign in to comment.