diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index f9aaf7333fa9c..3ab54ae1c7dca 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -178,10 +178,11 @@ protected RemoteClusterAware(Settings settings) { * (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to * {@link TransportAddress#META_ADDRESS} and their configured address will be used as the hostname for the generated discovery node. */ - protected static Map>>> buildRemoteClustersDynamicConfig(Settings settings) { - final Map>>> remoteSeeds = + protected static Map>>>> buildRemoteClustersDynamicConfig( + final Settings settings) { + final Map>>>> remoteSeeds = buildRemoteClustersDynamicConfig(settings, REMOTE_CLUSTERS_SEEDS); - final Map>>> searchRemoteSeeds = + final Map>>>> searchRemoteSeeds = buildRemoteClustersDynamicConfig(settings, SEARCH_REMOTE_CLUSTERS_SEEDS); // sort the intersection for predictable output order final NavigableSet intersection = @@ -200,7 +201,7 @@ protected static Map>>> build .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - private static Map>>> buildRemoteClustersDynamicConfig( + private static Map>>>> buildRemoteClustersDynamicConfig( final Settings settings, final Setting.AffixSetting> seedsSetting) { final Stream>> allConcreteSettings = seedsSetting.getAllConcreteSettings(settings); return allConcreteSettings.collect( @@ -209,9 +210,9 @@ private static Map>>> buildRe List addresses = concreteSetting.get(settings); final boolean proxyMode = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).existsOrFallbackExists(settings); - List> nodes = new ArrayList<>(addresses.size()); + List>> nodes = new ArrayList<>(addresses.size()); for (String address : addresses) { - nodes.add(() -> buildSeedNode(clusterName, address, proxyMode)); + nodes.add(Tuple.tuple(address, () -> buildSeedNode(clusterName, address, proxyMode))); } return new Tuple<>(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterName).get(settings), nodes); })); @@ -299,16 +300,24 @@ public void listenForUpdates(ClusterSettings clusterSettings) { (namespace, value) -> {}); } - - protected static InetSocketAddress parseSeedAddress(String remoteHost) { - String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost)); + static InetSocketAddress parseSeedAddress(String remoteHost) { + final Tuple hostPort = parseHostPort(remoteHost); + final String host = hostPort.v1(); + assert hostPort.v2() != null : remoteHost; + final int port = hostPort.v2(); InetAddress hostAddress; try { hostAddress = InetAddress.getByName(host); } catch (UnknownHostException e) { throw new IllegalArgumentException("unknown host [" + host + "]", e); } - return new InetSocketAddress(hostAddress, parsePort(remoteHost)); + return new InetSocketAddress(hostAddress, port); + } + + public static Tuple parseHostPort(final String remoteHost) { + final String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost)); + final int port = parsePort(remoteHost); + return Tuple.tuple(host, port); } private static int parsePort(String remoteHost) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 39b732059885d..36026f85bbbff 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -100,7 +101,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos private final Predicate nodePredicate; private final ThreadPool threadPool; private volatile String proxyAddress; - private volatile List> seedNodes; + private volatile List>> seedNodes; private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; private final TimeValue initialConnectionTimeout; @@ -116,7 +117,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos * @param nodePredicate a predicate to filter eligible remote nodes to connect to * @param proxyAddress the proxy address */ - RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, + RemoteClusterConnection(Settings settings, String clusterAlias, List>> seedNodes, TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, String proxyAddress) { this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress, @@ -124,7 +125,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos } // Public for tests to pass a StubbableConnectionManager - RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, + RemoteClusterConnection(Settings settings, String clusterAlias, List>> seedNodes, TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, String proxyAddress, ConnectionManager connectionManager) { this.transportService = transportService; @@ -160,7 +161,10 @@ private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, Discovery /** * Updates the list of seed nodes for this cluster connection */ - synchronized void updateSeedNodes(String proxyAddress, List> seedNodes, ActionListener connectListener) { + synchronized void updateSeedNodes( + final String proxyAddress, + final List>> seedNodes, + final ActionListener connectListener) { this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes)); this.proxyAddress = proxyAddress; connectHandler.connect(connectListener); @@ -470,7 +474,7 @@ protected void doRun() { maybeConnect(); } }); - collectRemoteNodes(seedNodes.iterator(), transportService, connectionManager, listener); + collectRemoteNodes(seedNodes.stream().map(Tuple::v2).iterator(), transportService, connectionManager, listener); } }); } @@ -719,10 +723,10 @@ public void handleResponse(NodesInfoResponse response) { } } RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(clusterAlias, - seedNodes.stream().map(sup -> sup.get().getAddress()).collect(Collectors.toList()), - new ArrayList<>(httpAddresses), - maxNumRemoteConnections, connectedNodes.size(), - initialConnectionTimeout, skipUnavailable); + seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()), + new ArrayList<>(httpAddresses), + maxNumRemoteConnections, connectedNodes.size(), + initialConnectionTimeout, skipUnavailable); listener.onResponse(remoteConnectionInfo); } @@ -740,10 +744,9 @@ public String executor() { } RemoteConnectionInfo getLocalConnectionInfo() { // for tests - List seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect - (Collectors.toList()); - return new RemoteConnectionInfo(clusterAlias, Collections.emptyList(), - seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(), initialConnectionTimeout, skipUnavailable); + List seedNodeAddresses = seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()); + return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, + Collections.emptyList(), maxNumRemoteConnections, connectedNodes.size(), initialConnectionTimeout, skipUnavailable); } int getNumNodesConnected() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 84826b4d1e0db..908ecaa9db5f5 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -195,7 +195,7 @@ public String getKey(final String key) { * @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes * @param connectionListener a listener invoked once every configured cluster has been connected to */ - private synchronized void updateRemoteClusters(Map>>> seeds, + private synchronized void updateRemoteClusters(Map>>>> seeds, ActionListener connectionListener) { if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) { throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); @@ -206,8 +206,8 @@ private synchronized void updateRemoteClusters(Map>>> entry : seeds.entrySet()) { - List> seedList = entry.getValue().v2(); + for (Map.Entry>>>> entry : seeds.entrySet()) { + List>> seedList = entry.getValue().v2(); String proxyAddress = entry.getValue().v1(); RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey()); @@ -402,9 +402,10 @@ void updateRemoteCluster( final List addresses, final String proxyAddress, final ActionListener connectionListener) { - final List> nodes = addresses.stream().>map(address -> () -> - buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress)) - ).collect(Collectors.toList()); + final List>> nodes = + addresses.stream().>>map(address -> Tuple.tuple(address, () -> + buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))) + ).collect(Collectors.toList()); updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener); } @@ -415,7 +416,8 @@ void updateRemoteCluster( void initializeRemoteClusters() { final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); final PlainActionFuture future = new PlainActionFuture<>(); - Map>>> seeds = RemoteClusterAware.buildRemoteClustersDynamicConfig(settings); + Map>>>> seeds = + RemoteClusterAware.buildRemoteClustersDynamicConfig(settings); updateRemoteClusters(seeds, future); try { future.get(timeValue.millis(), TimeUnit.MILLISECONDS); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java index 7995f23ad1f49..a8d4328b29a85 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java @@ -16,9 +16,11 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.transport; import org.elasticsearch.Version; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -28,15 +30,19 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** * This class encapsulates all remote cluster information to be rendered on * {@code _remote/info} requests. */ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable { - final List seedNodes; + final List seedNodes; final List httpAddresses; final int connectionsPerCluster; final TimeValue initialConnectionTimeout; @@ -44,7 +50,7 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable final String clusterAlias; final boolean skipUnavailable; - RemoteConnectionInfo(String clusterAlias, List seedNodes, + RemoteConnectionInfo(String clusterAlias, List seedNodes, List httpAddresses, int connectionsPerCluster, int numNodesConnected, TimeValue initialConnectionTimeout, boolean skipUnavailable) { @@ -58,7 +64,17 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable } public RemoteConnectionInfo(StreamInput input) throws IOException { - seedNodes = input.readList(TransportAddress::new); + if (input.getVersion().onOrAfter(Version.V_6_6_0)) { + seedNodes = Arrays.asList(input.readStringArray()); + } else { + // versions prior to 7.0.0 sent the resolved transport address of the seed nodes + final List transportAddresses = input.readList(TransportAddress::new); + seedNodes = + transportAddresses + .stream() + .map(a -> a.address().getHostString() + ":" + a.address().getPort()) + .collect(Collectors.toList()); + } httpAddresses = input.readList(TransportAddress::new); connectionsPerCluster = input.readVInt(); initialConnectionTimeout = input.readTimeValue(); @@ -76,8 +92,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(clusterAlias); { builder.startArray("seeds"); - for (TransportAddress addr : seedNodes) { - builder.value(addr.toString()); + for (String addr : seedNodes) { + builder.value(addr); } builder.endArray(); builder.startArray("http_addresses"); @@ -97,7 +113,26 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { - out.writeList(seedNodes); + if (out.getVersion().onOrAfter(Version.V_6_6_0)) { + out.writeStringArray(seedNodes.toArray(new String[0])); + } else { + // versions prior to 7.0.0 received the resolved transport address of the seed nodes + out.writeList(seedNodes + .stream() + .map( + s -> { + final Tuple hostPort = RemoteClusterAware.parseHostPort(s); + assert hostPort.v2() != null : s; + try { + return new TransportAddress( + InetAddress.getByAddress(hostPort.v1(), TransportAddress.META_ADDRESS.getAddress()), + hostPort.v2()); + } catch (final UnknownHostException e) { + throw new AssertionError(e); + } + }) + .collect(Collectors.toList())); + } out.writeList(httpAddresses); out.writeVInt(connectionsPerCluster); out.writeTimeValue(initialConnectionTimeout); @@ -127,4 +162,5 @@ public int hashCode() { return Objects.hash(seedNodes, httpAddresses, connectionsPerCluster, initialConnectionTimeout, numNodesConnected, clusterAlias, skipUnavailable); } + } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index da77e2e06abf2..18c58f1f41a95 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; @@ -87,6 +88,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -171,9 +173,9 @@ public void testRemoteProfileIsUsedForLocalCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { ConnectionManager connectionManager = connection.getConnectionManager(); - updateSeedNodes(connection, Arrays.asList(() -> seedNode)); + updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); assertTrue(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -213,9 +215,9 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { ConnectionManager connectionManager = connection.getConnectionManager(); - updateSeedNodes(connection, Arrays.asList(() -> seedNode)); + updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); assertTrue(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -266,9 +268,9 @@ public void testDiscoverSingleNode() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { ConnectionManager connectionManager = connection.getConnectionManager(); - updateSeedNodes(connection, Arrays.asList(() -> seedNode)); + updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); assertTrue(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); @@ -289,7 +291,9 @@ public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception { knownNodes.add(discoverableTransport.getLocalDiscoNode()); knownNodes.add(incompatibleTransport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); - List> seedNodes = Arrays.asList(() -> incompatibleSeedNode, () -> seedNode); + List>> seedNodes = Arrays.asList( + Tuple.tuple(incompatibleSeedNode.toString(), () -> incompatibleSeedNode), + Tuple.tuple(seedNode.toString(), () -> seedNode)); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { @@ -324,9 +328,9 @@ public void testNodeDisconnected() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { ConnectionManager connectionManager = connection.getConnectionManager(); - updateSeedNodes(connection, Arrays.asList(() -> seedNode)); + updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); assertTrue(connectionManager.nodeConnected(discoverableNode)); assertFalse(connectionManager.nodeConnected(spareNode)); @@ -374,9 +378,9 @@ public void testFilterDiscoveredNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> n.equals(rejectedNode) == false, null)) { ConnectionManager connectionManager = connection.getConnectionManager(); - updateSeedNodes(connection, Arrays.asList(() -> seedNode)); + updateSeedNodes(connection, seedNodes(seedNode)); if (rejectedNode.equals(seedNode)) { assertFalse(connectionManager.nodeConnected(seedNode)); assertTrue(connectionManager.nodeConnected(discoverableNode)); @@ -389,11 +393,15 @@ public void testFilterDiscoveredNodes() throws Exception { } } } - private void updateSeedNodes(RemoteClusterConnection connection, List> seedNodes) throws Exception { + private void updateSeedNodes( + final RemoteClusterConnection connection, final List>> seedNodes) throws Exception { updateSeedNodes(connection, seedNodes, null); } - private void updateSeedNodes(RemoteClusterConnection connection, List> seedNodes, String proxyAddress) + private void updateSeedNodes( + final RemoteClusterConnection connection, + final List>> seedNodes, + final String proxyAddress) throws Exception { CountDownLatch latch = new CountDownLatch(1); AtomicReference exceptionAtomicReference = new AtomicReference<>(); @@ -418,9 +426,11 @@ public void testConnectWithIncompatibleTransports() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)), service, Integer.MAX_VALUE, n -> true, null)) { ConnectionManager connectionManager = connection.getConnectionManager(); - expectThrows(Exception.class, () -> updateSeedNodes(connection, Arrays.asList(() -> seedNode))); + expectThrows( + Exception.class, + () -> updateSeedNodes(connection, Arrays.asList(Tuple.tuple(seedNode.toString(), () -> seedNode)))); assertFalse(connectionManager.nodeConnected(seedNode)); assertTrue(connection.assertNoRunningConnections()); } @@ -471,7 +481,7 @@ public void sendRequest(long requestId, String action, TransportRequest request, service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) { connection.addConnectedNode(seedNode); for (DiscoveryNode node : knownNodes) { final Transport.Connection transportConnection = connection.getConnection(node); @@ -514,7 +524,7 @@ public void run() { CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -522,7 +532,7 @@ public void run() { exceptionReference.set(x); listenerCalled.countDown(); }); - connection.updateSeedNodes(null, Arrays.asList(() -> seedNode), listener); + connection.updateSeedNodes(null, seedNodes(seedNode), listener); acceptedLatch.await(); connection.close(); // now close it, this should trigger an interrupt on the socket and we can move on assertTrue(connection.assertNoRunningConnections()); @@ -538,6 +548,18 @@ public void run() { } } + private List>> seedNodes(final DiscoveryNode... seedNodes) { + if (seedNodes.length == 0) { + return Collections.emptyList(); + } else if (seedNodes.length == 1) { + return Collections.singletonList(Tuple.tuple(seedNodes[0].toString(), () -> seedNodes[0])); + } else { + return Arrays.stream(seedNodes) + .map(s -> Tuple.tuple(s.toString(), (Supplier)() -> s)) + .collect(Collectors.toList()); + } + } + public void testFetchShards() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); @@ -549,11 +571,11 @@ public void testFetchShards() throws Exception { try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { service.start(); service.acceptIncomingRequests(); - List> nodes = Collections.singletonList(() -> seedNode); + final List>> seedNodes = seedNodes(seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { if (randomBoolean()) { - updateSeedNodes(connection, nodes); + updateSeedNodes(connection, seedNodes); } if (randomBoolean()) { connection.updateSkipUnavailable(randomBoolean()); @@ -589,9 +611,9 @@ public void testFetchShardsThreadContextHeader() throws Exception { try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { service.start(); service.acceptIncomingRequests(); - List> nodes = Collections.singletonList(() -> seedNode); + final List>> seedNodes = seedNodes(seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - nodes, service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { SearchRequest request = new SearchRequest("test-index"); Thread[] threads = new Thread[10]; for (int i = 0; i < threads.length; i++) { @@ -645,7 +667,7 @@ public void testFetchShardsSkipUnavailable() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { ConnectionManager connectionManager = connection.getConnectionManager(); SearchRequest request = new SearchRequest("test-index"); @@ -749,7 +771,7 @@ public void testTriggerUpdatesConcurrently() throws IOException, InterruptedExce knownNodes.add(discoverableTransport.getLocalDiscoNode()); knownNodes.add(seedTransport1.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); - List> seedNodes = Arrays.asList(() -> seedNode1, () -> seedNode); + List>> seedNodes = seedNodes(seedNode1, seedNode); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { @@ -829,7 +851,7 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt knownNodes.add(discoverableTransport.getLocalDiscoNode()); knownNodes.add(seedTransport1.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); - List> seedNodes = Arrays.asList(() -> seedNode1, () -> seedNode); + List>> seedNodes = seedNodes(seedNode1, seedNode); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { @@ -932,7 +954,7 @@ public void testGetConnectionInfo() throws Exception { knownNodes.add(transport3.getLocalDiscoNode()); knownNodes.add(transport2.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); - List> seedNodes = Arrays.asList(() -> node3, () -> node1, () -> node2); + List>> seedNodes = seedNodes(node3, node1, node2); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { @@ -999,50 +1021,50 @@ public void onFailure(Exception e) { public void testRemoteConnectionInfo() throws IOException { RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster", - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), - 4, 3, TimeValue.timeValueMinutes(30), false); + Arrays.asList("seed:1"), + Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), + 4, 3, TimeValue.timeValueMinutes(30), false); assertSerialization(stats); RemoteConnectionInfo stats1 = new RemoteConnectionInfo("test_cluster", - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), - 4, 4, TimeValue.timeValueMinutes(30), true); + Arrays.asList("seed:1"), + Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), + 4, 4, TimeValue.timeValueMinutes(30), true); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster_1", - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), - 4, 3, TimeValue.timeValueMinutes(30), false); + Arrays.asList("seed:1"), + Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), + 4, 3, TimeValue.timeValueMinutes(30), false); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster", - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 15)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), - 4, 3, TimeValue.timeValueMinutes(30), false); + Arrays.asList("seed:15"), + Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), + 4, 3, TimeValue.timeValueMinutes(30), false); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster", - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 87)), - 4, 3, TimeValue.timeValueMinutes(30), true); + Arrays.asList("seed:1"), + Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 87)), + 4, 3, TimeValue.timeValueMinutes(30), true); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster", - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), - 4, 3, TimeValue.timeValueMinutes(325), true); + Arrays.asList("seed:1"), + Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), + 4, 3, TimeValue.timeValueMinutes(325), true); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster", - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), - 5, 3, TimeValue.timeValueMinutes(30), false); + Arrays.asList("seed:1"), + Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), + 5, 3, TimeValue.timeValueMinutes(30), false); assertSerialization(stats1); assertNotEquals(stats, stats1); } @@ -1063,7 +1085,7 @@ private static RemoteConnectionInfo assertSerialization(RemoteConnectionInfo inf public void testRemoteConnectionInfoBwComp() throws IOException { final Version version = VersionUtils.randomVersionBetween(random(), Version.V_5_6_5, Version.V_6_0_0); RemoteConnectionInfo expected = new RemoteConnectionInfo("test_cluster", - Collections.singletonList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), + Collections.singletonList("0.0.0.0:1"), Collections.singletonList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), 4, 4, new TimeValue(30, TimeUnit.MINUTES), false); @@ -1095,20 +1117,20 @@ public void testRemoteConnectionInfoBwComp() throws IOException { public void testRenderConnectionInfoXContent() throws IOException { RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster", - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1)), - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80)), - 4, 3, TimeValue.timeValueMinutes(30), true); + Arrays.asList("seed:1"), + Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), + 4, 3, TimeValue.timeValueMinutes(30), true); stats = assertSerialization(stats); XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); stats.toXContent(builder, null); builder.endObject(); - assertEquals("{\"test_cluster\":{\"seeds\":[\"0.0.0.0:1\"],\"http_addresses\":[\"0.0.0.0:80\"],\"connected\":true," + + assertEquals("{\"test_cluster\":{\"seeds\":[\"seed:1\"],\"http_addresses\":[\"0.0.0.0:80\"],\"connected\":true," + "\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"," + "\"skip_unavailable\":true}}", Strings.toString(builder)); stats = new RemoteConnectionInfo("some_other_cluster", - Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1), new TransportAddress(TransportAddress.META_ADDRESS,2)), + Arrays.asList("seed:1", "seed:2"), Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80), new TransportAddress(TransportAddress.META_ADDRESS,81)), 2, 0, TimeValue.timeValueSeconds(30), false); stats = assertSerialization(stats); @@ -1116,7 +1138,7 @@ public void testRenderConnectionInfoXContent() throws IOException { builder.startObject(); stats.toXContent(builder, null); builder.endObject(); - assertEquals("{\"some_other_cluster\":{\"seeds\":[\"0.0.0.0:1\",\"0.0.0.0:2\"],\"http_addresses\":[\"0.0.0.0:80\",\"0.0.0.0:81\"]," + assertEquals("{\"some_other_cluster\":{\"seeds\":[\"seed:1\",\"seed:2\"],\"http_addresses\":[\"0.0.0.0:80\",\"0.0.0.0:81\"]," + "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"," + "\"skip_unavailable\":false}}", Strings.toString(builder)); } @@ -1135,7 +1157,7 @@ public void testEnsureConnected() throws IOException, InterruptedException { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { ConnectionManager connectionManager = connection.getConnectionManager(); assertFalse(connectionManager.nodeConnected(seedNode)); assertFalse(connectionManager.nodeConnected(discoverableNode)); @@ -1185,9 +1207,9 @@ public void testCollectNodes() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(() -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { if (randomBoolean()) { - updateSeedNodes(connection, Arrays.asList(() -> seedNode)); + updateSeedNodes(connection, seedNodes(seedNode)); } CountDownLatch responseLatch = new CountDownLatch(1); AtomicReference> reference = new AtomicReference<>(); @@ -1219,14 +1241,14 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted List discoverableTransports = new CopyOnWriteArrayList<>(); try { final int numDiscoverableNodes = randomIntBetween(5, 20); - List> discoverableNodes = new ArrayList<>(numDiscoverableNodes); + List>> discoverableNodes = new ArrayList<>(numDiscoverableNodes); for (int i = 0; i < numDiscoverableNodes; i++ ) { MockTransportService transportService = startTransport("discoverable_node" + i, knownNodes, Version.CURRENT); - discoverableNodes.add(transportService::getLocalDiscoNode); + discoverableNodes.add(Tuple.tuple("discoverable_node" + i, transportService::getLocalDiscoNode)); discoverableTransports.add(transportService); } - List> seedNodes = randomSubsetOf(discoverableNodes); + List>> seedNodes = randomSubsetOf(discoverableNodes); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { @@ -1275,7 +1297,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted discoverableTransports.add(transportService); connection.addConnectedNode(transportService.getLocalDiscoNode()); } else { - DiscoveryNode node = randomFrom(discoverableNodes).get(); + DiscoveryNode node = randomFrom(discoverableNodes).v2().get(); connection.onNodeDisconnected(node); } } @@ -1323,14 +1345,16 @@ public void testClusterNameIsChecked() throws Exception { service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList( () -> seedNode), service, Integer.MAX_VALUE, n -> true, null)) { + seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { ConnectionManager connectionManager = connection.getConnectionManager(); - updateSeedNodes(connection, Collections.singletonList(() -> seedNode)); + updateSeedNodes(connection, seedNodes(seedNode)); assertTrue(connectionManager.nodeConnected(seedNode)); assertTrue(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); - List> discoveryNodes = - Arrays.asList(otherClusterTransport::getLocalDiscoNode, () -> seedNode); + List>> discoveryNodes = + Arrays.asList( + Tuple.tuple("other", otherClusterTransport::getLocalDiscoNode), + Tuple.tuple(seedNode.toString(), () -> seedNode)); Collections.shuffle(discoveryNodes, random()); updateSeedNodes(connection, discoveryNodes); assertTrue(connectionManager.nodeConnected(seedNode)); @@ -1341,7 +1365,7 @@ public void testClusterNameIsChecked() throws Exception { assertTrue(connectionManager.nodeConnected(discoverableNode)); assertTrue(connection.assertNoRunningConnections()); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () -> - updateSeedNodes(connection, Arrays.asList(() -> otherClusterTransport.getLocalDiscoNode()))); + updateSeedNodes(connection, Arrays.asList(Tuple.tuple("other", otherClusterTransport::getLocalDiscoNode)))); assertThat(illegalStateException.getMessage(), startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" + " - {other_cluster_discoverable_node}")); @@ -1393,7 +1417,7 @@ public void sendRequest(long requestId, String action, TransportRequest request, service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(() -> connectedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) { + seedNodes(connectedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) { connection.addConnectedNode(connectedNode); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected @@ -1430,10 +1454,10 @@ public void testLazyResolveTransportAddress() throws Exception { service.start(); service.acceptIncomingRequests(); CountDownLatch multipleResolveLatch = new CountDownLatch(2); - Supplier seedSupplier = () -> { + Tuple> seedSupplier = Tuple.tuple(seedNode.toString(), () -> { multipleResolveLatch.countDown(); return seedNode; - }; + }); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, null)) { updateSeedNodes(connection, Arrays.asList(seedSupplier)); @@ -1463,9 +1487,9 @@ public void testProxyMode() throws Exception { threadPool, null, Collections.emptySet())) { service.start(); service.acceptIncomingRequests(); - Supplier seedSupplier = () -> - RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true); - assertEquals("node_0", seedSupplier.get().getAttributes().get("server_name")); + Tuple> seedSupplier = Tuple.tuple("node_0", () -> + RemoteClusterAware.buildSeedNode("some-remote-cluster", "node_0:" + randomIntBetween(1, 10000), true)); + assertEquals("node_0", seedSupplier.v2().get().getAttributes().get("server_name")); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, proxyAddress)) { updateSeedNodes(connection, Arrays.asList(seedSupplier), proxyAddress); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index d41faee81bdfd..4845b0f23ff6e 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -124,41 +124,42 @@ public void testRemoteClusterSeedSetting() { } public void testBuildRemoteClustersDynamicConfig() throws Exception { - Map>>> map = RemoteClusterService.buildRemoteClustersDynamicConfig( - Settings.builder() - .put("cluster.remote.foo.seeds", "192.168.0.1:8080") - .put("cluster.remote.bar.seeds", "[::1]:9090") - .put("cluster.remote.boom.seeds", "boom-node1.internal:1000") - .put("cluster.remote.boom.proxy", "foo.bar.com:1234") - .put("search.remote.quux.seeds", "quux:9300") - .put("search.remote.quux.proxy", "quux-proxy:19300") - .build()); + Map>>>> map = + RemoteClusterService.buildRemoteClustersDynamicConfig( + Settings.builder() + .put("cluster.remote.foo.seeds", "192.168.0.1:8080") + .put("cluster.remote.bar.seeds", "[::1]:9090") + .put("cluster.remote.boom.seeds", "boom-node1.internal:1000") + .put("cluster.remote.boom.proxy", "foo.bar.com:1234") + .put("search.remote.quux.seeds", "quux:9300") + .put("search.remote.quux.proxy", "quux-proxy:19300") + .build()); assertThat(map.keySet(), containsInAnyOrder(equalTo("foo"), equalTo("bar"), equalTo("boom"), equalTo("quux"))); assertThat(map.get("foo").v2(), hasSize(1)); assertThat(map.get("bar").v2(), hasSize(1)); assertThat(map.get("boom").v2(), hasSize(1)); assertThat(map.get("quux").v2(), hasSize(1)); - DiscoveryNode foo = map.get("foo").v2().get(0).get(); + DiscoveryNode foo = map.get("foo").v2().get(0).v2().get(); assertEquals("", map.get("foo").v1()); assertEquals(foo.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 8080))); assertEquals(foo.getId(), "foo#192.168.0.1:8080"); assertEquals(foo.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); - DiscoveryNode bar = map.get("bar").v2().get(0).get(); + DiscoveryNode bar = map.get("bar").v2().get(0).v2().get(); assertEquals(bar.getAddress(), new TransportAddress(new InetSocketAddress(InetAddress.getByName("[::1]"), 9090))); assertEquals(bar.getId(), "bar#[::1]:9090"); assertEquals("", map.get("bar").v1()); assertEquals(bar.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); - DiscoveryNode boom = map.get("boom").v2().get(0).get(); + DiscoveryNode boom = map.get("boom").v2().get(0).v2().get(); assertEquals(boom.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0)); assertEquals("boom-node1.internal", boom.getHostName()); assertEquals(boom.getId(), "boom#boom-node1.internal:1000"); assertEquals("foo.bar.com:1234", map.get("boom").v1()); assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion()); - DiscoveryNode quux = map.get("quux").v2().get(0).get(); + DiscoveryNode quux = map.get("quux").v2().get(0).v2().get(); assertEquals(quux.getAddress(), new TransportAddress(TransportAddress.META_ADDRESS, 0)); assertEquals("quux", quux.getHostName()); assertEquals(quux.getId(), "quux#quux:9300");