diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1bcf2687ad8ca..941151aae9acb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1479,7 +1479,8 @@ final boolean tryRenewSyncCommit() { ensureOpen(); ensureCanFlush(); String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID); - if (syncId != null && translog.uncommittedOperations() == 0 && indexWriter.hasUncommittedChanges()) { + long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); + if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) { logger.trace("start renewing sync commit [{}]", syncId); commitIndexWriter(indexWriter, translog, syncId); logger.debug("successfully sync committed. sync id [{}].", syncId); @@ -1501,26 +1502,30 @@ final boolean tryRenewSyncCommit() { @Override public boolean shouldPeriodicallyFlush() { ensureOpen(); + final long translogGenerationOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); - final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes(); - if (uncommittedSizeOfCurrentCommit < flushThreshold) { + if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { return false; } /* - * We should only flush ony if the shouldFlush condition can become false after flushing. - * This condition will change if the `uncommittedSize` of the new commit is smaller than - * the `uncommittedSize` of the current commit. This method is to maintain translog only, - * thus the IndexWriter#hasUncommittedChanges condition is not considered. - */ - final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1); - /* - * If flushThreshold is too small, we may repeatedly flush even there is no uncommitted operation - * as #sizeOfGensAboveSeqNoInByte and #uncommittedSizeInBytes can return different values. - * An empty translog file has non-zero `uncommittedSize` (the translog header), and method #sizeOfGensAboveSeqNoInBytes can - * return 0 now(no translog gen contains ops above local checkpoint) but method #uncommittedSizeInBytes will return an actual - * non-zero value after rolling a new translog generation. This can be avoided by checking the actual uncommitted operations. + * We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be + * below the flush-threshold after a flush. To avoid getting into an endless loop of flushing, we only enable the + * periodically flush condition if this condition is disabled after a flush. The condition will change if the new + * commit points to the later generation the last commit's(eg. gen-of-last-commit < gen-of-new-commit)[1]. + * + * When the local checkpoint equals to max_seqno, and translog-gen of the last commit equals to translog-gen of + * the new commit, we know that the last generation must contain operations because its size is above the flush + * threshold and the flush-threshold is guaranteed to be higher than an empty translog by the setting validation. + * This guarantees that the new commit will point to the newly rolled generation. In fact, this scenario only + * happens when the generation-threshold is close to or above the flush-threshold; otherwise we have rolled + * generations as the generation-threshold was reached, then the first condition (eg. [1]) is already satisfied. + * + * This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered. */ - return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit && translog.uncommittedOperations() > 0; + final long translogGenerationOfNewCommit = + translog.getMinGenerationForSeqNo(localCheckpointTracker.getCheckpoint() + 1).translogFileGeneration; + return translogGenerationOfLastCommit < translogGenerationOfNewCommit + || localCheckpointTracker.getCheckpoint() == localCheckpointTracker.getMaxSeqNo(); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index ffdfad13c515d..d46a6e00a639b 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -106,6 +106,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC public static final String TRANSLOG_FILE_SUFFIX = ".tlog"; public static final String CHECKPOINT_SUFFIX = ".ckp"; public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX; + public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogWriter.getHeaderLength(UUIDs.randomBase64UUID()); static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$"); @@ -372,26 +373,11 @@ public long getMinFileGeneration() { } } - - /** - * Returns the number of operations in the translog files that aren't committed to lucene. - */ - public int uncommittedOperations() { - return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit()); - } - - /** - * Returns the size in bytes of the translog files that aren't committed to lucene. - */ - public long uncommittedSizeInBytes() { - return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit()); - } - /** * Returns the number of operations in the translog files */ public int totalOperations() { - return totalOperations(-1); + return totalOperationsByMinGen(-1); } /** @@ -402,9 +388,9 @@ public long sizeInBytes() { } /** - * Returns the number of operations in the transaction files that aren't committed to lucene.. + * Returns the number of operations in the translog files at least the given generation */ - private int totalOperations(long minGeneration) { + public int totalOperationsByMinGen(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return Stream.concat(readers.stream(), Stream.of(current)) @@ -425,9 +411,9 @@ public int estimateTotalOperationsFromMinSeq(long minSeqNo) { } /** - * Returns the size in bytes of the translog files above the given generation + * Returns the size in bytes of the translog files at least the given generation */ - private long sizeInBytesByMinGen(long minGeneration) { + public long sizeInBytesByMinGen(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return Stream.concat(readers.stream(), Stream.of(current)) @@ -437,16 +423,6 @@ private long sizeInBytesByMinGen(long minGeneration) { } } - /** - * Returns the size in bytes of the translog files with ops above the given seqNo - */ - public long sizeOfGensAboveSeqNoInBytes(long minSeqNo) { - try (ReleasableLock ignored = readLock.acquire()) { - ensureOpen(); - return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum(); - } - } - /** * Creates a new translog for the specified generation. * @@ -751,7 +727,8 @@ private void closeOnTragicEvent(Exception ex) { public TranslogStats stats() { // acquire lock to make the two numbers roughly consistent (no file change half way) try (ReleasableLock lock = readLock.acquire()) { - return new TranslogStats(totalOperations(), sizeInBytes(), uncommittedOperations(), uncommittedSizeInBytes()); + final long uncommittedGen = deletionPolicy.getTranslogGenerationOfLastCommit(); + return new TranslogStats(totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen), sizeInBytesByMinGen(uncommittedGen)); } } @@ -1508,7 +1485,7 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl * @return the minimum generation for the sequence number */ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { - try (ReleasableLock ignored = writeLock.acquire()) { + try (ReleasableLock ignored = readLock.acquire()) { /* * When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the * local checkpoint. Immediately after flushing, there will be no such generation, so this minimum generation in this case will diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 5eba198378a1d..eb23a415d3e34 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -211,7 +211,6 @@ public synchronized long getMinTranslogGenerationForRecovery() { /** * Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit. - * See {@link Translog#uncommittedOperations()} and {@link Translog#uncommittedSizeInBytes()} */ public synchronized long getTranslogGenerationOfLastCommit() { return translogGenerationOfLastCommit; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ee3eb8d69c853..9885b3d0a0daf 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -723,14 +723,13 @@ public void testTranslogRecoveryDoesNotReplayIntoTranslog() throws IOException { recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { @Override public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { - assertThat(getTranslog().uncommittedOperations(), equalTo(docs)); + assertThat(getTranslog().stats().getUncommittedOperations(), equalTo(docs)); final CommitId commitId = super.flush(force, waitIfOngoing); flushed.set(true); return commitId; } }; - - assertThat(recoveringEngine.getTranslog().uncommittedOperations(), equalTo(docs)); + assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(docs)); recoveringEngine.recoverFromTranslog(); assertTrue(flushed.get()); } finally { @@ -2884,7 +2883,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); assertEquals(1, engine.getTranslog().currentFileGeneration()); - assertEquals(0L, engine.getTranslog().uncommittedOperations()); + assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); } } @@ -3840,7 +3839,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) { System.nanoTime(), reason)); assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 1))); - assertThat(noOpEngine.getTranslog().uncommittedOperations(), equalTo(1 + gapsFilled)); + assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(1 + gapsFilled)); // skip to the op that we added to the translog Translog.Operation op; Translog.Operation last = null; @@ -4041,7 +4040,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint()); recoveringEngine = new InternalEngine(copy( replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get)); - assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations()); + assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations()); recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint()); @@ -4076,7 +4075,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { recoveringEngine = new InternalEngine( copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get)); if (flushed) { - assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations()); + assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); } recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); @@ -4451,17 +4450,17 @@ public void testShouldPeriodicallyFlush() throws Exception { engine.index(indexForDoc(doc)); } assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false)); - long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, engine.getTranslog().uncommittedSizeInBytes()); + long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, engine.getTranslog().stats().getUncommittedSizeInBytes()); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) .settings(Settings.builder().put(indexSettings.getSettings()) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); // Stale operations skipped by Lucene but added to translog - still able to flush for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); @@ -4469,13 +4468,53 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat(result.isCreated(), equalTo(false)); } SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); + // If the new index commit still points to the same translog generation as the current index commit, + // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes. + engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here + for (int id = 0; id < numDocs; id++) { + if (randomBoolean()) { + engine.getTranslog().rollGeneration(); + } + final ParsedDocument doc = testParsedDocument("new" + id, null, testDocumentWithTextField(), SOURCE, null); + engine.index(replicaIndexForDoc(doc, 2L, engine.getLocalCheckpointTracker().generateSeqNo(), false)); + if (engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + } + } } + public void testStressShouldPeriodicallyFlush() throws Exception { + final long flushThreshold = randomLongBetween(100, 5000); + final long generationThreshold = randomLongBetween(1000, 5000); + final IndexSettings indexSettings = engine.config().getIndexSettings(); + final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) + .settings(Settings.builder().put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), generationThreshold + "b") + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); + indexSettings.updateIndexMetaData(indexMetaData); + engine.onSettingsChanged(); + final int numOps = scaledRandomIntBetween(100, 10_000); + for (int i = 0; i < numOps; i++) { + final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint(); + final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5); + final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null); + engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); + if (rarely() && engine.getTranslog().shouldRollGeneration()) { + engine.rollTranslogGeneration(); + } + if (rarely() || engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + } + } + } public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException { final int iters = randomIntBetween(1, 15); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 8e35dd06e0baa..6f6f76bbc456e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -359,29 +359,29 @@ public void testMaybeFlush() throws Exception { IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {}); assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = shard.getEngine().getTranslog(); - assertEquals(2, translog.uncommittedOperations()); + assertEquals(2, translog.stats().getUncommittedOperations()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, translog.uncommittedOperations()); + assertEquals(0, translog.stats().getUncommittedOperations()); translog.sync(); - long size = translog.uncommittedSizeInBytes(); - logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + long size = Math.max(translog.stats().getUncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); + logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.stats().getUncommittedSizeInBytes(), + translog.stats().getUncommittedOperations(), translog.getGeneration()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) .build()).get(); client().prepareDelete("test", "test", "2").get(); - logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", translog.stats().getUncommittedSizeInBytes(), + translog.stats().getUncommittedOperations(), translog.getGeneration()); assertBusy(() -> { // this is async - logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.stats().getTranslogSizeInBytes(), + translog.stats().getUncommittedOperations(), translog.getGeneration()); assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, translog.uncommittedOperations()); + assertEquals(0, translog.stats().getUncommittedOperations()); } public void testMaybeRollTranslogGeneration() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 46f889050306b..9b03027bd0ca8 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -460,10 +460,10 @@ public void testUncommittedOperations() throws Exception { translog.rollGeneration(); operationsInLastGen = 0; } - assertThat(translog.uncommittedOperations(), equalTo(uncommittedOps)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(uncommittedOps)); if (frequently()) { markCurrentGenAsCommitted(translog); - assertThat(translog.uncommittedOperations(), equalTo(operationsInLastGen)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(operationsInLastGen)); uncommittedOps = operationsInLastGen; } } @@ -2455,7 +2455,7 @@ public void testRollGeneration() throws Exception { long minGenForRecovery = randomLongBetween(generation, generation + rolls); commit(translog, minGenForRecovery, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); - assertThat(translog.uncommittedOperations(), equalTo(0)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(0)); if (longRetention) { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 69176b03942f6..ebeb6725ef1c8 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -315,7 +315,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startAll(); int numDocs = shards.indexDocs(between(10, 100)); - final long translogSizeOnPrimary = shards.getPrimary().getTranslog().uncommittedSizeInBytes(); + final long translogSizeOnPrimary = shards.getPrimary().translogStats().getUncommittedSizeInBytes(); shards.flush(); final IndexShard replica = shards.addReplica();