Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Variable Width Histogram Aggregation #42035

Merged
merged 33 commits into from
Jun 23, 2020

Conversation

jamesdorfman
Copy link
Contributor

@jamesdorfman jamesdorfman commented May 9, 2019

Implements a new histogram aggregation called variable_width_histogram which dynamically determines bucket intervals based on document groupings. These groups are determined by running a one-pass clustering algorithm on each shard and then reducing each shard's clusters using an agglomerative clustering algorithm.

This PR addresses #9572.

The shard-level clustering is done in one pass to minimize memory overhead. The algorithm was lightly inspired by this paper. It fetches a small number of documents to sample the data and determine initial clusters. Subsequent documents are then placed into one of these clusters, or a new one if they are an outlier. This algorithm is described in more details in the aggregation's asciidoc page.

At reduce time, a hierarchical agglomerative clustering algorithm inspired by this paper continually merges the closest buckets from all shards (based on their centroids) until the target number of buckets is reached.

The final values produced by this aggregation are approximate. Buckets are merged based on their centroids and not their bounds. So it is possible that adjacent buckets will overlap after reduction. To avoid confusion, this overlap is not shown in the final histogram. Rather, when such overlap occurs we set the min of the bucket with the larger centroid to the midpoint between its minimum and the smaller bucket’s maximum: min[large] = (min[large] + max[small]) / 2. We do the same adjustment for the max of the smaller bucket. This heuristic is expected to increase the accuracy of the clustering.

Nodes are unable to share centroids during the shard-level clustering phase. In the future, resolving #50863 would let us solve this issue.

It doesn’t make sense for this aggregation to support the min_doc_count parameter, since clusters are determined dynamically. The order parameter is not supported here to keep this large PR from becoming too complex.

@jamesdorfman
Copy link
Contributor Author

I don't have permissions to add reviewers or tags. From looking at other PRs, it seems that maybe @colings86 or @polyfractal should review this?

@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo

@polyfractal
Copy link
Contributor

Very cool @jamesdorfman! I'll take a closer look tomorrow, but just an FYI I'll be traveling next week so I might be delayed in responding/reviewing thoroughly. Just as a heads up :)

@colings86 is definitely a good review candidate too, since I know he's played with streaming (non-agglomerative) k-means algos in the past.

It is difficult to produce accurate clustering results in a distributed environment. However, this problem becomes significantly easier if nodes can communicate with each other. I am not sure if the existing aggregations framework supports intra-node communication through the node that co-ordinates the aggregation. If it does, please let me know! We could eliminate bucket overlaps entirely by sharing centroids between nodes during the k-means phase.

At the moment it's not possible, each shard maps/combines in complete isolation from the rest of the shards, and the coordinator just receives the results as they arrive from the shards. There's no form of run-time message passing that might help shards make better decisions unfortunately.

We are looking to see if multi-pass aggregations might be possible (multiple map/combine/reduce iterations), but we haven't even started a proof of concept yet so it's a long way off :) And I'm not sure it'd help in this case anyway

The name auto_histogram initially seems more relevant. However, this is the first histogram aggregation where buckets will not all uniform width. The other auto histogram aggregation (for dates) selects one width for all intervals, so the name auto_histogram would indicate similar functionality. This would be confusing.

++ agreed, naming here is gonna get tricky. Just to throw out another option, we could drop "auto" and go with naming like numeric_cluster (and so the corresponding geo one would be a nice geo_cluster... instead of trying to shoehorn in "auto" and "histogram").

There's also the option of just having one "auto histogram" with different modes: cluster/variable-width and fixed-width. But that might introduce a lot of complexity too, especially if they have different sets of parameters.

Not sure, will noodle over it. :)

Anyhow very neat, I'll take a closer look tomorrow!

@polyfractal polyfractal self-requested a review May 9, 2019 21:23
@jamesdorfman
Copy link
Contributor Author

@polyfractal I definitely agree. A name like numeric_cluster makes the aggregation's functionality a lot more clear.

Copy link
Contributor

@polyfractal polyfractal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a pass over the code, mostly looking at algo and not style/logistical stuff.

Left a few comments and questions. I think the biggest challenge will be the temporary memory needed to cache the values for the final k-means, and the O(100 * n * k) nature of the k-means algo itself.

I know single-pass versions of the algo exist... although I don't know much about them. Replacing the shard-local k-means with a single pass and using the agglomerative clustering at reduction would probably satisfy the memory/runtime requirements.

Or I might have misinterpreted part of the code :)

As to naming, I was thinking it might just make sense to call this variable_width_histogram, which leaves the door open for auto_histogram to match auto_date_histogram, and accurately describes what it does. Would be simpler than a mode parameter, and internally would avoid the need for entirely different specializations depending on mode.


/** Read in object data from a stream, for internal use only. */
public AutoClusterHistogramAggregationBuilder(StreamInput in) throws IOException {
super(in, ValuesSourceType.NUMERIC, ValueType.DATE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ValueType.Date looks like a typo

AggregatorFactories.Builder subFactoriesBuilder) throws IOException {

Settings settings = context.getQueryShardContext().getIndexSettings().getNodeSettings();
int maxBuckets = MultiBucketConsumerService.MAX_BUCKET_SETTING.get(settings);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I like this... we should probably do something similar for the other bucketing aggs (so that we can fail fast instead of waiting for the breaker to trip at runtime).

One question/concern: since you always get less-than-or-equal to the number of requested buckets, is it a potential issue that you might need to request over the limit to get a number of buckets that is under? I don't have a good feel for how much "under" the requested value you end up getting. If it's just a few it shouldn't be a problem... but if you request 100 and get back 10 it might be an issue.

Code wise, I think you can do this to get the limit without needing the settings first:

int maxBuckets = context.aggregations().multiBucketConsumer().getLimit();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each shard should should individually return more than the numBuckets buckets. Then, the agglomerative clustering will reduce them to exactly numBuckets buckets.

If shards are returning less buckets, this can be fixed by increasing the shard_size and cache_limit parameters.

}

public AutoClusterHistogramAggregationBuilder setNumBuckets(int numBuckets){
if (numBuckets <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to consider an upper limit on size too, as a safety measure against excessive computation (when comparing centroids)

LeafBucketCollector sub;
DocInfo(int doc, double val, LeafBucketCollector sub){
this.doc = doc;
this.val = val;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think this is going to be problematic. If I'm understanding correctly, this basically caches the document value for later use in the k-means algo (so that it can run the standard k-means and move centroids around)?

Lucene shard limit is 2³¹ docs, so 12b for int + double == ~25gb of heap in worst case for just the DocInfo, ignoring overhead and all the other parts of an agg. And multiple shards per node concurrently being searched.

Still working through the code so I'm not sure if it's a deal breaker, but we might need to find a way to do single-pass K-means on each shard instead of caching these values and doing the O(100 * n * k) k-means

Copy link
Contributor Author

@jamesdorfman jamesdorfman Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a really good point, I never performed this calculation until now. I agree that this is a huge problem.

An alternative solution that still uses k-means could be to just collect some statistics about the distribution of the data while collecting and caching the first n documents (for some small n, let's just say 1000 for now). Then, during collection on the rest of the documents we could bucket extremely close documents together. We could determine which documents are close using our information about the document distribution.

I had never heard of 1-pass clustering algorithms until you mentioned it. It looks like there are many 1-pass clustering algorithms available online, so I will definitely research them and do some thinking.

}

@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is deferring collection of sub aggregations until after clustering, I think we will need to collect sub aggs in this method somewhere (similar to how the DeferringBucketCollectors work, or by implementing this as a deferred agg).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I thought that sub-aggregations aren't run on a document until we collect it into a bucket. And since we only actually call collectBucket() after we run k-means, the sub aggregations would automatically be deferred until this point.

I can definitely update this class to be a deferred buckets aggregator. But I'm curious when the sub aggregations would be called in the current code. During the collection phase?

@jamesdorfman jamesdorfman changed the title Add Auto-Cluster Histogram Aggregation Add Variable Width Histogram Aggregation Aug 21, 2019
@mrec
Copy link

mrec commented Sep 12, 2019

From the description in the first comment I think this would also satisfy #24254.

(Not the boundary-rounding that @markharwood's comment describes, but I don't think that necessarily needs to be part of the MVP.)

Copy link
Contributor

@polyfractal polyfractal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heya @jamesdorfman, apologies for the long delay in this review.

I left some comments around the document caching, with some extra info about how Lucene's segment and doc value iterators work (which aren't super obvious from the outside). The tl;dr: is that I think we'll need to convert this to a deferred agg, because segment boundaries will make cached values/docs invalid due to how Lucene works.

Happy to provide more context or code examples, lemme know if you have any questions about what I wrote. I'm liking the direction the algo is going, and the adjacent stuff looks great (docs, tests, other parts of the PR, etc)! You've just unfortunately landed in a section that our agg framework has a hard time with: partially processing some but not all of the documents :)

Copy link
Contributor

@polyfractal polyfractal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few minor comments and one important one: it's a product of poorly specified internal API and there's really no indication it needs to be called (sorry 😭 ), but you'll need to call mergeBuckets on the deferring collector as well. More details in the comment and the javadocs.

Otherwise, I looked over the aggregator logic and I think I understand how it works for the most part, didn't see any major red flags. The use of deferring collection looks a lot better than before! 👍 I haven't look too closely at the InternalVariable... reduction logic yet, and wanted to pause on comments until mergeBuckets was resolved in case it happened to change a bunch. But I think this is shaping up nicely!

Let me know if you have questions about the mergemap stuff, it honestly breaks my brain a little to think about sometimes. It might work fine, or we might need to think of a fix or alternative "merging" functionality that works for this agg.

private final ValuesSource.Numeric valuesSource;
private final DocValueFormat formatter;

private int numBuckets; // Final number of buckets (clusters) that should be produced by the master node after reduction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor note: technically it would be a "coordinator" node, not the master :)

* their value is distant from all existing buckets.
*/
private void collectValue(LeafBucketCollector sub, int doc, double val) throws IOException{
final BigArrays bigArrays = context.bigArrays();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the bigArrays in the constructor a class variable, that way we can reference it instead of fetching from the context here. Shouldn't practically matter much since I expect the JVM to optimize it away, but would be a bit cleaner.

}
}

mergeBuckets(mergeMap, initialNumBuckets);
Copy link
Contributor

@polyfractal polyfractal Feb 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so this is a result of not-great internal APIs, but you'll also need to call mergeBuckets() on the deferring collector as well:

if (deferringCollector != null) {
    deferringCollector.mergeBuckets(mergeMap);
}

mergeBuckets on the super class tidies up the doc counts, while mergeBuckets on the collector handles merging the ordinals and doc ID deltas (e.g. which underlying docs actually go to which bucket). There are more details on the javadoc of that method, but in summary the mergemap given to that method usually looks something like [1,1,3,-1,3], translated as:

  • Buckets 0 and 1 will be merged to bucket ordinal 1
  • Bucket 2 and 4 will be merged to ordinal 3
  • Bucket 3 will be removed entirely

This is largely a result of what it was designed for originally (merging together buckets in auto-date-histo), but I think fear it might give you problems. E.g. it assumes buckets are merged together, whereas you're essentially doing a big swap of all the buckets.

It might work fine, or not, really not sure without digging in a bit more :) We can brainstorm if it doesnt look like it will work easily

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand how swapping buckets is different from merging them.

From reading through the method, It looks like deferringCollector.mergeBuckets(mergeMap) just applies the merge map. And since I'm able to encode the buckets swaps into a merge map, I don't see why it wouldn't work?

I've added a new test which requires a complex merge map to be applied on an aggregator with sub aggregations: VariableWidthHistogramAggregatorTests.testComplexSubAggregations.

This test fails when run on the old version. However, now that I've implemented your suggested change it is passing.

this.clusterMaxes = bigArrays.newDoubleArray(0);
this.clusterCentroids = bigArrays.newDoubleArray(0);
this.clusterSizes = bigArrays.newIntArray(0);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll need to add a guard here to prevent execution if a subagg needs a score, similar to how RareTerms does it: https://github.com/elastic/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractRareTermsAggregator.java#L72-L84

That particular combination requires the agg to run non-deferred which some aggs can do (e.g. terms) but in this case we have to run deferred so we're out of luck.

Looks like auto-date-histo doesn't have that guard... i'll double check and we'll probably want to add it there (different issue :) )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added! Although... I'm not sure if the cases that force non-deferred collection are the same here as they are with RareTerms?

i.e. RareTerms checks if (scoringAgg != null && nestedAgg != null). Don't know if nested aggregations are also relevant for this aggregation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested aggregators are only relevant if you wanted to, say, sort by them. Otherwise you can defer them if they are ok with it. And these two cases aren't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I completely understand: what is it that these two cases aren't ok with?

@rjernst rjernst added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label May 4, 2020
@nik9000
Copy link
Member

nik9000 commented May 13, 2020

Since @polyfractal is so busy I'm going to try and take a look at this one. I do know that I'll have broken it with #55873, at a minimum.

@jamesdorfman
Copy link
Contributor Author

@nik9000 would you recommend that I pull in the new changes and fix this feature before any further review, or is that something that should be done at the end?

@nik9000
Copy link
Member

nik9000 commented May 13, 2020

@nik9000 would you recommend that I pull in the new changes and fix this feature before any further review, or is that something that should be done at the end?

Either way is fine with me! I do plan to read this in the next day or so but I just haven't yet.

Pulling in master will cause a compiler error because I changed

InternalAggregation buildAggregation(long owningBucketOrd)

to

InternalAggregation[] buildAggregation(long[] owningBucketOrd)

I also deprecated asMultiBucketAggregator because it is super wasteful from a Lucene standpoint and makes too easy to write slow, memory hungy aggregations. I'm in the process of moving all of the aggs that we have now away from it. But it is still early so I imagine there are some rough edges.

You'll also find runDeferredCollections has vanished. It is now automatic when you use buildSubAggsForBuckets or one of the helpers around it.

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read it! Neat!

I'm mostly a hacker so I left hacker comments in line. I can't really comment on the papers and how well it sticks to them. One of them is walled off from me and I got bored reading the other one and wanted to look at code instead. Sorry!


public static final String NAME = "variable_width_histogram";

private static final ParseField NUM_BUCKETS_FIELD = new ParseField("buckets");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be a little easier to read if you grouped all of the _FIELD constants together. I'm just used to reading all of the "regular" fields right above the first constructor.

private static final ParseField SHARD_SIZE_FIELD = new ParseField("shard_size");
private int shardSize = numBuckets * 50;

// Users should not worry about this field, since it is dynamically calculated from the shard size by default
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its probably worth writing this in javadoc instead of line comments. Also, because it is exposed over the API I'm not surer that the comment is strictly accurate. More like users may override this, but we believe the default to be sane.


public int getCacheLimit(){ return cacheLimit; }

public static VariableWidthHistogramAggregationBuilder parse(String aggregationName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're starting to do away with these ceremonial parse methods in favor of just making the PARSER public and giving it like:

    public static final ObjectParser<VariableWidthHistogramAggregationBuilder, String> PARSER =
            ObjectParser.fromBuilder(NAME, VariableWidthHistogramAggregationBuilder::new);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some trouble getting this to work. Are there any examples of other aggregations that do it like this, for me to refer to?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out TopMetricsAggregationBuilder. We haven't done it to many of the aggs yet, but it is how I'm hoping we'd declare the new ones. It isn't a huge deal if you can't do it though. We have so many done the way you have it already.

}

/** Create a new builder with the given name. */
public VariableWidthHistogramAggregationBuilder(String name) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nicer if the ctor was at the top. I'm just so used to them being in that order that I get confused when they jump out at me so low down.

}

public BucketBounds(StreamInput in) throws IOException {
this(in.readDouble(), in.readDouble());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't tend to delegate these reading constructor. I find it forces you into certain wire layout which look fine now but some day might be a big pain. But I see why you did it in this case. And it is probably ok. 👍

nearest_bucket = i;
}
}
return nearest_bucket;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to keep the buckets in sorted order by centroid and binary search them? It'd certainly make inserting into the array more of a pain but that isn't something we do nearly as often as finding the nearest bucket.

Sad that we don't have a clean way to return the smallest distance. I think we'd want it at most of the call sites of this method.

Copy link
Contributor Author

@jamesdorfman jamesdorfman May 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! I think that makes a lot of sense. I just implemented this fairly easily by using the existing BucketsAggregator.mergeBuckets() and MergingBucketsDeferringCollector.mergeBuckets() methods

If you like the overall change, I can make this more efficient by eliminating the need to create a merge map every time we want to insert. This would require creating very similar methods to the mergeBuckets ones, but that are specifically made for moving one bucket around.

I didn't want spend time making this implementation more efficient before getting feedback on the idea. However, if you like it I can definitely do so!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like the idea. I'm starting to do work on auto_date_histogram lately and the whole mergeMap thing makes me sad in general. I know why we have it but it feels so wasteful! I'll read your changes and leave another comment when I'm through.


// The contract of the histogram aggregation is that shards must return
// buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that you have set a new contract in your reduce method - they have to be sorted by centroid. They'd come out that way if clusterCentroids were sorted which I think is probably a good idea.


@Override
public void doClose() {
Releasables.close(cachedValues);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to do Releasables.close(cachedValues, cluserMaxes, clusterMins, clusterCentroids, clusterSizes).

// There are enough docs in this bucket, so we should make a new one
bucketOrd += 1;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At then end of this you can call close on cachedValues and set the reference to it to null so it can be GCed. That'd free up a tiny bit of memory.

private DoubleArray clusterMaxes;
private DoubleArray clusterMins;
private DoubleArray clusterCentroids;
private IntArray clusterSizes; // Different than BucketsAggregator.docCounts: holds intermediate doc counts during clustering
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it'd be cleaner to write it like:

private Work work;

and have two kinds of Work:

private class CacheValues extends Work {
  private DoubleArray cachedValues;
  private int numCachedDocs;
  @Override
  public void (LeafBucketCollector sub, int doc, double val) throws IOException {
    cachedValues = cachedValues.grow(bigArrays, numCachedDocs + 1);
    cachedValues.set(numCachedDocs, val);
    ....

and

private class MergeBuckets extends Work {
   private DoubleArray clusterMaxes;
   private DoubleArray clusterMins;
   private DoubleArray clusterCentroids;
  @Override
  public void (LeafBucketCollector sub, int doc, double val) throws IOException {
    int bucketOrd = getNearestBucket(val);
   ....

That way at a quick glance you could tell that the aggregator operates in two "modes". I haven't really thought it through, but I think something like this would be more readable to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of all of the comments I left that you just 👍ed I didn't expect you to just 👍 this one. But, if you like the idea, great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a really good idea, I liked it! The code is much more readable now after I did this refactor.

I used 👍to indicate I implemented it btw. I didn't want to resolve your comments myself, in case you felt there was still work to do on them :)

@jamesdorfman
Copy link
Contributor Author

@nik9000 Thanks for the detailed feedback! I left comments on a few issues I didn’t understand / had trouble with. I’ve implemented the rest of your suggestions.

Summary of key changes:

  • The clusters are now kept sorted during collection. Then, a binary search is used to find the the nearest cluster to a new one.
    • To insert into the sorted array, I used the existing mergeBuckets() method. It’s not very efficient though, because I have to create a merge map for all the clusters any time I want to insert a single cluster.
    • If you think this change is a good idea, I can look into making it more efficient by writing a new method like mergeBuckets() that would only move one element.
  • Refactored the two different collection modes into dedicated classes.
  • Refactored the bucket merging steps in InternalVariableWidthHistogram to be clearer.
  • Made merging more efficient in InternalVariableWidthHistogram.
    • Earlier, the two closest buckets were continually merged together until the target was reached.
    • Now, a plan is made and then all the buckets are merged at once.

Looking forward to hearing your thoughts :)

} else {
addToCluster(bucketOrd, val);
collectExistingBucket(sub, doc, bucketOrd);
if(numCachedDocs != cacheLimit) return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tend to prefer:

if (numCacheDocs != cacheLimit) {
  return;
}

It so much wider but it makes the return just stand out a bit better. Probably just my legacy as a java developer for so many years kicking in.

numCachedDocs += 1;
}

public void releaseCache(){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to have the superclass implement Releasable, I think. Those jump out at me real quick and I'm used to keeping a mental list of what has to be released.


public void releaseCache(){
Releasables.close(cachedValues);
cachedValues = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe you need to null this if you throw away the reference to this CollectionMode.


int left = 0;
int right = numClusters - 1;
while(left < right){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm somewhat surprised we don't have a binary search implementation that properly targets BigArrays, but it looks like we don't have one. It might be better to just make one in the same vein as org.apache.lucene.util.Sorter just so you can write unit tests for just the binary search. I certainly don't trust myself to write a correct binary search without playing with a bunch of test cases. Simple though it may be there are sneaky overflow issues I never properly remember.

return left;
}

public void doClose(){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comment about making it Releasable.

private DoubleArray clusterCentroids;
private DoubleArray clusterSizes; // clusterSizes.get(i) will not be equal to bucketDocCount(i) when clusters are being merged
private int numClusters;
CacheValues cacheValues;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be private CollectMode collect? And you start with CacheValues and close that when it is done? Maybe collectValue could return CollectMode and CacheValues could return itself until it is time to transition and then it'd build its replacement and close itself? I'm guessing that'd be a bit easier to understand for me, mostly because I really can't keep a bunch of things in my head at once and sort of need to aggressively make abstractions.

@@ -46,21 +46,20 @@
public class VariableWidthHistogramAggregationBuilder
extends ValuesSourceAggregationBuilder<ValuesSource.Numeric, VariableWidthHistogramAggregationBuilder> {

private int numBuckets = 10;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, could you keep the non-static stuff below the static stuff? I think I said the wrong thing when I left a message earlier. Sorry!

}
to_merge.add(buckets.get(startIdx)); // Don't remove the startIdx bucket because it will be replaced by the merged bucket

reduceContext.consumeBucketsAndMaybeBreak(1); // NOCOMMIT: Is this correct?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be more correct to reduceContext.consumeBucketsAndMaybeBreak(toMerge.size() -1). You are "freeing" buckets here, not really making more.

@@ -422,7 +500,13 @@ private void orderBucketsByXContentKey(List<Bucket> buckets, ReduceContext reduc
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);

return new InternalVariableWidthHistogram(getName(), reducedBuckets, bucketInfo, numClusters,
if(reduceContext.isFinalReduce()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels ok. At least, it is, like you said, what auto_date_histogram is doing. I think in the future we'll do more with partial reduce then we do now and we might have to change it, but for now it is final. At best you can perform the merges at each pass, but I haven't thought enough about your algorithm to have any idea if that is safe.

At least, conceptually, you are only going to get more buckets on subsequent passes. I feel like auto_date_histogram might have a chance to do things like bump up to a higher level of rounding. But I only am thinking about that because I was hacking on it this morning. Your aggs, I'm not sure. Not yet, at least. Sometimes it is easy to do, like max, right? But bucketing aggs almost always require more thought.

@@ -471,6 +471,9 @@ private void mergeBucketsWithSameMin(List<Bucket> buckets, ReduceContext reduceC
BucketRange nextRange = ranges.get(i+1);
if(range.min == nextRange.min){
range.mergeWith(nextRange);
ranges.remove(i+1);
} else{
i++;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are going to manipulate i outside of the for line, would you kindly replace this with a while loop? When I see while I'm automatically on guard for all kinds of sneaky stuff. But for lulls me into a false sense of security.

@nik9000
Copy link
Member

nik9000 commented May 21, 2020

I left a bunch more comments. I'm excited! This is coming along!

Please feel free to dismiss comments that you have already covered or were just informational. And good luck with your rebase. It'll be exciting. A year is a long time.

… intervals

The following commits were squashed into this commit:

* Redesigned clustering algorithm to use less memory and renamed aggregation to 'Variable Width Histogram'

* Defer subaggregations and bucket creation until clustering is complete

* Fix incorrect bucket merging by using MergingBucketsDeferringCollector, and add a test case to validate it

* Prevent VariableWidthHistogram execution in depth_first mode (i.e. throw an exception if a sub aggregation needs a score)

* Make BigArrays more efficient in the aggregator

* Use binary search to find nearest bucket; refactor the two different collection modes into dedicated classes

* Make InternalVariableWidthHistogram bucket merging logic more efficient & refactor it to be more clear

* Sort clusters more efficiently in the aggregator (use the Lucene sorter instead of a comparator)

* Implement releasable in the CollectionMode superclass

* Redesign the collection mode abstraction to make the aggregation's functionality clearer

* Fix bucket counts in InternalVariableWidthHistogram
@jamesdorfman jamesdorfman force-pushed the auto_cluster_histogram branch from 4df5d9b to f18127f Compare May 22, 2020 22:03
@jamesdorfman
Copy link
Contributor Author

Good point thanks! Turns out I can access the console.

It looks like the new issue is that the build fails for version 7.8.0, even though it works for some other versions like 7.9.0. Not really sure what would cause that, the console stack trace for the failure isn't very helpful. Any ideas?

It says: Building 7.8.0 didn't generate expected file /dev/shm/elastic+elasticsearch+pull-request-bwc/distribution/bwc/staged/build/bwc/checkout-7.8/distribution/archives/oss-linux-tar/build/distributions/elasticsearch-oss-7.8.0-SNAPSHOT-linux-x86_64.tar.gz

@nik9000
Copy link
Member

nik9000 commented Jun 19, 2020 via email

@jamesdorfman
Copy link
Contributor Author

Great suggestion, looks like that did it. All the tests are passing now 🎉

@nik9000
Copy link
Member

nik9000 commented Jun 19, 2020

Alright! I'll merge this sometime in the next few days. I have to be by the computer for a few hours after I merge just in case the build goes sideways. Randomized testing can be sneaky.

@nik9000 nik9000 merged commit e99d287 into elastic:master Jun 23, 2020
nik9000 pushed a commit to nik9000/elasticsearch that referenced this pull request Jun 23, 2020
Implements a new histogram aggregation called `variable_width_histogram` which
dynamically determines bucket intervals based on document groupings. These
groups are determined by running a one-pass clustering algorithm on each shard
and then reducing each shard's clusters using an agglomerative
clustering algorithm.

This PR addresses elastic#9572.

The shard-level clustering is done in one pass to minimize memory overhead. The
algorithm was lightly inspired by
[this paper](https://ieeexplore.ieee.org/abstract/document/1198387). It fetches
a small number of documents to sample the data and determine initial clusters.
Subsequent documents are then placed into one of these clusters, or a new one
if they are an outlier. This algorithm is described in more details in the
aggregation's docs.

At reduce time, a
[hierarchical agglomerative clustering](https://en.wikipedia.org/wiki/Hierarchical_clustering)
algorithm inspired by [this paper](https://arxiv.org/abs/1802.00304)
continually merges the closest buckets from all shards (based on their
centroids) until the target number of buckets is reached.

The final values produced by this aggregation are approximate. Each bucket's
min value is used as its key in the histogram. Furthermore, buckets are merged
based on their centroids and not their bounds. So it is possible that adjacent
buckets will overlap after reduction. Because each bucket's key is its min,
this overlap is not shown in the final histogram. However, when such overlap
occurs, we set the key of the bucket with the larger centroid to the midpoint
between its minimum and the smaller bucket’s maximum:
`min[large] = (min[large] + max[small]) / 2`. This heuristic is expected to
increases the accuracy of the clustering.

Nodes are unable to share centroids during the shard-level clustering phase. In
the future, resolving elastic#50863
would let us solve this issue.

It doesn’t make sense for this aggregation to support the `min_doc_count`
parameter, since clusters are determined dynamically. The `order` parameter is
not supported here to keep this large PR from becoming too complex.
@polyfractal
Copy link
Contributor

Woo! Just saw that this merged! Awesome work @jamesdorfman, and cheers @nik9000 for taking over the review process :)

image

@jamesdorfman
Copy link
Contributor Author

This is really exciting! Thanks @polyfractal and @nik9000 for working with me on this. I really appreciate your effort and all the extremely detailed feedback 😀

nik9000 added a commit that referenced this pull request Jun 25, 2020
Implements a new histogram aggregation called `variable_width_histogram` which
dynamically determines bucket intervals based on document groupings. These
groups are determined by running a one-pass clustering algorithm on each shard
and then reducing each shard's clusters using an agglomerative
clustering algorithm.

This PR addresses #9572.

The shard-level clustering is done in one pass to minimize memory overhead. The
algorithm was lightly inspired by
[this paper](https://ieeexplore.ieee.org/abstract/document/1198387). It fetches
a small number of documents to sample the data and determine initial clusters.
Subsequent documents are then placed into one of these clusters, or a new one
if they are an outlier. This algorithm is described in more details in the
aggregation's docs.

At reduce time, a
[hierarchical agglomerative clustering](https://en.wikipedia.org/wiki/Hierarchical_clustering)
algorithm inspired by [this paper](https://arxiv.org/abs/1802.00304)
continually merges the closest buckets from all shards (based on their
centroids) until the target number of buckets is reached.

The final values produced by this aggregation are approximate. Each bucket's
min value is used as its key in the histogram. Furthermore, buckets are merged
based on their centroids and not their bounds. So it is possible that adjacent
buckets will overlap after reduction. Because each bucket's key is its min,
this overlap is not shown in the final histogram. However, when such overlap
occurs, we set the key of the bucket with the larger centroid to the midpoint
between its minimum and the smaller bucket’s maximum:
`min[large] = (min[large] + max[small]) / 2`. This heuristic is expected to
increases the accuracy of the clustering.

Nodes are unable to share centroids during the shard-level clustering phase. In
the future, resolving #50863
would let us solve this issue.

It doesn’t make sense for this aggregation to support the `min_doc_count`
parameter, since clusters are determined dynamically. The `order` parameter is
not supported here to keep this large PR from becoming too complex.

Co-authored-by: James Dorfman <jamesdorfman@users.noreply.github.com>
@nik9000
Copy link
Member

nik9000 commented Jun 25, 2020

This is really exciting! Thanks @polyfractal and @nik9000 for working with me on this. I really appreciate your effort and all the extremely detailed feedback.

Thanks so much for working on this and for seeing it through to the bitter end! Thanks for your patience! I'm excited to see what we can do with this!

nik9000 added a commit to nik9000/elasticsearch that referenced this pull request Jun 26, 2020
Adds an explicit check to `variable_width_histogram` to stop it from
trying to collect from many buckets because it can't. I tried to make it
do so but that is more than an afternoon's project, sadly. So for now we
just disallow it.

Relates to elastic#42035
nik9000 added a commit that referenced this pull request Jun 30, 2020
Adds an explicit check to `variable_width_histogram` to stop it from
trying to collect from many buckets because it can't. I tried to make it
do so but that is more than an afternoon's project, sadly. So for now we
just disallow it.

Relates to #42035
nik9000 added a commit to nik9000/elasticsearch that referenced this pull request Jun 30, 2020
Adds an explicit check to `variable_width_histogram` to stop it from
trying to collect from many buckets because it can't. I tried to make it
do so but that is more than an afternoon's project, sadly. So for now we
just disallow it.

Relates to elastic#42035
nik9000 added a commit that referenced this pull request Jun 30, 2020
Adds an explicit check to `variable_width_histogram` to stop it from
trying to collect from many buckets because it can't. I tried to make it
do so but that is more than an afternoon's project, sadly. So for now we
just disallow it.

Relates to #42035
nik9000 added a commit that referenced this pull request Jul 13, 2020
@arshad171
Copy link

arshad171 commented Jun 13, 2021

Out of curiosity, can this implementation also handle multi-dimensional data (maybe a dense vector data type)?
i.e. bin data based on multi-dimensional vectors

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/Aggregations Aggregations >feature Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v7.9.0 v8.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants