25
25
import com.facebook.presto.verifier.event.DeterminismAnalysisDetails;
26
26
import com.facebook.presto.verifier.event.DeterminismAnalysisRun;
27
27
import com.facebook.presto.verifier.prestoaction.PrestoAction;
28
+ import com.facebook.presto.verifier.prestoaction.QueryAction;
28
29
import com.facebook.presto.verifier.rewrite.QueryRewriter;
29
30
import com.google.common.annotations.VisibleForTesting;
30
31
import com.google.common.collect.ImmutableSet;
36
37
import java.util.Set;
37
38
import java.util.concurrent.atomic.AtomicBoolean;
38
39
39
- import static com.facebook.presto.verifier.framework.ClusterType.CONTROL;
40
40
import static com.facebook.presto.verifier.framework.DataVerificationUtil.getColumns;
41
41
import static com.facebook.presto.verifier.framework.DataVerificationUtil.match;
42
42
import static com.facebook.presto.verifier.framework.DataVerificationUtil.teardownSafely;
60
60
public class DeterminismAnalyzer
61
61
{
62
62
private final SourceQuery sourceQuery;
63
- private final PrestoAction prestoAction ;
63
+ private final PrestoAction helperAction ;
64
64
private final QueryRewriter queryRewriter;
65
65
private final ChecksumValidator checksumValidator;
66
66
private final TypeManager typeManager;
@@ -72,14 +72,14 @@ public class DeterminismAnalyzer
72
72
73
73
public DeterminismAnalyzer(
74
74
SourceQuery sourceQuery,
75
- PrestoAction prestoAction ,
75
+ PrestoAction helperAction ,
76
76
QueryRewriter queryRewriter,
77
77
ChecksumValidator checksumValidator,
78
78
TypeManager typeManager,
79
79
DeterminismAnalyzerConfig config)
80
80
{
81
81
this.sourceQuery = requireNonNull(sourceQuery, "sourceQuery is null");
82
- this.prestoAction = requireNonNull(prestoAction , "prestoAction is null");
82
+ this.helperAction = requireNonNull(helperAction , "helperAction is null");
83
83
this.queryRewriter = requireNonNull(queryRewriter, "queryRewriter is null");
84
84
this.checksumValidator = requireNonNull(checksumValidator, "checksumValidator is null");
85
85
this.typeManager = requireNonNull(typeManager, "typeManager is null");
@@ -90,26 +90,27 @@ public DeterminismAnalyzer(
90
90
this.handleLimitQuery = config.isHandleLimitQuery();
91
91
}
92
92
93
- protected DeterminismAnalysisDetails analyze(QueryObjectBundle control , ChecksumResult controlChecksum )
93
+ protected DeterminismAnalysisDetails analyze(QueryAction queryAction, QueryObjectBundle objectBundle , ChecksumResult referenceChecksum )
94
94
{
95
+ requireNonNull(queryAction, "queryAction is null");
95
96
DeterminismAnalysisDetails.Builder determinismAnalysisDetails = DeterminismAnalysisDetails.builder();
96
- DeterminismAnalysis analysis = analyze(control, controlChecksum , determinismAnalysisDetails);
97
+ DeterminismAnalysis analysis = analyze(queryAction, objectBundle, referenceChecksum , determinismAnalysisDetails);
97
98
return determinismAnalysisDetails.build(analysis);
98
99
}
99
100
100
- private DeterminismAnalysis analyze(QueryObjectBundle control , ChecksumResult controlChecksum , DeterminismAnalysisDetails.Builder determinismAnalysisDetails)
101
+ private DeterminismAnalysis analyze(QueryAction actionForQuery, QueryObjectBundle objectBundle , ChecksumResult referenceChecksum , DeterminismAnalysisDetails.Builder determinismAnalysisDetails)
101
102
{
102
103
// Handle mutable catalogs
103
- if (isNonDeterministicCatalogReferenced(control .getQuery())) {
104
+ if (isNonDeterministicCatalogReferenced(objectBundle .getQuery())) {
104
105
return NON_DETERMINISTIC_CATALOG;
105
106
}
106
107
107
108
// Handle limit query
108
109
LimitQueryDeterminismAnalysis limitQueryAnalysis = new LimitQueryDeterminismAnalyzer(
109
- prestoAction ,
110
+ helperAction ,
110
111
handleLimitQuery,
111
- control .getQuery(),
112
- controlChecksum .getRowCount(),
112
+ objectBundle .getQuery(),
113
+ referenceChecksum .getRowCount(),
113
114
determinismAnalysisDetails).analyze();
114
115
115
116
switch (limitQueryAnalysis) {
@@ -127,29 +128,31 @@ private DeterminismAnalysis analyze(QueryObjectBundle control, ChecksumResult co
127
128
}
128
129
129
130
// Rerun control query multiple times
130
- List<Column> columns = getColumns(prestoAction, typeManager, control.getObjectName());
131
+ List<Column> columns = getColumns(helperAction, typeManager, objectBundle.getObjectName());
132
+ ClusterType clusterType = objectBundle.getCluster();
131
133
Map<QueryBundle, DeterminismAnalysisRun.Builder> queryRuns = new HashMap<>();
132
134
try {
133
135
for (int i = 0; i < maxAnalysisRuns; i++) {
134
- QueryObjectBundle queryBundle = queryRewriter.rewriteQuery(sourceQuery.getQuery(CONTROL ), sourceQuery.getQueryConfiguration(CONTROL ), CONTROL , false);
136
+ QueryObjectBundle queryBundle = queryRewriter.rewriteQuery(sourceQuery.getQuery(clusterType ), sourceQuery.getQueryConfiguration(clusterType ), clusterType , false);
135
137
DeterminismAnalysisRun.Builder run = determinismAnalysisDetails.addRun().setTableName(queryBundle.getObjectName().toString());
138
+ run.setClusterType(clusterType.name());
136
139
queryRuns.put(queryBundle, run);
137
140
138
141
// Rerun setup and main query
139
142
queryBundle.getSetupQueries().forEach(query -> runAndConsume(
140
- () -> prestoAction .execute(query, DETERMINISM_ANALYSIS_SETUP),
143
+ () -> helperAction .execute(query, DETERMINISM_ANALYSIS_SETUP),
141
144
stats -> stats.getQueryStats().map(QueryStats::getQueryId).ifPresent(run::addSetupQueryId)));
142
145
runAndConsume(
143
- () -> prestoAction .execute(queryBundle.getQuery(), DETERMINISM_ANALYSIS_MAIN),
146
+ () -> actionForQuery .execute(queryBundle.getQuery(), DETERMINISM_ANALYSIS_MAIN),
144
147
stats -> stats.getQueryStats().map(QueryStats::getQueryId).ifPresent(run::setQueryId));
145
148
146
149
// Run checksum query
147
150
Query checksumQuery = checksumValidator.generateChecksumQuery(queryBundle.getObjectName(), columns, Optional.empty());
148
151
ChecksumResult testChecksum = getOnlyElement(callAndConsume(
149
- () -> prestoAction .execute(checksumQuery, DETERMINISM_ANALYSIS_CHECKSUM, ChecksumResult::fromResultSet),
152
+ () -> helperAction .execute(checksumQuery, DETERMINISM_ANALYSIS_CHECKSUM, ChecksumResult::fromResultSet),
150
153
stats -> stats.getQueryStats().map(QueryStats::getQueryId).ifPresent(run::setChecksumQueryId)).getResults());
151
154
152
- DeterminismAnalysis analysis = matchResultToDeterminism(match(DataMatchResult.DataType.DATA, checksumValidator, columns, columns, controlChecksum , testChecksum));
155
+ DeterminismAnalysis analysis = matchResultToDeterminism(match(DataMatchResult.DataType.DATA, checksumValidator, columns, columns, referenceChecksum , testChecksum));
153
156
if (analysis != DETERMINISTIC) {
154
157
return analysis;
155
158
}
@@ -163,7 +166,7 @@ private DeterminismAnalysis analyze(QueryObjectBundle control, ChecksumResult co
163
166
finally {
164
167
if (runTeardown) {
165
168
queryRuns.forEach((queryBundle, run) -> teardownSafely(
166
- prestoAction ,
169
+ helperAction ,
167
170
Optional.of(queryBundle),
168
171
queryStats -> queryStats.getQueryStats().map(QueryStats::getQueryId).ifPresent(run::addTeardownQueryId)));
169
172
}
0 commit comments