Skip to content

Commit

Permalink
fix: make endpoints available while waiting for precondition (#5069)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Apr 15, 2020
1 parent d988722 commit 1136162
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,14 @@ public void startAsync() {

final KsqlConfig ksqlConfigWithPort = buildConfigWithPort();
configurables.forEach(c -> c.configure(ksqlConfigWithPort));

startKsql(ksqlConfigWithPort);
final Properties metricsProperties = new Properties();
metricsProperties.putAll(getConfiguration().getOriginals());
if (versionCheckerAgent != null) {
versionCheckerAgent.start(KsqlModuleType.SERVER, metricsProperties);
}

apiServer.setJettyPort(getJettyPort());

log.info("KSQL RESTful API listening on {}", StringUtils.join(getListeners(), ", "));
displayWelcomeMessage();
} catch (AbortApplicationStartException e) {
Expand Down Expand Up @@ -410,6 +409,7 @@ void startApiServer(final KsqlConfig ksqlConfigWithPort) {
apiServerConfig = new ApiServerConfig(ksqlConfigWithPort.originals());
apiServer = new Server(vertx, apiServerConfig, endpoints, true, securityExtension,
authenticationPlugin, serverState);
apiServer.setJettyPort(getJettyPort());
apiServer.start();
log.info("KSQL New API Server started");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.integration;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.KsqlServerPrecondition;
import io.confluent.ksql.rest.server.TestKsqlRestAppWaitingOnPrecondition;
import io.confluent.ksql.services.ServiceContext;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.zookeeper.ZooKeeperClientException;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;

@Category({IntegrationTest.class})
public class PreconditionFunctionalTest {

private static final String SERVER_PRECONDITIONS_CONFIG = "ksql.server.preconditions";

private static final int CUSTOM_ERROR_CODE = 50370;
private static final CountDownLatch latch = new CountDownLatch(1);

private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();

private static final TestKsqlRestAppWaitingOnPrecondition REST_APP = TestKsqlRestAppWaitingOnPrecondition
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withStaticServiceContext(TEST_HARNESS::getServiceContext)
.withProperty(
SERVER_PRECONDITIONS_CONFIG,
"io.confluent.ksql.rest.integration.PreconditionFunctionalTest$TestFailedPrecondition")
.buildWaitingOnPrecondition(latch);

@ClassRule
public static final RuleChain CHAIN = RuleChain
.outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS))
.around(TEST_HARNESS)
.around(REST_APP);

@BeforeClass
public static void setUpClass() {
REST_APP.startAndWaitForPrecondition();
}

@Test
public void shouldServeRequestsWhileWaitingForPrecondition() {
// When:
final KsqlErrorMessage error =
RestIntegrationTestUtil.makeKsqlRequestWithError(REST_APP, "SHOW STREAMS;");

// Then:
assertThat(error.getErrorCode(), is(CUSTOM_ERROR_CODE));
}

public static class TestFailedPrecondition implements KsqlServerPrecondition {

private boolean first = true;

@Override
public Optional<KsqlErrorMessage> checkPrecondition(
final KsqlRestConfig restConfig,
final ServiceContext serviceContext) {
return fail();
}

private Optional<KsqlErrorMessage> fail() {
if (first) {
latch.countDown();
first = false;
}
return Optional.of(new KsqlErrorMessage(CUSTOM_ERROR_CODE, "purposefully failed precondition"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Supplier;
Expand Down Expand Up @@ -98,26 +99,26 @@
*/
public class TestKsqlRestApp extends ExternalResource {

private static final AtomicInteger COUNTER = new AtomicInteger();
protected static final AtomicInteger COUNTER = new AtomicInteger();

private final String metricsPrefix = "app-" + COUNTER.getAndIncrement() + "-";
private final Map<String, ?> baseConfig;
private final Supplier<String> bootstrapServers;
private final Supplier<ServiceContext> serviceContext;
private final BiFunction<KsqlConfig, KsqlSecurityExtension, Binder> serviceContextBinderFactory;
private final KsqlSecurityContextProvider securityContextProvider;
private final List<URL> listeners = new ArrayList<>();
private final Optional<BasicCredentials> credentials;
private ExecutableServer<KsqlRestConfig> restServer;
private KsqlExecutionContext ksqlEngine;
private KsqlRestApplication ksqlRestApplication;
protected final String metricsPrefix = "app-" + COUNTER.getAndIncrement() + "-";
protected final Map<String, ?> baseConfig;
protected final Supplier<String> bootstrapServers;
protected final Supplier<ServiceContext> serviceContext;
protected final BiFunction<KsqlConfig, KsqlSecurityExtension, Binder> serviceContextBinderFactory;
protected final KsqlSecurityContextProvider securityContextProvider;
protected final List<URL> listeners = new ArrayList<>();
protected final Optional<BasicCredentials> credentials;
protected ExecutableServer<KsqlRestConfig> restServer;
protected KsqlExecutionContext ksqlEngine;
protected KsqlRestApplication ksqlRestApplication;

static {
// Increase the default - it's low (100)
System.setProperty("sun.net.maxDatagramSockets", "1024");
}

private TestKsqlRestApp(
protected TestKsqlRestApp(
final Supplier<String> bootstrapServers,
final Map<String, Object> additionalProps,
final Supplier<ServiceContext> serviceContext,
Expand Down Expand Up @@ -252,29 +253,7 @@ public ServiceContext getServiceContext() {

@Override
protected void before() {
if (restServer != null) {
after();
}

final KsqlRestConfig config = buildConfig(bootstrapServers, baseConfig);

try {
ksqlRestApplication = KsqlRestApplication.buildApplication(
metricsPrefix,
convertToApiServerConfig(config),
(booleanSupplier) -> niceMock(VersionCheckerAgent.class),
3,
serviceContext.get(),
serviceContextBinderFactory,
ksqlSecurityExtension -> securityContextProvider);

restServer = new ExecutableServer<>(
new ApplicationServer<>(convertToLocalListener(config)),
ImmutableList.of(ksqlRestApplication)
);
} catch (final Exception e) {
throw new RuntimeException("Failed to initialise", e);
}
initialize();

try {
restServer.startAsync();
Expand All @@ -301,6 +280,32 @@ protected void after() {
restServer = null;
}

protected void initialize() {
if (restServer != null) {
after();
}

final KsqlRestConfig config = buildConfig(bootstrapServers, baseConfig);

try {
ksqlRestApplication = KsqlRestApplication.buildApplication(
metricsPrefix,
convertToApiServerConfig(config),
(booleanSupplier) -> niceMock(VersionCheckerAgent.class),
3,
serviceContext.get(),
serviceContextBinderFactory,
ksqlSecurityExtension -> securityContextProvider);

restServer = new ExecutableServer<>(
new ApplicationServer<>(convertToLocalListener(config)),
ImmutableList.of(ksqlRestApplication)
);
} catch (final Exception e) {
throw new RuntimeException("Failed to initialise", e);
}
}

public static Builder builder(final Supplier<String> bootstrapServers) {
return new Builder(bootstrapServers);
}
Expand Down Expand Up @@ -538,5 +543,17 @@ public TestKsqlRestApp build() {
credentials
);
}

public TestKsqlRestAppWaitingOnPrecondition buildWaitingOnPrecondition(final CountDownLatch latch) {
return new TestKsqlRestAppWaitingOnPrecondition(
bootstrapServers,
additionalProps,
serviceContext,
securityContextBinder,
securityContextProvider,
credentials,
latch
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import io.confluent.ksql.api.endpoints.KsqlSecurityContextProvider;
import io.confluent.ksql.rest.client.BasicCredentials;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.glassfish.hk2.utilities.Binder;

/**
* A {@link TestKsqlRestApp} for testing behavior of a server stuck waiting on a precondition.
*
* The server is not started automatically. Rather, {@code startAndWaitForPrecondition()} should
* be called by the test suite. A {@code CountDownLatch} provided in the constructor counts down
* when a precondition check is called, in order to finish configuring the app once the server
* is waiting for preconditions.
*/
public class TestKsqlRestAppWaitingOnPrecondition extends TestKsqlRestApp {

private CountDownLatch latch;

TestKsqlRestAppWaitingOnPrecondition(
final Supplier<String> bootstrapServers,
final Map<String, Object> additionalProps,
final Supplier<ServiceContext> serviceContext,
final BiFunction<KsqlConfig, KsqlSecurityExtension, Binder> serviceContextBinderFactory,
final KsqlSecurityContextProvider securityContextProvider,
final Optional<BasicCredentials> credentials,
final CountDownLatch latch
) {
super(bootstrapServers, additionalProps, serviceContext, serviceContextBinderFactory, securityContextProvider, credentials);
this.latch = latch;
}

@Override
protected void before() {
initialize();
}

public void startAndWaitForPrecondition() {
try {
new Thread(() -> {
try {
restServer.startAsync();
} catch (Exception e) {
throw new RuntimeException("Error starting server", e);
}
}).start();
latch.await();
} catch (final Exception var2) {
throw new RuntimeException("Failed to start Ksql rest server", var2);
}

listeners.addAll(ksqlRestApplication.getListeners());
ksqlEngine = ksqlRestApplication.getEngine();
}
}

0 comments on commit 1136162

Please sign in to comment.