From bc1a2f859c7bc4c64890d14154ae8b86c0a45c63 Mon Sep 17 00:00:00 2001
From: Almog Gavra
Date: Mon, 19 Aug 2019 13:42:09 -0700
Subject: [PATCH] feat: some robustness improvements for Connect integration
(#3227)
---
ksql-engine/pom.xml | 5 ++
.../ksql/connect/ConnectConfigService.java | 2 +
.../ksql/connect/ConnectPollingService.java | 82 +++++++++++++++----
.../ksql/services/ConnectClient.java | 16 ++--
.../ksql/services/DefaultConnectClient.java | 54 +++++++++---
.../ksql/services/SandboxConnectClient.java | 13 ++-
.../connect/ConnectConfigServiceTest.java | 8 +-
.../connect/ConnectPollingServiceTest.java | 15 ++++
.../services/DefaultConnectClientTest.java | 23 +++++-
.../server/execution/ConnectExecutorTest.java | 5 +-
.../DescribeConnectorExecutorTest.java | 9 +-
.../execution/ListConnectorsExecutorTest.java | 12 +--
pom.xml | 7 ++
13 files changed, 202 insertions(+), 49 deletions(-)
diff --git a/ksql-engine/pom.xml b/ksql-engine/pom.xml
index 17a6b92d3526..b00d17db748b 100644
--- a/ksql-engine/pom.xml
+++ b/ksql-engine/pom.xml
@@ -73,6 +73,11 @@
guava
+
+ com.github.rholder
+ guava-retrying
+
+
io.airlift
slice
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java b/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java
index 65539f1fa326..a2fddbd31286 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectConfigService.java
@@ -185,6 +185,8 @@ private void handleConnector(final String name) {
try {
final ConnectResponse describe = connectClient.describe(name);
if (!describe.datum().isPresent()) {
+ describe.error()
+ .ifPresent(error -> LOG.warn("Failed to describe connect {} due to: {}", name, error));
return;
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectPollingService.java b/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectPollingService.java
index 5baaaab059d9..7a681c24092f 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectPollingService.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/connect/ConnectPollingService.java
@@ -15,10 +15,11 @@
package io.confluent.ksql.connect;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.execution.expression.tree.Literal;
import io.confluent.ksql.execution.expression.tree.QualifiedName;
@@ -32,12 +33,16 @@
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.properties.with.CreateConfigs;
import io.confluent.ksql.util.KsqlConstants;
+import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,41 +58,85 @@
* registry.
*/
@ThreadSafe
-final class ConnectPollingService extends AbstractScheduledService {
+final class ConnectPollingService extends AbstractExecutionThreadService {
private static final Logger LOG = LoggerFactory.getLogger(ConnectPollingService.class);
- private static final int INTERVAL_S = 30;
+ private static final int MAX_INTERVAL_S = 30;
+ private static final int START_INTERVAL_S = 1;
+ private static final Connector STOP_SENTINEL =
+ new Connector("_stop_", ignored -> false, Function.identity(), DataSourceType.KSTREAM, "");
private final KsqlExecutionContext executionContext;
private final Consumer sourceCallback;
+ private final int maxPollingIntervalSecs;
+ private final AtomicInteger pollingIntervalSecs;
+ // we use a blocking queue as a thread safe buffer between this class
+ // and others that may be calling #addConnector(Connector) - this also
+ // allows us to notify when a connector was added.
+ private BlockingQueue connectorQueue;
private Set connectors;
ConnectPollingService(
final KsqlExecutionContext executionContext,
final Consumer sourceCallback
+ ) {
+ this(executionContext, sourceCallback, MAX_INTERVAL_S);
+ }
+
+ @VisibleForTesting
+ ConnectPollingService(
+ final KsqlExecutionContext executionContext,
+ final Consumer sourceCallback,
+ final int maxPollingIntervalSecs
) {
this.executionContext = Objects.requireNonNull(executionContext, "executionContext");
this.sourceCallback = Objects.requireNonNull(sourceCallback, "sourceCallback");
- this.connectors = ConcurrentHashMap.newKeySet();
+ this.connectors = new HashSet<>();
+ this.connectorQueue = new LinkedBlockingDeque<>();
+ this.maxPollingIntervalSecs = maxPollingIntervalSecs;
+ this.pollingIntervalSecs = new AtomicInteger(maxPollingIntervalSecs);
}
/**
* Add this connector to the set of connectors that are polled by this
* {@code ConnectPollingService}. Next time an iteration is scheduled in
- * {@value #INTERVAL_S} seconds, the connector will be included in the topic
+ * {@value #MAX_INTERVAL_S} seconds, the connector will be included in the topic
* scan.
*
* @param connector a connector to register
*/
void addConnector(final Connector connector) {
- connectors.add(connector);
+ connectorQueue.add(connector);
+ pollingIntervalSecs.set(START_INTERVAL_S);
}
@Override
- protected void runOneIteration() {
+ protected void run() throws Exception {
+ while (isRunning()) {
+ final Connector connector = connectorQueue.poll(
+ pollingIntervalSecs.getAndUpdate(old -> Math.min(MAX_INTERVAL_S, 2 * old)),
+ TimeUnit.SECONDS);
+ if (connector == STOP_SENTINEL || connectorQueue.removeIf(c -> c == STOP_SENTINEL)) {
+ return;
+ } else if (connector != null) {
+ connectors.add(connector);
+ }
+
+ drainQueue();
+ runOneIteration();
+ }
+ }
+
+ @VisibleForTesting
+ void drainQueue() {
+ connectorQueue.drainTo(connectors);
+ }
+
+ @VisibleForTesting
+ void runOneIteration() {
+ // avoid making external calls if unnecessary
if (connectors.isEmpty()) {
- // avoid making external calls if unnecessary
return;
}
@@ -112,10 +161,19 @@ protected void runOneIteration() {
maybeConnector.ifPresent(connector -> handleTopic(topic, subjects, connector));
}
} catch (final Exception e) {
- LOG.error("Could not resolve connect sources. Trying again in {} seconds.", INTERVAL_S, e);
+ LOG.error("Could not resolve connect sources. Trying again in at most {} seconds.",
+ pollingIntervalSecs.get(),
+ e);
}
}
+ @Override
+ protected void triggerShutdown() {
+ // add the sentinel to the queue so that any blocking operation
+ // gets resolved
+ connectorQueue.add(STOP_SENTINEL);
+ }
+
private void handleTopic(
final String topic,
final Set subjects,
@@ -151,8 +209,4 @@ private void handleTopic(
}
}
- @Override
- protected Scheduler scheduler() {
- return Scheduler.newFixedRateSchedule(0, INTERVAL_S, TimeUnit.SECONDS);
- }
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/ConnectClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/ConnectClient.java
index 8b9dc5565e7c..ccf7dc0911be 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/services/ConnectClient.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/services/ConnectClient.java
@@ -66,21 +66,23 @@ public interface ConnectClient {
class ConnectResponse {
private final Optional datum;
private final Optional error;
+ private final int httpCode;
- public static ConnectResponse of(final T datum) {
- return new ConnectResponse<>(datum, null);
+ public static ConnectResponse of(final T datum, final int code) {
+ return new ConnectResponse<>(datum, null, code);
}
- public static ConnectResponse of(final String error) {
- return new ConnectResponse<>(null, error);
+ public static ConnectResponse of(final String error, final int code) {
+ return new ConnectResponse<>(null, error, code);
}
- private ConnectResponse(final T datum, final String error) {
+ private ConnectResponse(final T datum, final String error, final int code) {
KsqlPreconditions.checkArgument(
datum != null ^ error != null,
"expected exactly one of datum or error to be null");
this.datum = Optional.ofNullable(datum);
this.error = Optional.ofNullable(error);
+ this.httpCode = code;
}
public Optional datum() {
@@ -90,6 +92,10 @@ public Optional datum() {
public Optional error() {
return error;
}
+
+ public int httpCode() {
+ return httpCode;
+ }
}
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java
index 88a68c149739..0083ebae7f7f 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java
@@ -16,6 +16,10 @@
package io.confluent.ksql.services;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.util.KsqlException;
@@ -25,6 +29,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.http.HttpStatus;
import org.apache.http.client.ResponseHandler;
@@ -49,6 +55,7 @@ public class DefaultConnectClient implements ConnectClient {
private static final String CONNECTORS = "/connectors";
private static final String STATUS = "/status";
private static final int DEFAULT_TIMEOUT_MS = 5_000;
+ private static final int MAX_ATTEMPTS = 3;
private final URI connectUri;
@@ -74,7 +81,7 @@ public ConnectResponse create(
connector,
config);
- final ConnectResponse connectResponse = Request
+ final ConnectResponse connectResponse = withRetries(() -> Request
.Post(connectUri.resolve(CONNECTORS))
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
@@ -87,7 +94,7 @@ public ConnectResponse create(
)
.execute()
.handleResponse(
- createHandler(HttpStatus.SC_CREATED, ConnectorInfo.class, Function.identity()));
+ createHandler(HttpStatus.SC_CREATED, ConnectorInfo.class, Function.identity())));
connectResponse.error()
.ifPresent(error -> LOG.warn("Did not CREATE connector {}: {}", connector, error));
@@ -104,13 +111,13 @@ public ConnectResponse> connectors() {
try {
LOG.debug("Issuing request to Kafka Connect at URI {} to list connectors", connectUri);
- final ConnectResponse> connectResponse = Request
+ final ConnectResponse> connectResponse = withRetries(() -> Request
.Get(connectUri.resolve(CONNECTORS))
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.execute()
.handleResponse(
- createHandler(HttpStatus.SC_OK, List.class, foo -> (List) foo));
+ createHandler(HttpStatus.SC_OK, List.class, foo -> (List) foo)));
connectResponse.error()
.ifPresent(error -> LOG.warn("Could not list connectors: {}.", error));
@@ -128,13 +135,13 @@ public ConnectResponse status(final String connector) {
connectUri,
connector);
- final ConnectResponse connectResponse = Request
+ final ConnectResponse connectResponse = withRetries(() -> Request
.Get(connectUri.resolve(CONNECTORS + "/" + connector + STATUS))
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.execute()
.handleResponse(
- createHandler(HttpStatus.SC_OK, ConnectorStateInfo.class, Function.identity()));
+ createHandler(HttpStatus.SC_OK, ConnectorStateInfo.class, Function.identity())));
connectResponse.error()
.ifPresent(error ->
@@ -152,13 +159,13 @@ public ConnectResponse describe(final String connector) {
LOG.debug("Issuing request to Kafka Connect at URI {} to get config for {}",
connectUri, connector);
- final ConnectResponse connectResponse = Request
+ final ConnectResponse connectResponse = withRetries(() -> Request
.Get(connectUri.resolve(String.format("%s/%s", CONNECTORS, connector)))
.socketTimeout(DEFAULT_TIMEOUT_MS)
.connectTimeout(DEFAULT_TIMEOUT_MS)
.execute()
.handleResponse(
- createHandler(HttpStatus.SC_OK, ConnectorInfo.class, Function.identity()));
+ createHandler(HttpStatus.SC_OK, ConnectorInfo.class, Function.identity())));
connectResponse.error()
.ifPresent(error -> LOG.warn("Could not list connectors: {}.", error));
@@ -169,22 +176,49 @@ public ConnectResponse describe(final String connector) {
}
}
+ @SuppressWarnings("unchecked")
+ private static ConnectResponse withRetries(final Callable> action) {
+ try {
+ return RetryerBuilder.>newBuilder()
+ .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_ATTEMPTS))
+ .withWaitStrategy(WaitStrategies.exponentialWait())
+ .retryIfResult(
+ result -> result == null || result.httpCode() >= HttpStatus.SC_INTERNAL_SERVER_ERROR)
+ .retryIfException()
+ .build()
+ .call(action);
+ } catch (ExecutionException e) {
+ // this should never happen because we retryIfException()
+ throw new KsqlServerException("Unexpected exception!", e);
+ } catch (RetryException e) {
+ LOG.warn("Failed to query connect cluster after {} attempts.", e.getNumberOfFailedAttempts());
+ if (e.getLastFailedAttempt().hasResult()) {
+ return (ConnectResponse) e.getLastFailedAttempt().getResult();
+ }
+
+ // should rarely happen - only if some IOException happens and we didn't
+ // even get to send the request to the server
+ throw new KsqlServerException(e.getCause());
+ }
+ }
+
private static ResponseHandler> createHandler(
final int expectedStatus,
final Class entityClass,
final Function cast
) {
return httpResponse -> {
+ final int code = httpResponse.getStatusLine().getStatusCode();
if (httpResponse.getStatusLine().getStatusCode() != expectedStatus) {
final String entity = EntityUtils.toString(httpResponse.getEntity());
- return ConnectResponse.of(entity);
+ return ConnectResponse.of(entity, code);
}
final T info = cast.apply(MAPPER.readValue(
httpResponse.getEntity().getContent(),
entityClass));
- return ConnectResponse.of(info);
+ return ConnectResponse.of(info, code);
};
}
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java
index da8a4822804f..234fd82fe56e 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/services/SandboxConnectClient.java
@@ -21,6 +21,7 @@
import io.confluent.ksql.services.ConnectClient.ConnectResponse;
import io.confluent.ksql.util.LimitedProxyBuilder;
import java.util.Map;
+import org.apache.http.HttpStatus;
/**
* Supplies {@link ConnectClient}s to use that do not make any
@@ -32,10 +33,14 @@ private SandboxConnectClient() { }
public static ConnectClient createProxy() {
return LimitedProxyBuilder.forClass(ConnectClient.class)
- .swallow("create", methodParams(String.class, Map.class), ConnectResponse.of("sandbox"))
- .swallow("describe", methodParams(String.class), ConnectResponse.of("sandbox"))
- .swallow("connectors", methodParams(), ConnectResponse.of(ImmutableList.of()))
- .swallow("status", methodParams(String.class), ConnectResponse.of("sandbox"))
+ .swallow("create", methodParams(String.class, Map.class),
+ ConnectResponse.of("sandbox", HttpStatus.SC_INTERNAL_SERVER_ERROR))
+ .swallow("describe", methodParams(String.class),
+ ConnectResponse.of("sandbox", HttpStatus.SC_INTERNAL_SERVER_ERROR))
+ .swallow("connectors", methodParams(),
+ ConnectResponse.of(ImmutableList.of(), HttpStatus.SC_OK))
+ .swallow("status", methodParams(String.class),
+ ConnectResponse.of("sandbox", HttpStatus.SC_INTERNAL_SERVER_ERROR))
.build();
}
}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java
index 599a7d780b32..0e0128072648 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectConfigServiceTest.java
@@ -16,7 +16,6 @@
package io.confluent.ksql.connect;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyFloat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
@@ -30,8 +29,8 @@
import io.confluent.ksql.util.KsqlServerException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import org.apache.http.HttpStatus;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -234,9 +233,10 @@ private void givenConnectors(final String... names){
ImmutableMap.of(),
ImmutableList.of(),
ConnectorType.SOURCE
- )));
+ ), HttpStatus.SC_CREATED));
}
- when(connectClient.connectors()).thenReturn(ConnectResponse.of(ImmutableList.copyOf(names)));
+ when(connectClient.connectors()).thenReturn(ConnectResponse.of(ImmutableList.copyOf(names),
+ HttpStatus.SC_OK));
}
private OngoingStubbing> givenConnectorRecord(OngoingStubbing> stubbing) {
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectPollingServiceTest.java b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectPollingServiceTest.java
index 5e93bacb2821..a96e032dc2c4 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectPollingServiceTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/connect/ConnectPollingServiceTest.java
@@ -93,6 +93,7 @@ public void shouldCreateSourceFromConnector() {
givenConnector("foo");
// When:
+ pollingService.drainQueue();
pollingService.runOneIteration();
// Then:
@@ -116,6 +117,7 @@ public void shouldNotCreateSourceFromConnectorWithoutTopicMatch() {
givenConnector("bar");
// When:
+ pollingService.drainQueue();
pollingService.runOneIteration();
// Then:
@@ -131,6 +133,7 @@ public void shouldNotCreateSourceFromConnectorWithoutSubjectMatch() {
givenConnector("foo");
// When:
+ pollingService.drainQueue();
pollingService.runOneIteration();
// Then:
@@ -151,6 +154,7 @@ public void shouldNotCreateSourceForAlreadyRegisteredSource() {
metaStore.putSource(source);
// When:
+ pollingService.drainQueue();
pollingService.runOneIteration();
// Then:
@@ -168,6 +172,17 @@ public void shouldNotPollIfNoRegisteredConnectors() {
verifyZeroInteractions(serviceContext);
}
+ @Test(timeout = 30_000L)
+ public void shouldImmediatelyShutdown() {
+ pollingService = new ConnectPollingService(executionContext, foo -> {}, 60);
+
+ // When:
+ pollingService.startAsync().awaitRunning();
+ pollingService.stopAsync().awaitTerminated();
+
+ // Then: (test immediately stops)
+ }
+
private void givenTopic(final String topicName) {
adminClient.addTopic(
false,
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java
index d5d5c70da610..15df6d8b6726 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java
@@ -94,7 +94,7 @@ public void testCreateWithError() throws JsonProcessingException {
WireMock.stubFor(
WireMock.post(WireMock.urlEqualTo("/connectors"))
.willReturn(WireMock.aResponse()
- .withStatus(HttpStatus.SC_BAD_REQUEST)
+ .withStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR)
.withBody("Oh no!"))
);
@@ -169,4 +169,25 @@ public void testStatus() throws JsonProcessingException {
assertThat("Expected no error!", !response.error().isPresent());
}
+ @Test
+ public void testListShouldRetryOnFailure() throws JsonProcessingException {
+ // Given:
+ WireMock.stubFor(
+ WireMock.get(WireMock.urlEqualTo("/connectors"))
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR)
+ .withBody("Encountered an error!"))
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_OK)
+ .withBody(MAPPER.writeValueAsString(ImmutableList.of("one", "two"))))
+ );
+
+ // When:
+ final ConnectResponse> response = client.connectors();
+
+ // Then:
+ assertThat(response.datum(), OptionalMatchers.of(is(ImmutableList.of("one", "two"))));
+ assertThat("Expected no error!", !response.error().isPresent());
+ }
+
}
\ No newline at end of file
diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java
index 2da942a1811f..172e579f6129 100644
--- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java
+++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ConnectExecutorTest.java
@@ -37,6 +37,7 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;
+import org.apache.http.HttpStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.junit.Before;
@@ -118,12 +119,12 @@ private void givenSuccess() {
"foo",
ImmutableMap.of(),
ImmutableList.of(),
- ConnectorType.SOURCE)));
+ ConnectorType.SOURCE), HttpStatus.SC_OK));
}
private void givenError() {
when(connectClient.create(anyString(), anyMap()))
- .thenReturn(ConnectResponse.of("error!"));
+ .thenReturn(ConnectResponse.of("error!", HttpStatus.SC_BAD_REQUEST));
}
}
\ No newline at end of file
diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutorTest.java
index a52a0617150c..48dbd8b22b29 100644
--- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutorTest.java
+++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutorTest.java
@@ -51,6 +51,7 @@
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;
import java.util.function.Function;
+import org.apache.http.HttpStatus;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -122,8 +123,8 @@ public void setUp() {
when(source.getDataSourceType()).thenReturn(DataSourceType.KTABLE);
when(source.getKeyField()).thenReturn(KeyField.none());
when(source.getName()).thenReturn("source");
- when(connectClient.status(CONNECTOR_NAME)).thenReturn(ConnectResponse.of(STATUS));
- when(connectClient.describe("connector")).thenReturn(ConnectResponse.of(INFO));
+ when(connectClient.status(CONNECTOR_NAME)).thenReturn(ConnectResponse.of(STATUS, HttpStatus.SC_OK));
+ when(connectClient.describe("connector")).thenReturn(ConnectResponse.of(INFO, HttpStatus.SC_OK));
when(connector.matches(any())).thenReturn(false);
when(connector.matches("kafka-topic")).thenReturn(true);
@@ -159,7 +160,7 @@ public void shouldDescribeKnownConnector() {
@Test
public void shouldErrorIfConnectClientFailsStatus() {
// Given:
- when(connectClient.describe(any())).thenReturn(ConnectResponse.of("error"));
+ when(connectClient.describe(any())).thenReturn(ConnectResponse.of("error", HttpStatus.SC_INTERNAL_SERVER_ERROR));
// When:
final Optional entity = executor.execute(describeStatement, engine, serviceContext);
@@ -174,7 +175,7 @@ public void shouldErrorIfConnectClientFailsStatus() {
@Test
public void shouldErrorIfConnectClientFailsDescribe() {
// Given:
- when(connectClient.describe(any())).thenReturn(ConnectResponse.of("error"));
+ when(connectClient.describe(any())).thenReturn(ConnectResponse.of("error", HttpStatus.SC_INTERNAL_SERVER_ERROR));
// When:
final Optional entity = executor.execute(describeStatement, engine, serviceContext);
diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java
index 54c162f3ecd7..9f507a2aefab 100644
--- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java
+++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListConnectorsExecutorTest.java
@@ -35,6 +35,7 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;
+import org.apache.http.HttpStatus;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
@@ -68,16 +69,16 @@ public class ListConnectorsExecutorTest {
public void setUp() {
when(serviceContext.getConnectClient()).thenReturn(connectClient);
when(connectClient.describe("connector"))
- .thenReturn(ConnectResponse.of(INFO));
+ .thenReturn(ConnectResponse.of(INFO, HttpStatus.SC_OK));
when(connectClient.describe("connector2"))
- .thenReturn(ConnectResponse.of("DANGER WILL ROBINSON."));
+ .thenReturn(ConnectResponse.of("DANGER WILL ROBINSON.", HttpStatus.SC_NOT_FOUND));
}
@Test
public void shouldListValidConnector() {
// Given:
when(connectClient.connectors())
- .thenReturn(ConnectResponse.of(ImmutableList.of("connector")));
+ .thenReturn(ConnectResponse.of(ImmutableList.of("connector"), HttpStatus.SC_OK));
final ConfiguredStatement statement = ConfiguredStatement.of(
PreparedStatement.of("", new ListConnectors(Optional.empty(), Scope.ALL)),
ImmutableMap.of(),
@@ -105,7 +106,8 @@ public void shouldListValidConnector() {
public void shouldFilterNonMatchingConnectors() {
// Given:
when(connectClient.connectors())
- .thenReturn(ConnectResponse.of(ImmutableList.of("connector", "connector2")));
+ .thenReturn(ConnectResponse.of(ImmutableList.of("connector", "connector2"),
+ HttpStatus.SC_OK));
final ConfiguredStatement statement = ConfiguredStatement.of(
PreparedStatement.of("", new ListConnectors(Optional.empty(), Scope.SINK)),
ImmutableMap.of(),
@@ -131,7 +133,7 @@ public void shouldFilterNonMatchingConnectors() {
public void shouldListInvalidConnectorWithNoInfo() {
// Given:
when(connectClient.connectors())
- .thenReturn(ConnectResponse.of(ImmutableList.of("connector2")));
+ .thenReturn(ConnectResponse.of(ImmutableList.of("connector2"), HttpStatus.SC_OK));
final ConfiguredStatement statement = ConfiguredStatement.of(
PreparedStatement.of("", new ListConnectors(Optional.empty(), Scope.ALL)),
ImmutableMap.of(),
diff --git a/pom.xml b/pom.xml
index 8230d95cafcb..7cba15e1849f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,7 @@
1.4
3.3.1
24.1.1-jre
+ 2.0.0
1
3.0.7
3.11.0
@@ -268,6 +269,12 @@
${guava.version}
+
+ com.github.rholder
+ guava-retrying
+ ${retrying.version}
+
+
io.airlift
slice