-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sequence numbers commit data for Lucene uses Iterable interface #20793
Conversation
Sequence number related data (maximum sequence number, local checkpoint, and global checkpoint) gets stored in Lucene on each commit. The logical place to store this data is on each Lucene commit's user commit data structure (see IndexWriter#setCommitData and the new version IndexWriter#setLiveCommitData). However, previously we did not store the maximum sequence number in the commit data because the commit data got copied over before the Lucene IndexWriter flushed the documents to segments in the commit. This means that between the time that the commit data was set on the IndexWriter and the time that the IndexWriter completes the commit, documents with higher sequence numbers could have entered the commit. Hence, we would use FieldStats on the _seq_no field in the documents to get the maximum sequence number value, but this suffers the drawback that if the last sequence number in the commit corresponded to a delete document action, that sequence number would not show up in FieldStats as there would be no corresponding document in Lucene. In Lucene 6.2, the commit data was changed to take an Iterable interface, so that the commit data can be calculated and retrieved *after* all documents have been flushed, while the commit data itself is being set on the Lucene commit. This commit changes max_seq_no so it is stored in the commit data instead of being calculated from FieldStats, taking advantage of the deferred calculation of the max_seq_no through passing an Iterable that dynamically sets the iterator data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx @abeyad . I left some minor comments. Can we increase the test to cover delete operations?
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit | ||
* data iterator (which occurs after all documents have been flushed to Lucene). | ||
*/ | ||
final Map<String, String> deferredCommitData = new HashMap<>(commitData.size() + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be simpler to just capture the local and global checkpoints before we start and build one map here. This way we don't need two maps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -1336,10 +1321,26 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn | |||
} | |||
|
|||
if (logger.isTraceEnabled()) { | |||
logger.trace("committing writer with commit data [{}]", commitData); | |||
logger.trace("committing writer with commit data (max_seq_no excluded) [{}]", commitData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we log this after the commit so we have everything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do, in doing this, I noticed there is an issue in how the commit data is set - anytime the iterator() is called, it will recompute the maxSeqNo based on the current value of what the SequenceNumbersService returns. We only want this done once so each subsequent time we call writer.getLiveCommitData()
, we get the same value that went into the commit. I'm fixing this and will push up a new commit
intertwined with document indexing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a minor suggestion. Also, I think it will be great to have a test that concurrently indexes and commits and makes sure that with every commit point, all ops below the local checkpoint are present and no ops about the max_seq is present. Feel free to add this test here or in a follow up change.
writer.setLiveCommitData(new Iterable<Map.Entry<String, String>>() { | ||
// save the max seq no the first time its computed, so subsequent iterations don't recompute, | ||
// potentially getting a different value | ||
private String computedMaxSeqNoEntry = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like we need this cashing because we load user data from the index writer. maybe we can use lastCommittedSegmentInfos
, which can be read earlier when opening the engine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bleskes I agree with using lastCommittedSegmentInfos
from the engine as a solution, but my concern here was that basically we have to document and ensure that no one uses IndexWriter#getLiveCommitData
or depends on it for accurate information, otherwise the max_seq_no could be different than what is actually stored in the commit. That's why I added the caching part, which does increase complexity. Do you prefer I remove it and just document that we should never call IndexWriter#getLiveCommitData
inside ES code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's OK to not use IndexWriter#getLiveCommitData
- as it may not return what's in the last commit. I suspect this is why it was renamed to say live
in the name. Is there anything specific you are concerned about?
to Lucene, ensuring the sequence number related commit data in each Lucene commit point matches the invariants of localCheckpoint <= highest sequence number in commit <= maxSeqNo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx @abeyad . This looks great. I left some minor comments around testing.
} | ||
commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); | ||
if (logger.isTraceEnabled()) { | ||
logger.trace("committed writer with commit data [{}]", commitData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should be "commit_ting_ writer with commit data"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
try { | ||
initialEngine.index(index); | ||
final String id; | ||
boolean versionConflict = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
initialEngine.seqNoService().getMaxSeqNo(), | ||
// its possible we haven't indexed any documents yet, or its possible that right after a commit, a version conflict | ||
// exception happened so the max seq no was not updated, so here we check greater than or equal to | ||
initialEngine.seqNoService().getMaxSeqNo() != SequenceNumbersService.NO_OPS_PERFORMED || versionConflict ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we need this extra check? what were you aiming at testing ? feels like it's for the time we had caching?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't asserting anything relevant about the saved commit data any longer, so I'm removing it
assertThat( | ||
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), | ||
equalTo(primarySeqNo)); | ||
assertThat( | ||
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), | ||
equalTo(globalCheckpoint)); | ||
assertThat( | ||
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)), | ||
// after recovering from translog, all docs have been flushed to Lucene segments, so check against primarySeqNo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I follow this comment? can you elaborate?
@@ -1695,6 +1766,118 @@ public void testSeqNoAndCheckpoints() throws IOException { | |||
} | |||
} | |||
|
|||
// this test writes documents to the engine while concurrently flushing/commit | |||
// and ensuring that the commit points contain the correct sequence number data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++.. thx!
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} finally { | ||
threadStatuses.get(threadIdx).set(true); // signal that this thread is done indexing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just use thread.alive() here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, done
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { | ||
|
||
final int numIndexingThreads = randomIntBetween(4, 7); | ||
final int numDocsPerThread = randomIntBetween(500, 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how long does this test run? wondering if we should make it lighter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Average run is about 2.2 seconds on my Mac. I agree, could be lighter.. but dropping to randomIntBetween(100,500)
reduces the execution time to 1.95 secs on average. Not sure if its worth going lower than that as we likely won't often get useful commits to check against (all docs would go in a single commit).
long maxSeqNo = userData.containsKey(InternalEngine.MAX_SEQ_NO) ? | ||
Long.parseLong(userData.get(InternalEngine.MAX_SEQ_NO)) : | ||
SequenceNumbersService.UNASSIGNED_SEQ_NO; | ||
assertThat(localCheckpoint, greaterThanOrEqualTo(prevLocalCheckpoint)) ; // local checkpoint shouldn't go backwards |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
show we have the same for maxSeqNo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
final Bits bits = leaf.getLiveDocs(); | ||
for (int docID = 0; docID < leaf.maxDoc(); docID++) { | ||
if (bits == null || bits.get(docID)) { | ||
bitSet.set((int) values.get(docID)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we check this is not set already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
thanks for the review @bleskes ! |
Sequence number related data (maximum sequence number, local checkpoint,
and global checkpoint) gets stored in Lucene on each commit. The logical
place to store this data is on each Lucene commit's user commit data
structure (see IndexWriter#setCommitData and the new version
IndexWriter#setLiveCommitData). However, previously we did not store the
maximum sequence number in the commit data because the commit data got
copied over before the Lucene IndexWriter flushed the documents to segments
in the commit. This meant that between the time that the commit data was
set on the IndexWriter and the time that the IndexWriter completes the commit,
documents with higher sequence numbers could have entered the commit.
Hence, we would use FieldStats on the _seq_no field in the documents to get
the maximum sequence number value, but this suffers the drawback that if the
last sequence number in the commit corresponded to a delete document action,
that sequence number would not show up in FieldStats as there would be no
corresponding document in Lucene.
In Lucene 6.2, commit data was changed to take an Iterable interface, so
that the commit data can be calculated and retrieved after all documents
have been flushed. This commit changes max_seq_no so it is stored in the
commit data instead of being calculated from FieldStats, taking advantage of
the deferred calculation of the max_seq_no through passing an Iterable that
dynamically sets the iterator data.
Relates #10708