Skip to content

Commit

Permalink
fix: The aggs result of NestedAggregator with sub NestedAggregator ma…
Browse files Browse the repository at this point in the history
…y be not accurately

Signed-off-by: kkewwei <kkewwei@163.com>
  • Loading branch information
kkewwei committed May 21, 2024
1 parent 3fe6674 commit 9ff7178
Show file tree
Hide file tree
Showing 4 changed files with 380 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix negative RequestStats metric issue ([#13553](https://github.com/opensearch-project/OpenSearch/pull/13553))
- Fix get field mapping API returns 404 error in mixed cluster with multiple versions ([#13624](https://github.com/opensearch-project/OpenSearch/pull/13624))
- Allow clearing `remote_store.compatibility_mode` setting ([#13646](https://github.com/opensearch-project/OpenSearch/pull/13646))
- The aggs result of NestedAggregator with sub NestedAggregator may be not accurately ([#13324](https://github.com/opensearch-project/OpenSearch/pull/13324))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.util.BitSet;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.core.ParseField;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.ObjectMapper;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -61,6 +63,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

/**
* Aggregate all docs that match a nested path
*
Expand Down Expand Up @@ -88,12 +92,25 @@ public class NestedAggregator extends BucketsAggregator implements SingleBucketA
) throws IOException {
super(name, factories, context, parent, cardinality, metadata);

Query parentFilter = parentObjectMapper != null ? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter();
Query parentFilter = isParent(parentObjectMapper, childObjectMapper, context.mapperService())
? parentObjectMapper.nestedTypeFilter()
: Queries.newNonNestedFilter();
this.parentFilter = context.bitsetFilterCache().getBitSetProducer(parentFilter);
this.childFilter = childObjectMapper.nestedTypeFilter();
this.collectsFromSingleBucket = cardinality.map(estimate -> estimate < 2);
}

private boolean isParent(ObjectMapper parentObjectMapper, ObjectMapper childObjectMapper, MapperService mapperService) {
if (parentObjectMapper == null) {
return false;
}
ObjectMapper parent;
do {
parent = childObjectMapper.getParentObjectMapper(mapperService);
} while (parent != null && parent != parentObjectMapper);
return parentObjectMapper == parent;
}

@Override
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(ctx);
Expand All @@ -108,19 +125,16 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final L
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
// parentDoc can be 0 when searching:
if (parentDocs == null || childDocs == null) {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getParentAndChildId(parentDocs, childDocs, parentDoc);
int currentParentDoc = res.v1();
int childDocId = res.v2();

for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) {
collectBucket(sub, childDocId, bucket);
}
}
Expand All @@ -130,6 +144,33 @@ public void collect(int parentDoc, long bucket) throws IOException {
}
}

static Tuple<Integer, Integer> getParentAndChildId(BitSet parentDocs, DocIdSetIterator childDocs, int parentDoc) throws IOException {
int currentParentDoc;
int prevParentDoc = parentDocs.prevSetBit(parentDoc);
if (prevParentDoc == -1) {
currentParentDoc = parentDocs.nextSetBit(0);
} else if (prevParentDoc == parentDoc) {
currentParentDoc = parentDoc;
if (currentParentDoc == 0) {
prevParentDoc = -1;
} else {
prevParentDoc = parentDocs.prevSetBit(currentParentDoc - 1);
}
} else {
currentParentDoc = parentDocs.nextSetBit(prevParentDoc + 1);
}

int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}

if (currentParentDoc <= childDocId) {
childDocId = NO_MORE_DOCS;
}
return Tuple.tuple(currentParentDoc, childDocId);
}

@Override
protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException {
super.preGetSubLeafCollectors(ctx);
Expand Down Expand Up @@ -191,9 +232,8 @@ public void setScorer(Scorable scorer) throws IOException {

@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
// parentDoc can be 0 when searching:
if (parentDocs == null || childDocs == null) {
return;
}

Expand All @@ -214,11 +254,9 @@ void processBufferedChildBuckets() throws IOException {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(currentParentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getParentAndChildId(parentDocs, childDocs, currentParentDoc);
int currentParentDoc = res.v1();
int childDocId = res.v2();

for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) {
cachedScorer.doc = childDocId;
Expand Down
Loading

0 comments on commit 9ff7178

Please sign in to comment.