Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addressing IllegalStateException due to double free of Connection reference by the Transport #34122

Merged
merged 2 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.implementation.handler.ReactorHandler;
import com.azure.core.amqp.implementation.handler.TransportHandler;
import com.azure.core.amqp.implementation.handler.GlobalIOHandler;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.reactor.Reactor;
Expand Down Expand Up @@ -39,7 +39,7 @@ public ReactorDispatcher getReactorDispatcher() {
* @throws IOException If the service could not create a Reactor instance.
*/
public Reactor createReactor(String connectionId, int maxFrameSize) throws IOException {
final TransportHandler transportHandler = new TransportHandler(connectionId);
final GlobalIOHandler globalIOHandler = new GlobalIOHandler(connectionId);
final ReactorHandler reactorHandler = new ReactorHandler(connectionId);

synchronized (lock) {
Expand All @@ -56,7 +56,7 @@ public Reactor createReactor(String connectionId, int maxFrameSize) throws IOExc
reactorOptions.setEnableSaslByDefault(true);

final Reactor reactor = Proton.reactor(reactorOptions, reactorHandler);
reactor.getGlobalHandler().add(transportHandler);
reactor.setGlobalHandler(globalIOHandler);

final Pipe ioSignal = Pipe.open();
final ReactorDispatcher dispatcher = new ReactorDispatcher(connectionId, reactor, ioSignal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,32 @@
package com.azure.core.amqp.implementation.handler;

import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.reactor.impl.IOHandler;

import static com.azure.core.amqp.implementation.AmqpLoggingUtils.createContextWithConnectionId;
import static com.azure.core.amqp.implementation.ClientConstants.HOSTNAME_KEY;
import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;

/**
* Handles transport related events.
*/
public class TransportHandler extends BaseHandler {
public class GlobalIOHandler extends IOHandler {
private final ClientLogger logger;

public TransportHandler(final String connectionId) {
this.logger = new ClientLogger(TransportHandler.class, createContextWithConnectionId(connectionId));
public GlobalIOHandler(final String connectionId) {
this.logger = new ClientLogger(GlobalIOHandler.class, createContextWithConnectionId(connectionId));
}

/**
* Override the transport_closed event handling behavior of base IOHandler. The base IOHandler does the Transport
* unbind even if the ConnectionHandler already did the unbind on the transport_error event. Each additional
* unbinding reduces the Connection's reference count by one. Ideally, removing Transport must lower
* the overall reference count by only one; else, the undesired reduction can lead to IllegalSateException.
* By overriding the transport_closed event handling, this method performs unbind only if it is not already done.
* Also, not doing unbind at least once will result in a memory leak.
*
* @param event the event description.
*/
@Override
public void onTransportClosed(Event event) {
final Transport transport = event.getTransport();
Expand All @@ -32,8 +39,7 @@ public void onTransportClosed(Event event) {
.addKeyValue(HOSTNAME_KEY, connection != null ? connection.getHostname() : NOT_APPLICABLE)
.log("onTransportClosed");

// connection.getTransport returns null if already unbound.
// We need to unbind the transport so that we do not leak memory.
// connection.getTransport() returns null if the unbind is already done.
if (transport != null && connection != null && connection.getTransport() != null) {
transport.unbind();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import static org.mockito.Mockito.when;

/**
* Tests transport handler.
* Tests transport unbinding behavior of the GlobalIOHandler.
*/
public class TransportHandlerTest {
public class GlobalIOHandlerTest {
@Mock
private Transport transport;
@Mock
Expand Down Expand Up @@ -55,7 +55,7 @@ public void unbindsTransport() {

when(connection.getTransport()).thenReturn(transport);

final TransportHandler handler = new TransportHandler("name");
final GlobalIOHandler handler = new GlobalIOHandler("name");

// Act
handler.onTransportClosed(event);
Expand All @@ -70,7 +70,7 @@ public void noTransport() {
when(event.getTransport()).thenReturn(transport);
when(event.getConnection()).thenReturn(connection);

final TransportHandler handler = new TransportHandler("name");
final GlobalIOHandler handler = new GlobalIOHandler("name");

// Act
handler.onTransportClosed(event);
Expand Down