Skip to content

Commit

Permalink
[Remove] Deprecated serialization logic from pipeline aggs (#4847)
Browse files Browse the repository at this point in the history
Removes the deprecated readTo/writeFrom serialization logic from piepline aggs
that is no longer used as of Legacy version 7.8.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize authored Oct 20, 2022
1 parent 3344738 commit 515f84b
Show file tree
Hide file tree
Showing 29 changed files with 22 additions and 800 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Always auto release the flood stage block ([#4703](https://github.com/opensearch-project/OpenSearch/pull/4703))
- Remove LegacyESVersion.V_7_4_ and V_7_5_ Constants ([#4704](https://github.com/opensearch-project/OpenSearch/pull/4704))
- Remove Legacy Version support from Snapshot/Restore Service ([#4728](https://github.com/opensearch-project/OpenSearch/pull/4728))
- Remove deprecated serialization logic from pipeline aggs ([#4847](https://github.com/opensearch-project/OpenSearch/pull/4847))

### Fixed
- `opensearch-service.bat start` and `opensearch-service.bat manager` failing to run ([#4289](https://github.com/opensearch-project/OpenSearch/pull/4289))
Expand Down
74 changes: 1 addition & 73 deletions server/src/main/java/org/opensearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -621,52 +621,6 @@ class PipelineAggregationSpec extends SearchExtensionSpec<
PipelineAggregationBuilder,
ContextParser<String, ? extends PipelineAggregationBuilder>> {
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
/**
* Read the aggregator from a stream.
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
*/
@Deprecated
private final Writeable.Reader<? extends PipelineAggregator> aggregatorReader;

/**
* Specification of a {@link PipelineAggregator}.
*
* @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it
* is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and
* {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}. It is an error if
* {@link ParseField#getPreferredName()} conflicts with another registered name, including names from other plugins.
* @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param parser reads the aggregation builder from XContent
*/
public PipelineAggregationSpec(
ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser
) {
super(name, builderReader, parser);
this.aggregatorReader = null;
}

/**
* Specification of a {@link PipelineAggregator}.
*
* @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the
* {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from
* {@link NamedWriteable#getWriteableName()}. It is an error if this name conflicts with another registered name, including
* names from other plugins.
* @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param parser reads the aggregation builder from XContent
*/
public PipelineAggregationSpec(
String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser
) {
super(name, builderReader, parser);
this.aggregatorReader = null;
}

/**
* Specification of a {@link PipelineAggregator}.
Expand All @@ -677,20 +631,14 @@ public PipelineAggregationSpec(
* {@link ParseField#getPreferredName()} conflicts with another registered name, including names from other plugins.
* @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(ParseField, Writeable.Reader, ContextParser)} for
* pipelines implemented after 7.8.0
*/
@Deprecated
public PipelineAggregationSpec(
ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser
) {
super(name, builderReader, parser);
this.aggregatorReader = aggregatorReader;
}

/**
Expand All @@ -702,20 +650,15 @@ public PipelineAggregationSpec(
* names from other plugins.
* @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(String, Writeable.Reader, ContextParser)} for pipelines
* implemented after 7.8.0
*/
@Deprecated
public PipelineAggregationSpec(
String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser
) {
super(name, builderReader, parser);
this.aggregatorReader = aggregatorReader;
}

/**
Expand All @@ -727,19 +670,16 @@ public PipelineAggregationSpec(
* {@link ParseField#getPreferredName()} conflicts with another registered name, including names from other plugins.
* @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated prefer the ctor that takes a {@link ContextParser}
*/
@Deprecated
public PipelineAggregationSpec(
ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
PipelineAggregator.Parser parser
) {
super(name, builderReader, (p, n) -> parser.parse(n, p));
this.aggregatorReader = aggregatorReader;
}

/**
Expand All @@ -751,18 +691,15 @@ public PipelineAggregationSpec(
* names from other plugins.
* @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @deprecated prefer the ctor that takes a {@link ContextParser}
*/
@Deprecated
public PipelineAggregationSpec(
String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
PipelineAggregator.Parser parser
) {
super(name, builderReader, (p, n) -> parser.parse(n, p));
this.aggregatorReader = aggregatorReader;
}

/**
Expand All @@ -781,15 +718,6 @@ public PipelineAggregationSpec addResultReader(String writeableName, Writeable.R
return this;
}

/**
* Read the aggregator from a stream.
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
*/
@Deprecated
public Writeable.Reader<? extends PipelineAggregator> getAggregatorReader() {
return aggregatorReader;
}

/**
* Get the readers that must be registered for this aggregation's results.
*/
Expand Down
36 changes: 0 additions & 36 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,21 +211,14 @@
import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.opensearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.AvgBucketPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.BucketSelectorPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.BucketSelectorPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.BucketSortPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.BucketSortPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.CumulativeSumPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.CumulativeSumPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.DerivativePipelineAggregator;
import org.opensearch.search.aggregations.pipeline.EwmaModel;
import org.opensearch.search.aggregations.pipeline.ExtendedStatsBucketParser;
import org.opensearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.HoltLinearModel;
import org.opensearch.search.aggregations.pipeline.HoltWintersModel;
import org.opensearch.search.aggregations.pipeline.InternalBucketMetricValue;
Expand All @@ -236,24 +229,15 @@
import org.opensearch.search.aggregations.pipeline.InternalStatsBucket;
import org.opensearch.search.aggregations.pipeline.LinearModel;
import org.opensearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.MaxBucketPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.MinBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.MinBucketPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.MovAvgModel;
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.MovFnPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.MovFnPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.pipeline.SerialDiffPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.SerialDiffPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.SimpleModel;
import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregator;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.fetch.FetchSubPhase;
Expand Down Expand Up @@ -710,15 +694,13 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
new PipelineAggregationSpec(
DerivativePipelineAggregationBuilder.NAME,
DerivativePipelineAggregationBuilder::new,
DerivativePipelineAggregator::new,
DerivativePipelineAggregationBuilder::parse
).addResultReader(InternalDerivative::new)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
MaxBucketPipelineAggregationBuilder.NAME,
MaxBucketPipelineAggregationBuilder::new,
MaxBucketPipelineAggregator::new,
MaxBucketPipelineAggregationBuilder.PARSER
)
// This bucket is used by many pipeline aggreations.
Expand All @@ -728,7 +710,6 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
new PipelineAggregationSpec(
MinBucketPipelineAggregationBuilder.NAME,
MinBucketPipelineAggregationBuilder::new,
MinBucketPipelineAggregator::new,
MinBucketPipelineAggregationBuilder.PARSER
)
/* Uses InternalBucketMetricValue */
Expand All @@ -737,7 +718,6 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
new PipelineAggregationSpec(
AvgBucketPipelineAggregationBuilder.NAME,
AvgBucketPipelineAggregationBuilder::new,
AvgBucketPipelineAggregator::new,
AvgBucketPipelineAggregationBuilder.PARSER
)
// This bucket is used by many pipeline aggreations.
Expand All @@ -747,7 +727,6 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
new PipelineAggregationSpec(
SumBucketPipelineAggregationBuilder.NAME,
SumBucketPipelineAggregationBuilder::new,
SumBucketPipelineAggregator::new,
SumBucketPipelineAggregationBuilder.PARSER
)
/* Uses InternalSimpleValue */
Expand All @@ -756,31 +735,27 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
new PipelineAggregationSpec(
StatsBucketPipelineAggregationBuilder.NAME,
StatsBucketPipelineAggregationBuilder::new,
StatsBucketPipelineAggregator::new,
StatsBucketPipelineAggregationBuilder.PARSER
).addResultReader(InternalStatsBucket::new)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
ExtendedStatsBucketPipelineAggregationBuilder.NAME,
ExtendedStatsBucketPipelineAggregationBuilder::new,
ExtendedStatsBucketPipelineAggregator::new,
new ExtendedStatsBucketParser()
).addResultReader(InternalExtendedStatsBucket::new)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
PercentilesBucketPipelineAggregationBuilder.NAME,
PercentilesBucketPipelineAggregationBuilder::new,
PercentilesBucketPipelineAggregator::new,
PercentilesBucketPipelineAggregationBuilder.PARSER
).addResultReader(InternalPercentilesBucket::new)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
MovAvgPipelineAggregationBuilder.NAME,
MovAvgPipelineAggregationBuilder::new,
MovAvgPipelineAggregator::new,
(XContentParser parser, String name) -> MovAvgPipelineAggregationBuilder.parse(
movingAverageModelParserRegistry,
name,
Expand All @@ -792,47 +767,41 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
new PipelineAggregationSpec(
CumulativeSumPipelineAggregationBuilder.NAME,
CumulativeSumPipelineAggregationBuilder::new,
CumulativeSumPipelineAggregator::new,
CumulativeSumPipelineAggregationBuilder.PARSER
)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
BucketScriptPipelineAggregationBuilder.NAME,
BucketScriptPipelineAggregationBuilder::new,
BucketScriptPipelineAggregator::new,
BucketScriptPipelineAggregationBuilder.PARSER
)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
BucketSelectorPipelineAggregationBuilder.NAME,
BucketSelectorPipelineAggregationBuilder::new,
BucketSelectorPipelineAggregator::new,
BucketSelectorPipelineAggregationBuilder::parse
)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
BucketSortPipelineAggregationBuilder.NAME,
BucketSortPipelineAggregationBuilder::new,
BucketSortPipelineAggregator::new,
BucketSortPipelineAggregationBuilder::parse
)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
SerialDiffPipelineAggregationBuilder.NAME,
SerialDiffPipelineAggregationBuilder::new,
SerialDiffPipelineAggregator::new,
SerialDiffPipelineAggregationBuilder::parse
)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
MovFnPipelineAggregationBuilder.NAME,
MovFnPipelineAggregationBuilder::new,
MovFnPipelineAggregator::new,
MovFnPipelineAggregationBuilder.PARSER
)
);
Expand All @@ -847,11 +816,6 @@ private void registerPipelineAggregation(PipelineAggregationSpec spec) {
namedWriteables.add(
new NamedWriteableRegistry.Entry(PipelineAggregationBuilder.class, spec.getName().getPreferredName(), spec.getReader())
);
if (spec.getAggregatorReader() != null) {
namedWriteables.add(
new NamedWriteableRegistry.Entry(PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader())
);
}
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> resultReader : spec.getResultReaders().entrySet()) {
namedWriteables.add(
new NamedWriteableRegistry.Entry(InternalAggregation.class, resultReader.getKey(), resultReader.getValue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ public void execute(SearchContext context) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
}
context.queryResult()
.aggregations(new InternalAggregations(aggregations, context.request().source().aggregations()::buildPipelineTree));
context.queryResult().aggregations(new InternalAggregations(aggregations));

// disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null);
Expand Down
Loading

0 comments on commit 515f84b

Please sign in to comment.