diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index 49d52ec4622b..6197bb24a433 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Addressing IllegalStateException due to double free of Connection reference by the Transport. + ### Other Changes ## 2.8.3 (2023-03-02) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorProvider.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorProvider.java index ed77b733e243..a3717b9f170d 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorProvider.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorProvider.java @@ -4,7 +4,7 @@ package com.azure.core.amqp.implementation; import com.azure.core.amqp.implementation.handler.ReactorHandler; -import com.azure.core.amqp.implementation.handler.TransportHandler; +import com.azure.core.amqp.implementation.handler.GlobalIOHandler; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.reactor.Reactor; @@ -39,7 +39,7 @@ public ReactorDispatcher getReactorDispatcher() { * @throws IOException If the service could not create a Reactor instance. */ public Reactor createReactor(String connectionId, int maxFrameSize) throws IOException { - final TransportHandler transportHandler = new TransportHandler(connectionId); + final GlobalIOHandler globalIOHandler = new GlobalIOHandler(connectionId); final ReactorHandler reactorHandler = new ReactorHandler(connectionId); synchronized (lock) { @@ -56,7 +56,7 @@ public Reactor createReactor(String connectionId, int maxFrameSize) throws IOExc reactorOptions.setEnableSaslByDefault(true); final Reactor reactor = Proton.reactor(reactorOptions, reactorHandler); - reactor.getGlobalHandler().add(transportHandler); + reactor.setGlobalHandler(globalIOHandler); final Pipe ioSignal = Pipe.open(); final ReactorDispatcher dispatcher = new ReactorDispatcher(connectionId, reactor, ioSignal); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/TransportHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/GlobalIOHandler.java similarity index 51% rename from sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/TransportHandler.java rename to sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/GlobalIOHandler.java index 4a13d5f6a781..c4a7fa7c9df1 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/TransportHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/GlobalIOHandler.java @@ -4,25 +4,32 @@ package com.azure.core.amqp.implementation.handler; import com.azure.core.util.logging.ClientLogger; -import org.apache.qpid.proton.engine.BaseHandler; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.reactor.impl.IOHandler; import static com.azure.core.amqp.implementation.AmqpLoggingUtils.createContextWithConnectionId; import static com.azure.core.amqp.implementation.ClientConstants.HOSTNAME_KEY; import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE; -/** - * Handles transport related events. - */ -public class TransportHandler extends BaseHandler { +public class GlobalIOHandler extends IOHandler { private final ClientLogger logger; - public TransportHandler(final String connectionId) { - this.logger = new ClientLogger(TransportHandler.class, createContextWithConnectionId(connectionId)); + public GlobalIOHandler(final String connectionId) { + this.logger = new ClientLogger(GlobalIOHandler.class, createContextWithConnectionId(connectionId)); } + /** + * Override the transport_closed event handling behavior of base IOHandler. The base IOHandler does the Transport + * unbind even if the ConnectionHandler already did the unbind on the transport_error event. Each additional + * unbinding reduces the Connection's reference count by one. Ideally, removing Transport must lower + * the overall reference count by only one; else, the undesired reduction can lead to IllegalSateException. + * By overriding the transport_closed event handling, this method performs unbind only if it is not already done. + * Also, not doing unbind at least once will result in a memory leak. + * + * @param event the event description. + */ @Override public void onTransportClosed(Event event) { final Transport transport = event.getTransport(); @@ -32,8 +39,7 @@ public void onTransportClosed(Event event) { .addKeyValue(HOSTNAME_KEY, connection != null ? connection.getHostname() : NOT_APPLICABLE) .log("onTransportClosed"); - // connection.getTransport returns null if already unbound. - // We need to unbind the transport so that we do not leak memory. + // connection.getTransport() returns null if the unbind is already done. if (transport != null && connection != null && connection.getTransport() != null) { transport.unbind(); } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/TransportHandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/GlobalIOHandlerTest.java similarity index 88% rename from sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/TransportHandlerTest.java rename to sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/GlobalIOHandlerTest.java index 9cb4591b36e1..62a0bbbfd975 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/TransportHandlerTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/GlobalIOHandlerTest.java @@ -18,9 +18,9 @@ import static org.mockito.Mockito.when; /** - * Tests transport handler. + * Tests transport unbinding behavior of the GlobalIOHandler. */ -public class TransportHandlerTest { +public class GlobalIOHandlerTest { @Mock private Transport transport; @Mock @@ -55,7 +55,7 @@ public void unbindsTransport() { when(connection.getTransport()).thenReturn(transport); - final TransportHandler handler = new TransportHandler("name"); + final GlobalIOHandler handler = new GlobalIOHandler("name"); // Act handler.onTransportClosed(event); @@ -70,7 +70,7 @@ public void noTransport() { when(event.getTransport()).thenReturn(transport); when(event.getConnection()).thenReturn(connection); - final TransportHandler handler = new TransportHandler("name"); + final GlobalIOHandler handler = new GlobalIOHandler("name"); // Act handler.onTransportClosed(event);