diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 233644a5a8fc2..dd35d7c7c390c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1509,7 +1509,8 @@ final boolean tryRenewSyncCommit() { ensureOpen(); ensureCanFlush(); String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID); - if (syncId != null && translog.uncommittedOperations() == 0 && indexWriter.hasUncommittedChanges()) { + long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); + if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) { logger.trace("start renewing sync commit [{}]", syncId); commitIndexWriter(indexWriter, translog, syncId); logger.debug("successfully sync committed. sync id [{}].", syncId); @@ -1531,19 +1532,30 @@ final boolean tryRenewSyncCommit() { @Override public boolean shouldPeriodicallyFlush() { ensureOpen(); + final long translogGenerationOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); - final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes(); - if (uncommittedSizeOfCurrentCommit < flushThreshold) { + if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { return false; } /* - * We should only flush ony if the shouldFlush condition can become false after flushing. - * This condition will change if the `uncommittedSize` of the new commit is smaller than - * the `uncommittedSize` of the current commit. This method is to maintain translog only, - * thus the IndexWriter#hasUncommittedChanges condition is not considered. + * We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be + * below the flush-threshold after a flush. To avoid getting into an endless loop of flushing, we only enable the + * periodically flush condition if this condition is disabled after a flush. The condition will change if the new + * commit points to the later generation the last commit's(eg. gen-of-last-commit < gen-of-new-commit)[1]. + * + * When the local checkpoint equals to max_seqno, and translog-gen of the last commit equals to translog-gen of + * the new commit, we know that the last generation must contain operations because its size is above the flush + * threshold and the flush-threshold is guaranteed to be higher than an empty translog by the setting validation. + * This guarantees that the new commit will point to the newly rolled generation. In fact, this scenario only + * happens when the generation-threshold is close to or above the flush-threshold; otherwise we have rolled + * generations as the generation-threshold was reached, then the first condition (eg. [1]) is already satisfied. + * + * This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered. */ - final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1); - return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit; + final long translogGenerationOfNewCommit = + translog.getMinGenerationForSeqNo(localCheckpointTracker.getCheckpoint() + 1).translogFileGeneration; + return translogGenerationOfLastCommit < translogGenerationOfNewCommit + || localCheckpointTracker.getCheckpoint() == localCheckpointTracker.getMaxSeqNo(); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 4a643fb993bcf..ed7c6a4d33f9b 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -356,26 +356,11 @@ public long getMinFileGeneration() { } } - - /** - * Returns the number of operations in the translog files that aren't committed to lucene. - */ - public int uncommittedOperations() { - return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit()); - } - - /** - * Returns the size in bytes of the translog files that aren't committed to lucene. - */ - public long uncommittedSizeInBytes() { - return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit()); - } - /** * Returns the number of operations in the translog files */ public int totalOperations() { - return totalOperations(-1); + return totalOperationsByMinGen(-1); } /** @@ -406,9 +391,9 @@ static long findEarliestLastModifiedAge(long currentTime, Iterable engine.recoverFromTranslog()); assertEquals(2, engine.getTranslog().currentFileGeneration()); - assertEquals(0L, engine.getTranslog().uncommittedOperations()); + assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); } } @@ -3839,7 +3838,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) { System.nanoTime(), reason)); assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 1))); - assertThat(noOpEngine.getTranslog().uncommittedOperations(), equalTo(1 + gapsFilled)); + assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(1 + gapsFilled)); // skip to the op that we added to the translog Translog.Operation op; Translog.Operation last = null; @@ -4040,7 +4039,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint()); recoveringEngine = new InternalEngine(copy( replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get)); - assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations()); + assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations()); recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint()); @@ -4075,7 +4074,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { recoveringEngine = new InternalEngine( copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get)); if (flushed) { - assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations()); + assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); } recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); @@ -4505,7 +4504,8 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception { public void testShouldPeriodicallyFlush() throws Exception { assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); // A new engine may have more than one empty translog files - the test should account this extra. - final long extraTranslogSizeInNewEngine = engine.getTranslog().uncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; + final Translog translog = engine.getTranslog(); + final long extraTranslogSizeInNewEngine = engine.getTranslog().stats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; int numDocs = between(10, 100); for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); @@ -4513,17 +4513,17 @@ public void testShouldPeriodicallyFlush() throws Exception { } assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false)); long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, - engine.getTranslog().uncommittedSizeInBytes() - extraTranslogSizeInNewEngine); + engine.getTranslog().stats().getUncommittedSizeInBytes()- extraTranslogSizeInNewEngine); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) .settings(Settings.builder().put(indexSettings.getSettings()) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); // Stale operations skipped by Lucene but added to translog - still able to flush for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); @@ -4531,13 +4531,53 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat(result.isCreated(), equalTo(false)); } SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); + // If the new index commit still points to the same translog generation as the current index commit, + // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes. + engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here + for (int id = 0; id < numDocs; id++) { + if (randomBoolean()) { + translog.rollGeneration(); + } + final ParsedDocument doc = testParsedDocument("new" + id, null, testDocumentWithTextField(), SOURCE, null); + engine.index(replicaIndexForDoc(doc, 2L, engine.getLocalCheckpointTracker().generateSeqNo(), false)); + if (engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + } + } } + public void testStressShouldPeriodicallyFlush() throws Exception { + final long flushThreshold = randomLongBetween(100, 5000); + final long generationThreshold = randomLongBetween(1000, 5000); + final IndexSettings indexSettings = engine.config().getIndexSettings(); + final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) + .settings(Settings.builder().put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), generationThreshold + "b") + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); + indexSettings.updateIndexMetaData(indexMetaData); + engine.onSettingsChanged(); + final int numOps = scaledRandomIntBetween(100, 10_000); + for (int i = 0; i < numOps; i++) { + final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint(); + final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5); + final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null); + engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); + if (rarely() && engine.getTranslog().shouldRollGeneration()) { + engine.rollTranslogGeneration(); + } + if (rarely() || engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + } + } + } public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException { final int iters = randomIntBetween(1, 15); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 622a9b1acc363..2eccc0d45bbf4 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -359,29 +359,29 @@ public void testMaybeFlush() throws Exception { IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {}); assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = shard.getEngine().getTranslog(); - assertEquals(2, translog.uncommittedOperations()); + assertEquals(2, translog.stats().getUncommittedOperations()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, translog.uncommittedOperations()); + assertEquals(0, translog.stats().getUncommittedOperations()); translog.sync(); - long size = Math.max(translog.uncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); - logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + long size = Math.max(translog.stats().getUncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); + logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", + translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) .build()).get(); client().prepareDelete("test", "test", "2").get(); - logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", + translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); assertBusy(() -> { // this is async - logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", + translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, translog.uncommittedOperations()); + assertEquals(0, translog.stats().getUncommittedOperations()); } public void testMaybeRollTranslogGeneration() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 5407dbb911b69..3583356e0780c 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -502,10 +502,10 @@ public void testUncommittedOperations() throws Exception { translog.rollGeneration(); operationsInLastGen = 0; } - assertThat(translog.uncommittedOperations(), equalTo(uncommittedOps)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(uncommittedOps)); if (frequently()) { markCurrentGenAsCommitted(translog); - assertThat(translog.uncommittedOperations(), equalTo(operationsInLastGen)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(operationsInLastGen)); uncommittedOps = operationsInLastGen; } } @@ -2517,7 +2517,7 @@ public void testRollGeneration() throws Exception { long minGenForRecovery = randomLongBetween(generation, generation + rolls); commit(translog, minGenForRecovery, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); - assertThat(translog.uncommittedOperations(), equalTo(0)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(0)); if (longRetention) { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index a496664c0260b..49e557c3dde78 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -306,7 +306,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startAll(); int numDocs = shards.indexDocs(between(10, 100)); - final long translogSizeOnPrimary = shards.getPrimary().getTranslog().uncommittedSizeInBytes(); + final long translogSizeOnPrimary = shards.getPrimary().translogStats().getUncommittedSizeInBytes(); shards.flush(); final IndexShard replica = shards.addReplica();