Skip to content

Commit

Permalink
Create warning for approx_distinct and approx_set with low maxStandar…
Browse files Browse the repository at this point in the history
…dError
  • Loading branch information
stevechuck committed Mar 15, 2022
1 parent 40a79cd commit 2cc3635
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ public final class SystemSessionProperties
public static final String VERBOSE_RUNTIME_STATS_ENABLED = "verbose_runtime_stats_enabled";
public static final String STREAMING_FOR_PARTIAL_AGGREGATION_ENABLED = "streaming_for_partial_aggregation_enabled";
public static final String MAX_STAGE_COUNT_FOR_EAGER_SCHEDULING = "max_stage_count_for_eager_scheduling";
public static final String HYPERLOGLOG_STANDARD_ERROR_WARNING_THRESHOLD = "hyperloglog_standard_error_warning_threshold";

//TODO: Prestissimo related session properties that are temporarily put here. They will be relocated in the future
public static final String PRESTISSIMO_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "simplified_expression_evaluation_enabled";
Expand Down Expand Up @@ -1235,6 +1236,11 @@ public SystemSessionProperties(
MAX_STAGE_COUNT_FOR_EAGER_SCHEDULING,
"Maximum stage count to use eager scheduling when using the adaptive scheduling policy",
featuresConfig.getMaxStageCountForEagerScheduling(),
false),
doubleProperty(
HYPERLOGLOG_STANDARD_ERROR_WARNING_THRESHOLD,
"Threshold for obtaining precise results from aggregation functions",
featuresConfig.getHyperloglogStandardErrorWarningThreshold(),
false));
}

Expand Down Expand Up @@ -2075,4 +2081,9 @@ public static int getMaxStageCountForEagerScheduling(Session session)
{
return session.getSystemProperty(MAX_STAGE_COUNT_FOR_EAGER_SCHEDULING, Integer.class);
}

public static double getHyperloglogStandardErrorWarningThreshold(Session session)
{
return session.getSystemProperty(HYPERLOGLOG_STANDARD_ERROR_WARNING_THRESHOLD, Double.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import com.facebook.presto.spi.WarningCode;
import com.facebook.presto.spi.WarningCollector;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static com.facebook.presto.spi.StandardErrorCode.WARNING_AS_ERROR;
import static com.facebook.presto.spi.StandardWarningCode.PARSER_WARNING;
Expand All @@ -35,7 +35,7 @@ public class DefaultWarningCollector
implements WarningCollector
{
@GuardedBy("this")
private final Map<WarningCode, PrestoWarning> warnings = new LinkedHashMap<>();
private final Multimap<WarningCode, PrestoWarning> warnings = LinkedHashMultimap.create();
private final WarningCollectorConfig config;
private final WarningHandlingLevel warningHandlingLevel;

Expand Down Expand Up @@ -77,7 +77,7 @@ public synchronized List<PrestoWarning> getWarnings()
private synchronized void addWarningIfNumWarningsLessThanConfig(PrestoWarning warning)
{
if (warnings.size() < config.getMaxWarnings()) {
warnings.putIfAbsent(warning.getWarningCode(), warning);
warnings.put(warning.getWarningCode(), warning);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
@AggregationFunction("approx_distinct")
public final class DefaultApproximateCountDistinctAggregation
{
private static final double DEFAULT_STANDARD_ERROR = 0.023;
public static final double DEFAULT_STANDARD_ERROR = 0.023;

private DefaultApproximateCountDistinctAggregation() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
*/
package com.facebook.presto.sql.analyzer;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.operator.aggregation.ApproximateSetAggregation;
import com.facebook.presto.operator.aggregation.DefaultApproximateCountDistinctAggregation;
import com.facebook.presto.spi.PrestoWarning;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.sql.planner.ParameterRewriter;
Expand All @@ -30,6 +34,7 @@
import com.facebook.presto.sql.tree.ComparisonExpression;
import com.facebook.presto.sql.tree.CurrentTime;
import com.facebook.presto.sql.tree.DereferenceExpression;
import com.facebook.presto.sql.tree.DoubleLiteral;
import com.facebook.presto.sql.tree.ExistsPredicate;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.ExpressionTreeRewriter;
Expand Down Expand Up @@ -114,6 +119,7 @@ class AggregationAnalyzer
private final Scope sourceScope;
private final Optional<Scope> orderByScope;
private final WarningCollector warningCollector;
private final Session session;
private final FunctionResolution functionResolution;

public static void verifySourceAggregations(
Expand All @@ -122,9 +128,10 @@ public static void verifySourceAggregations(
Expression expression,
Metadata metadata,
Analysis analysis,
WarningCollector warningCollector)
WarningCollector warningCollector,
Session session)
{
AggregationAnalyzer analyzer = new AggregationAnalyzer(groupByExpressions, sourceScope, Optional.empty(), metadata, analysis, warningCollector);
AggregationAnalyzer analyzer = new AggregationAnalyzer(groupByExpressions, sourceScope, Optional.empty(), metadata, analysis, warningCollector, session);
analyzer.analyze(expression);
}

Expand All @@ -135,26 +142,29 @@ public static void verifyOrderByAggregations(
Expression expression,
Metadata metadata,
Analysis analysis,
WarningCollector warningCollector)
WarningCollector warningCollector,
Session session)
{
AggregationAnalyzer analyzer = new AggregationAnalyzer(groupByExpressions, sourceScope, Optional.of(orderByScope), metadata, analysis, warningCollector);
AggregationAnalyzer analyzer = new AggregationAnalyzer(groupByExpressions, sourceScope, Optional.of(orderByScope), metadata, analysis, warningCollector, session);
analyzer.analyze(expression);
}

private AggregationAnalyzer(List<Expression> groupByExpressions, Scope sourceScope, Optional<Scope> orderByScope, Metadata metadata, Analysis analysis, WarningCollector warningCollector)
private AggregationAnalyzer(List<Expression> groupByExpressions, Scope sourceScope, Optional<Scope> orderByScope, Metadata metadata, Analysis analysis, WarningCollector warningCollector, Session session)
{
requireNonNull(groupByExpressions, "groupByExpressions is null");
requireNonNull(sourceScope, "sourceScope is null");
requireNonNull(orderByScope, "orderByScope is null");
requireNonNull(metadata, "metadata is null");
requireNonNull(analysis, "analysis is null");
requireNonNull(warningCollector, "warningCollector is null");
requireNonNull(session, "session is null");

this.sourceScope = sourceScope;
this.warningCollector = warningCollector;
this.orderByScope = orderByScope;
this.metadata = metadata;
this.analysis = analysis;
this.warningCollector = warningCollector;
this.session = session;
this.functionResolution = new FunctionResolution(metadata.getFunctionAndTypeManager());
this.expressions = groupByExpressions.stream()
.map(e -> ExpressionTreeRewriter.rewriteWith(new ParameterRewriter(analysis.getParameters()), e))
Expand Down Expand Up @@ -335,6 +345,34 @@ protected Boolean visitFunctionCall(FunctionCall node, Void context)
PERFORMANCE_WARNING,
"COUNT(DISTINCT xxx) can be a very expensive operation when the cardinality is high for xxx. In most scenarios, using approx_distinct instead would be enough"));
}
if (functionResolution.isApproximateCountDistinctFunction(analysis.getFunctionHandle(node))) {
double maxStandardError = DefaultApproximateCountDistinctAggregation.DEFAULT_STANDARD_ERROR;
double lowestMaxStandardError = SystemSessionProperties.getHyperloglogStandardErrorWarningThreshold(session);
// maxStandardError is supplied
if (node.getArguments().size() > 1) {
Expression maxStandardErrorExpr = node.getArguments().get(1);
if (maxStandardErrorExpr instanceof DoubleLiteral) {
maxStandardError = ((DoubleLiteral) maxStandardErrorExpr).getValue();
}
}
if (maxStandardError <= lowestMaxStandardError) {
warningCollector.add(new PrestoWarning(PERFORMANCE_WARNING, String.format("approx_distinct can produce low-precision results with the current standard error: %.4f (<=%.4f)", maxStandardError, lowestMaxStandardError)));
}
}
if (functionResolution.isApproximateSetFunction(analysis.getFunctionHandle(node))) {
double maxStandardError = ApproximateSetAggregation.DEFAULT_STANDARD_ERROR;
double lowestMaxStandardError = SystemSessionProperties.getHyperloglogStandardErrorWarningThreshold(session);
// maxStandardError is supplied
if (node.getArguments().size() > 1) {
Expression maxStandardErrorExpr = node.getArguments().get(1);
if (maxStandardErrorExpr instanceof DoubleLiteral) {
maxStandardError = ((DoubleLiteral) maxStandardErrorExpr).getValue();
}
}
if (maxStandardError <= lowestMaxStandardError) {
warningCollector.add(new PrestoWarning(PERFORMANCE_WARNING, String.format("approx_set can produce low-precision results with the current standard error: %.4f (<=%.4f)", maxStandardError, lowestMaxStandardError)));
}
}
if (!node.getWindow().isPresent()) {
List<FunctionCall> aggregateFunctions = extractAggregateFunctions(analysis.getFunctionHandles(), node.getArguments(), metadata.getFunctionAndTypeManager());
List<FunctionCall> windowFunctions = extractWindowFunctions(node.getArguments());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ public class FeaturesConfig

private int maxStageCountForEagerScheduling = 25;

private double hyperloglogStandardErrorWarningThreshold = 0.004;

public enum PartitioningPrecisionStrategy
{
// Let Presto decide when to repartition
Expand Down Expand Up @@ -1985,4 +1987,17 @@ public FeaturesConfig setMaxStageCountForEagerScheduling(int maxStageCountForEag
this.maxStageCountForEagerScheduling = maxStageCountForEagerScheduling;
return this;
}

public double getHyperloglogStandardErrorWarningThreshold()
{
return hyperloglogStandardErrorWarningThreshold;
}

@Config("hyperloglog-standard-error-warning-threshold")
@ConfigDescription("aggregation functions can produce low-precision results when the max standard error lower than this value.")
public FeaturesConfig setHyperloglogStandardErrorWarningThreshold(double hyperloglogStandardErrorWarningThreshold)
{
this.hyperloglogStandardErrorWarningThreshold = hyperloglogStandardErrorWarningThreshold;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2554,11 +2554,11 @@ private void verifyAggregations(
.collect(toImmutableList());

for (Expression expression : outputExpressions) {
verifySourceAggregations(distinctGroupingColumns, sourceScope, expression, metadata, analysis, warningCollector);
verifySourceAggregations(distinctGroupingColumns, sourceScope, expression, metadata, analysis, warningCollector, session);
}

for (Expression expression : orderByExpressions) {
verifyOrderByAggregations(distinctGroupingColumns, sourceScope, orderByScope.get(), expression, metadata, analysis, warningCollector);
verifyOrderByAggregations(distinctGroupingColumns, sourceScope, orderByScope.get(), expression, metadata, analysis, warningCollector, session);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,28 @@ public FunctionHandle minFunction(Type valueType)
{
return functionAndTypeManager.lookupFunction("min", fromTypes(valueType));
}

@Override
public boolean isApproximateCountDistinctFunction(FunctionHandle functionHandle)
{
return functionAndTypeManager.getFunctionMetadata(functionHandle).getName().equals(QualifiedObjectName.valueOf(DEFAULT_NAMESPACE, "approx_distinct"));
}

@Override
public FunctionHandle approximateCountDistinctFunction(Type valueType)
{
return functionAndTypeManager.lookupFunction("approx_distinct", fromTypes(valueType));
}

@Override
public boolean isApproximateSetFunction(FunctionHandle functionHandle)
{
return functionAndTypeManager.getFunctionMetadata(functionHandle).getName().equals(QualifiedObjectName.valueOf(DEFAULT_NAMESPACE, "approx_set"));
}

@Override
public FunctionHandle approximateSetFunction(Type valueType)
{
return functionAndTypeManager.lookupFunction("approx_set", fromTypes(valueType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import com.facebook.presto.spi.WarningCollector;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.String.format;
Expand All @@ -36,7 +36,7 @@ public class TestingWarningCollector
implements WarningCollector
{
@GuardedBy("this")
private final Map<WarningCode, PrestoWarning> warnings = new LinkedHashMap<>();
private final Multimap<WarningCode, PrestoWarning> warnings = LinkedHashMultimap.create();
private final WarningCollectorConfig config;

private final boolean addWarnings;
Expand All @@ -59,7 +59,7 @@ public synchronized void add(PrestoWarning warning)
{
requireNonNull(warning, "warning is null");
if (warnings.size() < config.getMaxWarnings()) {
warnings.putIfAbsent(warning.getWarningCode(), warning);
warnings.put(warning.getWarningCode(), warning);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ public void testMaxWarnings()
assertEquals(warningCollector.getWarnings().size(), 2);
}

@Test
public void testAddingSameTypeWarnings()
{
WarningCollector warningCollector = new DefaultWarningCollector(new WarningCollectorConfig(), WarningHandlingLevel.NORMAL);
warningCollector.add(new PrestoWarning(new WarningCode(1, "1"), "warning 1-1"));
warningCollector.add(new PrestoWarning(new WarningCode(1, "1"), "warning 1-2"));
warningCollector.add(new PrestoWarning(new WarningCode(2, "2"), "warning 2"));
warningCollector.add(new PrestoWarning(new WarningCode(3, "3"), "warning 3"));
assertEquals(warningCollector.getWarnings().size(), 4);
}

@Test
public void testWarningSuppress()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class TestAnalyzer
private static void assertHasWarning(WarningCollector warningCollector, StandardWarningCode code, String match)
{
List<PrestoWarning> warnings = warningCollector.getWarnings();
assertEquals(warnings.size(), 1);
assertTrue(warnings.size() > 0);
PrestoWarning warning = warnings.get(0);
assertEquals(warning.getWarningCode(), code.toWarningCode());
assertTrue(warning.getMessage().startsWith(match));
Expand Down Expand Up @@ -1115,6 +1115,66 @@ public void testCountDistinctPerformanceWarning()
assertTrue(warning.getMessage().contains("COUNT(DISTINCT xxx)"));
}

@Test
public void testApproxDistinctPerformanceWarning()
{
WarningCollector warningCollector = analyzeWithWarnings("SELECT approx_distinct(a) FROM t1 GROUP BY b");
List<PrestoWarning> warnings = warningCollector.getWarnings();
assertEquals(warnings.size(), 0);

warningCollector = analyzeWithWarnings("SELECT approx_distinct(a, 0.0025E0) FROM t1 GROUP BY b");
warnings = warningCollector.getWarnings();
assertEquals(warnings.size(), 1);

// Ensure warning is the performance warning we expect
PrestoWarning warning = warnings.get(0);
assertEquals(warning.getWarningCode(), PERFORMANCE_WARNING.toWarningCode());
assertTrue(warning.getMessage().startsWith("approx_distinct"));

// Ensure warning is only issued for values lower than threshold
warningCollector = analyzeWithWarnings("SELECT approx_distinct(a, 0.0055E0) FROM t1 GROUP BY b");
warnings = warningCollector.getWarnings();
assertEquals(warnings.size(), 0);
}

@Test
public void testApproxSetPerformanceWarning()
{
WarningCollector warningCollector = analyzeWithWarnings("SELECT approx_set(a) FROM t1 GROUP BY b");
List<PrestoWarning> warnings = warningCollector.getWarnings();
assertEquals(warnings.size(), 0);

warningCollector = analyzeWithWarnings("SELECT approx_set(a, 0.0015E0) FROM t1 GROUP BY b");
warnings = warningCollector.getWarnings();
assertEquals(warnings.size(), 1);

// Ensure warning is the performance warning we expect
PrestoWarning warning = warnings.get(0);
assertEquals(warning.getWarningCode(), PERFORMANCE_WARNING.toWarningCode());
assertTrue(warning.getMessage().startsWith("approx_set"));

// Ensure warning is only issued for values lower than threshold
warningCollector = analyzeWithWarnings("SELECT approx_set(a, 0.0055E0) FROM t1 GROUP BY b");
warnings = warningCollector.getWarnings();
assertEquals(warnings.size(), 0);
}

@Test
public void testApproxDistinctAndApproxSetPerformanceWarning()
{
WarningCollector warningCollector = analyzeWithWarnings("SELECT approx_distinct(a, 0.0025E0), approx_set(a, 0.0013E0) FROM t1 GROUP BY b");
List<PrestoWarning> warnings = warningCollector.getWarnings();
assertEquals(warnings.size(), 2);

// Ensure warnings are the performance warnings we expect
PrestoWarning approxDistinctWarning = warnings.get(0);
assertEquals(approxDistinctWarning.getWarningCode(), PERFORMANCE_WARNING.toWarningCode());
assertTrue(approxDistinctWarning.getMessage().startsWith("approx_distinct"));
PrestoWarning approxSetWarning = warnings.get(1);
assertEquals(approxSetWarning.getWarningCode(), PERFORMANCE_WARNING.toWarningCode());
assertTrue(approxSetWarning.getMessage().startsWith("approx_set"));
}

@Test
public void testUnionNoPerformanceWarning()
{
Expand Down
Loading

0 comments on commit 2cc3635

Please sign in to comment.