Skip to content

Commit

Permalink
Refactor to leverage BucketedSort
Browse files Browse the repository at this point in the history
these changes include usage of BucketedSort and ability to order
the lines by both ascending and descending time/sort order.
  • Loading branch information
talevy committed Oct 26, 2020
1 parent 8afb5f6 commit 6d70ccd
Show file tree
Hide file tree
Showing 14 changed files with 476 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.index.fielddata.MultiGeoPointValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.AggregationExecutionException;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
* worst case. Critically, it is a very fast {@code O(1)} to check if a value
* is competitive at all which, so long as buckets aren't hit in reverse
* order, they mostly won't be. Extracting results in sorted order is still
* {@code O(n * log n)}.
* {@code O(n * log n)}.
* </p>
* <p>
* When we first collect a bucket we make sure that we've allocated enough
Expand All @@ -90,7 +90,7 @@ public interface ExtraData {
* <p>
* Both parameters will have previously been loaded by
* {@link Loader#loadFromDoc(long, int)} so the implementer shouldn't
* need to grow the underlying storage to implement this.
* need to grow the underlying storage to implement this.
* </p>
*/
void swap(long lhs, long rhs);
Expand Down Expand Up @@ -128,7 +128,7 @@ public Loader loader(LeafReaderContext ctx) throws IOException {
private final SortOrder order;
private final DocValueFormat format;
private final int bucketSize;
private final ExtraData extra;
protected final ExtraData extra;
/**
* {@code true} if the bucket is in heap mode, {@code false} if
* it is still gathering.
Expand Down Expand Up @@ -206,9 +206,9 @@ public final List<SortValue> getValues(long bucket) {
}

/**
* Is this bucket a min heap {@code true} or in gathering mode {@code false}?
* Is this bucket a min heap {@code true} or in gathering mode {@code false}?
*/
private boolean inHeapMode(long bucket) {
public boolean inHeapMode(long bucket) {
return heapMode.get(bucket);
}

Expand Down Expand Up @@ -254,7 +254,7 @@ private boolean inHeapMode(long bucket) {
/**
* {@code true} if the entry at index {@code lhs} is "better" than
* the entry at {@code rhs}. "Better" in this means "lower" for
* {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
* {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
*/
protected abstract boolean betterThan(long lhs, long rhs);

Expand Down Expand Up @@ -283,7 +283,7 @@ protected final String debugFormat() {

/**
* Initialize the gather offsets after setting up values. Subclasses
* should call this once, after setting up their {@link #values()}.
* should call this once, after setting up their {@link #values()}.
*/
protected final void initGatherOffsets() {
setNextGatherOffsets(0);
Expand Down Expand Up @@ -325,12 +325,12 @@ private void setNextGatherOffsets(long startingAt) {
* case.
* </p>
* <ul>
* <li>Hayward, Ryan; McDiarmid, Colin (1991).
* <li>Hayward, Ryan; McDiarmid, Colin (1991).
* <a href="https://web.archive.org/web/20160205023201/http://www.stats.ox.ac.uk/__data/assets/pdf_file/0015/4173/heapbuildjalg.pdf">
* Average Case Analysis of Heap Building byRepeated Insertion</a> J. Algorithms.
* <li>D.E. Knuth, ”The Art of Computer Programming, Vol. 3, Sorting and Searching”</li>
* </ul>
* @param rootIndex the index the start of the bucket
* @param rootIndex the index the start of the bucket
*/
private void heapify(long rootIndex) {
int maxParent = bucketSize / 2 - 1;
Expand All @@ -344,7 +344,7 @@ private void heapify(long rootIndex) {
* runs in {@code O(log n)} time.
* @param rootIndex index of the start of the bucket
* @param parent Index within the bucket of the parent to check.
* For example, 0 is the "root".
* For example, 0 is the "root".
*/
private void downHeap(long rootIndex, int parent) {
while (true) {
Expand Down Expand Up @@ -443,7 +443,7 @@ public final void collect(int doc, long bucket) throws IOException {
/**
* {@code true} if the sort value for the doc is "better" than the
* entry at {@code index}. "Better" in means is "lower" for
* {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
* {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
*/
protected abstract boolean docBetterThan(long index);

Expand Down Expand Up @@ -545,7 +545,7 @@ public abstract static class ForFloats extends BucketedSort {
* The maximum size of buckets this can store. This is because we
* store the next offset to write to in a float and floats only have
* {@code 23} bits of mantissa so they can't accurate store values
* higher than {@code 2 ^ 24}.
* higher than {@code 2 ^ 24}.
*/
public static final int MAX_BUCKET_SIZE = (int) Math.pow(2, 24);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void testTwoHitsDesc() throws IOException {
assertThat(sort.getValues(0, extra.valueBuilder()), contains(extraValue(3000, 3), extraValue(200, 2)));
}
}

public void testTwoHitsAsc() throws IOException {
try (T sort = build(SortOrder.ASC, 2, new double[] {1, 2, 3})) {
BucketedSort.Leaf leaf = sort.forLeaf(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortValue;
import org.elasticsearch.xpack.core.common.search.aggregations.MissingHelper;
import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics.MetricValue;

import java.io.IOException;
Expand Down Expand Up @@ -427,62 +428,4 @@ public Loader loader(LeafReaderContext ctx) throws IOException {
public void close() {}
}

/**
* Helps {@link LongMetricValues} track "empty" slots. It attempts to have
* very low CPU overhead and no memory overhead when there *aren't* empty
* values.
*/
private static class MissingHelper implements Releasable {
private final BigArrays bigArrays;
private BitArray tracker;

MissingHelper(BigArrays bigArrays) {
this.bigArrays = bigArrays;
}

void markMissing(long index) {
if (tracker == null) {
tracker = new BitArray(index, bigArrays);
}
tracker.set(index);
}

void markNotMissing(long index) {
if (tracker == null) {
return;
}
tracker.clear(index);
}

void swap(long lhs, long rhs) {
if (tracker == null) {
return;
}
boolean backup = tracker.get(lhs);
if (tracker.get(rhs)) {
tracker.set(lhs);
} else {
tracker.clear(lhs);
}
if (backup) {
tracker.set(rhs);
} else {
tracker.clear(rhs);
}
}

boolean isEmpty(long index) {
if (tracker == null) {
return false;
}
return tracker.get(index);
}

@Override
public void close() {
if (tracker != null) {
tracker.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.common.search.aggregations;

import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;

/**
* Helps long-valued {@link org.elasticsearch.search.sort.BucketedSort.ExtraData} track "empty" slots. It attempts to have
* very low CPU overhead and no memory overhead when there *aren't* empty
* values.
*/
public class MissingHelper implements Releasable {
private final BigArrays bigArrays;
private BitArray tracker;

public MissingHelper(BigArrays bigArrays) {
this.bigArrays = bigArrays;
}

public void markMissing(long index) {
if (tracker == null) {
tracker = new BitArray(index, bigArrays);
}
tracker.set(index);
}

public void markNotMissing(long index) {
if (tracker == null) {
return;
}
tracker.clear(index);
}

public void swap(long lhs, long rhs) {
if (tracker == null) {
return;
}
boolean backup = tracker.get(lhs);
if (tracker.get(rhs)) {
tracker.set(lhs);
} else {
tracker.clear(lhs);
}
if (backup) {
tracker.set(rhs);
} else {
tracker.clear(rhs);
}
}

public boolean isEmpty(long index) {
if (tracker == null) {
return false;
}
return tracker.get(index);
}

@Override
public void close() {
if (tracker != null) {
tracker.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,15 @@ public EnumCounters<Item> getStats() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
EnumCounters<Item> stats = getStats();
builder.startObject("stats");
for (Item item : Item.values()) {
builder.field(item.name().toLowerCase(Locale.ROOT) + "_usage", stats.get(item));
builder.startObject();
{
builder.startObject("stats");
{
for (Item item : Item.values()) {
builder.field(item.name().toLowerCase(Locale.ROOT) + "_usage", stats.get(item));
}
}
builder.endObject();
}
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregationBuilder;
import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregatorFactory;
Expand All @@ -26,6 +26,7 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.sort.SortOrder;

import java.io.IOException;
import java.util.Map;
Expand All @@ -34,8 +35,9 @@
public class GeoLineAggregationBuilder
extends MultiValuesSourceAggregationBuilder.LeafOnly<GeoLineAggregationBuilder> {

static final ParseField GEO_POINT_FIELD = new ParseField("geo_point");
static final ParseField POINT_FIELD = new ParseField("point");
static final ParseField SORT_FIELD = new ParseField("sort");
static final ParseField ORDER_FIELD = new ParseField("sort_order");
static final ParseField INCLUDE_SORT_FIELD = new ParseField("include_sort");

public static final String NAME = "geo_line";
Expand All @@ -44,12 +46,14 @@ public class GeoLineAggregationBuilder
ObjectParser.fromBuilder(NAME, GeoLineAggregationBuilder::new);
static {
MultiValuesSourceParseHelper.declareCommon(PARSER, true, ValueType.NUMERIC);
MultiValuesSourceParseHelper.declareField(GEO_POINT_FIELD.getPreferredName(), PARSER, true, false, false);
MultiValuesSourceParseHelper.declareField(POINT_FIELD.getPreferredName(), PARSER, true, false, false);
MultiValuesSourceParseHelper.declareField(SORT_FIELD.getPreferredName(), PARSER, true, false, false);
PARSER.declareString((builder, order) -> builder.sortOrder(SortOrder.fromString(order)), ORDER_FIELD);
PARSER.declareBoolean(GeoLineAggregationBuilder::includeSort, INCLUDE_SORT_FIELD);
}

private boolean includeSort;
private SortOrder sortOrder = SortOrder.ASC;

public static void registerUsage(ValuesSourceRegistry.Builder builder) {
builder.registerUsage(NAME, CoreValuesSourceType.GEOPOINT);
Expand All @@ -69,13 +73,20 @@ private GeoLineAggregationBuilder(GeoLineAggregationBuilder clone,
*/
public GeoLineAggregationBuilder(StreamInput in) throws IOException {
super(in);
sortOrder = SortOrder.readFromStream(in);
includeSort = in.readBoolean();
}

public GeoLineAggregationBuilder includeSort(boolean includeSort) {
this.includeSort = includeSort;
return this;
}

public GeoLineAggregationBuilder sortOrder(SortOrder sortOrder) {
this.sortOrder = sortOrder;
return this;
}

@Override
protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metaData) {
return new GeoLineAggregationBuilder(this, factoriesBuilder, metaData);
Expand All @@ -87,8 +98,9 @@ public BucketCardinality bucketCardinality() {
}

@Override
protected void innerWriteTo(StreamOutput out) {
// Do nothing, no extra state to write to stream
protected void innerWriteTo(StreamOutput out) throws IOException {
sortOrder.writeTo(out);
out.writeBoolean(includeSort);
}

@Override
Expand All @@ -97,18 +109,19 @@ protected ValuesSourceType defaultValueSourceType() {
}

@Override
protected MultiValuesSourceAggregatorFactory innerBuild(QueryShardContext queryShardContext,
protected MultiValuesSourceAggregatorFactory innerBuild(AggregationContext aggregationContext,
Map<String, ValuesSourceConfig> configs,
Map<String, QueryBuilder> filters,
DocValueFormat format,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder) throws IOException {
return new GeoLineAggregatorFactory(name, configs, format, queryShardContext, parent, subFactoriesBuilder, metadata, includeSort);
return new GeoLineAggregatorFactory(name, configs, format, aggregationContext, parent, subFactoriesBuilder, metadata,
includeSort, sortOrder);
}

public GeoLineAggregationBuilder value(MultiValuesSourceFieldConfig valueConfig) {
valueConfig = Objects.requireNonNull(valueConfig, "Configuration for field [" + GEO_POINT_FIELD + "] cannot be null");
field(GEO_POINT_FIELD.getPreferredName(), valueConfig);
valueConfig = Objects.requireNonNull(valueConfig, "Configuration for field [" + POINT_FIELD + "] cannot be null");
field(POINT_FIELD.getPreferredName(), valueConfig);
return this;
}

Expand Down
Loading

0 comments on commit 6d70ccd

Please sign in to comment.