From 892b7d55c59f2adf9c549cc89b7daf6af6dbafd7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 22 Mar 2018 14:31:15 -0400 Subject: [PATCH] Harden periodically check to avoid endless flush loop (#29125) In #28350, we fixed an endless flushing loop which may happen on replicas by tightening the relation between the flush action and the periodically flush condition. 1. The periodically flush condition is enabled only if it is disabled after a flush. 2. If the periodically flush condition is enabled then a flush will actually happen regardless of Lucene state. (1) and (2) guarantee that a flushing loop will be terminated. Sadly, the condition 1 can be violated in edge cases as we used two different algorithms to evaluate the current and future uncommitted translog size. - We use method `uncommittedSizeInBytes` to calculate current uncommitted size. It is the sum of translogs whose generation at least the minGen (determined by a given seqno). We pick a continuous range of translogs since the minGen to evaluate the current uncommitted size. - We use method `sizeOfGensAboveSeqNoInBytes` to calculate the future uncommitted size. It is the sum of translogs whose maxSeqNo at least the given seqNo. Here we don't pick a range but select translog one by one. Suppose we have 3 translogs `gen1={#1,#2}, gen2={}, gen3={#3} and seqno=#1`, `uncommittedSizeInBytes` is the sum of gen1, gen2, and gen3 while `sizeOfGensAboveSeqNoInBytes` is the sum of gen1 and gen3. Gen2 is excluded because its maxSeqno is still -1. This commit removes both `sizeOfGensAboveSeqNoInBytes` and `uncommittedSizeInBytes` methods, then enforces an engine to use only `sizeInBytesByMinGen` method to evaluate the periodically flush condition. Closes #29097 Relates ##28350 --- .../index/engine/InternalEngine.java | 30 ++++++--- .../index/translog/Translog.java | 38 ++--------- .../translog/TranslogDeletionPolicy.java | 1 - .../index/engine/InternalEngineTests.java | 66 +++++++++++++++---- .../index/shard/IndexShardIT.java | 20 +++--- .../index/translog/TranslogTests.java | 6 +- .../indices/recovery/RecoveryTests.java | 2 +- 7 files changed, 95 insertions(+), 68 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 233644a5a8fc2..dd35d7c7c390c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1509,7 +1509,8 @@ final boolean tryRenewSyncCommit() { ensureOpen(); ensureCanFlush(); String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID); - if (syncId != null && translog.uncommittedOperations() == 0 && indexWriter.hasUncommittedChanges()) { + long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); + if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) { logger.trace("start renewing sync commit [{}]", syncId); commitIndexWriter(indexWriter, translog, syncId); logger.debug("successfully sync committed. sync id [{}].", syncId); @@ -1531,19 +1532,30 @@ final boolean tryRenewSyncCommit() { @Override public boolean shouldPeriodicallyFlush() { ensureOpen(); + final long translogGenerationOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); - final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes(); - if (uncommittedSizeOfCurrentCommit < flushThreshold) { + if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { return false; } /* - * We should only flush ony if the shouldFlush condition can become false after flushing. - * This condition will change if the `uncommittedSize` of the new commit is smaller than - * the `uncommittedSize` of the current commit. This method is to maintain translog only, - * thus the IndexWriter#hasUncommittedChanges condition is not considered. + * We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be + * below the flush-threshold after a flush. To avoid getting into an endless loop of flushing, we only enable the + * periodically flush condition if this condition is disabled after a flush. The condition will change if the new + * commit points to the later generation the last commit's(eg. gen-of-last-commit < gen-of-new-commit)[1]. + * + * When the local checkpoint equals to max_seqno, and translog-gen of the last commit equals to translog-gen of + * the new commit, we know that the last generation must contain operations because its size is above the flush + * threshold and the flush-threshold is guaranteed to be higher than an empty translog by the setting validation. + * This guarantees that the new commit will point to the newly rolled generation. In fact, this scenario only + * happens when the generation-threshold is close to or above the flush-threshold; otherwise we have rolled + * generations as the generation-threshold was reached, then the first condition (eg. [1]) is already satisfied. + * + * This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered. */ - final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1); - return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit; + final long translogGenerationOfNewCommit = + translog.getMinGenerationForSeqNo(localCheckpointTracker.getCheckpoint() + 1).translogFileGeneration; + return translogGenerationOfLastCommit < translogGenerationOfNewCommit + || localCheckpointTracker.getCheckpoint() == localCheckpointTracker.getMaxSeqNo(); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 4a643fb993bcf..ed7c6a4d33f9b 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -356,26 +356,11 @@ public long getMinFileGeneration() { } } - - /** - * Returns the number of operations in the translog files that aren't committed to lucene. - */ - public int uncommittedOperations() { - return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit()); - } - - /** - * Returns the size in bytes of the translog files that aren't committed to lucene. - */ - public long uncommittedSizeInBytes() { - return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit()); - } - /** * Returns the number of operations in the translog files */ public int totalOperations() { - return totalOperations(-1); + return totalOperationsByMinGen(-1); } /** @@ -406,9 +391,9 @@ static long findEarliestLastModifiedAge(long currentTime, Iterable engine.recoverFromTranslog()); assertEquals(2, engine.getTranslog().currentFileGeneration()); - assertEquals(0L, engine.getTranslog().uncommittedOperations()); + assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); } } @@ -3839,7 +3838,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) { System.nanoTime(), reason)); assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 1))); - assertThat(noOpEngine.getTranslog().uncommittedOperations(), equalTo(1 + gapsFilled)); + assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(1 + gapsFilled)); // skip to the op that we added to the translog Translog.Operation op; Translog.Operation last = null; @@ -4040,7 +4039,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint()); recoveringEngine = new InternalEngine(copy( replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get)); - assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations()); + assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations()); recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint()); @@ -4075,7 +4074,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { recoveringEngine = new InternalEngine( copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get)); if (flushed) { - assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations()); + assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); } recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); @@ -4505,7 +4504,8 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception { public void testShouldPeriodicallyFlush() throws Exception { assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); // A new engine may have more than one empty translog files - the test should account this extra. - final long extraTranslogSizeInNewEngine = engine.getTranslog().uncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; + final Translog translog = engine.getTranslog(); + final long extraTranslogSizeInNewEngine = engine.getTranslog().stats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; int numDocs = between(10, 100); for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); @@ -4513,17 +4513,17 @@ public void testShouldPeriodicallyFlush() throws Exception { } assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false)); long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, - engine.getTranslog().uncommittedSizeInBytes() - extraTranslogSizeInNewEngine); + engine.getTranslog().stats().getUncommittedSizeInBytes()- extraTranslogSizeInNewEngine); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) .settings(Settings.builder().put(indexSettings.getSettings()) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); // Stale operations skipped by Lucene but added to translog - still able to flush for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); @@ -4531,13 +4531,53 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat(result.isCreated(), equalTo(false)); } SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); + // If the new index commit still points to the same translog generation as the current index commit, + // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes. + engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here + for (int id = 0; id < numDocs; id++) { + if (randomBoolean()) { + translog.rollGeneration(); + } + final ParsedDocument doc = testParsedDocument("new" + id, null, testDocumentWithTextField(), SOURCE, null); + engine.index(replicaIndexForDoc(doc, 2L, engine.getLocalCheckpointTracker().generateSeqNo(), false)); + if (engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + } + } } + public void testStressShouldPeriodicallyFlush() throws Exception { + final long flushThreshold = randomLongBetween(100, 5000); + final long generationThreshold = randomLongBetween(1000, 5000); + final IndexSettings indexSettings = engine.config().getIndexSettings(); + final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) + .settings(Settings.builder().put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), generationThreshold + "b") + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); + indexSettings.updateIndexMetaData(indexMetaData); + engine.onSettingsChanged(); + final int numOps = scaledRandomIntBetween(100, 10_000); + for (int i = 0; i < numOps; i++) { + final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint(); + final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5); + final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null); + engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); + if (rarely() && engine.getTranslog().shouldRollGeneration()) { + engine.rollTranslogGeneration(); + } + if (rarely() || engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + } + } + } public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException { final int iters = randomIntBetween(1, 15); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 622a9b1acc363..2eccc0d45bbf4 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -359,29 +359,29 @@ public void testMaybeFlush() throws Exception { IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {}); assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = shard.getEngine().getTranslog(); - assertEquals(2, translog.uncommittedOperations()); + assertEquals(2, translog.stats().getUncommittedOperations()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, translog.uncommittedOperations()); + assertEquals(0, translog.stats().getUncommittedOperations()); translog.sync(); - long size = Math.max(translog.uncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); - logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + long size = Math.max(translog.stats().getUncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); + logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", + translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) .build()).get(); client().prepareDelete("test", "test", "2").get(); - logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", + translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); assertBusy(() -> { // this is async - logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", + translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, translog.uncommittedOperations()); + assertEquals(0, translog.stats().getUncommittedOperations()); } public void testMaybeRollTranslogGeneration() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 5407dbb911b69..3583356e0780c 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -502,10 +502,10 @@ public void testUncommittedOperations() throws Exception { translog.rollGeneration(); operationsInLastGen = 0; } - assertThat(translog.uncommittedOperations(), equalTo(uncommittedOps)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(uncommittedOps)); if (frequently()) { markCurrentGenAsCommitted(translog); - assertThat(translog.uncommittedOperations(), equalTo(operationsInLastGen)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(operationsInLastGen)); uncommittedOps = operationsInLastGen; } } @@ -2517,7 +2517,7 @@ public void testRollGeneration() throws Exception { long minGenForRecovery = randomLongBetween(generation, generation + rolls); commit(translog, minGenForRecovery, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); - assertThat(translog.uncommittedOperations(), equalTo(0)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(0)); if (longRetention) { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index a496664c0260b..49e557c3dde78 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -306,7 +306,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startAll(); int numDocs = shards.indexDocs(between(10, 100)); - final long translogSizeOnPrimary = shards.getPrimary().getTranslog().uncommittedSizeInBytes(); + final long translogSizeOnPrimary = shards.getPrimary().translogStats().getUncommittedSizeInBytes(); shards.flush(); final IndexShard replica = shards.addReplica();