-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Translog file recovery should not rely on lucene commits #25005
Translog file recovery should not rely on lucene commits #25005
Conversation
retest this please |
When we open a translog, we rely on the `translog.ckp` file to tell us what the maximum generation file should be and on the information stored in the last lucene commit to know the first file we need to recover. This requires coordination and is currently subject to a race condition: if a node dies after a lucene commit is made but before we remove the translog generations that were unneeded by it, the next we open the translog we will ignore those files and never delete them (I have added tests for this). This PR changes the approach to have the translog store both of those numbers in the `translog.ckp`. This means it's more self contained and easier to control. This change also decouples the translog recovery logic from the specific commit we're opening. This prepares the ground to fully utilize the deletion policy introduce elastic#24950 and store more translog data that's needed for Lucene, keep multiple lucene commits around, and be free to recover from any of them.
4f7f33a
to
2e4a617
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's figure out something better than that disgusting hack. Otherwise it looks good.
@@ -1154,7 +1156,7 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { | |||
|
|||
public void testSyncedFlush() throws IOException { | |||
try (Store store = createStore(); | |||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { | |||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra space is extra.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, it's part of the try() clause and not the body, I think this is good ? (it's also the auto formatter)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, that is hideous. 😦
@@ -1299,8 +1301,8 @@ public void testVersioningNewIndex() throws IOException { | |||
|
|||
public void testForceMerge() throws IOException { | |||
try (Store store = createStore(); | |||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), | |||
new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP | |||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra space is extra.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, it's part of the try() clause and not the body, I think this is good ? (it's also the auto formatter)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, that is hideous. 😦
FileChannel::open, | ||
TranslogConfig.DEFAULT_BUFFER_SIZE, | ||
() -> globalCheckpoint, () -> generation)) { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like it was unnecessary to touch this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to add the new parameters?
@@ -1831,6 +1834,11 @@ private void commitIndexWriter(final IndexWriter writer, final Translog translog | |||
} | |||
} | |||
|
|||
// commit hook for testing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a matter of taste, but I no longer find these comments to be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mind. just following conventions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I'm doing what I can to influence abandonment of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, you're the reviewer for this one and I don't mind - so gone it is.
@@ -1831,6 +1834,11 @@ private void commitIndexWriter(final IndexWriter writer, final Translog translog | |||
} | |||
} | |||
|
|||
// commit hook for testing | |||
void callCommitOnWriter(IndexWriter writer) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should just be called commit
or commitWriter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I struggled with this name. We already have a commitIndexWriter
, so I figured to be explicit and call this exactly what this does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can achieve the same without introducing this method (that has potential to be used in the wrong place) with the following:
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 18fafb6e90..8bbf320e26 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -1778,7 +1778,7 @@ public class InternalEngine extends Engine {
* @param syncId the sync flush ID ({@code null} if not committing a synced flush)
* @throws IOException if an I/O exception occurs committing the specfied writer
*/
- private void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
+ void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
final long localCheckpoint = seqNoService().getLocalCheckpoint();
@@ -1810,7 +1810,7 @@ public class InternalEngine extends Engine {
return commitData.entrySet().iterator();
});
- callCommitOnWriter(writer);
+ writer.commit();
} catch (final Exception ex) {
try {
failEngine("lucene commit failed", ex);
@@ -1837,11 +1837,6 @@ public class InternalEngine extends Engine {
}
}
- // commit hook for testing
- void callCommitOnWriter(IndexWriter writer) throws IOException {
- writer.commit();
- }
-
private void ensureCanFlush() {
// translog recover happens after the engine is fully constructed
// if we are in this stage we have to prevent flushes from this
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index a767e50d4e..a10381be4a 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -1163,8 +1163,9 @@ public class InternalEngineTests extends ESTestCase {
}
public void testSyncedFlush() throws IOException {
- try (Store store = createStore();
- Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) {
+ try (
+ Store store = createStore();
+ Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
@@ -2496,8 +2497,8 @@ public class InternalEngineTests extends ESTestCase {
final Path translogPath = createTempDir();
try (InternalEngine engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null)) {
@Override
- void callCommitOnWriter(IndexWriter writer) throws IOException {
- super.callCommitOnWriter(writer);
+ void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
+ super.commitIndexWriter(writer, translog, syncId);
if (throwErrorOnCommit.get()) {
throw new RuntimeException("power's out");
}
What do you think?
final long minGenerationToRecoverFrom; | ||
if (checkpoint.minTranslogGeneration < 0) { | ||
final Version indexVersionCreated = indexSettings().getIndexVersionCreated(); | ||
assert indexVersionCreated.before(Version.V_6_0_0_alpha2) : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be Version.V_6_0_0_alpha3
now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep changed.
} | ||
final TranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i)))); | ||
foundTranslogs.add(reader); | ||
logger.debug("recovered local translog from checkpoint {}", checkpoint); | ||
} | ||
Collections.reverse(foundTranslogs); | ||
|
||
// when we clean up files, we first update the checkpoint with a new minReferencedTranslog and then delete them | ||
// if we crush just at the wrong moment, it may be that we leave one unreferenced file behind. Delete it if there |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crush
-> crash
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
. Delete it if there
-> so we delete it if they exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. thx.
} | ||
final TranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i)))); | ||
foundTranslogs.add(reader); | ||
logger.debug("recovered local translog from checkpoint {}", checkpoint); | ||
} | ||
Collections.reverse(foundTranslogs); | ||
|
||
// when we clean up files, we first update the checkpoint with a new minReferencedTranslog and then delete them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
them
-> them;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol. done.
*/ | ||
long getMinFileGeneration() { | ||
try (ReleasableLock ignored = readLock.acquire()) { | ||
if (readers.isEmpty() == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's change the conditional so we can avoid the negative:
if (readers.isEmpty()) {
return current.getGeneration()
} else {
return readers.get(0).getGeneration();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
globalCheckpointSupplier); | ||
globalCheckpointSupplier, | ||
minTranslogGenerationSupplier | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we place this on the end of the previous line?
Thx @jasontedor . I addressed all your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some nits LGTM otherwise
IOUtils.closeWhileHandlingException(unreferencedReader); | ||
IOUtils.deleteFilesIgnoringExceptions(translogPath, | ||
translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration()))); | ||
// update the checkpoint not to reference the removed file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can this comment be more clear ie. tell us what we update to make sure we don't ref this file.
"deletion policy requires a minReferenceGen of [" + minReferencedGen + "] which is higher than the current generation [" | ||
+ currentFileGeneration() + "]"; | ||
|
||
while (readers.isEmpty() == false && readers.get(0).getGeneration() < minReferencedGen) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use an iterator here instead? it would be more clear to me if we'd do that..
IOUtils.deleteFilesIgnoringExceptions(translogPath, | ||
translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration()))); | ||
// update the checkpoint not to reference the removed file | ||
current.sync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we try to delete in a finally block here? best effort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we fail to sync, I think we want to keep the file around because it's being referenced by the ckp?
@@ -494,6 +501,15 @@ public void assertFileDeleted(Translog translog, long id) { | |||
assertFalse("translog [" + id + "] still exists", Files.exists(translog.location().resolve(Translog.getFilename(id)))); | |||
} | |||
|
|||
private void assertFilesPresence(Translog translog) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think to be ultra-pedantic it should be assertFilePresences
.
@@ -1299,8 +1301,8 @@ public void testVersioningNewIndex() throws IOException { | |||
|
|||
public void testForceMerge() throws IOException { | |||
try (Store store = createStore(); | |||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), | |||
new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP | |||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, that is hideous. 😦
…in_checkpoint # Conflicts: # core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few more comments.
589c735
to
440ecc9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
final Path translogPath = createTempDir(); | ||
try (InternalEngine engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null)) { | ||
@Override | ||
protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of making this method protected I think we should use MockDirectoryWrapper#failOn(Failure)
and pass some failure to it that fails if we commit the indexwriter like this:
Failure fail = new Failure() {
@Override
public void eval(MockDirectoryWrapper dir) throws IOException {
for (StackTraceElement e : Thread.currentThread().getStackTrace()) {
if (doFail && "commit".equals(e.getMethodName())) {
throw new FakeIOException();
}
}
}
};
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm okay with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@s1monw I tried this is in many variants and non of them was good. The Failure as stated fails too early (before the segmetns_N
file is written). I tried many other variants but none of them allow all of the commit logic to complete without triggering any failure handling in the IndexWriter which also means later on that the new commits files are cleaned by a rollback we do when we fail the engine. Even if we do find a way to do this, I think it will be way too brittle and tend to break with changes in Lucene. Bottom line, I prefer to keep the current solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough
…in_checkpoint # Conflicts: # core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java # core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
Thanks @jasontedor @s1monw for the thorough review. |
When we open a translog, we rely on the
translog.ckp
file to tell us what the maximum generation file should be and on the information stored in the last lucene commit to know the first file we need to recover. This requires coordination and is currently subject to a race condition: if a node dies after a lucene commit is made but before we remove the translog generations that were unneeded by it, the next time we open the translog we will ignore those files and never delete them (I have added tests for this).This PR changes the approach to have the translog store both of those numbers in the
translog.ckp
. This means it's more self contained and easier to control.This change also decouples the translog recovery logic from the specific commit we're opening. This prepares the ground to fully utilize the deletion policy introduced in #24950 and store more translog data that's needed for Lucene, keep multiple lucene commits around and be free to recover from any of them.