Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport] Add Normalize Pipeline Aggregation (#56399) #56792

Merged
merged 1 commit into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/aggregations/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,4 @@ include::pipeline/bucket-selector-aggregation.asciidoc[]
include::pipeline/bucket-sort-aggregation.asciidoc[]
include::pipeline/serial-diff-aggregation.asciidoc[]
include::pipeline/moving-percentiles-aggregation.asciidoc[]
include::pipeline/normalize-aggregation.asciidoc[]
182 changes: 182 additions & 0 deletions docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
[role="xpack"]
[testenv="basic"]
[[search-aggregations-pipeline-normalize-aggregation]]
=== Normalize Aggregation

A parent pipeline aggregation which calculates the specific normalized/rescaled value for a specific bucket value.
Values that cannot be normalized, will be skipped using the <<gap-policy, skip gap policy>>.

==== Syntax

A `normalize` aggregation looks like this in isolation:

[source,js]
--------------------------------------------------
{
"normalize": {
"buckets_path": "normalized",
"method": "percent_of_sum"
}
}
--------------------------------------------------
// NOTCONSOLE

[[normalize_pipeline-params]]
.`normalize_pipeline` Parameters
[options="header"]
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |The path to the buckets we wish to normalize (see <<buckets-path-syntax, `buckets_path` syntax>> for more details) |Required |
|`method` | The specific <<normalize_pipeline-method, method>> to apply | Required |
|`format` |format to apply to the output value of this aggregation |Optional |`null`
|===

==== Methods
[[normalize_pipeline-method]]

The Normalize Aggregation supports multiple methods to transform the bucket values. Each method definition will use
the following original set of bucket values as examples: `[5, 5, 10, 50, 10, 20]`.

_rescale_0_1_::
This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized
linearly in-between.

x' = (x - min_x) / (max_x - min_x)

[0, 0, .1111, 1, .1111, .3333]

_rescale_0_100_::
This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized
linearly in-between.

x' = 100 * (x - min_x) / (max_x - min_x)

[0, 0, 11.11, 100, 11.11, 33.33]

_percent_of_sum_::
This method normalizes each value so that it represents a percentage of the total sum it attributes to.

x' = x / sum_x

[5%, 5%, 10%, 50%, 10%, 20%]


_mean_::
This method normalizes such that each value is normalized by how much it differs from the average.

x' = (x - mean_x) / (max_x - min_x)

[4.63, 4.63, 9.63, 49.63, 9.63, 9.63, 19.63]

_zscore_::
This method normalizes such that each value represents how far it is from the mean relative to the standard deviation

x' = (x - mean_x) / stdev_x

[-0.68, -0.68, -0.39, 1.94, -0.39, 0.19]

_softmax_::
This method normalizes such that each value is exponentiated and relative to the sum of the exponents of the original values.

x' = e^x / sum_e_x

[2.862E-20, 2.862E-20, 4.248E-18, 0.999, 9.357E-14, 4.248E-18]


==== Example

The following snippet calculates the percent of total sales for each month:

[source,console]
--------------------------------------------------
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
},
"percent_of_total_sales": {
"normalize": {
"buckets_path": "sales", <1>
"method": "percent_of_sum", <2>
"format": "00.00%" <3>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]

<1> `buckets_path` instructs this normalize aggregation to use the output of the `sales` aggregation for rescaling
<2> `method` sets which rescaling to apply. In this case, `percent_of_sum` will calculate the sales value as a percent of all sales
in the parent bucket
<3> `format` influences how to format the metric as a string using Java's `DecimalFormat` pattern. In this case, multiplying by 100
and adding a '%'

And the following may be the response:

[source,console-result]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550.0
},
"percent_of_total_sales": {
"value": 0.5583756345177665,
"value_as_string": "55.84%"
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60.0
},
"percent_of_total_sales": {
"value": 0.06091370558375635,
"value_as_string": "06.09%"
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375.0
},
"percent_of_total_sales": {
"value": 0.38071065989847713,
"value_as_string": "38.07%"
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
public static final String NAME = "simple_value";
protected final double value;

InternalSimpleValue(String name, double value, DocValueFormat formatter, Map<String, Object> metadata) {
public InternalSimpleValue(String name, double value, DocValueFormat formatter, Map<String, Object> metadata) {
super(name, metadata);
this.format = formatter;
this.value = value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.aggregations.metrics.AnalyticsAggregatorFactory;
import org.elasticsearch.xpack.analytics.normalize.NormalizePipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
Expand Down Expand Up @@ -84,6 +85,11 @@ public List<PipelineAggregationSpec> getPipelineAggregations() {
MovingPercentilesPipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.MOVING_PERCENTILES,
checkLicense(MovingPercentilesPipelineAggregationBuilder.PARSER))));
pipelineAggs.add(new PipelineAggregationSpec(
NormalizePipelineAggregationBuilder.NAME,
NormalizePipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.NORMALIZE,
checkLicense(NormalizePipelineAggregationBuilder.PARSER))));
return pipelineAggs;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.analytics.normalize;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.DoubleUnaryOperator;
import java.util.function.Function;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Mean;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Percent;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOne;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOneHundred;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Softmax;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.ZScore;

public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<NormalizePipelineAggregationBuilder> {
public static final String NAME = "normalize";
static final ParseField METHOD_FIELD = new ParseField("method");

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<NormalizePipelineAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(
NAME, false, (args, name) -> new NormalizePipelineAggregationBuilder(name, (String) args[0],
(String) args[1], (List<String>) args[2]));

static {
PARSER.declareString(optionalConstructorArg(), FORMAT);
PARSER.declareString(constructorArg(), METHOD_FIELD);
PARSER.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD);
}

static final Map<String, Function<double[], DoubleUnaryOperator>> NAME_MAP;

static {
NAME_MAP = new HashMap<>();
NAME_MAP.put(RescaleZeroToOne.NAME, RescaleZeroToOne::new);
NAME_MAP.put(RescaleZeroToOneHundred.NAME, RescaleZeroToOneHundred::new);
NAME_MAP.put(Mean.NAME, Mean::new);
NAME_MAP.put(ZScore.NAME, ZScore::new);
NAME_MAP.put(Percent.NAME, Percent::new);
NAME_MAP.put(Softmax.NAME, Softmax::new);
}

static String validateMethodName(String name) {
if (NAME_MAP.containsKey(name)) {
return name;
}
throw new IllegalArgumentException("invalid method [" + name + "]");
}

private final String format;
private final String method;

public NormalizePipelineAggregationBuilder(String name, String format, String method, List<String> bucketsPath) {
super(name, NAME, bucketsPath.toArray(new String[0]));
this.format = format;
this.method = validateMethodName(method);
}

/**
* Read from a stream.
*/
public NormalizePipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, NAME);
format = in.readOptionalString();
method = in.readString();
}

@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
out.writeOptionalString(format);
out.writeString(method);
}

/**
* Gets the format to use on the output of this aggregation.
*/
public String format() {
return format;
}

protected DocValueFormat formatter() {
if (format != null) {
return new DocValueFormat.Decimal(format);
} else {
return DocValueFormat.RAW;
}
}

@Override
protected PipelineAggregator createInternal(Map<String, Object> metadata) {
return new NormalizePipelineAggregator(name, bucketsPaths, formatter(), NAME_MAP.get(method), metadata);
}

@Override
protected void validate(ValidationContext context) {
if (bucketsPaths.length != 1) {
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]");
}
context.validateHasParent(NAME, name);
}

@Override
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format);
}
builder.field(METHOD_FIELD.getPreferredName(), method);
return builder;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), format, method);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
NormalizePipelineAggregationBuilder other = (NormalizePipelineAggregationBuilder) obj;
return Objects.equals(format, other.format) && Objects.equals(method, other.method);
}

@Override
public String getWriteableName() {
return NAME;
}
}
Loading