Skip to content

Commit

Permalink
Add 'use-test-checksum-in-determinism-analyzer' config property to Ve…
Browse files Browse the repository at this point in the history
…rifier.
  • Loading branch information
spershin committed Feb 6, 2025
1 parent a8a6ffd commit 402108b
Show file tree
Hide file tree
Showing 14 changed files with 187 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,22 @@
public class DeterminismAnalysisRun
{
private final String tableName;
private final String clusterType;
private final String queryId;
private final List<String> setupQueryIds;
private final List<String> teardownQueryIds;
private final String checksumQueryId;

private DeterminismAnalysisRun(
Optional<String> tableName,
Optional<String> clusterType,
Optional<String> queryId,
List<String> setupQueryIds,
List<String> teardownQueryIds,
Optional<String> checksumQueryId)
{
this.tableName = tableName.orElse(null);
this.clusterType = clusterType.orElse(null);
this.queryId = queryId.orElse(null);
this.setupQueryIds = ImmutableList.copyOf(setupQueryIds);
this.teardownQueryIds = ImmutableList.copyOf(teardownQueryIds);
Expand All @@ -54,6 +57,12 @@ public String getTableName()
return tableName;
}

@EventField
public String getClusterType()
{
return clusterType;
}

@EventField
public String getQueryId()
{
Expand Down Expand Up @@ -86,6 +95,7 @@ public static Builder builder()
public static class Builder
{
private Optional<String> tableName = Optional.empty();
private Optional<String> clusterType = Optional.empty();
private Optional<String> queryId = Optional.empty();
private ImmutableList.Builder<String> setupQueryIds = ImmutableList.builder();
private ImmutableList.Builder<String> teardownQueryIds = ImmutableList.builder();
Expand All @@ -100,6 +110,13 @@ public Builder setTableName(String tableName)
return this;
}

public Builder setClusterType(String clusterType)
{
checkState(!this.clusterType.isPresent(), "clusterType is already set");
this.clusterType = Optional.of(clusterType);
return this;
}

public Builder setQueryId(String queryId)
{
checkState(!this.queryId.isPresent(), "queryId is already set");
Expand Down Expand Up @@ -128,7 +145,7 @@ public Builder setChecksumQueryId(String checksumQueryId)

public DeterminismAnalysisRun build()
{
return new DeterminismAnalysisRun(tableName, queryId, setupQueryIds.build(), teardownQueryIds.build(), checksumQueryId);
return new DeterminismAnalysisRun(tableName, clusterType, queryId, setupQueryIds.build(), teardownQueryIds.build(), checksumQueryId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public abstract class AbstractVerification<B extends QueryBundle, R extends Matc
protected final String runningMode;
protected final boolean saveSnapshot;
protected final boolean isExplain;
protected final boolean isRunDeterminismAnalysisOnTest;
private final boolean concurrentControlAndTest;
protected final SnapshotQueryConsumer snapshotQueryConsumer;
protected final Map<String, SnapshotQuery> snapshotQueries;
Expand Down Expand Up @@ -131,6 +132,7 @@ public AbstractVerification(
this.runningMode = verifierConfig.getRunningMode();
this.saveSnapshot = verifierConfig.isSaveSnapshot();
this.isExplain = verifierConfig.isExplain();
this.isRunDeterminismAnalysisOnTest = verifierConfig.isRunDeterminismAnalysisOnTest();
}

protected abstract B getQueryRewrite(ClusterType clusterType);
Expand All @@ -143,7 +145,7 @@ protected abstract R verify(
ChecksumQueryContext controlChecksumQueryContext,
ChecksumQueryContext testChecksumQueryContext);

protected abstract DeterminismAnalysisDetails analyzeDeterminism(B control, R matchResult);
protected abstract DeterminismAnalysisDetails analyzeDeterminism(B controlObject, B testObject, R matchResult);

protected abstract Optional<String> resolveFailure(
Optional<B> control,
Expand All @@ -166,6 +168,16 @@ protected PrestoAction getHelperAction()
return queryActions.getHelperAction();
}

protected QueryAction getTestAction()
{
return queryActions.getTestAction();
}

protected QueryAction getControlAction()
{
return queryActions.getControlAction();
}

protected boolean isControlEnabled()
{
return !skipControl || saveSnapshot;
Expand Down Expand Up @@ -291,7 +303,7 @@ else if ((isControlEnabled()) && !skipChecksum) {
return new VerificationResult(this, true, Optional.empty());
}
else if (matchResult.get().isMismatchPossiblyCausedByNonDeterminism()) {
determinismAnalysisDetails = Optional.of(analyzeDeterminism(control.get(), matchResult.get()));
determinismAnalysisDetails = Optional.of(analyzeDeterminism(control.get(), test.get(), matchResult.get()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public enum MatchType
private final DataType dataType;
private final MatchType matchType;
private final Optional<ChecksumResult> controlChecksum;
private final Optional<ChecksumResult> testChecksum;
private final OptionalLong controlRowCount;
private final OptionalLong testRowCount;
private final List<ColumnMatchResult<?>> mismatchedColumns;
Expand All @@ -61,13 +62,15 @@ public DataMatchResult(
DataType dataType,
MatchType matchType,
Optional<ChecksumResult> controlChecksum,
Optional<ChecksumResult> testChecksum,
OptionalLong controlRowCount,
OptionalLong testRowCount,
List<ColumnMatchResult<?>> mismatchedColumns)
{
this.dataType = requireNonNull(dataType, "data type is null");
this.matchType = requireNonNull(matchType, "match type is null");
this.controlChecksum = requireNonNull(controlChecksum, "controlChecksum is null");
this.testChecksum = requireNonNull(testChecksum, "testChecksum is null");
this.controlRowCount = requireNonNull(controlRowCount, "controlRowCount is null");
this.testRowCount = requireNonNull(testRowCount, "testRowCount is null");
this.mismatchedColumns = ImmutableList.copyOf(mismatchedColumns);
Expand Down Expand Up @@ -114,6 +117,12 @@ public ChecksumResult getControlChecksum()
return controlChecksum.get();
}

public ChecksumResult getTestChecksum()
{
checkState(testChecksum.isPresent(), "testChecksum is missing");
return testChecksum.get();
}

public List<ColumnMatchResult<?>> getMismatchedColumns()
{
return mismatchedColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.verifier.checksum.ChecksumResult;
import com.facebook.presto.verifier.checksum.ChecksumValidator;
import com.facebook.presto.verifier.event.DeterminismAnalysisDetails;
import com.facebook.presto.verifier.event.DeterminismAnalysisRun;
import com.facebook.presto.verifier.event.QueryInfo;
import com.facebook.presto.verifier.prestoaction.QueryActions;
import com.facebook.presto.verifier.prestoaction.SqlExceptionClassifier;
Expand All @@ -34,6 +35,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.facebook.presto.verifier.framework.DataMatchResult.DataType.DATA;
import static com.facebook.presto.verifier.framework.DataMatchResult.MatchType.MATCH;
Expand Down Expand Up @@ -132,6 +135,7 @@ public DataMatchResult verify(
DATA,
MATCH,
Optional.empty(),
Optional.empty(),
OptionalLong.empty(),
OptionalLong.empty(),
ImmutableList.of());
Expand All @@ -146,7 +150,14 @@ else if (QUERY_BANK_MODE.equals(runningMode)) {
controlChecksumResult = ChecksumResult.fromJson(snapshotJson);
}
else {
return new DataMatchResult(DATA, SNAPSHOT_DOES_NOT_EXIST, Optional.empty(), OptionalLong.empty(), OptionalLong.empty(), Collections.emptyList());
return new DataMatchResult(
DATA,
SNAPSHOT_DOES_NOT_EXIST,
Optional.empty(),
Optional.empty(),
OptionalLong.empty(),
OptionalLong.empty(),
Collections.emptyList());
}
}

Expand All @@ -159,9 +170,23 @@ else if (QUERY_BANK_MODE.equals(runningMode)) {
}

@Override
protected DeterminismAnalysisDetails analyzeDeterminism(QueryObjectBundle control, DataMatchResult matchResult)
protected DeterminismAnalysisDetails analyzeDeterminism(QueryObjectBundle controlObject, QueryObjectBundle testObject, DataMatchResult matchResult)
{
return determinismAnalyzer.analyze(control, matchResult.getControlChecksum());
if (isRunDeterminismAnalysisOnTest) {
DeterminismAnalysisDetails analysis = determinismAnalyzer.analyze(getTestAction(), testObject, matchResult.getTestChecksum());
if (!analysis.getDeterminismAnalysis().isNonDeterministic()) {
return analysis;
}
// In case we rerun determinism analysis on control, we keep the test runs for stats.
List<DeterminismAnalysisRun> runs = analysis.getRuns();
analysis = determinismAnalyzer.analyze(getControlAction(), controlObject, matchResult.getControlChecksum());
return new DeterminismAnalysisDetails(
analysis.getDeterminismAnalysis(),
Stream.concat(runs.stream(), analysis.getRuns().stream()).collect(Collectors.toList()),
LimitQueryDeterminismAnalysis.valueOf(analysis.getLimitQueryAnalysis()),
Optional.ofNullable(analysis.getLimitQueryAnalysisQueryId()));
}
return determinismAnalyzer.analyze(getControlAction(), controlObject, matchResult.getControlChecksum());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public static DataMatchResult match(
dataType,
SCHEMA_MISMATCH,
Optional.empty(),
Optional.empty(),
OptionalLong.empty(),
OptionalLong.empty(),
ImmutableList.of());
Expand All @@ -107,6 +108,7 @@ public static DataMatchResult match(
dataType,
matchType,
Optional.of(controlChecksum),
Optional.of(testChecksum),
controlRowCount,
testRowCount,
mismatchedColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ else if (QUERY_BANK_MODE.equals(runningMode)) {
}

@Override
protected DeterminismAnalysisDetails analyzeDeterminism(QueryObjectBundle control, DdlMatchResult matchResult)
protected DeterminismAnalysisDetails analyzeDeterminism(QueryObjectBundle controlObject, QueryObjectBundle testObject, DdlMatchResult matchResult)
{
throw new UnsupportedOperationException("analyzeDeterminism is not supported for DdlVerification");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.facebook.presto.verifier.event.DeterminismAnalysisDetails;
import com.facebook.presto.verifier.event.DeterminismAnalysisRun;
import com.facebook.presto.verifier.prestoaction.PrestoAction;
import com.facebook.presto.verifier.prestoaction.QueryAction;
import com.facebook.presto.verifier.rewrite.QueryRewriter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
Expand All @@ -36,7 +37,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.facebook.presto.verifier.framework.ClusterType.CONTROL;
import static com.facebook.presto.verifier.framework.DataVerificationUtil.getColumns;
import static com.facebook.presto.verifier.framework.DataVerificationUtil.match;
import static com.facebook.presto.verifier.framework.DataVerificationUtil.teardownSafely;
Expand All @@ -60,7 +60,7 @@
public class DeterminismAnalyzer
{
private final SourceQuery sourceQuery;
private final PrestoAction prestoAction;
private final PrestoAction helperAction;
private final QueryRewriter queryRewriter;
private final ChecksumValidator checksumValidator;
private final TypeManager typeManager;
Expand All @@ -72,14 +72,14 @@ public class DeterminismAnalyzer

public DeterminismAnalyzer(
SourceQuery sourceQuery,
PrestoAction prestoAction,
PrestoAction helperAction,
QueryRewriter queryRewriter,
ChecksumValidator checksumValidator,
TypeManager typeManager,
DeterminismAnalyzerConfig config)
{
this.sourceQuery = requireNonNull(sourceQuery, "sourceQuery is null");
this.prestoAction = requireNonNull(prestoAction, "prestoAction is null");
this.helperAction = requireNonNull(helperAction, "helperAction is null");
this.queryRewriter = requireNonNull(queryRewriter, "queryRewriter is null");
this.checksumValidator = requireNonNull(checksumValidator, "checksumValidator is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand All @@ -90,26 +90,27 @@ public DeterminismAnalyzer(
this.handleLimitQuery = config.isHandleLimitQuery();
}

protected DeterminismAnalysisDetails analyze(QueryObjectBundle control, ChecksumResult controlChecksum)
protected DeterminismAnalysisDetails analyze(QueryAction queryAction, QueryObjectBundle objectBundle, ChecksumResult referenceChecksum)
{
requireNonNull(queryAction, "queryAction is null");
DeterminismAnalysisDetails.Builder determinismAnalysisDetails = DeterminismAnalysisDetails.builder();
DeterminismAnalysis analysis = analyze(control, controlChecksum, determinismAnalysisDetails);
DeterminismAnalysis analysis = analyze(queryAction, objectBundle, referenceChecksum, determinismAnalysisDetails);
return determinismAnalysisDetails.build(analysis);
}

private DeterminismAnalysis analyze(QueryObjectBundle control, ChecksumResult controlChecksum, DeterminismAnalysisDetails.Builder determinismAnalysisDetails)
private DeterminismAnalysis analyze(QueryAction actionForQuery, QueryObjectBundle objectBundle, ChecksumResult referenceChecksum, DeterminismAnalysisDetails.Builder determinismAnalysisDetails)
{
// Handle mutable catalogs
if (isNonDeterministicCatalogReferenced(control.getQuery())) {
if (isNonDeterministicCatalogReferenced(objectBundle.getQuery())) {
return NON_DETERMINISTIC_CATALOG;
}

// Handle limit query
LimitQueryDeterminismAnalysis limitQueryAnalysis = new LimitQueryDeterminismAnalyzer(
prestoAction,
helperAction,
handleLimitQuery,
control.getQuery(),
controlChecksum.getRowCount(),
objectBundle.getQuery(),
referenceChecksum.getRowCount(),
determinismAnalysisDetails).analyze();

switch (limitQueryAnalysis) {
Expand All @@ -127,29 +128,31 @@ private DeterminismAnalysis analyze(QueryObjectBundle control, ChecksumResult co
}

// Rerun control query multiple times
List<Column> columns = getColumns(prestoAction, typeManager, control.getObjectName());
List<Column> columns = getColumns(helperAction, typeManager, objectBundle.getObjectName());
ClusterType clusterType = objectBundle.getCluster();
Map<QueryBundle, DeterminismAnalysisRun.Builder> queryRuns = new HashMap<>();
try {
for (int i = 0; i < maxAnalysisRuns; i++) {
QueryObjectBundle queryBundle = queryRewriter.rewriteQuery(sourceQuery.getQuery(CONTROL), sourceQuery.getQueryConfiguration(CONTROL), CONTROL, false);
QueryObjectBundle queryBundle = queryRewriter.rewriteQuery(sourceQuery.getQuery(clusterType), sourceQuery.getQueryConfiguration(clusterType), clusterType, false);
DeterminismAnalysisRun.Builder run = determinismAnalysisDetails.addRun().setTableName(queryBundle.getObjectName().toString());
run.setClusterType(clusterType.name());
queryRuns.put(queryBundle, run);

// Rerun setup and main query
queryBundle.getSetupQueries().forEach(query -> runAndConsume(
() -> prestoAction.execute(query, DETERMINISM_ANALYSIS_SETUP),
() -> helperAction.execute(query, DETERMINISM_ANALYSIS_SETUP),
stats -> stats.getQueryStats().map(QueryStats::getQueryId).ifPresent(run::addSetupQueryId)));
runAndConsume(
() -> prestoAction.execute(queryBundle.getQuery(), DETERMINISM_ANALYSIS_MAIN),
() -> actionForQuery.execute(queryBundle.getQuery(), DETERMINISM_ANALYSIS_MAIN),
stats -> stats.getQueryStats().map(QueryStats::getQueryId).ifPresent(run::setQueryId));

// Run checksum query
Query checksumQuery = checksumValidator.generateChecksumQuery(queryBundle.getObjectName(), columns, Optional.empty());
ChecksumResult testChecksum = getOnlyElement(callAndConsume(
() -> prestoAction.execute(checksumQuery, DETERMINISM_ANALYSIS_CHECKSUM, ChecksumResult::fromResultSet),
() -> helperAction.execute(checksumQuery, DETERMINISM_ANALYSIS_CHECKSUM, ChecksumResult::fromResultSet),
stats -> stats.getQueryStats().map(QueryStats::getQueryId).ifPresent(run::setChecksumQueryId)).getResults());

DeterminismAnalysis analysis = matchResultToDeterminism(match(DataMatchResult.DataType.DATA, checksumValidator, columns, columns, controlChecksum, testChecksum));
DeterminismAnalysis analysis = matchResultToDeterminism(match(DataMatchResult.DataType.DATA, checksumValidator, columns, columns, referenceChecksum, testChecksum));
if (analysis != DETERMINISTIC) {
return analysis;
}
Expand All @@ -163,7 +166,7 @@ private DeterminismAnalysis analyze(QueryObjectBundle control, ChecksumResult co
finally {
if (runTeardown) {
queryRuns.forEach((queryBundle, run) -> teardownSafely(
prestoAction,
helperAction,
Optional.of(queryBundle),
queryStats -> queryStats.getQueryStats().map(QueryStats::getQueryId).ifPresent(run::addTeardownQueryId)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected ExplainMatchResult verify(
}

@Override
protected DeterminismAnalysisDetails analyzeDeterminism(QueryBundle control, ExplainMatchResult matchResult)
protected DeterminismAnalysisDetails analyzeDeterminism(QueryBundle controlObject, QueryBundle testObject, ExplainMatchResult matchResult)
{
throw new UnsupportedOperationException("analyzeDeterminism is not supported for ExplainVerification");
}
Expand Down
Loading

0 comments on commit 402108b

Please sign in to comment.