Skip to content

Commit

Permalink
Add concurrent control and test to verifier
Browse files Browse the repository at this point in the history
  • Loading branch information
aweisberg authored and highker committed Apr 9, 2022
1 parent e4131f5 commit 7193625
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import com.facebook.presto.verifier.prestoaction.QueryActions;
import com.facebook.presto.verifier.prestoaction.SqlExceptionClassifier;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;

import java.util.List;
import java.util.Optional;

import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT;
import static com.facebook.presto.verifier.event.VerifierQueryEvent.EventStatus.FAILED;
import static com.facebook.presto.verifier.event.VerifierQueryEvent.EventStatus.FAILED_RESOLVED;
Expand Down Expand Up @@ -59,6 +62,7 @@
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.base.Throwables.getStackTraceAsString;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand All @@ -72,6 +76,7 @@ public abstract class AbstractVerification<B extends QueryBundle, R extends Matc
private final SqlExceptionClassifier exceptionClassifier;
private final VerificationContext verificationContext;
private final Optional<ResultSetConverter<V>> mainQueryResultSetConverter;
private final ListeningExecutorService executor;

private final String testId;
private final boolean smartTeardown;
Expand All @@ -81,20 +86,23 @@ public abstract class AbstractVerification<B extends QueryBundle, R extends Matc
private final boolean teardownOnMainClusters;
private final boolean skipControl;
private final boolean skipChecksum;
private final boolean concurrentControlAndTest;

public AbstractVerification(
QueryActions queryActions,
SourceQuery sourceQuery,
SqlExceptionClassifier exceptionClassifier,
VerificationContext verificationContext,
Optional<ResultSetConverter<V>> mainQueryResultSetConverter,
VerifierConfig verifierConfig)
VerifierConfig verifierConfig,
ListeningExecutorService executor)
{
this.queryActions = requireNonNull(queryActions, "queryActions is null");
this.sourceQuery = requireNonNull(sourceQuery, "sourceQuery is null");
this.exceptionClassifier = requireNonNull(exceptionClassifier, "exceptionClassifier is null");
this.verificationContext = requireNonNull(verificationContext, "verificationContext is null");
this.mainQueryResultSetConverter = requireNonNull(mainQueryResultSetConverter, "mainQueryResultSetConverter is null");
this.executor = requireNonNull(executor, "executor is null");

this.testId = requireNonNull(verifierConfig.getTestId(), "testId is null");
this.smartTeardown = verifierConfig.isSmartTeardown();
Expand All @@ -103,6 +111,7 @@ public AbstractVerification(
this.teardownOnMainClusters = verifierConfig.isTeardownOnMainClusters();
this.skipControl = verifierConfig.isSkipControl();
this.skipChecksum = verifierConfig.isSkipChecksum();
this.concurrentControlAndTest = verifierConfig.isConcurrentControlAndTest();
}

protected abstract B getQueryRewrite(ClusterType clusterType);
Expand Down Expand Up @@ -174,29 +183,41 @@ public VerificationResult run()
}
test = Optional.of(getQueryRewrite(TEST));

// Run control queries
// First run setup queries
if (!skipControl) {
QueryBundle controlQueryBundle = control.get();
QueryAction controlSetupAction = setupOnMainClusters ? queryActions.getControlAction() : queryActions.getHelperAction();
controlQueryBundle.getSetupQueries().forEach(query -> runAndConsume(
() -> controlSetupAction.execute(query, CONTROL_SETUP),
controlQueryContext::addSetupQuery,
controlQueryContext::setException));
controlQueryResult = runMainQuery(controlQueryBundle.getQuery(), CONTROL, controlQueryContext);
controlQueryContext.setState(QueryState.SUCCEEDED);
}
else {
controlQueryContext.setState(NOT_RUN);
}

// Run test queries
QueryBundle testQueryBundle = test.get();
QueryAction testSetupAction = setupOnMainClusters ? queryActions.getTestAction() : queryActions.getHelperAction();
testQueryBundle.getSetupQueries().forEach(query -> runAndConsume(
() -> testSetupAction.execute(query, TEST_SETUP),
testQueryContext::addSetupQuery,
testQueryContext::setException));
testQueryResult = runMainQuery(testQueryBundle.getQuery(), TEST, testQueryContext);

ListenableFuture<Optional<QueryResult<V>>> controlQueryFuture = immediateFuture(Optional.empty());
// Start control query
if (!skipControl) {
QueryBundle controlQueryBundle = control.get();
controlQueryFuture = executor.submit(() -> runMainQuery(controlQueryBundle.getQuery(), CONTROL, controlQueryContext));
}
else {
controlQueryContext.setState(NOT_RUN);
}

if (!concurrentControlAndTest) {
getFutureValue(controlQueryFuture);
}

// Run test queries
ListenableFuture<Optional<QueryResult<V>>> testQueryFuture = executor.submit(() -> runMainQuery(testQueryBundle.getQuery(), TEST, testQueryContext));
controlQueryResult = getFutureValue(controlQueryFuture);
controlQueryContext.setState(QueryState.SUCCEEDED);
testQueryResult = getFutureValue(testQueryFuture);
testQueryContext.setState(QueryState.SUCCEEDED);

// Verify results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.verifier.prestoaction.QueryActions;
import com.facebook.presto.verifier.prestoaction.SqlExceptionClassifier;
import com.facebook.presto.verifier.rewrite.QueryRewriter;
import com.google.common.util.concurrent.ListeningExecutorService;

import java.sql.SQLException;
import java.util.Objects;
Expand Down Expand Up @@ -52,9 +53,10 @@ public CreateTableVerification(
QueryRewriter queryRewriter,
SqlExceptionClassifier exceptionClassifier,
VerificationContext verificationContext,
VerifierConfig verifierConfig)
VerifierConfig verifierConfig,
ListeningExecutorService executor)
{
super(sqlParser, queryActions, sourceQuery, exceptionClassifier, verificationContext, verifierConfig, SHOW_CREATE_TABLE_CONVERTER);
super(sqlParser, queryActions, sourceQuery, exceptionClassifier, verificationContext, verifierConfig, SHOW_CREATE_TABLE_CONVERTER, executor);
this.queryRewriter = requireNonNull(queryRewriter, "queryRewriter is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.verifier.prestoaction.QueryActions;
import com.facebook.presto.verifier.prestoaction.SqlExceptionClassifier;
import com.facebook.presto.verifier.rewrite.QueryRewriter;
import com.google.common.util.concurrent.ListeningExecutorService;

import java.sql.SQLException;
import java.util.Objects;
Expand Down Expand Up @@ -52,9 +53,10 @@ public CreateViewVerification(
QueryRewriter queryRewriter,
SqlExceptionClassifier exceptionClassifier,
VerificationContext verificationContext,
VerifierConfig verifierConfig)
VerifierConfig verifierConfig,
ListeningExecutorService executor)
{
super(sqlParser, queryActions, sourceQuery, exceptionClassifier, verificationContext, verifierConfig, SHOW_CREATE_VIEW_CONVERTER);
super(sqlParser, queryActions, sourceQuery, exceptionClassifier, verificationContext, verifierConfig, SHOW_CREATE_VIEW_CONVERTER, executor);
this.queryRewriter = requireNonNull(queryRewriter, "queryRewriter is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.facebook.presto.verifier.prestoaction.SqlExceptionClassifier;
import com.facebook.presto.verifier.resolver.FailureResolverManager;
import com.facebook.presto.verifier.rewrite.QueryRewriter;
import com.google.common.util.concurrent.ListeningExecutorService;

import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -57,9 +58,10 @@ public DataVerification(
VerificationContext verificationContext,
VerifierConfig verifierConfig,
TypeManager typeManager,
ChecksumValidator checksumValidator)
ChecksumValidator checksumValidator,
ListeningExecutorService executor)
{
super(queryActions, sourceQuery, exceptionClassifier, verificationContext, Optional.empty(), verifierConfig);
super(queryActions, sourceQuery, exceptionClassifier, verificationContext, Optional.empty(), verifierConfig, executor);
this.queryRewriter = requireNonNull(queryRewriter, "queryRewriter is null");
this.determinismAnalyzer = requireNonNull(determinismAnalyzer, "determinismAnalyzer is null");
this.failureResolverManager = requireNonNull(failureResolverManager, "failureResolverManager is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.verifier.prestoaction.PrestoAction.ResultSetConverter;
import com.facebook.presto.verifier.prestoaction.QueryActions;
import com.facebook.presto.verifier.prestoaction.SqlExceptionClassifier;
import com.google.common.util.concurrent.ListeningExecutorService;

import java.util.Optional;

Expand Down Expand Up @@ -48,9 +49,10 @@ public DdlVerification(
SqlExceptionClassifier exceptionClassifier,
VerificationContext verificationContext,
VerifierConfig verifierConfig,
ResultSetConverter<String> checksumConverter)
ResultSetConverter<String> checksumConverter,
ListeningExecutorService executor)
{
super(queryActions, sourceQuery, exceptionClassifier, verificationContext, Optional.empty(), verifierConfig);
super(queryActions, sourceQuery, exceptionClassifier, verificationContext, Optional.empty(), verifierConfig, executor);
this.sqlParser = requireNonNull(sqlParser, "sqlParser");
this.checksumConverter = requireNonNull(checksumConverter, "checksumConverter is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.facebook.presto.verifier.prestoaction.QueryActions;
import com.facebook.presto.verifier.prestoaction.SqlExceptionClassifier;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningExecutorService;

import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -53,9 +54,10 @@ public ExplainVerification(
SqlExceptionClassifier exceptionClassifier,
VerificationContext verificationContext,
VerifierConfig verifierConfig,
SqlParser sqlParser)
SqlParser sqlParser,
ListeningExecutorService executor)
{
super(queryActions, sourceQuery, exceptionClassifier, verificationContext, Optional.of(QUERY_PLAN_RESULT_SET_CONVERTER), verifierConfig);
super(queryActions, sourceQuery, exceptionClassifier, verificationContext, Optional.of(QUERY_PLAN_RESULT_SET_CONVERTER), verifierConfig, executor);
this.sqlParser = requireNonNull(sqlParser, "sqlParser is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@
import com.facebook.presto.verifier.resolver.FailureResolverManagerFactory;
import com.facebook.presto.verifier.rewrite.QueryRewriter;
import com.facebook.presto.verifier.rewrite.QueryRewriterFactory;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import javax.inject.Inject;

import java.util.Optional;

import static com.facebook.presto.verifier.framework.ClusterType.CONTROL;
import static com.facebook.presto.verifier.framework.VerifierUtil.PARSING_OPTIONS;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;

public class VerificationFactory
{
Expand All @@ -45,6 +49,7 @@ public class VerificationFactory
private final VerifierConfig verifierConfig;
private final TypeManager typeManager;
private final DeterminismAnalyzerConfig determinismAnalyzerConfig;
private final ListeningExecutorService executor = listeningDecorator(newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Control/Test query thread - %d").build()));

@Inject
public VerificationFactory(
Expand Down Expand Up @@ -83,7 +88,8 @@ public Verification get(SourceQuery sourceQuery, Optional<VerificationContext> e
exceptionClassifier,
verificationContext,
verifierConfig,
sqlParser);
sqlParser,
executor);
}

QueryRewriter queryRewriter = queryRewriterFactory.create(queryActions.getHelperAction());
Expand All @@ -109,7 +115,8 @@ public Verification get(SourceQuery sourceQuery, Optional<VerificationContext> e
verificationContext,
verifierConfig,
typeManager,
checksumValidator);
checksumValidator,
executor);
case CREATE_VIEW:
return new CreateViewVerification(
sqlParser,
Expand All @@ -118,7 +125,8 @@ public Verification get(SourceQuery sourceQuery, Optional<VerificationContext> e
queryRewriter,
exceptionClassifier,
verificationContext,
verifierConfig);
verifierConfig,
executor);
case CREATE_TABLE:
return new CreateTableVerification(
sqlParser,
Expand All @@ -127,7 +135,8 @@ public Verification get(SourceQuery sourceQuery, Optional<VerificationContext> e
queryRewriter,
exceptionClassifier,
verificationContext,
verifierConfig);
verifierConfig,
executor);
default:
throw new IllegalStateException(format("Unsupported query type: %s", queryType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class VerifierConfig
private boolean teardownOnMainClusters = true;
private boolean skipControl;
private boolean skipChecksum;
private boolean concurrentControlAndTest;

private boolean explain;

Expand Down Expand Up @@ -331,4 +332,17 @@ public VerifierConfig setExplain(boolean explain)
this.explain = explain;
return this;
}

public boolean isConcurrentControlAndTest()
{
return concurrentControlAndTest;
}

@ConfigDescription("Run control and test query concurrently")
@Config("concurrent-control-and-test")
public VerifierConfig setConcurrentControlAndTest(boolean concurrentControlAndTest)
{
this.concurrentControlAndTest = concurrentControlAndTest;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,22 +105,27 @@ protected SourceQuery getSourceQuery(String controlQuery, String testQuery)

protected Optional<VerifierQueryEvent> runExplain(String controlQuery, String testQuery)
{
return verify(getSourceQuery(controlQuery, testQuery), true, Optional.empty());
return verify(getSourceQuery(controlQuery, testQuery), true, Optional.empty(), Optional.empty());
}

protected Optional<VerifierQueryEvent> runVerification(String controlQuery, String testQuery)
{
return verify(getSourceQuery(controlQuery, testQuery), false, Optional.empty());
return verify(getSourceQuery(controlQuery, testQuery), false, Optional.empty(), Optional.empty());
}

protected Optional<VerifierQueryEvent> runVerification(String controlQuery, String testQuery, VerificationSettings settings)
{
return verify(getSourceQuery(controlQuery, testQuery), false, Optional.empty(), Optional.of(settings));
}

protected Optional<VerifierQueryEvent> verify(SourceQuery sourceQuery, boolean explain)
{
return verify(sourceQuery, explain, Optional.empty());
return verify(sourceQuery, explain, Optional.empty(), Optional.empty());
}

protected Optional<VerifierQueryEvent> verify(SourceQuery sourceQuery, boolean explain, PrestoAction mockPrestoAction)
{
return verify(sourceQuery, explain, Optional.of(mockPrestoAction));
return verify(sourceQuery, explain, Optional.of(mockPrestoAction), Optional.empty());
}

protected PrestoAction getPrestoAction(Optional<QueryConfiguration> queryConfiguration)
Expand All @@ -145,9 +150,16 @@ protected PrestoAction getPrestoAction(Optional<QueryConfiguration> queryConfigu
verifierConfig);
}

private Optional<VerifierQueryEvent> verify(SourceQuery sourceQuery, boolean explain, Optional<PrestoAction> mockPrestoAction)
private Optional<VerifierQueryEvent> verify(
SourceQuery sourceQuery,
boolean explain,
Optional<PrestoAction> mockPrestoAction,
Optional<VerificationSettings> verificationSettings)
{
VerifierConfig verifierConfig = new VerifierConfig().setTestId(TEST_ID).setExplain(explain);
verificationSettings.ifPresent(settings -> {
settings.concurrentControlAndTest.ifPresent(verifierConfig::setConcurrentControlAndTest);
});
TypeManager typeManager = createTypeManager();
PrestoAction prestoAction = mockPrestoAction.orElseGet(() -> getPrestoAction(Optional.of(sourceQuery.getControlConfiguration())));
QueryRewriterFactory queryRewriterFactory = new VerificationQueryRewriterFactory(
Expand All @@ -168,4 +180,9 @@ private Optional<VerifierQueryEvent> verify(SourceQuery sourceQuery, boolean exp
determinismAnalyzerConfig);
return verificationFactory.get(sourceQuery, Optional.empty()).run().getEvent();
}

public static class VerificationSettings
{
Optional<Boolean> concurrentControlAndTest;
}
}
Loading

0 comments on commit 7193625

Please sign in to comment.