Skip to content

Commit

Permalink
Fixing a memory leak in qpid layer. (Azure#193)
Browse files Browse the repository at this point in the history
* Fixing a memory leak in qpid layer. When service force closes an idle connetion, it is not properly freed in qpid layer.
Includes some minor changes to make code simpler.

* Fixing a code review comment.
  • Loading branch information
yvgopal authored Mar 3, 2018
1 parent 68d114b commit e7fcc0b
Show file tree
Hide file tree
Showing 15 changed files with 108 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
abstract class InitializableEntity extends ClientEntity {

//TODO Init and close semantics are primitive now. Fix them with support for other states like Initializing, Closing, and concurrency.
protected InitializableEntity(String clientId, ClientEntity parent) {
super(clientId, parent);
protected InitializableEntity(String clientId) {
super(clientId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class MessageAndSessionPump extends InitializableEntity implements IMessageAndSe
private int prefetchCount;

public MessageAndSessionPump(MessagingFactory factory, String entityPath, ReceiveMode receiveMode) {
super(StringUtil.getShortRandomString(), null);
super(StringUtil.getShortRandomString());
this.factory = factory;
this.entityPath = entityPath;
this.receiveMode = receiveMode;
Expand Down Expand Up @@ -92,7 +92,7 @@ public void registerSessionHandler(ISessionHandler handler, SessionHandlerOption
this.sessionHandlerOptions = handlerOptions;

for (int i = 0; i < handlerOptions.getMaxConcurrentSessions(); i++) {
this.acceptSessionsAndPumpMessage();
this.acceptSessionAndPumpMessages();
}
}

Expand Down Expand Up @@ -142,6 +142,12 @@ private void receiveAndPumpMessage() {
onMessageFuture = new CompletableFuture<Void>();
onMessageFuture.completeExceptionally(onMessageSyncEx);
}

// Some clients are returning null from the call
if(onMessageFuture == null)
{
onMessageFuture = CompletableFuture.completedFuture(null);
}

onMessageFuture.handleAsync((v, onMessageEx) -> {
if (onMessageEx != null) {
Expand Down Expand Up @@ -203,7 +209,7 @@ private void receiveAndPumpMessage() {
}
}

private void acceptSessionsAndPumpMessage() {
private void acceptSessionAndPumpMessages() {
if (!this.getIsClosingOrClosed()) {
TRACE_LOGGER.debug("Accepting a session from entity '{}'", this.entityPath);
CompletableFuture<IMessageSession> acceptSessionFuture = ClientFactory.acceptSessionFromEntityPathAsync(this.factory, this.entityPath, null, this.receiveMode);
Expand All @@ -222,7 +228,7 @@ private void acceptSessionsAndPumpMessage() {
// In case of any other exception, sleep and retry
TRACE_LOGGER.debug("AcceptSession from entity '{}' will be retried after '{}'.", this.entityPath, SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION);
Timer.schedule(() -> {
MessageAndSessionPump.this.acceptSessionsAndPumpMessage();
MessageAndSessionPump.this.acceptSessionAndPumpMessages();
}, SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION, TimerType.OneTimeRun);
}
} else {
Expand Down Expand Up @@ -293,6 +299,12 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) {
onMessageFuture.completeExceptionally(onMessageSyncEx);
}

// Some clients are returning null from the call
if(onMessageFuture == null)
{
onMessageFuture = CompletableFuture.completedFuture(null);
}

onMessageFuture.handleAsync((v, onMessageEx) -> {
renewCancelTimer.cancel(true);
if (onMessageEx != null) {
Expand Down Expand Up @@ -419,6 +431,12 @@ synchronized CompletableFuture<Boolean> shouldRetryOnNoMessageOrException() {
onCloseFuture = new CompletableFuture<Void>();
onCloseFuture.completeExceptionally(onCloseSyncEx);
}

// Some clients are returning null from the call
if(onCloseFuture == null)
{
onCloseFuture = CompletableFuture.completedFuture(null);
}

onCloseFuture.handleAsync((v, onCloseEx) -> {
renewCancelTimer.cancel(true);
Expand All @@ -441,7 +459,7 @@ synchronized CompletableFuture<Boolean> shouldRetryOnNoMessageOrException() {
}

this.messageAndSessionPump.openSessions.remove(this.session.getSessionId());
this.messageAndSessionPump.acceptSessionsAndPumpMessage();
this.messageAndSessionPump.acceptSessionAndPumpMessages();
return null;
});
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class MessageReceiver extends InitializableEntity implements IMessageReceiver, I
private final ConcurrentHashMap<UUID, Instant> requestResponseLockTokensToLockTimesMap;

private MessageReceiver(ReceiveMode receiveMode) {
super(StringUtil.getShortRandomString(), null);
super(StringUtil.getShortRandomString());
this.receiveMode = receiveMode;
this.requestResponseLockTokensToLockTimesMap = new ConcurrentHashMap<>();
if (receiveMode == ReceiveMode.PEEKLOCK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ final class MessageSender extends InitializableEntity implements IMessageSender
private ClientSettings clientSettings;

private MessageSender() {
super(StringUtil.getShortRandomString(), null);
super(StringUtil.getShortRandomString());
}

MessageSender(URI namespaceEndpointURI, String entityPath, ClientSettings clientSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public final class QueueClient extends InitializableEntity implements IQueueClie
private MiscRequestResponseOperationHandler miscRequestResponseHandler;

private QueueClient(ReceiveMode receiveMode, String queuePath) {
super(StringUtil.getShortRandomString(), null);
super(StringUtil.getShortRandomString());
this.receiveMode = receiveMode;
this.queuePath = queuePath;
this.senderCreationLock = new Object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class SubscriptionClient extends InitializableEntity implements ISu

private SubscriptionClient(ReceiveMode receiveMode, String subscriptionPath)
{
super(StringUtil.getShortRandomString(), null);
super(StringUtil.getShortRandomString());
this.receiveMode = receiveMode;
this.subscriptionPath = subscriptionPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public final class TopicClient extends InitializableEntity implements ITopicClie
private MessageBrowser browser;

private TopicClient() {
super(StringUtil.getShortRandomString(), null);
super(StringUtil.getShortRandomString());
}

public TopicClient(ConnectionStringBuilder amqpConnectionStringBuilder) throws InterruptedException, ServiceBusException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ public void onConnectionBound(Event event)
sasl.setMechanisms("ANONYMOUS");
}

@Override
public void onConnectionUnbound(Event event)
{
TRACE_LOGGER.debug("Connection.onConnectionUnbound: hostname:{}", event.getConnection().getHostname());
}

@Override
public void onTransportError(Event event)
{
Expand All @@ -89,9 +83,10 @@ public void onTransportError(Event event)

this.messagingFactory.onConnectionError(condition);
Connection connection = event.getConnection();
if (connection != null) {
connection.free();
}
if(connection != null)
{
connection.free();
}
}

@Override
Expand All @@ -108,38 +103,33 @@ public void onConnectionRemoteClose(Event event)
final ErrorCondition error = connection.getRemoteCondition();

TRACE_LOGGER.debug("onConnectionRemoteClose: hostname:{},errorCondition:{}", connection.getHostname(), error != null ? error.getCondition() + "," + error.getDescription() : null);

if (connection.getRemoteState() != EndpointState.CLOSED)
boolean shouldFreeConnection = connection.getLocalState() == EndpointState.CLOSED;
this.messagingFactory.onConnectionError(error);
if(shouldFreeConnection)
{
connection.close();
connection.free();
}

this.messagingFactory.onConnectionError(error);
this.freeOnCloseResponse(connection);
}

@Override
public void onConnectionFinal(Event event) {
final Transport transport = event.getTransport();
if (transport != null) {
transport.unbind();
transport.free();
}
TRACE_LOGGER.debug("onConnectionFinal: hostname:{}", event.getConnection().getHostname());
}

@Override
public void onConnectionLocalClose(Event event) {
Connection connection = event.getConnection();
TRACE_LOGGER.debug("onConnectionLocalClose: hostname:{}", connection.getHostname());
this.freeOnCloseResponse(connection);
}

private void freeOnCloseResponse(Connection connection) {
if (connection != null &&
connection.getLocalState() == EndpointState.CLOSED &&
(connection.getRemoteState() == EndpointState.CLOSED)) {
connection.free();
}
if(connection.getRemoteState() == EndpointState.CLOSED)
{
// Service closed it first. In some such cases transport is not unbound and causing a leak.
if(connection.getTransport() != null)
{
connection.getTransport().unbind();
}

connection.free();
}
}

private static SslDomain makeDomain(SslDomain.Mode mode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,13 @@ public void onConnectionLocalOpen(Event event)
transport.setEmitFlowEventOnSend(false);
transport.bind(connection);
}

@Override
public void onTransportClosed(Event event)
{
if(event.getTransport() != null)
{
event.getTransport().unbind();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@ public abstract class ClientEntity
{
private final String clientId;
private final Object syncClose;
private final ClientEntity parent;

private boolean isClosing;
private boolean isClosed;

protected ClientEntity(final String clientId, final ClientEntity parent)
protected ClientEntity(final String clientId)
{
this.clientId = clientId;
this.parent = parent;

this.syncClose = new Object();
}

Expand All @@ -39,21 +36,19 @@ public String getClientId()
}

protected boolean getIsClosed()
{
final boolean isParentClosed = this.parent != null && this.parent.getIsClosed();
{
synchronized (this.syncClose)
{
return isParentClosed || this.isClosed;
return this.isClosed;
}
}

// returns true even if the Parent is (being) Closed

protected boolean getIsClosingOrClosed()
{
final boolean isParentClosingOrClosed = this.parent != null && this.parent.getIsClosingOrClosed();

synchronized (this.syncClose)
{
return isParentClosingOrClosed || this.isClosing || this.isClosed;
return this.isClosing || this.isClosed;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private CoreMessageReceiver(final MessagingFactory factory,
final int prefetchCount,
final SettleModePair settleModePair)
{
super(name, factory);
super(name);

this.underlyingFactory = factory;
this.operationTimeout = factory.getOperationTimeout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private void closeRequestResponseLink()

private CoreMessageSender(final MessagingFactory factory, final String sendLinkName, final String senderPath)
{
super(sendLinkName, factory);
super(sendLinkName);

this.sendPath = senderPath;
this.sasTokenAudienceURI = String.format(ClientConstants.SAS_TOKEN_AUDIENCE_FORMAT, factory.getHostName(), senderPath);
Expand Down Expand Up @@ -595,7 +595,7 @@ private void createSendLink()

SendLinkHandler handler = new SendLinkHandler(CoreMessageSender.this);
BaseHandler.setHandler(sender, handler);
sender.open();
sender.open();
this.sendLink = sender;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class MessagingFactory extends ClientEntity implements IAmqpConnection

private MessagingFactory(URI namespaceEndpointUri, ClientSettings clientSettings)
{
super("MessagingFactory".concat(StringUtil.getShortRandomString()), null);
super("MessagingFactory".concat(StringUtil.getShortRandomString()));
this.namespaceEndpointUri = namespaceEndpointUri;
this.clientSettings = clientSettings;

Expand Down Expand Up @@ -303,7 +303,7 @@ public void onConnectionError(ErrorCondition error)
TRACE_LOGGER.info("Connection to host closed.");
this.connetionCloseFuture.complete(null);
Timer.unregister(this.getClientId());
}
}
}

private void onReactorError(Exception cause)
Expand Down Expand Up @@ -341,28 +341,25 @@ private void onReactorError(Exception cause)
// One of the parameters must be null
private void closeConnection(ErrorCondition error, Exception cause)
{
TRACE_LOGGER.info("Closing connection to host");
// Important to copy the reference of the connection as a call to getConnection might create a new connection while we are still in this method
Connection currentConnection = this.connection;
if(currentConnection != null)
{
Link[] links = this.registeredLinks.toArray(new Link[0]);
this.registeredLinks.clear();

TRACE_LOGGER.debug("Closing all links on the connection. Number of links '{}'", links.length);
for(Link link : links)
{
if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED)
{
link.close();
}
link.close();
}

TRACE_LOGGER.debug("Closed all links on the connection. Number of links '{}'", links.length);

if (currentConnection.getLocalState() != EndpointState.CLOSED && currentConnection.getRemoteState() != EndpointState.CLOSED)
if (currentConnection.getLocalState() != EndpointState.CLOSED)
{
TRACE_LOGGER.info("Closing connection to host");
currentConnection.close();
currentConnection.free();
}

for(Link link : links)
Expand Down Expand Up @@ -399,7 +396,7 @@ protected CompletableFuture<Void> onClose()
{
TRACE_LOGGER.info("Closing CBS link");
cbsLinkCloseFuture = this.cbsLink.closeAsync();
}
}

cbsLinkCloseFuture.thenRun(() -> this.managementLinksCache.freeAsync()).thenRun(() -> {
if(this.cbsLinkCreationFuture != null && !this.cbsLinkCreationFuture.isDone())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public final class MiscRequestResponseOperationHandler extends ClientEntity

private MiscRequestResponseOperationHandler(MessagingFactory factory, String linkName, String entityPath)
{
super(linkName, factory);
super(linkName);

this.underlyingFactory = factory;
this.entityPath = entityPath;
Expand Down
Loading

0 comments on commit e7fcc0b

Please sign in to comment.