getSchemaRegistryClientFactory();
+ /**
+ * Get the shared {@link ConnectClient} instance.
+ *
+ * The default implementation is thread-safe and can be shared across threads.
+ *
+ * @return a shared {@link ConnectClient}
+ */
+ ConnectClient getConnectClient();
+
@Override
void close();
}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java
index ce7305d0058d..263786c30dfa 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java
@@ -19,6 +19,7 @@
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
+import io.confluent.ksql.services.DefaultConnectClient;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.KafkaTopicClientImpl;
import io.confluent.ksql.services.ServiceContext;
@@ -50,7 +51,8 @@ public static KsqlContext create(
clientSupplier,
adminClient,
kafkaTopicClient,
- () -> schemaRegistryClient
+ () -> schemaRegistryClient,
+ new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY))
);
final KsqlEngine engine = new KsqlEngine(
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java
new file mode 100644
index 000000000000..b6aba696cf34
--- /dev/null
+++ b/ksql-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"; you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.services;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.*;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.confluent.ksql.json.JsonMapper;
+import io.confluent.ksql.metastore.model.MetaStoreMatchers.OptionalMatchers;
+import io.confluent.ksql.services.ConnectClient.ConnectResponse;
+import org.apache.http.HttpStatus;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class DefaultConnectClientTest {
+
+ private static final ObjectMapper MAPPER = JsonMapper.INSTANCE.mapper;
+ private static final ConnectorInfo SAMPLE_INFO = new ConnectorInfo(
+ "foo",
+ ImmutableMap.of("key", "value"),
+ ImmutableList.of(new ConnectorTaskId("foo", 1)),
+ ConnectorType.SOURCE
+ );
+
+ @Rule
+ public WireMockRule wireMockRule = new WireMockRule(
+ WireMockConfiguration.wireMockConfig().dynamicPort());
+
+ private ConnectClient client;
+
+ @Before
+ public void setup() {
+ client = new DefaultConnectClient("http://localhost:" + wireMockRule.port());
+ }
+
+ @Test
+ public void testCreate() throws JsonProcessingException {
+ // Given:
+ WireMock.stubFor(
+ WireMock.post(WireMock.urlEqualTo("/connectors"))
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_CREATED)
+ .withBody(MAPPER.writeValueAsString(SAMPLE_INFO)))
+ );
+
+ // When:
+ final ConnectResponse response =
+ client.create("foo", ImmutableMap.of());
+
+ // Then:
+ assertThat(response.datum(), OptionalMatchers.of(is(SAMPLE_INFO)));
+ assertThat("Expected no error!", !response.error().isPresent());
+ }
+
+ @Test
+ public void testCreateWithError() throws JsonProcessingException {
+ // Given:
+ WireMock.stubFor(
+ WireMock.post(WireMock.urlEqualTo("/connectors"))
+ .willReturn(WireMock.aResponse()
+ .withStatus(HttpStatus.SC_BAD_REQUEST)
+ .withBody("Oh no!"))
+ );
+
+ // When:
+ final ConnectResponse response =
+ client.create("foo", ImmutableMap.of());
+
+ // Then:
+ assertThat("Expected no datum!", !response.datum().isPresent());
+ assertThat(response.error(), OptionalMatchers.of(is("Oh no!")));
+ }
+
+}
\ No newline at end of file
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxConnectClientTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxConnectClientTest.java
new file mode 100644
index 000000000000..f43416d776c4
--- /dev/null
+++ b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxConnectClientTest.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"; you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.services;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.*;
+
+import com.google.common.collect.ImmutableMap;
+import io.confluent.ksql.metastore.model.MetaStoreMatchers.OptionalMatchers;
+import io.confluent.ksql.services.ConnectClient.ConnectResponse;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+public class SandboxConnectClientTest {
+
+ private ConnectClient sandboxClient;
+
+ @Before
+ public void setUp() {
+ sandboxClient = SandboxConnectClient.createProxy();
+ }
+
+ @Test
+ public void shouldReturnErrorOnCreate() {
+ // When:
+ final ConnectResponse foo = sandboxClient.create("foo", ImmutableMap.of());
+
+ // Then:
+ assertThat(foo.error(), OptionalMatchers.of(is("sandbox")));
+ assertThat("expected no datum", !foo.datum().isPresent());
+ }
+
+}
\ No newline at end of file
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedServiceContextTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedServiceContextTest.java
index cf398c105a7a..d6a3edae35b6 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedServiceContextTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/services/SandboxedServiceContextTest.java
@@ -18,6 +18,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -55,6 +56,7 @@ public static Collection> getMethodsToTest() {
.ignore("getKafkaClientSupplier")
.ignore("getSchemaRegistryClient")
.ignore("getSchemaRegistryClientFactory")
+ .ignore("getConnectClient")
.ignore("close")
.build();
}
@@ -162,6 +164,15 @@ public void shouldGetSandboxedSchemaRegistryFactory() {
assertThat(factory.get(), is(sameInstance(sandboxedServiceContext.getSchemaRegistryClient())));
}
+ @Test
+ public void shouldGetSandboxedConnectClient() {
+ // When:
+ final ConnectClient client = sandboxedServiceContext.getConnectClient();
+
+ // Then:
+ assertThat("Expected proxy class", Proxy.isProxyClass(client.getClass()));
+ }
+
@Test
public void shouldNoNothingOnClose() {
sandboxedServiceContext.close();
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java b/ksql-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java
index 991c5437ded5..fb23445631b7 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java
@@ -62,7 +62,8 @@ public static ServiceContext create(
new FakeKafkaClientSupplier(),
new FakeKafkaClientSupplier().getAdminClient(Collections.emptyMap()),
topicClient,
- srClientFactory
+ srClientFactory,
+ new DefaultConnectClient("http://localhost:8083")
);
}
@@ -78,7 +79,8 @@ public static ServiceContext create(
kafkaClientSupplier,
adminClient,
new KafkaTopicClientImpl(adminClient),
- srClientFactory
+ srClientFactory,
+ new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY))
);
}
@@ -86,8 +88,9 @@ public static ServiceContext create(
final KafkaClientSupplier kafkaClientSupplier,
final AdminClient adminClient,
final KafkaTopicClient topicClient,
- final Supplier srClientFactory
+ final Supplier srClientFactory,
+ final ConnectClient connectClient
) {
- return new DefaultServiceContext(kafkaClientSupplier, adminClient, topicClient, srClientFactory);
+ return new DefaultServiceContext(kafkaClientSupplier, adminClient, topicClient, srClientFactory, connectClient);
}
}
\ No newline at end of file
diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java
index d9031d26bdac..7f4e232327e3 100644
--- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java
+++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java
@@ -247,7 +247,8 @@ public void shouldNotCallTopicClientForExtendedDescription() {
engine.getServiceContext().getKafkaClientSupplier(),
engine.getServiceContext().getAdminClient(),
spyTopicClient,
- engine.getServiceContext().getSchemaRegistryClientFactory()
+ engine.getServiceContext().getSchemaRegistryClientFactory(),
+ engine.getServiceContext().getConnectClient()
);
// When:
diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java
index 2420d861f73f..318abeb5ea80 100644
--- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java
+++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java
@@ -59,7 +59,8 @@ public void shouldListKafkaTopics() {
engine.getServiceContext().getKafkaClientSupplier(),
mockAdminClient,
engine.getServiceContext().getTopicClient(),
- engine.getServiceContext().getSchemaRegistryClientFactory()
+ engine.getServiceContext().getSchemaRegistryClientFactory(),
+ engine.getServiceContext().getConnectClient()
);
// When:
diff --git a/pom.xml b/pom.xml
index 15ffc55a1144..e2b3ad259500 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
2.0.0-M22
1.0.0-M33
+ 4.5.9
http://packages.confluent.io/maven/
1.2.1
6.0.2.RELEASE
@@ -107,6 +108,7 @@
1.0.2
0.2.2
2.9.0
+ 2.24.0