diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java index f7c18d74a20af..e716efc091df8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java @@ -26,28 +26,36 @@ import org.apache.kafka.connect.source.SourceConnectorContext; import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore; -import org.easymock.Capture; import org.apache.kafka.connect.util.Callback; -import org.easymock.EasyMock; -import org.easymock.EasyMockRunner; -import org.easymock.EasyMockSupport; -import org.easymock.Mock; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import java.util.HashMap; import java.util.Map; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; - -@RunWith(EasyMockRunner.class) -public class WorkerConnectorTest extends EasyMockSupport { +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class WorkerConnectorTest { private static final String VERSION = "1.1"; public static final String CONNECTOR = "connector"; @@ -60,15 +68,15 @@ public class WorkerConnectorTest extends EasyMockSupport { public ConnectorConfig connectorConfig; public MockConnectMetrics metrics; - @Mock Plugins plugins; - @Mock SourceConnector sourceConnector; - @Mock SinkConnector sinkConnector; - @Mock Connector connector; - @Mock CloseableConnectorContext ctx; - @Mock ConnectorStatus.Listener listener; - @Mock CloseableOffsetStorageReader offsetStorageReader; - @Mock ConnectorOffsetBackingStore offsetStore; - @Mock ClassLoader classLoader; + @Mock private Plugins plugins; + @Mock private SourceConnector sourceConnector; + @Mock private SinkConnector sinkConnector; + @Mock private CloseableConnectorContext ctx; + @Mock private ConnectorStatus.Listener listener; + @Mock private CloseableOffsetStorageReader offsetStorageReader; + @Mock private ConnectorOffsetBackingStore offsetStore; + @Mock private ClassLoader classLoader; + private Connector connector; @Before public void setup() { @@ -86,31 +94,8 @@ public void testInitializeFailure() { RuntimeException exception = new RuntimeException(); connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall().andThrow(exception); - - listener.onFailure(CONNECTOR, exception); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + doThrow(exception).when(connector).initialize(any()); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); @@ -120,7 +105,9 @@ public void testInitializeFailure() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(listener).onFailure(CONNECTOR, exception); + verifyCleanShutdown(false); } @Test @@ -128,36 +115,11 @@ public void testFailureIsFinalState() { RuntimeException exception = new RuntimeException(); connector = sinkConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); - expectLastCall().andThrow(exception); - - listener.onFailure(CONNECTOR, exception); - expectLastCall(); - - // expect no call to onStartup() after failure - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); + doThrow(exception).when(connector).initialize(any()); - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.anyObject(Exception.class), EasyMock.isNull()); - expectLastCall(); - - replayAll(); - - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); + Callback onStateChange = mockCallback(); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader); workerConnector.initialize(); assertFailedMetric(workerConnector); @@ -167,48 +129,22 @@ public void testFailureIsFinalState() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(listener).onFailure(CONNECTOR, exception); + // expect no call to onStartup() after failure + verifyCleanShutdown(false); + + verify(onStateChange).onCompletion(any(Exception.class), isNull()); + verifyNoMoreInteractions(onStateChange); } @Test public void testStartupAndShutdown() { connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -219,54 +155,26 @@ public void testStartupAndShutdown() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(connector).start(CONFIG); + verify(listener).onStartup(CONNECTOR); + verifyCleanShutdown(true); + + verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testStartupAndPause() { connector = sinkConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onPause(CONNECTOR); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED)); - expectLastCall(); - - replayAll(); - - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); + Callback onStateChange = mockCallback(); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader); workerConnector.initialize(); assertInitializedSinkMetric(workerConnector); + workerConnector.doTransitionTo(TargetState.STARTED, onStateChange); assertRunningMetric(workerConnector); workerConnector.doTransitionTo(TargetState.PAUSED, onStateChange); @@ -275,52 +183,25 @@ public void testStartupAndPause() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(connector).start(CONFIG); + verify(listener).onStartup(CONNECTOR); + verify(listener).onPause(CONNECTOR); + verifyCleanShutdown(true); + + InOrder inOrder = inOrder(onStateChange); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testOnResume() { connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - offsetStore.start(); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - listener.onPause(CONNECTOR); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onResume(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED)); - expectLastCall(); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - - replayAll(); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); @@ -334,42 +215,25 @@ public void testOnResume() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(listener).onPause(CONNECTOR); + verify(connector).start(CONFIG); + verify(listener).onResume(CONNECTOR); + verifyCleanShutdown(true); + + InOrder inOrder = inOrder(onStateChange); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testStartupPaused() { connector = sinkConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); - expectLastCall(); - - // connector never gets started - - listener.onPause(CONNECTOR); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED)); - expectLastCall(); - - replayAll(); - - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); + Callback onStateChange = mockCallback(); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader); workerConnector.initialize(); assertInitializedSinkMetric(workerConnector); @@ -379,45 +243,25 @@ public void testStartupPaused() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + // connector never gets started + verify(listener).onPause(CONNECTOR); + verifyCleanShutdown(false); + + verify(onStateChange).onCompletion(isNull(), eq(TargetState.PAUSED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testStartupFailure() { RuntimeException exception = new RuntimeException(); - connector = sinkConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - connector.initialize(EasyMock.notNull(SinkConnectorContext.class)); - expectLastCall(); - connector.start(CONFIG); - expectLastCall().andThrow(exception); + when(connector.version()).thenReturn(VERSION); + doThrow(exception).when(connector).start(CONFIG); - listener.onFailure(CONNECTOR, exception); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.anyObject(Exception.class), EasyMock.isNull()); - expectLastCall(); - - replayAll(); - - WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); + Callback onStateChange = mockCallback(); + WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, null, null, classLoader); workerConnector.initialize(); assertInitializedSinkMetric(workerConnector); @@ -427,7 +271,13 @@ public void testStartupFailure() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(connector).start(CONFIG); + verify(listener).onFailure(CONNECTOR, exception); + verifyCleanShutdown(false); + + verify(onStateChange).onCompletion(any(Exception.class), isNull()); + verifyNoMoreInteractions(onStateChange); } @Test @@ -435,42 +285,11 @@ public void testShutdownFailure() { RuntimeException exception = new RuntimeException(); connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall().andThrow(exception); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - - listener.onFailure(CONNECTOR, exception); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - replayAll(); + doThrow(exception).when(connector).stop(); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -481,48 +300,22 @@ public void testShutdownFailure() { workerConnector.doShutdown(); assertFailedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(connector).start(CONFIG); + verify(listener).onStartup(CONNECTOR); + verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + verifyNoMoreInteractions(onStateChange); + verify(listener).onFailure(CONNECTOR, exception); + verifyShutdown(false, true); } @Test public void testTransitionStartedToStarted() { connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - // expect only one call to onStartup() - listener.onStartup(CONNECTOR); - expectLastCall(); - connector.stop(); - expectLastCall(); + when(connector.version()).thenReturn(VERSION); - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall().times(2); - - replayAll(); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); @@ -536,53 +329,21 @@ public void testTransitionStartedToStarted() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(connector).start(CONFIG); + // expect only one call to onStartup() + verify(listener).onStartup(CONNECTOR); + verifyCleanShutdown(true); + verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.STARTED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testTransitionPausedToPaused() { connector = sourceConnector; - connector.version(); - expectLastCall().andReturn(VERSION); - - offsetStore.start(); - expectLastCall(); - - connector.initialize(EasyMock.notNull(SourceConnectorContext.class)); - expectLastCall(); - - connector.start(CONFIG); - expectLastCall(); - - listener.onStartup(CONNECTOR); - expectLastCall(); - - connector.stop(); - expectLastCall(); - - listener.onPause(CONNECTOR); - expectLastCall(); - - listener.onShutdown(CONNECTOR); - expectLastCall(); - - ctx.close(); - expectLastCall(); - - offsetStorageReader.close(); - expectLastCall(); - - offsetStore.stop(); - expectLastCall(); - - Callback onStateChange = createStrictMock(Callback.class); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.STARTED)); - expectLastCall(); - onStateChange.onCompletion(EasyMock.isNull(), EasyMock.eq(TargetState.PAUSED)); - expectLastCall().times(2); - - replayAll(); + when(connector.version()).thenReturn(VERSION); + Callback onStateChange = mockCallback(); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); @@ -597,28 +358,32 @@ public void testTransitionPausedToPaused() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); - verifyAll(); + verifyInitialize(); + verify(connector).start(CONFIG); + verify(listener).onStartup(CONNECTOR); + verify(listener).onPause(CONNECTOR); + verifyCleanShutdown(true); + + InOrder inOrder = inOrder(onStateChange); + inOrder.verify(onStateChange).onCompletion(isNull(), eq(TargetState.STARTED)); + inOrder.verify(onStateChange, times(2)).onCompletion(isNull(), eq(TargetState.PAUSED)); + verifyNoMoreInteractions(onStateChange); } @Test public void testFailConnectorThatIsNeitherSourceNorSink() { - connector.version(); - expectLastCall().andReturn(VERSION); - - Capture exceptionCapture = Capture.newInstance(); - listener.onFailure(EasyMock.eq(CONNECTOR), EasyMock.capture(exceptionCapture)); - expectLastCall(); - - replayAll(); - + connector = mock(Connector.class); + when(connector.version()).thenReturn(VERSION); WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, offsetStore, classLoader); workerConnector.initialize(); + + verify(connector).version(); + ArgumentCaptor exceptionCapture = ArgumentCaptor.forClass(Throwable.class); + verify(listener).onFailure(eq(CONNECTOR), exceptionCapture.capture()); Throwable e = exceptionCapture.getValue(); assertTrue(e instanceof ConnectException); assertTrue(e.getMessage().contains("must be a subclass of")); - - verifyAll(); } protected void assertFailedMetric(WorkerConnector workerConnector) { @@ -672,6 +437,39 @@ protected void assertInitializedMetric(WorkerConnector workerConnector, String e assertEquals(VERSION, version); } + @SuppressWarnings("unchecked") + private Callback mockCallback() { + return mock(Callback.class); + } + + private void verifyInitialize() { + verify(connector).version(); + if (connector instanceof SourceConnector) { + verify(offsetStore).start(); + verify(connector).initialize(any(SourceConnectorContext.class)); + } else { + verify(connector).initialize(any(SinkConnectorContext.class)); + } + } + + private void verifyCleanShutdown(boolean started) { + verifyShutdown(true, started); + } + + private void verifyShutdown(boolean clean, boolean started) { + verify(ctx).close(); + if (connector instanceof SourceConnector) { + verify(offsetStorageReader).close(); + verify(offsetStore).stop(); + } + if (clean) { + verify(listener).onShutdown(CONNECTOR); + } + if (started) { + verify(connector).stop(); + } + } + private static abstract class TestConnector extends Connector { } }