Skip to content

Commit

Permalink
Nested aggregator: Fix handling of multiple buckets being emitted for…
Browse files Browse the repository at this point in the history
… the same parent doc id.

This bug was introduced by elastic#8454 which allowed the childFilter to only be consumed once. By adding the child docid buffering multiple buckets can now be emitted by the same doc id. This child docid buffering only happens in the scope of the current root document, so the amount of child doc ids buffered is small.

Closes elastic#9317
Closes elastic#9346
  • Loading branch information
martijnvg committed Jan 26, 2015
1 parent f829435 commit 7ca2ef9
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.elasticsearch.search.aggregations.bucket.nested;

import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -49,9 +51,12 @@ public class NestedAggregator extends SingleBucketAggregator implements ReaderCo

private DocIdSetIterator childDocs;
private BitSet parentDocs;

private LeafReaderContext reader;


private BitSet rootDocs;
private int currentRootDoc = -1;
private final IntObjectOpenHashMap<IntArrayList> childDocIdBuffers = new IntObjectOpenHashMap<>();

public NestedAggregator(String name, AggregatorFactories factories, ObjectMapper objectMapper, AggregationContext aggregationContext, Aggregator parentAggregator, Map<String, Object> metaData, FilterCachingPolicy filterCachingPolicy) throws IOException {
super(name, factories, aggregationContext, parentAggregator, metaData);
this.parentAggregator = parentAggregator;
Expand All @@ -71,6 +76,9 @@ public void setNextReader(LeafReaderContext reader) {
} else {
childDocs = childDocIdSet.iterator();
}
BitDocIdSetFilter rootDocsFilter = context.searchContext().bitsetFilterCache().getBitDocIdSetFilter(NonNestedDocsFilter.INSTANCE);
BitDocIdSet rootDocIdSet = rootDocsFilter.getDocIdSet(reader);
rootDocs = rootDocIdSet.bits();
} catch (IOException ioe) {
throw new AggregationExecutionException("Failed to aggregate [" + name + "]", ioe);
}
Expand Down Expand Up @@ -108,22 +116,22 @@ public void collect(int parentDoc, long bucketOrd) throws IOException {
}
}

int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId;
if (childDocs.docID() > prevParentDoc) {
childDocId = childDocs.docID();
} else {
childDocId = childDocs.advance(prevParentDoc + 1);
}

int numChildren = 0;
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
IntArrayList iterator = getChildren(parentDoc);
final int[] buffer = iterator.buffer;
final int size = iterator.size();
for (int i = 0; i < size; i++) {
numChildren++;
collectBucketNoCounts(childDocId, bucketOrd);
collectBucketNoCounts(buffer[i], bucketOrd);
}
incrementBucketDocCount(bucketOrd, numChildren);
}

@Override
protected void doClose() {
childDocIdBuffers.clear();
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalNested(name, bucketDocCount(owningBucketOrdinal), bucketAggregations(owningBucketOrdinal), metaData());
Expand Down Expand Up @@ -185,5 +193,43 @@ public InternalAggregation buildEmptyAggregation() {
}
}

// The aggs framework can collect buckets for the same parent doc id more than once and because the children docs
// can only be consumed once we need to buffer the child docs. We only need to buffer child docs in the scope
// of the current root doc.

// Examples:
// 1) nested agg wrapped is by terms agg and multiple buckets per document are emitted
// 2) Multiple nested fields are defined. A nested agg joins back to another nested agg via the reverse_nested agg.
// For each child in the first nested agg the second nested agg gets invoked with the same buckets / docids
private IntArrayList getChildren(final int parentDocId) throws IOException {
int rootDocId = rootDocs.nextSetBit(parentDocId);
if (currentRootDoc == rootDocId) {
final IntArrayList childDocIdBuffer = childDocIdBuffers.get(parentDocId);
if (childDocIdBuffer != null) {
return childDocIdBuffer;
} else {
// here we translate the parent doc to a list of its nested docs,
// and then collect buckets for every one of them so they'll be collected
final IntArrayList newChildDocIdBuffer = new IntArrayList();
childDocIdBuffers.put(parentDocId, newChildDocIdBuffer);
int prevParentDoc = parentDocs.prevSetBit(parentDocId - 1);
int childDocId;
if (childDocs.docID() > prevParentDoc) {
childDocId = childDocs.docID();
} else {
childDocId = childDocs.advance(prevParentDoc + 1);
}
for (; childDocId < parentDocId; childDocId = childDocs.nextDoc()) {
newChildDocIdBuffer.add(childDocId);
}
return newChildDocIdBuffer;
}
} else {
this.currentRootDoc = rootDocId;
childDocIdBuffers.clear();
return getChildren(parentDocId);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -451,4 +451,90 @@ public void testParentFilterResolvedCorrectly() throws Exception {
tags = nestedTags.getAggregations().get("tag");
assertThat(tags.getBuckets().size(), equalTo(0)); // and this must be empty
}

@Test
public void nestedSameDocIdProcessedMultipleTime() throws Exception {
assertAcked(
prepareCreate("idx4")
.setSettings(ImmutableSettings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
.addMapping("product", "categories", "type=string", "name", "type=string", "property", "type=nested")
);

client().prepareIndex("idx4", "product", "1").setSource(jsonBuilder().startObject()
.field("name", "product1")
.field("categories", "1", "2", "3", "4")
.startArray("property")
.startObject().field("id", 1).endObject()
.startObject().field("id", 2).endObject()
.startObject().field("id", 3).endObject()
.endArray()
.endObject()).get();
client().prepareIndex("idx4", "product", "2").setSource(jsonBuilder().startObject()
.field("name", "product2")
.field("categories", "1", "2")
.startArray("property")
.startObject().field("id", 1).endObject()
.startObject().field("id", 5).endObject()
.startObject().field("id", 4).endObject()
.endArray()
.endObject()).get();
refresh();

SearchResponse response = client().prepareSearch("idx4").setTypes("product")
.addAggregation(terms("category").field("categories").subAggregation(
nested("property").path("property").subAggregation(
terms("property_id").field("property.id")
)
))
.get();
assertNoFailures(response);
assertHitCount(response, 2);

Terms category = response.getAggregations().get("category");
assertThat(category.getBuckets().size(), equalTo(4));

Terms.Bucket bucket = category.getBucketByKey("1");
assertThat(bucket.getDocCount(), equalTo(2l));
Nested property = bucket.getAggregations().get("property");
assertThat(property.getDocCount(), equalTo(6l));
Terms propertyId = property.getAggregations().get("property_id");
assertThat(propertyId.getBuckets().size(), equalTo(5));
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l));
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l));

bucket = category.getBucketByKey("2");
assertThat(bucket.getDocCount(), equalTo(2l));
property = bucket.getAggregations().get("property");
assertThat(property.getDocCount(), equalTo(6l));
propertyId = property.getAggregations().get("property_id");
assertThat(propertyId.getBuckets().size(), equalTo(5));
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(2l));
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("4").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("5").getDocCount(), equalTo(1l));

bucket = category.getBucketByKey("3");
assertThat(bucket.getDocCount(), equalTo(1l));
property = bucket.getAggregations().get("property");
assertThat(property.getDocCount(), equalTo(3l));
propertyId = property.getAggregations().get("property_id");
assertThat(propertyId.getBuckets().size(), equalTo(3));
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));

bucket = category.getBucketByKey("4");
assertThat(bucket.getDocCount(), equalTo(1l));
property = bucket.getAggregations().get("property");
assertThat(property.getDocCount(), equalTo(3l));
propertyId = property.getAggregations().get("property_id");
assertThat(propertyId.getBuckets().size(), equalTo(3));
assertThat(propertyId.getBucketByKey("1").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("2").getDocCount(), equalTo(1l));
assertThat(propertyId.getBucketByKey("3").getDocCount(), equalTo(1l));
}
}

0 comments on commit 7ca2ef9

Please sign in to comment.