From f2d064a1d560737e3626eea016fad10d104c7b20 Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Tue, 2 Aug 2022 18:50:10 +0530 Subject: [PATCH 1/3] KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest --- .../connect/runtime/WorkerConnectorTest.java | 555 ++++++------------ 1 file changed, 188 insertions(+), 367 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java index f7c18d74a20af..cadb679954ab4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java @@ -26,28 +26,32 @@ import org.apache.kafka.connect.source.SourceConnectorContext; import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore; -import org.easymock.Capture; import org.apache.kafka.connect.util.Callback; -import org.easymock.EasyMock; -import org.easymock.EasyMockRunner; -import org.easymock.EasyMockSupport; -import org.easymock.Mock; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import java.util.HashMap; import java.util.Map; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; - -@RunWith(EasyMockRunner.class) -public class WorkerConnectorTest extends EasyMockSupport { +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class WorkerConnectorTest { private static final String VERSION = "1.1"; public static final String CONNECTOR = "connector"; @@ -60,15 +64,15 @@ public class WorkerConnectorTest extends EasyMockSupport { public ConnectorConfig connectorConfig; public MockConnectMetrics metrics; - @Mock Plugins plugins; - @Mock SourceConnector sourceConnector; - @Mock SinkConnector sinkConnector; - @Mock Connector connector; - @Mock CloseableConnectorContext ctx; - @Mock ConnectorStatus.Listener listener; - @Mock CloseableOffsetStorageReader offsetStorageReader; - @Mock ConnectorOffsetBackingStore offsetStore; - @Mock ClassLoader classLoader; + private final Plugins plugins = mock(Plugins.class); + private final SourceConnector sourceConnector = mock(SourceConnector.class); + private final SinkConnector sinkConnector = mock(SinkConnector.class); + private final CloseableConnectorContext ctx = mock(CloseableConnectorContext.class); + private final ConnectorStatus.Listener listener = mock(ConnectorStatus.Listener.class); + private final CloseableOffsetStorageReader offsetStorageReader = mock(CloseableOffsetStorageReader.class); + private final ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); + private final ClassLoader classLoader = mock(ClassLoader.class); + private Connector connector; @Before public void setup() { @@ -86,31 +90,8 @@ public void testInitializeFailure() { RuntimeException exception = new RuntimeException(); connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall().andThrow(exception); - - listener.onFailure(CONNECTOR, exception); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + doThrow(exception).when(connector).initialize(any()); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); @@ -120,7 +101,14 @@ public void testInitializeFailure() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verify(connector).version(); + verify(offsetStore).start(); + verify(connector).initialize(any(SourceConnectorContext.class)); + verify(listener).onFailure(CONNECTOR, exception); + verify(listener).onShutdown(CONNECTOR); + verify(ctx).close(); + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); } @Test @@ -128,35 +116,11 @@ public void testFailureIsFinalState() { RuntimeException exception = new RuntimeException(); connector = sinkConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); - expectLastCall().andThrow(exception); - - listener.onFailure(CONNECTOR, exception); - expectLastCall(); - - // expect no call to onStartup() after failure - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.anyObject(Exception.class), EasyMock.isNull()); - expectLastCall(); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + doThrow(exception).when(connector).initialize(any()); + @SuppressWarnings("unchecked") + Callback onStateChange = mock(Callback.class); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -167,48 +131,27 @@ public void testFailureIsFinalState() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verify(connector).version(); + verify(connector).initialize(any(SinkConnectorContext.class)); + verify(listener).onFailure(CONNECTOR, exception); + // expect no call to onStartup() after failure + verify(listener).onShutdown(CONNECTOR); + verify(ctx).close(); + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); + + verify(onStateChange).onCompletion(any(Exception.class), isNull()); + verifyNoMoreInteractions(onStateChange); } @Test public void testStartupAndShutdown() { connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + @SuppressWarnings("unchecked") + Callback onStateChange = mock(Callback.class); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -219,54 +162,33 @@ public void testStartupAndShutdown() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verify(connector).version(); + verify(offsetStore).start(); + verify(connector).initialize(any(SourceConnectorContext.class)); + verify(connector).start(CONFIG); + verify(listener).onStartup(CONNECTOR); + verify(connector).stop(); + verify(listener).onShutdown(CONNECTOR); + verify(ctx).close(); + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); + + verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testStartupAndPause() { connector = sinkConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onPause(CONNECTOR); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED)); - expectLastCall(); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + @SuppressWarnings("unchecked") + Callback onStateChange = mock(Callback.class); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); assertInitializedSinkMetric(workerConnector); + workerConnector.doTransitionTo(TargetState.STARTED, onStateChange); assertRunningMetric(workerConnector); workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange); @@ -275,52 +197,31 @@ public void testStartupAndPause() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verify(connector).version(); + verify(connector).initialize(any(SinkConnectorContext.class)); + verify(connector).start(CONFIG); + verify(listener).onStartup(CONNECTOR); + verify(connector).stop(); + verify(listener).onPause(CONNECTOR); + verify(listener).onShutdown(CONNECTOR); + verify(ctx).close(); + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); + + InOrder inOrder = inOrder(onStateChange); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testOnResume() { connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - - offsetStore.start(); - expectLastCall(); - - listener.onPause(CONNECTOR); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onResume(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - listener.onShutdown(CONNECTOR); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED)); - expectLastCall(); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - - replayAll(); + @SuppressWarnings("unchecked") + Callback onStateChange = mock(Callback.class); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); @@ -334,41 +235,31 @@ public void testOnResume() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verify(connector).version(); + verify(connector).initialize(any(SourceConnectorContext.class)); + verify(offsetStore).start(); + verify(listener).onPause(CONNECTOR); + verify(connector).start(CONFIG); + verify(listener).onResume(CONNECTOR); + verify(connector).stop(); + verify(listener).onShutdown(CONNECTOR); + verify(ctx).close(); + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); + + InOrder inOrder = inOrder(onStateChange); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testStartupPaused() { connector = sinkConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); - expectLastCall(); - - // connector never gets started - - listener.onPause(CONNECTOR); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED)); - expectLastCall(); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + @SuppressWarnings("unchecked") + Callback onStateChange = mock(Callback.class); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -379,44 +270,29 @@ public void testStartupPaused() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verify(connector).version(); + verify(connector).initialize(any(SinkConnectorContext.class)); + // connector never gets started + verify(listener).onPause(CONNECTOR); + verify(listener).onShutdown(CONNECTOR); + verify(ctx).close(); + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); + + verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testStartupFailure() { RuntimeException exception = new RuntimeException(); - connector = sinkConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall().andThrow(exception); - - listener.onFailure(CONNECTOR, exception); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.anyObject(Exception.class), EasyMock.isNull()); - expectLastCall(); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + doThrow(exception).when(connector).start(CONFIG); + @SuppressWarnings("unchecked") + Callback onStateChange = mock(Callback.class); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -427,7 +303,17 @@ public void testStartupFailure() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verify(connector).version(); + verify(connector).initialize(any(SinkConnectorContext.class)); + verify(connector).start(CONFIG); + verify(listener).onFailure(CONNECTOR, exception); + verify(listener).onShutdown(CONNECTOR); + verify(ctx).close(); + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); + + verify(onStateChange).onCompletion(any(Exception.class), isNull()); + verifyNoMoreInteractions(onStateChange); } @Test @@ -435,42 +321,12 @@ public void testShutdownFailure() { RuntimeException exception = new RuntimeException(); connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - connector.stop(); - expectLastCall().andThrow(exception); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - - listener.onFailure(CONNECTOR, exception); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - replayAll(); + doThrow(exception).when(connector).stop(); + @SuppressWarnings("unchecked") + Callback onStateChange = mock(Callback.class); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -481,48 +337,28 @@ public void testShutdownFailure() { workerConnector.doShutdown(); assertFailedMetric(workerConnector); - verifyAll(); + verify(connector).version(); + verify(offsetStore).start(); + verify(connector).initialize(any(SourceConnectorContext.class)); + verify(connector).start(CONFIG); + verify(listener).onStartup(CONNECTOR); + verify(connector).stop(); + verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + verifyNoMoreInteractions(onStateChange); + verify(listener).onFailure(CONNECTOR, exception); + verify(ctx).close(); + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); } @Test public void testTransitionStartedToStarted() { connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - connector.start(CONFIG); - expectLastCall(); - - // expect only one call to onStartup() - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall().times(2); - - replayAll(); + @SuppressWarnings("unchecked") + Callback onStateChange = mock(Callback.class); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); @@ -536,53 +372,28 @@ public void testTransitionStartedToStarted() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verify(connector).version(); + verify(offsetStore).start(); + verify(connector).initialize(any(SourceConnectorContext.class)); + verify(connector).start(CONFIG); + // expect only one call to onStartup() + verify(listener).onStartup(CONNECTOR); + verify(connector).stop(); + verify(listener).onShutdown(CONNECTOR); + verify(ctx).close(); + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); + verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.STARTED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testTransitionPausedToPaused() { connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onPause(CONNECTOR); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED)); - expectLastCall().times(2); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + @SuppressWarnings("unchecked") + Callback onStateChange = mock(Callback.class); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -597,28 +408,38 @@ public void testTransitionPausedToPaused() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verify(connector).version(); + verify(offsetStore).start(); + verify(connector).initialize(any(SourceConnectorContext.class)); + verify(connector).start(CONFIG); + verify(listener).onStartup(CONNECTOR); + verify(connector).stop(); + verify(listener).onPause(CONNECTOR); + verify(listener).onShutdown(CONNECTOR); + verify(ctx).close(); + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); + + InOrder inOrder = inOrder(onStateChange); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + inOrder.verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.PAUSED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testFailConnectorThatIsNeitherSourceNorSink() { - connector.version(); - expectLastCall().andReturn(VERSION); - - Capture exceptionCapture = Capture.newInstance(); - listener.onFailure(EasyMock.eq(CONNECTOR), EasyMock.capture(exceptionCapture)); - expectLastCall(); - - replayAll(); - + connector = mock(Connector.class); + when(connector.version()).thenReturn(VERSION); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); + + verify(connector).version(); + ArgumentCaptor exceptionCapture = ArgumentCaptor.forClass(Throwable.class); + verify(listener).onFailure(eq(CONNECTOR), exceptionCapture.capture()); Throwable e = exceptionCapture.getValue(); assertTrue(e instanceof ConnectException); assertTrue(e.getMessage().contains("must be a subclass of")); - - verifyAll(); } protected void assertFailedMetric(WorkerConnector workerConnector) { From f72ee69ec40ed4e9362fd3012fbf8d663a07395d Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Wed, 3 Aug 2022 12:46:02 +0530 Subject: [PATCH 2/3] Use Mock annotations; use MockitoJUnitRunner.StrictStubs; group some verify calls into separate methods; reduce number of SuppressWarning annotations --- .../connect/runtime/WorkerConnectorTest.java | 144 ++++++++---------- 1 file changed, 62 insertions(+), 82 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java index cadb679954ab4..1049721c20d54 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java @@ -33,8 +33,11 @@ import java.util.HashMap; import java.util.Map; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -51,6 +54,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class WorkerConnectorTest { private static final String VERSION = "1.1"; @@ -64,14 +68,14 @@ public class WorkerConnectorTest { public ConnectorConfig connectorConfig; public MockConnectMetrics metrics; - private final Plugins plugins = mock(Plugins.class); - private final SourceConnector sourceConnector = mock(SourceConnector.class); - private final SinkConnector sinkConnector = mock(SinkConnector.class); - private final CloseableConnectorContext ctx = mock(CloseableConnectorContext.class); - private final ConnectorStatus.Listener listener = mock(ConnectorStatus.Listener.class); - private final CloseableOffsetStorageReader offsetStorageReader = mock(CloseableOffsetStorageReader.class); - private final ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); - private final ClassLoader classLoader = mock(ClassLoader.class); + @Mock private Plugins plugins; + @Mock private SourceConnector sourceConnector; + @Mock private SinkConnector sinkConnector; + @Mock private CloseableConnectorContext ctx; + @Mock private ConnectorStatus.Listener listener; + @Mock private CloseableOffsetStorageReader offsetStorageReader; + @Mock private ConnectorOffsetBackingStore offsetStore; + @Mock private ClassLoader classLoader; private Connector connector; @Before @@ -101,14 +105,10 @@ public void testInitializeFailure() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verify(connector).version(); - verify(offsetStore).start(); - verify(connector).initialize(any(SourceConnectorContext.class)); + verifyCleanInitialize(); verify(listener).onFailure(CONNECTOR, exception); verify(listener).onShutdown(CONNECTOR); - verify(ctx).close(); - verify(offsetStorageReader).close(); - verify(offsetStore).stop(); + verifyCleanShutdown(); } @Test @@ -119,8 +119,7 @@ public void testFailureIsFinalState() { when(connector.version()).thenReturn(VERSION); doThrow(exception).when(connector).initialize(any()); - @SuppressWarnings("unchecked") - Callback onStateChange = mock(Callback.class); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -131,14 +130,11 @@ public void testFailureIsFinalState() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verify(connector).version(); - verify(connector).initialize(any(SinkConnectorContext.class)); + verifyCleanInitialize(); verify(listener).onFailure(CONNECTOR, exception); // expect no call to onStartup() after failure verify(listener).onShutdown(CONNECTOR); - verify(ctx).close(); - verify(offsetStorageReader).close(); - verify(offsetStore).stop(); + verifyCleanShutdown(); verify(onStateChange).onCompletion(any(Exception.class), isNull()); verifyNoMoreInteractions(onStateChange); @@ -150,8 +146,7 @@ public void testStartupAndShutdown() { when(connector.version()).thenReturn(VERSION); - @SuppressWarnings("unchecked") - Callback onStateChange = mock(Callback.class); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -162,16 +157,12 @@ public void testStartupAndShutdown() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verify(connector).version(); - verify(offsetStore).start(); - verify(connector).initialize(any(SourceConnectorContext.class)); + verifyCleanInitialize(); verify(connector).start(CONFIG); verify(listener).onStartup(CONNECTOR); verify(connector).stop(); verify(listener).onShutdown(CONNECTOR); - verify(ctx).close(); - verify(offsetStorageReader).close(); - verify(offsetStore).stop(); + verifyCleanShutdown(); verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); verifyNoMoreInteractions(onStateChange); @@ -182,8 +173,7 @@ public void testStartupAndPause() { connector = sinkConnector; when(connector.version()).thenReturn(VERSION); - @SuppressWarnings("unchecked") - Callback onStateChange = mock(Callback.class); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -197,16 +187,13 @@ public void testStartupAndPause() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verify(connector).version(); - verify(connector).initialize(any(SinkConnectorContext.class)); + verifyCleanInitialize(); verify(connector).start(CONFIG); verify(listener).onStartup(CONNECTOR); verify(connector).stop(); verify(listener).onPause(CONNECTOR); verify(listener).onShutdown(CONNECTOR); - verify(ctx).close(); - verify(offsetStorageReader).close(); - verify(offsetStore).stop(); + verifyCleanShutdown(); InOrder inOrder = inOrder(onStateChange); inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); @@ -220,8 +207,7 @@ public void testOnResume() { when(connector.version()).thenReturn(VERSION); - @SuppressWarnings("unchecked") - Callback onStateChange = mock(Callback.class); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); @@ -235,17 +221,13 @@ public void testOnResume() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verify(connector).version(); - verify(connector).initialize(any(SourceConnectorContext.class)); - verify(offsetStore).start(); + verifyCleanInitialize(); verify(listener).onPause(CONNECTOR); verify(connector).start(CONFIG); verify(listener).onResume(CONNECTOR); verify(connector).stop(); verify(listener).onShutdown(CONNECTOR); - verify(ctx).close(); - verify(offsetStorageReader).close(); - verify(offsetStore).stop(); + verifyCleanShutdown(); InOrder inOrder = inOrder(onStateChange); inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); @@ -258,8 +240,7 @@ public void testStartupPaused() { connector = sinkConnector; when(connector.version()).thenReturn(VERSION); - @SuppressWarnings("unchecked") - Callback onStateChange = mock(Callback.class); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -270,14 +251,11 @@ public void testStartupPaused() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verify(connector).version(); - verify(connector).initialize(any(SinkConnectorContext.class)); + verifyCleanInitialize(); // connector never gets started verify(listener).onPause(CONNECTOR); verify(listener).onShutdown(CONNECTOR); - verify(ctx).close(); - verify(offsetStorageReader).close(); - verify(offsetStore).stop(); + verifyCleanShutdown(); verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); verifyNoMoreInteractions(onStateChange); @@ -291,8 +269,7 @@ public void testStartupFailure() { when(connector.version()).thenReturn(VERSION); doThrow(exception).when(connector).start(CONFIG); - @SuppressWarnings("unchecked") - Callback onStateChange = mock(Callback.class); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -303,14 +280,11 @@ public void testStartupFailure() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verify(connector).version(); - verify(connector).initialize(any(SinkConnectorContext.class)); + verifyCleanInitialize(); verify(connector).start(CONFIG); verify(listener).onFailure(CONNECTOR, exception); verify(listener).onShutdown(CONNECTOR); - verify(ctx).close(); - verify(offsetStorageReader).close(); - verify(offsetStore).stop(); + verifyCleanShutdown(); verify(onStateChange).onCompletion(any(Exception.class), isNull()); verifyNoMoreInteractions(onStateChange); @@ -325,8 +299,7 @@ public void testShutdownFailure() { doThrow(exception).when(connector).stop(); - @SuppressWarnings("unchecked") - Callback onStateChange = mock(Callback.class); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -337,18 +310,14 @@ public void testShutdownFailure() { workerConnector.doShutdown(); assertFailedMetric(workerConnector); - verify(connector).version(); - verify(offsetStore).start(); - verify(connector).initialize(any(SourceConnectorContext.class)); + verifyCleanInitialize(); verify(connector).start(CONFIG); verify(listener).onStartup(CONNECTOR); verify(connector).stop(); verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); verifyNoMoreInteractions(onStateChange); verify(listener).onFailure(CONNECTOR, exception); - verify(ctx).close(); - verify(offsetStorageReader).close(); - verify(offsetStore).stop(); + verifyCleanShutdown(); } @Test @@ -357,8 +326,7 @@ public void testTransitionStartedToStarted() { when(connector.version()).thenReturn(VERSION); - @SuppressWarnings("unchecked") - Callback onStateChange = mock(Callback.class); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); @@ -372,17 +340,13 @@ public void testTransitionStartedToStarted() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verify(connector).version(); - verify(offsetStore).start(); - verify(connector).initialize(any(SourceConnectorContext.class)); + verifyCleanInitialize(); verify(connector).start(CONFIG); // expect only one call to onStartup() verify(listener).onStartup(CONNECTOR); verify(connector).stop(); verify(listener).onShutdown(CONNECTOR); - verify(ctx).close(); - verify(offsetStorageReader).close(); - verify(offsetStore).stop(); + verifyCleanShutdown(); verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.STARTED)); verifyNoMoreInteractions(onStateChange); } @@ -392,8 +356,7 @@ public void testTransitionPausedToPaused() { connector = sourceConnector; when(connector.version()).thenReturn(VERSION); - @SuppressWarnings("unchecked") - Callback onStateChange = mock(Callback.class); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -408,17 +371,13 @@ public void testTransitionPausedToPaused() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verify(connector).version(); - verify(offsetStore).start(); - verify(connector).initialize(any(SourceConnectorContext.class)); + verifyCleanInitialize(); verify(connector).start(CONFIG); verify(listener).onStartup(CONNECTOR); verify(connector).stop(); verify(listener).onPause(CONNECTOR); verify(listener).onShutdown(CONNECTOR); - verify(ctx).close(); - verify(offsetStorageReader).close(); - verify(offsetStore).stop(); + verifyCleanShutdown(); InOrder inOrder = inOrder(onStateChange); inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); @@ -493,6 +452,27 @@ protected void assertInitializedMetric(WorkerConnector workerConnector, String e assertEquals(VERSION, version); } + @SuppressWarnings("unchecked") + private Callback mockCallback() { + return mock(Callback.class); + } + + private void verifyCleanInitialize() { + verify(connector).version(); + if (connector instanceof SourceConnector) { + verify(offsetStore).start(); + verify(connector).initialize(any(SourceConnectorContext.class)); + } else { + verify(connector).initialize(any(SinkConnectorContext.class)); + } + } + + private void verifyCleanShutdown() { + verify(ctx).close(); + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); + } + private static abstract class TestConnector extends Connector { } } From d062da6cb5bf7f7e4fe2aef8727140f1f7bd8af7 Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Tue, 9 Aug 2022 15:45:18 +0530 Subject: [PATCH 3/3] Address review comments --- .../connect/runtime/WorkerConnectorTest.java | 83 +++++++++---------- 1 file changed, 40 insertions(+), 43 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java index 1049721c20d54..e716efc091df8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java @@ -105,10 +105,9 @@ public void testInitializeFailure() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyCleanInitialize(); + verifyInitialize(); verify(listener).onFailure(CONNECTOR, exception); - verify(listener).onShutdown(CONNECTOR); - verifyCleanShutdown(); + verifyCleanShutdown(false); } @Test @@ -120,7 +119,7 @@ public void testFailureIsFinalState() { doThrow(exception).when(connector).initialize(any()); Callback onStateChange = mockCallback(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader); workerConnector.initialize(); assertFailedMetric(workerConnector); @@ -130,11 +129,10 @@ public void testFailureIsFinalState() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyCleanInitialize(); + verifyInitialize(); verify(listener).onFailure(CONNECTOR, exception); // expect no call to onStartup() after failure - verify(listener).onShutdown(CONNECTOR); - verifyCleanShutdown(); + verifyCleanShutdown(false); verify(onStateChange).onCompletion(any(Exception.class), isNull()); verifyNoMoreInteractions(onStateChange); @@ -157,12 +155,10 @@ public void testStartupAndShutdown() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyCleanInitialize(); + verifyInitialize(); verify(connector).start(CONFIG); verify(listener).onStartup(CONNECTOR); - verify(connector).stop(); - verify(listener).onShutdown(CONNECTOR); - verifyCleanShutdown(); + verifyCleanShutdown(true); verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); verifyNoMoreInteractions(onStateChange); @@ -174,7 +170,7 @@ public void testStartupAndPause() { when(connector.version()).thenReturn(VERSION); Callback onStateChange = mockCallback(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader); workerConnector.initialize(); assertInitializedSinkMetric(workerConnector); @@ -187,13 +183,11 @@ public void testStartupAndPause() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyCleanInitialize(); + verifyInitialize(); verify(connector).start(CONFIG); verify(listener).onStartup(CONNECTOR); - verify(connector).stop(); verify(listener).onPause(CONNECTOR); - verify(listener).onShutdown(CONNECTOR); - verifyCleanShutdown(); + verifyCleanShutdown(true); InOrder inOrder = inOrder(onStateChange); inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); @@ -221,13 +215,11 @@ public void testOnResume() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyCleanInitialize(); + verifyInitialize(); verify(listener).onPause(CONNECTOR); verify(connector).start(CONFIG); verify(listener).onResume(CONNECTOR); - verify(connector).stop(); - verify(listener).onShutdown(CONNECTOR); - verifyCleanShutdown(); + verifyCleanShutdown(true); InOrder inOrder = inOrder(onStateChange); inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); @@ -241,7 +233,7 @@ public void testStartupPaused() { when(connector.version()).thenReturn(VERSION); Callback onStateChange = mockCallback(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader); workerConnector.initialize(); assertInitializedSinkMetric(workerConnector); @@ -251,11 +243,10 @@ public void testStartupPaused() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyCleanInitialize(); + verifyInitialize(); // connector never gets started verify(listener).onPause(CONNECTOR); - verify(listener).onShutdown(CONNECTOR); - verifyCleanShutdown(); + verifyCleanShutdown(false); verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); verifyNoMoreInteractions(onStateChange); @@ -270,7 +261,7 @@ public void testStartupFailure() { doThrow(exception).when(connector).start(CONFIG); Callback onStateChange = mockCallback(); - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader); workerConnector.initialize(); assertInitializedSinkMetric(workerConnector); @@ -280,11 +271,10 @@ public void testStartupFailure() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyCleanInitialize(); + verifyInitialize(); verify(connector).start(CONFIG); verify(listener).onFailure(CONNECTOR, exception); - verify(listener).onShutdown(CONNECTOR); - verifyCleanShutdown(); + verifyCleanShutdown(false); verify(onStateChange).onCompletion(any(Exception.class), isNull()); verifyNoMoreInteractions(onStateChange); @@ -310,14 +300,13 @@ public void testShutdownFailure() { workerConnector.doShutdown(); assertFailedMetric(workerConnector); - verifyCleanInitialize(); + verifyInitialize(); verify(connector).start(CONFIG); verify(listener).onStartup(CONNECTOR); - verify(connector).stop(); verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); verifyNoMoreInteractions(onStateChange); verify(listener).onFailure(CONNECTOR, exception); - verifyCleanShutdown(); + verifyShutdown(false, true); } @Test @@ -340,13 +329,11 @@ public void testTransitionStartedToStarted() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyCleanInitialize(); + verifyInitialize(); verify(connector).start(CONFIG); // expect only one call to onStartup() verify(listener).onStartup(CONNECTOR); - verify(connector).stop(); - verify(listener).onShutdown(CONNECTOR); - verifyCleanShutdown(); + verifyCleanShutdown(true); verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.STARTED)); verifyNoMoreInteractions(onStateChange); } @@ -371,13 +358,11 @@ public void testTransitionPausedToPaused() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyCleanInitialize(); + verifyInitialize(); verify(connector).start(CONFIG); verify(listener).onStartup(CONNECTOR); - verify(connector).stop(); verify(listener).onPause(CONNECTOR); - verify(listener).onShutdown(CONNECTOR); - verifyCleanShutdown(); + verifyCleanShutdown(true); InOrder inOrder = inOrder(onStateChange); inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); @@ -457,7 +442,7 @@ private Callback mockCallback() { return mock(Callback.class); } - private void verifyCleanInitialize() { + private void verifyInitialize() { verify(connector).version(); if (connector instanceof SourceConnector) { verify(offsetStore).start(); @@ -467,10 +452,22 @@ private void verifyCleanInitialize() { } } - private void verifyCleanShutdown() { + private void verifyCleanShutdown(boolean started) { + verifyShutdown(true, started); + } + + private void verifyShutdown(boolean clean, boolean started) { verify(ctx).close(); - verify(offsetStorageReader).close(); - verify(offsetStore).stop(); + if (connector instanceof SourceConnector) { + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); + } + if (clean) { + verify(listener).onShutdown(CONNECTOR); + } + if (started) { + verify(connector).stop(); + } } private static abstract class TestConnector extends Connector {