Skip to content

Commit

Permalink
Make TcpTransport#openConnection fully async (#36095)
Browse files Browse the repository at this point in the history
This is a follow-up to #35144. That commit made the underlying
connection opening process in TcpTransport asynchronous. However the
method still blocked on the process being complete before returning.
This commit moves the blocking to the ConnectionManager level. This is
another step towards the top-level TransportService api being async.
  • Loading branch information
Tim-Brooks authored Nov 30, 2018
1 parent 465a65a commit ea7ea51
Show file tree
Hide file tree
Showing 15 changed files with 214 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.lease.Releasable;
Expand Down Expand Up @@ -218,7 +219,18 @@ public void close() {
}

private Transport.Connection internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Transport.Connection connection = transport.openConnection(node, connectionProfile);
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
Releasable pendingConnection = transport.openConnection(node, connectionProfile, future);
Transport.Connection connection;
try {
connection = future.actionGet();
} catch (IllegalStateException e) {
// If the future was interrupted we must cancel the pending connection to avoid channels leaking
if (e.getCause() instanceof InterruptedException) {
pendingConnection.close();
}
throw e;
}
try {
connectionListener.onConnectionOpened(connection);
} finally {
Expand Down
30 changes: 10 additions & 20 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
Expand All @@ -46,6 +45,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
Expand Down Expand Up @@ -349,34 +349,24 @@ protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile con
}

@Override
public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Objects.requireNonNull(connectionProfile, "connection profile cannot be null");
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Transport.Connection> listener) {
Objects.requireNonNull(profile, "connection profile cannot be null");
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
connectionProfile = maybeOverrideConnectionProfile(connectionProfile);
ConnectionProfile finalProfile = maybeOverrideConnectionProfile(profile);
closeLock.readLock().lock(); // ensure we don't open connections while we are closing
try {
ensureOpen();
PlainActionFuture<NodeChannels> connectionFuture = PlainActionFuture.newFuture();
List<TcpChannel> pendingChannels = initiateConnection(node, connectionProfile, connectionFuture);

try {
return connectionFuture.actionGet();
} catch (IllegalStateException e) {
// If the future was interrupted we can close the channels to improve the shutdown of the MockTcpTransport
if (e.getCause() instanceof InterruptedException) {
CloseableChannel.closeChannels(pendingChannels, false);
}
throw e;
}
List<TcpChannel> pendingChannels = initiateConnection(node, finalProfile, listener);
return () -> CloseableChannel.closeChannels(pendingChannels, false);
} finally {
closeLock.readLock().unlock();
}
}

private List<TcpChannel> initiateConnection(DiscoveryNode node, ConnectionProfile connectionProfile,
ActionListener<NodeChannels> listener) {
ActionListener<Transport.Connection> listener) {
int numConnections = connectionProfile.getNumConnections();
assert numConnections > 0 : "A connection profile must be configured with at least one connection";

Expand Down Expand Up @@ -432,7 +422,7 @@ public List<String> getLocalAddresses() {

protected void bindServer(ProfileSettings profileSettings) {
// Bind and start to accept incoming connections.
InetAddress hostAddresses[];
InetAddress[] hostAddresses;
List<String> profileBindHosts = profileSettings.bindHosts;
try {
hostAddresses = networkService.resolveBindHostAddresses(profileBindHosts.toArray(Strings.EMPTY_ARRAY));
Expand Down Expand Up @@ -1581,11 +1571,11 @@ private final class ChannelsConnectedListener implements ActionListener<Void> {
private final DiscoveryNode node;
private final ConnectionProfile connectionProfile;
private final List<TcpChannel> channels;
private final ActionListener<NodeChannels> listener;
private final ActionListener<Transport.Connection> listener;
private final CountDown countDown;

private ChannelsConnectedListener(DiscoveryNode node, ConnectionProfile connectionProfile, List<TcpChannel> channels,
ActionListener<NodeChannels> listener) {
ActionListener<Transport.Connection> listener) {
this.node = node;
this.connectionProfile = connectionProfile;
this.channels = channels;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.transport.BoundTransportAddress;
Expand Down Expand Up @@ -86,10 +87,12 @@ default CircuitBreaker getInFlightRequestBreaker() {
}

/**
* Opens a new connection to the given node and returns it. The returned connection is not managed by
* the transport implementation. This connection must be closed once it's not needed anymore.
* Opens a new connection to the given node. When the connection is fully connected, the listener is
* called. A {@link Releasable} is returned representing the pending connection. If the caller of this
* method decides to move on before the listener is called with the completed connection, they should
* release the pending connection to prevent hanging connections.
*/
Connection openConnection(DiscoveryNode node, ConnectionProfile profile);
Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Transport.Connection> listener);

TransportStats getStats();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.transport;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
Expand All @@ -30,6 +31,7 @@
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -79,8 +81,8 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
protected abstract ClusterState getMockClusterState(DiscoveryNode node);

@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
return new CloseableConnection() {
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> connectionListener) {
connectionListener.onResponse(new CloseableConnection() {

@Override
public DiscoveryNode getNode() {
Expand Down Expand Up @@ -134,7 +136,9 @@ public void sendRequest(long requestId, String action, TransportRequest request,
}
}
}
};
});

return () -> {};
}

protected abstract Response newResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -167,7 +168,9 @@ public <T extends TransportResponse> void sendRequest(Transport.Connection conne
transportService.addNodeConnectedBehavior((connectionManager, discoveryNode) -> false);
transportService.addGetConnectionBehavior((connectionManager, discoveryNode) -> {
// The FailAndRetryTransport does not use the connection profile
return transport.openConnection(discoveryNode, null);
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
transport.openConnection(discoveryNode, null, future);
return future.actionGet();
});
transportService.start();
transportService.acceptIncomingRequests();
Expand Down Expand Up @@ -358,11 +361,19 @@ public void testSniffNodesSamplerClosesConnections() throws Exception {
try (MockTransportService clientService = createNewService(clientSettings, Version.CURRENT, threadPool, null)) {
final List<Transport.Connection> establishedConnections = new CopyOnWriteArrayList<>();

clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile) -> {
Transport.Connection connection = transport.openConnection(discoveryNode, profile);
establishedConnections.add(connection);
return connection;
});
clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile, listener) ->
transport.openConnection(discoveryNode, profile, new ActionListener<Transport.Connection>() {
@Override
public void onResponse(Transport.Connection connection) {
establishedConnections.add(connection);
listener.onResponse(connection);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}));


clientService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
Expand Down Expand Up @@ -187,8 +188,6 @@ public HandshakeResponse handshake(Transport.Connection connection, long timeout
private final class MockTransport implements Transport {
private ResponseHandlers responseHandlers = new ResponseHandlers();
private volatile boolean randomConnectionExceptions = false;
private TransportMessageListener listener = new TransportMessageListener() {
};

@Override
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
Expand All @@ -201,7 +200,6 @@ public RequestHandlerRegistry getRequestHandler(String action) {

@Override
public void addMessageListener(TransportMessageListener listener) {
this.listener = listener;
}

@Override
Expand All @@ -225,13 +223,14 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi
}

@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
if (connectionProfile == null) {
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> listener) {
if (profile == null) {
if (randomConnectionExceptions && randomBoolean()) {
throw new ConnectTransportException(node, "simulated");
listener.onFailure(new ConnectTransportException(node, "simulated"));
return () -> {};
}
}
Connection connection = new Connection() {
listener.onResponse(new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
Expand All @@ -257,8 +256,8 @@ public void close() {
public boolean isClosed() {
return false;
}
};
return connection;
});
return () -> {};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -35,8 +36,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ConnectionManagerTests extends ESTestCase {

Expand Down Expand Up @@ -82,7 +85,11 @@ public void onNodeDisconnected(DiscoveryNode node) {

DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
Transport.Connection connection = new TestConnect(node);
when(transport.openConnection(node, connectionProfile)).thenReturn(connection);
doAnswer(invocationOnMock -> {
ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
listener.onResponse(connection);
return null;
}).when(transport).openConnection(eq(node), eq(connectionProfile), any(ActionListener.class));

assertFalse(connectionManager.nodeConnected(node));

Expand Down Expand Up @@ -126,7 +133,11 @@ public void onNodeDisconnected(DiscoveryNode node) {

DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
Transport.Connection connection = new TestConnect(node);
when(transport.openConnection(node, connectionProfile)).thenReturn(connection);
doAnswer(invocationOnMock -> {
ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
listener.onResponse(connection);
return null;
}).when(transport).openConnection(eq(node), eq(connectionProfile), any(ActionListener.class));

assertFalse(connectionManager.nodeConnected(node));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ public static Transport getProxyTransport(ThreadPool threadPool, Map<String, Map

StubbableTransport stubbableTransport = new StubbableTransport(MockTransportService.newMockTransport(Settings.EMPTY, Version
.CURRENT, threadPool));
stubbableTransport.setDefaultConnectBehavior((t, node, profile) -> {
stubbableTransport.setDefaultConnectBehavior((t, node, profile, listener) -> {
Map<String, DiscoveryNode> proxyMapping = nodeMap.get(node.getAddress().toString());
if (proxyMapping == null) {
throw new IllegalStateException("no proxy mapping for node: " + node);
Expand All @@ -1455,34 +1455,44 @@ public static Transport getProxyTransport(ThreadPool threadPool, Map<String, Map
// route by seed hostname
proxyNode = proxyMapping.get(node.getHostName());
}
Transport.Connection connection = t.openConnection(proxyNode, profile);
return new Transport.Connection() {
return t.openConnection(proxyNode, profile, new ActionListener<Transport.Connection>() {
@Override
public DiscoveryNode getNode() {
return node;
}
public void onResponse(Transport.Connection connection) {
Transport.Connection proxyConnection = new Transport.Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}

@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, action, request, options);
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
connection.sendRequest(requestId, action, request, options);
}

@Override
public void addCloseListener(ActionListener<Void> listener) {
connection.addCloseListener(listener);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
connection.addCloseListener(listener);
}

@Override
public boolean isClosed() {
return connection.isClosed();
@Override
public boolean isClosed() {
return connection.isClosed();
}

@Override
public void close() {
connection.close();
}
};
listener.onResponse(proxyConnection);
}

@Override
public void close() {
connection.close();
public void onFailure(Exception e) {
listener.onFailure(e);
}
};
});
});
return stubbableTransport;
}
Expand Down
Loading

0 comments on commit ea7ea51

Please sign in to comment.