From bbddbcd76d91b6219824b70bcdfb060b4c01e2dd Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 2 Mar 2020 10:34:30 -0800 Subject: [PATCH] insilico --- .../support/MultiValuesSource.java | 36 +++++ .../GeoLineAggregationBuilder.java | 106 ++++++++++++ .../aggregations/GeoLineAggregator.java | 151 ++++++++++++++++++ .../GeoLineAggregatorFactory.java | 48 ++++++ .../search/aggregations/InternalGeoLine.java | 129 +++++++++++++++ .../search/aggregations/PathArraySorter.java | 47 ++++++ .../aggregations/GeoLineAggregatorTests.java | 133 +++++++++++++++ 7 files changed, 650 insertions(+) create mode 100644 x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilder.java create mode 100644 x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregator.java create mode 100644 x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorFactory.java create mode 100644 x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLine.java create mode 100644 x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/PathArraySorter.java create mode 100644 x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSource.java index 3a8bd9e12feca..5c14649a5e3ec 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/MultiValuesSource.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.support; import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.index.fielddata.MultiGeoPointValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.index.query.QueryShardContext; @@ -51,6 +52,41 @@ public SortedNumericDoubleValues getField(String fieldName, LeafReaderContext ct } } + public static class AnyMultiValuesSource extends MultiValuesSource { + public AnyMultiValuesSource(Map> valuesSourceConfigs, + QueryShardContext context) { + values = new HashMap<>(valuesSourceConfigs.size()); + for (Map.Entry> entry : valuesSourceConfigs.entrySet()) { + values.put(entry.getKey(), entry.getValue().toValuesSource(context)); + } + } + + private ValuesSource getField(String fieldName) { + ValuesSource valuesSource = values.get(fieldName); + if (valuesSource == null) { + throw new IllegalArgumentException("Could not find field name [" + fieldName + "] in multiValuesSource"); + } + return valuesSource; + } + + public SortedNumericDoubleValues getNumericField(String fieldName, LeafReaderContext ctx) throws IOException { + ValuesSource valuesSource = getField(fieldName); + if (valuesSource instanceof ValuesSource.Numeric) { + return ((ValuesSource.Numeric) valuesSource).doubleValues(ctx); + } + throw new IllegalArgumentException("field [" + fieldName + "] is not a numeric type"); + } + + public MultiGeoPointValues getGeoPointField(String fieldName, LeafReaderContext ctx) { + ValuesSource valuesSource = getField(fieldName); + if (valuesSource instanceof ValuesSource.GeoPoint) { + return ((ValuesSource.GeoPoint) valuesSource).geoPointValues(ctx); + } + throw new IllegalArgumentException("field [" + fieldName + "] is not a geo_point type"); + } + + } + public boolean needsScores() { return values.values().stream().anyMatch(ValuesSource::needsScores); } diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilder.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilder.java new file mode 100644 index 0000000000000..bc352eed08431 --- /dev/null +++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilder.java @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.spatial.search.aggregations; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregationBuilder; +import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig; +import org.elasticsearch.search.aggregations.support.MultiValuesSourceParseHelper; +import org.elasticsearch.search.aggregations.support.ValueType; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class GeoLineAggregationBuilder + extends MultiValuesSourceAggregationBuilder { + + static final ParseField GEO_POINT_FIELD = new ParseField("geo_point"); + static final ParseField SORT_FIELD = new ParseField("sort"); + + static final String NAME = "geo_line"; + + private static final ObjectParser PARSER; + static { + PARSER = new ObjectParser<>(NAME); + MultiValuesSourceParseHelper.declareCommon(PARSER, true, ValueType.NUMERIC); + MultiValuesSourceParseHelper.declareField(GEO_POINT_FIELD.getPreferredName(), PARSER, true, false); + MultiValuesSourceParseHelper.declareField(SORT_FIELD.getPreferredName(), PARSER, true, false); + } + + GeoLineAggregationBuilder(String name) { + super(name, null); + } + + private GeoLineAggregationBuilder(GeoLineAggregationBuilder clone, + AggregatorFactories.Builder factoriesBuilder, Map metaData) { + super(clone, factoriesBuilder, metaData); + } + + /** + * Read from a stream. + */ + GeoLineAggregationBuilder(StreamInput in) throws IOException { + super(in, null); + } + + static AggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException { + return PARSER.parse(parser, new GeoLineAggregationBuilder(aggregationName), null); + } + + @Override + protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metaData) { + return new GeoLineAggregationBuilder(this, factoriesBuilder, metaData); + } + + @Override + protected void innerWriteTo(StreamOutput out) { + // Do nothing, no extra state to write to stream + } + + @Override + protected MultiValuesSourceAggregatorFactory innerBuild(QueryShardContext queryShardContext, Map> configs, DocValueFormat format, AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder) throws IOException { + return new GeoLineAggregatorFactory(name, configs, format, queryShardContext, parent, subFactoriesBuilder, metaData); + } + + public GeoLineAggregationBuilder value(MultiValuesSourceFieldConfig valueConfig) { + valueConfig = Objects.requireNonNull(valueConfig, "Configuration for field [" + GEO_POINT_FIELD + "] cannot be null"); + field(GEO_POINT_FIELD.getPreferredName(), valueConfig); + return this; + } + + public GeoLineAggregationBuilder sort(MultiValuesSourceFieldConfig sortConfig) { + sortConfig = Objects.requireNonNull(sortConfig, "Configuration for field [" + SORT_FIELD + "] cannot be null"); + field(SORT_FIELD.getPreferredName(), sortConfig); + return this; + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, ToXContent.Params params) { + return builder; + } + + @Override + public String getType() { + return NAME; + } +} diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregator.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregator.java new file mode 100644 index 0000000000000..a50e8997cd139 --- /dev/null +++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregator.java @@ -0,0 +1,151 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.spatial.search.aggregations; + +import org.apache.lucene.geo.GeoEncodingUtils; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.IntArray; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.index.fielddata.MultiGeoPointValues; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.metrics.MetricsAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.MultiValuesSource; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.spatial.search.aggregations.GeoLineAggregationBuilder.GEO_POINT_FIELD; +import static org.elasticsearch.xpack.spatial.search.aggregations.GeoLineAggregationBuilder.SORT_FIELD; + +/** + * Metric Aggregation for computing the pearson product correlation coefficient between multiple fields + **/ +final class GeoLineAggregator extends MetricsAggregator { + /** Multiple ValuesSource with field names */ + private final MultiValuesSource.AnyMultiValuesSource valuesSources; + + private ObjectArray paths; + private ObjectArray sortValues; + private IntArray idxs; + + GeoLineAggregator(String name, MultiValuesSource.AnyMultiValuesSource valuesSources, SearchContext context, + Aggregator parent, List pipelineAggregators, + Map metaData) throws IOException { + super(name, context, parent, pipelineAggregators, metaData); + this.valuesSources = valuesSources; + if (valuesSources != null) { + paths = context.bigArrays().newObjectArray(1); + sortValues = context.bigArrays().newObjectArray(1); + idxs = context.bigArrays().newIntArray(1); + } + } + + @Override + public ScoreMode scoreMode() { + if (valuesSources != null && valuesSources.needsScores()) { + return ScoreMode.COMPLETE; + } + return super.scoreMode(); + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, + final LeafBucketCollector sub) throws IOException { + if (valuesSources == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final BigArrays bigArrays = context.bigArrays(); + MultiGeoPointValues docGeoPointValues = valuesSources.getGeoPointField(GEO_POINT_FIELD.getPreferredName(), ctx); + SortedNumericDoubleValues docSortValues = valuesSources.getNumericField(SORT_FIELD.getPreferredName(), ctx); + + return new LeafBucketCollectorBase(sub, docGeoPointValues) { + @Override + public void collect(int doc, long bucket) throws IOException { + paths = bigArrays.grow(paths, bucket + 1); + if (docGeoPointValues.advanceExact(doc) && docSortValues.advanceExact(doc)) { + if (docSortValues.docValueCount() > 1) { + throw new AggregationExecutionException("Encountered more than one sort value for a " + + "single document. Use a script to combine multiple sort-values-per-doc into a single value."); + } + if (docGeoPointValues.docValueCount() > 1) { + throw new AggregationExecutionException("Encountered more than one geo_point value for a " + + "single document. Use a script to combine multiple geo_point-values-per-doc into a single value."); + } + + // There should always be one weight if advanceExact lands us here, either + // a real weight or a `missing` weight + assert docSortValues.docValueCount() == 1; + assert docGeoPointValues.docValueCount() == 1; + final double sort = docSortValues.nextValue(); + final GeoPoint point = docGeoPointValues.nextValue(); + + int idx = idxs.get(bucket); + long[] bucketLine = paths.get(bucket); + double[] sortVals = sortValues.get(bucket); + if (bucketLine == null) { + bucketLine = new long[10]; + } else { + bucketLine = ArrayUtil.grow(bucketLine, idx + 1); + } + + + if (sortVals == null) { + sortVals = new double[10]; + } else { + sortVals = ArrayUtil.grow(sortVals, idx + 1); + } + + int encodedLat = GeoEncodingUtils.encodeLatitude(point.lat()); + int encodedLon = GeoEncodingUtils.encodeLongitude(point.lon()); + long lonLat = (((long) encodedLon) << 32) | (encodedLat & 0xffffffffL); + + sortVals[idx] = sort; + bucketLine[idx] = lonLat; + + paths.set(bucket, bucketLine); + sortValues.set(bucket, sortVals); + idxs.set(bucket, idx + 1); + } + } + }; + } + + @Override + public InternalAggregation buildAggregation(long bucket) { + if (valuesSources == null) { + return buildEmptyAggregation(); + } + long[] bucketLine = paths.get(bucket); + double[] sortVals = sortValues.get(bucket); + int length = idxs.get(bucket); + new PathArraySorter(bucketLine, sortVals, length).sort(); + return new InternalGeoLine(name, bucketLine, sortVals, length, pipelineAggregators(), metaData()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalGeoLine(name, null, null, 0, pipelineAggregators(), metaData()); + } + + @Override + public void doClose() { + Releasables.close(paths, idxs, sortValues); + } +} diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorFactory.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorFactory.java new file mode 100644 index 0000000000000..b934e147882c6 --- /dev/null +++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorFactory.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.spatial.search.aggregations; + +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.MultiValuesSource; +import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +final class GeoLineAggregatorFactory extends MultiValuesSourceAggregatorFactory { + + GeoLineAggregatorFactory(String name, + Map> configs, + DocValueFormat format, QueryShardContext queryShardContext, AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metaData) throws IOException { + super(name, configs, format, queryShardContext, parent, subFactoriesBuilder, metaData); + } + + @Override + protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, List pipelineAggregators, + Map metaData) throws IOException { + return new GeoLineAggregator(name, null, searchContext, parent, pipelineAggregators, metaData); + } + + @Override + protected Aggregator doCreateInternal(SearchContext searchContext, Map> configs, + DocValueFormat format, Aggregator parent, boolean collectsFromSingleBucket, + List pipelineAggregators, Map metaData) throws IOException { + MultiValuesSource.AnyMultiValuesSource valuesSources = new MultiValuesSource + .AnyMultiValuesSource(configs, searchContext.getQueryShardContext()); + return new GeoLineAggregator(name, valuesSources, searchContext, parent, pipelineAggregators, metaData); + } +} diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLine.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLine.java new file mode 100644 index 0000000000000..b8b787335e93d --- /dev/null +++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLine.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.spatial.search.aggregations; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.geo.GeoEncodingUtils; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.geometry.Line; +import org.elasticsearch.geometry.utils.WellKnownText; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.lucene.util.ArrayUtil.grow; + +/** + * A single line string representing a sorted sequence of geo-points + */ +public class InternalGeoLine extends InternalAggregation { + private static final Logger logger = LogManager.getLogger(InternalGeoLine.class); + + private long[] line; + private double[] sortVals; + private int length; + + InternalGeoLine(String name, long[] line, double[] sortVals, int length, List pipelineAggregators, Map metaData) { + super(name, pipelineAggregators, metaData); + this.line = line; + this.sortVals = sortVals; + this.length = length; + } + + /** + * Read from a stream. + */ + public InternalGeoLine(StreamInput in) throws IOException { + super(in); + this.line = in.readLongArray(); + this.length = in.readVInt(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeLongArray(line); + out.writeVInt(length); + } + + @Override + public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { + int mergedSize = 0; + for (InternalAggregation aggregation : aggregations) { + InternalGeoLine geoLine = (InternalGeoLine) aggregation; + mergedSize += geoLine.length; + } + + long[] finalList = new long[mergedSize]; + double[] finalSortVals = new double[mergedSize]; + int idx = 0; + for (InternalAggregation aggregation : aggregations) { + InternalGeoLine geoLine = (InternalGeoLine) aggregation; + for (int i = 0; i < geoLine.length; i++) { + finalSortVals[idx] = geoLine.sortVals[i]; + finalList[idx++] = geoLine.line[i]; + } + } + + new PathArraySorter(finalList, finalSortVals, length).sort(); + + // sort the final list + return new InternalGeoLine(name, finalList, finalSortVals, mergedSize, pipelineAggregators(), getMetaData()); + } + + @Override + public String getWriteableName() { + return GeoLineAggregationBuilder.NAME; + } + + public long[] line() { + return line; + } + + public int length() { + return length; + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field("type", "LineString"); + final List coordinates = new ArrayList<>(); + for (int i = 0; i < length; i++) { + int x = (int) line[i] >> 32; + int y = (int) line[i]; + coordinates.add(new double[] { GeoEncodingUtils.decodeLongitude(x), GeoEncodingUtils.decodeLatitude(y) }); + } + + builder.array("coordinates", coordinates.toArray()); + builder.array("sorts", sortVals); + return builder; + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @Override + public Object getProperty(List path) { + logger.error("what in the world"); + if (path.isEmpty()) { + return this; + } else if (path.size() == 1 && "value".equals(path.get(0))) { + return line; + } else { + throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path); + } + } +} diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/PathArraySorter.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/PathArraySorter.java new file mode 100644 index 0000000000000..d7ed0bda2b970 --- /dev/null +++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/PathArraySorter.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.spatial.search.aggregations; + +import org.apache.lucene.util.IntroSorter; + +final class PathArraySorter extends IntroSorter { + + private final long[] points; + private final double[] sortValues; + private double sortValuePivot; + private int length; + + public PathArraySorter(long[] points, double[] sortValues, int length) { + this.points = points; + this.sortValues = sortValues; + this.sortValuePivot = 0; + this.length = length; + } + + public final void sort() { + sort(0, length); + } + + @Override + protected void swap(int i, int j) { + final long tmpPoint = points[i]; + points[i] = points[j]; + points[j] = tmpPoint; + final double tmpSortValue = sortValues[i]; + sortValues[i] = sortValues[j]; + sortValues[j] = tmpSortValue; + } + + @Override + protected void setPivot(int i) { + sortValuePivot = sortValues[i]; + } + + @Override + protected int comparePivot(int j) { + return Double.compare(sortValuePivot, sortValues[j]); + } +} diff --git a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java new file mode 100644 index 0000000000000..9db71d4400c4c --- /dev/null +++ b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.spatial.search.aggregations; + +import org.apache.lucene.document.LatLonDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.geo.GeoEncodingUtils; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.geo.GeometryTestUtils; +import org.elasticsearch.geometry.Point; +import org.elasticsearch.index.mapper.GeoPointFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig; +import org.mockito.internal.matchers.ArrayEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.function.Consumer; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class GeoLineAggregatorTests extends AggregatorTestCase { + + public void testSomething() throws IOException { + MultiValuesSourceFieldConfig valueConfig = new MultiValuesSourceFieldConfig.Builder() + .setFieldName("value_field") + .build(); + MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder().setFieldName("sort_field").build(); + GeoLineAggregationBuilder aggregationBuilder = new GeoLineAggregationBuilder("_name") + .value(valueConfig) + .sort(sortConfig); + + int numPoints = randomIntBetween(1, 10000); + int arrayLength = randomIntBetween(numPoints, numPoints + 1000); + long[] points = new long[arrayLength]; + double[] sortValues = new double[arrayLength]; + for (int i = 0; i < numPoints; i++) { + Point point = GeometryTestUtils.randomPoint(false); + int encodedLat = GeoEncodingUtils.encodeLatitude(point.getLat()); + int encodedLon = GeoEncodingUtils.encodeLongitude(point.getLon()); + long lonLat = (((long) encodedLon) << 32) | (encodedLat & 0xffffffffL); + points[i] = lonLat; + sortValues[i] = i; + } + + InternalGeoLine geoLine = new InternalGeoLine("_name", + Arrays.copyOf(points, arrayLength), Arrays.copyOf(sortValues, arrayLength), + numPoints, null, null); + + for (int i = 0; i < randomIntBetween(1, numPoints); i++) { + int idx1 = randomIntBetween(0, numPoints); + int idx2 = randomIntBetween(0, numPoints); + final long tmpPoint = points[idx1]; + points[idx1] = points[idx2]; + points[idx2] = tmpPoint; + final double tmpSortValue = sortValues[idx1]; + sortValues[idx1] = sortValues[idx2]; + sortValues[idx2] = tmpSortValue; + } + + + testCase(new MatchAllDocsQuery(), aggregationBuilder, iw -> { + for (int i = 0; i < numPoints; i++) { + int x = (int) points[i] >> 32; + int y = (int) points[i]; + iw.addDocument(Arrays.asList(new LatLonDocValuesField("value_field", + GeoEncodingUtils.decodeLatitude(y), + GeoEncodingUtils.decodeLongitude(x)), + new SortedNumericDocValuesField("sort_field", NumericUtils.doubleToSortableLong(sortValues[i])))); + } + }, actualGeoLine -> { + assertThat(actualGeoLine.length(), equalTo(geoLine.length())); + for (int i = 0; i < geoLine.length(); i++) { + assertThat(GeoEncodingUtils.decodeLongitude((int) actualGeoLine.line()[i]), + equalTo(GeoEncodingUtils.decodeLongitude((int) geoLine.line()[i]))); + assertThat(GeoEncodingUtils.decodeLatitude((int) actualGeoLine.line()[i] << 32), + equalTo(GeoEncodingUtils.decodeLatitude((int) geoLine.line()[i] << 32))); + } + }); + } + + private void testCase(Query query, GeoLineAggregationBuilder aggregationBuilder, + CheckedConsumer buildIndex, + Consumer verify) throws IOException { + testCase(query, aggregationBuilder, buildIndex, verify, NumberFieldMapper.NumberType.LONG); + } + + private void testCase(Query query, GeoLineAggregationBuilder aggregationBuilder, + CheckedConsumer buildIndex, + Consumer verify, + NumberFieldMapper.NumberType fieldNumberType) throws IOException { + + Directory directory = newDirectory(); + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + buildIndex.accept(indexWriter); + indexWriter.close(); + IndexReader indexReader = DirectoryReader.open(directory); + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + try { + MappedFieldType fieldType = new GeoPointFieldMapper.GeoPointFieldType(); + fieldType.setName("value_field"); + fieldType.setHasDocValues(true); + + MappedFieldType fieldType2 = new NumberFieldMapper.NumberFieldType(fieldNumberType); + fieldType2.setName("sort_field"); + fieldType2.setHasDocValues(true); + + GeoLineAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType, fieldType2); + aggregator.preCollection(); + indexSearcher.search(query, aggregator); + aggregator.postCollection(); + verify.accept((InternalGeoLine) aggregator.buildAggregation(0L)); + } finally { + indexReader.close(); + directory.close(); + } + } +}