Skip to content

Commit

Permalink
fix: The aggs result of NestedAggregator with sub NestedAggregator ma…
Browse files Browse the repository at this point in the history
…y be not accurately
  • Loading branch information
kkewwei committed Apr 21, 2024
1 parent b4692c8 commit 1b76a6a
Showing 1 changed file with 41 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.util.BitSet;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.core.ParseField;
import org.opensearch.index.mapper.ObjectMapper;
Expand All @@ -60,6 +61,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Aggregate all docs that match a nested path
Expand Down Expand Up @@ -106,21 +108,18 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final L
final DocIdSetIterator childDocs = childDocsScorer != null ? childDocsScorer.iterator() : null;
if (collectsFromSingleBucket) {
return new LeafBucketCollectorBase(sub, null) {
AtomicBoolean parentIdBeforeChild = new AtomicBoolean();
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
// search parentDoc can be 0:
if (parentDocs == null || childDocs == null) {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}

for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
Tuple<Integer, Integer> res = getChildAndNextParent(parentDocs, childDocs, parentDoc, parentIdBeforeChild);
int childDocId = res.v1();
int nextParentDoc = res.v2();
for (; childDocId < nextParentDoc; childDocId = childDocs.nextDoc()) {
collectBucket(sub, childDocId, bucket);
}
}
Expand All @@ -130,6 +129,31 @@ public void collect(int parentDoc, long bucket) throws IOException {
}
}

private Tuple<Integer, Integer> getChildAndNextParent(BitSet parentDocs, DocIdSetIterator childDocs, int parentDoc,
AtomicBoolean parentIdBeforeChild) throws IOException {
int childDocId = childDocs.docID();
if (childDocId == -1) {
childDocId = childDocs.advance(-1);
if (childDocId > parentDoc) {
parentIdBeforeChild.set(true);
}
}

int nextPossibleChild;
int nextParentDoc = parentDoc;
// parent is behind child
if (parentIdBeforeChild.get()) {
nextPossibleChild = parentDoc;
nextParentDoc = parentDocs.nextSetBit(parentDoc + 1);
} else {
nextPossibleChild = parentDocs.prevSetBit(parentDoc - 1);
}
if (childDocId <= nextPossibleChild) {
childDocId = childDocs.advance(nextPossibleChild + 1);
}
return Tuple.tuple(childDocId, nextParentDoc);
}

@Override
protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException {
super.preGetSubLeafCollectors(ctx);
Expand Down Expand Up @@ -174,6 +198,7 @@ class BufferingNestedLeafBucketCollector extends LeafBucketCollectorBase {

Scorable scorer;
int currentParentDoc = -1;
AtomicBoolean parentIdBeforeChild = new AtomicBoolean();
final CachedScorable cachedScorer = new CachedScorable();

BufferingNestedLeafBucketCollector(LeafBucketCollector sub, BitSet parentDocs, DocIdSetIterator childDocs) {
Expand All @@ -191,9 +216,8 @@ public void setScorer(Scorable scorer) throws IOException {

@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
// search parentDoc can be 0:
if (parentDocs == null || childDocs == null) {
return;
}

Expand All @@ -214,13 +238,11 @@ void processBufferedChildBuckets() throws IOException {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(currentParentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getChildAndNextParent(parentDocs, childDocs, currentParentDoc, parentIdBeforeChild);
int childDocId = res.v1();
int nextParentDoc = res.v2();

for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) {
for (; childDocId < nextParentDoc; childDocId = childDocs.nextDoc()) {
cachedScorer.doc = childDocId;
for (var bucket : bucketBuffer) {
collectBucket(sub, childDocId, bucket);
Expand Down

0 comments on commit 1b76a6a

Please sign in to comment.