From f96e00badf1411426190dd34c6dbfd341abdf024 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 12 Apr 2018 13:57:59 -0400 Subject: [PATCH] Add primary term to translog header (#29227) This change adds the current primary term to the header of the current translog file. Having a term in a translog header is a prerequisite step that allows us to trim translog operations given the max valid seq# for that term. This commit also updates tests to conform the primary term invariant which guarantees that all translog operations in a translog file have its terms at most the term stored in the translog header. --- .../elasticsearch/index/IndexSettings.java | 4 +- .../elasticsearch/index/engine/Engine.java | 15 +- .../index/engine/EngineConfig.java | 11 +- .../index/engine/InternalEngine.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 2 +- .../index/shard/StoreRecovery.java | 10 +- .../index/translog/BaseTranslogReader.java | 22 +- .../index/translog/Translog.java | 62 +++-- .../index/translog/TranslogHeader.java | 195 +++++++++++++++ .../index/translog/TranslogReader.java | 98 +------- .../index/translog/TranslogSnapshot.java | 8 +- .../index/translog/TranslogWriter.java | 46 +--- .../translog/TruncateTranslogCommand.java | 11 +- .../indices/recovery/RecoveryTarget.java | 4 +- .../resync/ResyncReplicationRequestTests.java | 3 +- .../elasticsearch/index/IndexModuleTests.java | 2 +- .../index/engine/InternalEngineTests.java | 86 +++---- .../RecoveryDuringReplicationTests.java | 2 +- .../index/shard/IndexShardIT.java | 2 +- .../index/shard/IndexShardTests.java | 22 +- .../shard/IndexingOperationListenerTests.java | 4 +- .../index/shard/RefreshListenersTests.java | 7 +- .../index/translog/TestTranslog.java | 51 ++-- .../translog/TranslogDeletionPolicyTests.java | 2 +- .../index/translog/TranslogHeaderTests.java | 128 ++++++++++ .../index/translog/TranslogTests.java | 228 ++++++++++-------- .../index/translog/TranslogVersionTests.java | 96 -------- .../elasticsearch/indices/flush/FlushIT.java | 2 +- .../recovery/RecoverySourceHandlerTests.java | 2 +- .../indices/recovery/RecoveryTests.java | 3 +- .../index/engine/EngineTestCase.java | 32 +-- 31 files changed, 680 insertions(+), 482 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/translog/TranslogHeader.java create mode 100644 server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java delete mode 100644 server/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index e3c82b8abd39f..db661743d9e85 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -185,7 +185,7 @@ public final class IndexSettings { public static final Setting INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING = Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), /* - * An empty translog occupies 43 bytes on disk. If the flush threshold is below this, the flush thread + * An empty translog occupies 55 bytes on disk. If the flush threshold is below this, the flush thread * can get stuck in an infinite loop as the shouldPeriodicallyFlush can still be true after flushing. * However, small thresholds are useful for testing so we do not add a large lower bound here. */ @@ -220,7 +220,7 @@ public final class IndexSettings { "index.translog.generation_threshold_size", new ByteSizeValue(64, ByteSizeUnit.MB), /* - * An empty translog occupies 43 bytes on disk. If the generation threshold is + * An empty translog occupies 55 bytes on disk. If the generation threshold is * below this, the flush thread can get stuck in an infinite loop repeatedly * rolling the generation as every new generation will already exceed the * generation threshold. However, small thresholds are useful for testing so we diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index b6c71e06c93df..fab8cba468b56 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1066,14 +1066,13 @@ public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long ve this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp; } - public Index(Term uid, ParsedDocument doc) { - this(uid, doc, Versions.MATCH_ANY); + public Index(Term uid, long primaryTerm, ParsedDocument doc) { + this(uid, primaryTerm, doc, Versions.MATCH_ANY); } // TEST ONLY - Index(Term uid, ParsedDocument doc, long version) { - // use a primary term of 2 to allow tests to reduce it to a valid >0 term - this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 2, version, VersionType.INTERNAL, - Origin.PRIMARY, System.nanoTime(), -1, false); + Index(Term uid, long primaryTerm, ParsedDocument doc, long version) { + this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, VersionType.INTERNAL, + Origin.PRIMARY, System.nanoTime(), -1, false); } // TEST ONLY public ParsedDocument parsedDoc() { @@ -1143,8 +1142,8 @@ public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, lo this.id = Objects.requireNonNull(id); } - public Delete(String type, String id, Term uid) { - this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime()); + public Delete(String type, String id, Term uid, long primaryTerm) { + this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime()); } public Delete(Delete template, VersionType versionType) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 352c3ba3e6280..b7c5a41691343 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -79,6 +79,7 @@ public final class EngineConfig { @Nullable private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; + private final LongSupplier primaryTermSupplier; /** * Index setting to change the low level lucene codec used for writing new segments. @@ -125,7 +126,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool, List externalRefreshListener, List internalRefreshListener, Sort indexSort, TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService, - LongSupplier globalCheckpointSupplier) { + LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) { this.shardId = shardId; this.allocationId = allocationId; this.indexSettings = indexSettings; @@ -152,6 +153,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool, this.translogRecoveryRunner = translogRecoveryRunner; this.circuitBreakerService = circuitBreakerService; this.globalCheckpointSupplier = globalCheckpointSupplier; + this.primaryTermSupplier = primaryTermSupplier; } /** @@ -354,4 +356,11 @@ public Sort getIndexSort() { public CircuitBreakerService getCircuitBreakerService() { return this.circuitBreakerService; } + + /** + * Returns a supplier that supplies the latest primary term value of the associated shard. + */ + public LongSupplier getPrimaryTermSupplier() { + return primaryTermSupplier; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 702f247b41994..dcd1ba65d8950 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -418,7 +418,7 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); final String translogUUID = loadTranslogUUIDFromLastCommit(); // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! - return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier); + return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier()); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cab7246a4cb0f..520115dc30a46 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2136,7 +2136,7 @@ private EngineConfig newEngineConfig() { IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Collections.singletonList(refreshListeners), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), - indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker); + indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm); } /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 3654aeba2bf8d..54718c545a44e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -393,7 +393,8 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); - final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId); + final String translogUUID = Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); } else if (indexShouldExists) { // since we recover from local, just fill the files and size @@ -407,8 +408,8 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe } } else { store.createEmpty(); - final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), - SequenceNumbers.NO_OPS_PERFORMED, shardId); + final String translogUUID = Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId, indexShard.getPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); } indexShard.openEngineAndRecoverFromTranslog(); @@ -456,7 +457,8 @@ private void restore(final IndexShard indexShard, final Repository repository, f store.bootstrapNewHistory(); final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); - final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId); + final String translogUUID = Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; indexShard.openEngineAndRecoverFromTranslog(); diff --git a/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java index 53d14f322995d..ff226ae00bef5 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java +++ b/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -35,15 +35,15 @@ public abstract class BaseTranslogReader implements Comparable getPrimaryTerm() && getPrimaryTerm() != TranslogHeader.UNKNOWN_PRIMARY_TERM) { + throw new TranslogCorruptedException("Operation's term is newer than translog header term; " + + "operation term[" + op.primaryTerm() + "], translog header term [" + getPrimaryTerm() + "]"); + } + return op; } /** diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 9726654c386e3..ab4961892ca12 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -107,7 +107,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC public static final String CHECKPOINT_FILE_NAME = "translog" + CHECKPOINT_SUFFIX; static final Pattern PARSE_STRICT_ID_PATTERN = Pattern.compile("^" + TRANSLOG_FILE_PREFIX + "(\\d+)(\\.tlog)$"); - public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogWriter.getHeaderLength(UUIDs.randomBase64UUID()); + public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogHeader.headerSizeInBytes(UUIDs.randomBase64UUID()); // the list of translog readers is guaranteed to be in order of translog generation private final List readers = new ArrayList<>(); @@ -120,6 +120,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final AtomicBoolean closed = new AtomicBoolean(); private final TranslogConfig config; private final LongSupplier globalCheckpointSupplier; + private final LongSupplier primaryTermSupplier; private final String translogUUID; private final TranslogDeletionPolicy deletionPolicy; @@ -131,17 +132,22 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * translog file referenced by this generation. The translog creation will fail if this generation can't be opened. * * @param config the configuration of this translog - * @param translogUUID the translog uuid to open, null for a new translog + * @param translogUUID the translog uuid to open, null for a new translog * @param deletionPolicy an instance of {@link TranslogDeletionPolicy} that controls when a translog file can be safely * deleted * @param globalCheckpointSupplier a supplier for the global checkpoint + * @param primaryTermSupplier a supplier for the latest value of primary term of the owning index shard. The latest term value is + * examined and stored in the header whenever a new generation is rolled. It's guaranteed from outside + * that a new generation is rolled when the term is increased. This guarantee allows to us to validate + * and reject operation whose term is higher than the primary term stored in the translog header. */ public Translog( final TranslogConfig config, final String translogUUID, TranslogDeletionPolicy deletionPolicy, - final LongSupplier globalCheckpointSupplier) throws IOException { + final LongSupplier globalCheckpointSupplier, final LongSupplier primaryTermSupplier) throws IOException { super(config.getShardId(), config.getIndexSettings()); this.config = config; this.globalCheckpointSupplier = globalCheckpointSupplier; + this.primaryTermSupplier = primaryTermSupplier; this.deletionPolicy = deletionPolicy; this.translogUUID = translogUUID; bigArrays = config.getBigArrays(); @@ -163,7 +169,7 @@ public Translog( // // For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that file exists // if not we don't even try to clean it up and wait until we fail creating it - assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID) : "unexpected translog file: [" + nextTranslogFile + "]"; + assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogHeader.headerSizeInBytes(translogUUID) : "unexpected translog file: [" + nextTranslogFile + "]"; if (Files.exists(currentCheckpointFile) // current checkpoint is already copied && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName()); @@ -224,6 +230,9 @@ private ArrayList recoverFromFiles(Checkpoint checkpoint) throws minGenerationToRecoverFrom + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive"); } final TranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i)))); + assert reader.getPrimaryTerm() <= primaryTermSupplier.getAsLong() : + "Primary terms go backwards; current term [" + primaryTermSupplier.getAsLong() + "]" + + "translog path [ " + committedTranslogFile + ", existing term [" + reader.getPrimaryTerm() + "]"; foundTranslogs.add(reader); logger.debug("recovered local translog from checkpoint {}", checkpoint); } @@ -267,10 +276,6 @@ private ArrayList recoverFromFiles(Checkpoint checkpoint) throws } TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException { - return openReader(path, checkpoint, translogUUID); - } - - private static TranslogReader openReader(Path path, Checkpoint checkpoint, String translogUUID) throws IOException { FileChannel channel = FileChannel.open(path, StandardOpenOption.READ); try { assert Translog.parseIdFromFileName(path) == checkpoint.generation : "expected generation: " + Translog.parseIdFromFileName(path) + " but got: " + checkpoint.generation; @@ -457,7 +462,7 @@ TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, lon getChannelFactory(), config.getBufferSize(), initialMinTranslogGen, initialGlobalCheckpoint, - globalCheckpointSupplier, this::getMinFileGeneration); + globalCheckpointSupplier, this::getMinFileGeneration, primaryTermSupplier.getAsLong()); } catch (final IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } @@ -485,6 +490,10 @@ public Location add(final Operation operation) throws IOException { final ReleasablePagedBytesReference bytes = out.bytes(); try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); + if (operation.primaryTerm() > current.getPrimaryTerm()) { + throw new IllegalArgumentException("Operation term is newer than the current term;" + + "current term[" + current.getPrimaryTerm() + "], operation term[" + operation + "]"); + } return current.add(bytes, operation.seqNo()); } } catch (final AlreadyClosedException | IOException ex) { @@ -991,17 +1000,17 @@ public Index(Engine.Index index, Engine.IndexResult indexResult) { this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp(); } - public Index(String type, String id, long seqNo, byte[] source) { - this(type, id, seqNo, Versions.MATCH_ANY, VersionType.INTERNAL, source, null, -1); + public Index(String type, String id, long seqNo, long primaryTerm, byte[] source) { + this(type, id, seqNo, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL, source, null, -1); } - public Index(String type, String id, long seqNo, long version, VersionType versionType, byte[] source, String routing, - long autoGeneratedIdTimestamp) { + public Index(String type, String id, long seqNo, long primaryTerm, long version, VersionType versionType, + byte[] source, String routing, long autoGeneratedIdTimestamp) { this.type = type; this.id = id; this.source = new BytesArray(source); this.seqNo = seqNo; - this.primaryTerm = 0; + this.primaryTerm = primaryTerm; this.version = version; this.versionType = versionType; this.routing = routing; @@ -1159,8 +1168,8 @@ public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) { } /** utility for testing */ - public Delete(String type, String id, long seqNo, Term uid) { - this(type, id, uid, seqNo, 0, Versions.MATCH_ANY, VersionType.INTERNAL); + public Delete(String type, String id, long seqNo, long primaryTerm, Term uid) { + this(type, id, uid, seqNo, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL); } public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) { @@ -1364,10 +1373,10 @@ public enum Durability { } - private static void verifyChecksum(BufferedChecksumStreamInput in) throws IOException { + static void verifyChecksum(BufferedChecksumStreamInput in) throws IOException { // This absolutely must come first, or else reading the checksum becomes part of the checksum long expectedChecksum = in.getChecksum(); - long readChecksum = in.readInt() & 0xFFFF_FFFFL; + long readChecksum = Integer.toUnsignedLong(in.readInt()); if (readChecksum != expectedChecksum) { throw new TranslogCorruptedException("translog stream is corrupted, expected: 0x" + Long.toHexString(expectedChecksum) + ", got: 0x" + Long.toHexString(readChecksum)); @@ -1651,10 +1660,10 @@ public static long readGlobalCheckpoint(final Path location, final String expect private static Checkpoint readCheckpoint(Path location, String expectedTranslogUUID) throws IOException { final Checkpoint checkpoint = readCheckpoint(location); - // We need to open at least translog reader to validate the translogUUID. + // We need to open at least one translog header to validate the translogUUID. final Path translogFile = location.resolve(getFilename(checkpoint.generation)); - try (TranslogReader reader = openReader(translogFile, checkpoint, expectedTranslogUUID)) { - + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { + TranslogHeader.read(expectedTranslogUUID, translogFile, channel); } catch (TranslogCorruptedException ex) { throw ex; // just bubble up. } catch (Exception ex) { @@ -1693,13 +1702,14 @@ List getReaders() { return readers; } - public static String createEmptyTranslog(final Path location, final long initialGlobalCheckpoint, final ShardId shardId) - throws IOException { + public static String createEmptyTranslog(final Path location, final long initialGlobalCheckpoint, + final ShardId shardId, final long primaryTerm) throws IOException { final ChannelFactory channelFactory = FileChannel::open; - return createEmptyTranslog(location, initialGlobalCheckpoint, shardId, channelFactory); + return createEmptyTranslog(location, initialGlobalCheckpoint, shardId, channelFactory, primaryTerm); } - static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, ShardId shardId, ChannelFactory channelFactory) throws IOException { + static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, ShardId shardId, + ChannelFactory channelFactory, long primaryTerm) throws IOException { IOUtils.rm(location); Files.createDirectories(location); final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, 1, initialGlobalCheckpoint, 1); @@ -1709,7 +1719,7 @@ static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, S final String translogUUID = UUIDs.randomBase64UUID(); TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, location.resolve(getFilename(1)), channelFactory, new ByteSizeValue(10), 1, initialGlobalCheckpoint, - () -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); } + () -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm ); writer.close(); return translogUUID; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogHeader.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogHeader.java new file mode 100644 index 0000000000000..0fde24d8bb4d5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogHeader.java @@ -0,0 +1,195 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.store.InputStreamDataInput; +import org.apache.lucene.store.OutputStreamDataOutput; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.io.Channels; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Path; + +/** + * Each translog file is started with a translog header then followed by translog operations. + */ +final class TranslogHeader { + public static final String TRANSLOG_CODEC = "translog"; + + public static final int VERSION_CHECKSUMS = 1; // pre-2.0 - unsupported + public static final int VERSION_CHECKPOINTS = 2; // added checkpoints + public static final int VERSION_PRIMARY_TERM = 3; // added primary term + public static final int CURRENT_VERSION = VERSION_PRIMARY_TERM; + + public static final long UNKNOWN_PRIMARY_TERM = 0L; + + private final String translogUUID; + private final long primaryTerm; + private final int headerSizeInBytes; + + /** + * Creates a new translog header with the given uuid and primary term. + * + * @param translogUUID this UUID is used to prevent accidental recovery from a transaction log that belongs to a + * different engine + * @param primaryTerm the primary term of the owning index shard when creating (eg. rolling) this translog file. + * All operations' terms in this translog file are enforced to be at most this term. + */ + TranslogHeader(String translogUUID, long primaryTerm) { + this(translogUUID, primaryTerm, headerSizeInBytes(translogUUID)); + assert primaryTerm >= 0 : "Primary term must be non-negative; term [" + primaryTerm + "]"; + } + + private TranslogHeader(String translogUUID, long primaryTerm, int headerSizeInBytes) { + this.translogUUID = translogUUID; + this.primaryTerm = primaryTerm; + this.headerSizeInBytes = headerSizeInBytes; + } + + public String getTranslogUUID() { + return translogUUID; + } + + /** + * Returns the primary term stored in this translog header. + * All operations in a translog file are expected to have their primary terms at most this term. + */ + public long getPrimaryTerm() { + return primaryTerm; + } + + /** + * Returns the header size in bytes. This value can be used as the offset of the first translog operation. + * See {@link BaseTranslogReader#getFirstOperationOffset()} + */ + public int sizeInBytes() { + return headerSizeInBytes; + } + + static int headerSizeInBytes(String translogUUID) { + return headerSizeInBytes(CURRENT_VERSION, new BytesRef(translogUUID).length); + } + + private static int headerSizeInBytes(int version, int uuidLength) { + int size = CodecUtil.headerLength(TRANSLOG_CODEC); + size += Integer.BYTES + uuidLength; // uuid + if (version >= VERSION_PRIMARY_TERM) { + size += Long.BYTES; // primary term + size += Integer.BYTES; // checksum + } + return size; + } + + /** + * Read a translog header from the given path and file channel + */ + static TranslogHeader read(final String translogUUID, final Path path, final FileChannel channel) throws IOException { + // This input is intentionally not closed because closing it will close the FileChannel. + final BufferedChecksumStreamInput in = + new BufferedChecksumStreamInput(new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel), channel.size())); + final int version; + try { + version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM); + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { + tryReportOldVersionError(path, channel); + throw new TranslogCorruptedException("Translog header corrupted. path:" + path, e); + } + if (version == VERSION_CHECKSUMS) { + throw new IllegalStateException("pre-2.0 translog found [" + path + "]"); + } + // Read the translogUUID + final int uuidLen = in.readInt(); + if (uuidLen > channel.size()) { + throw new TranslogCorruptedException("uuid length can't be larger than the translog"); + } + final BytesRef uuid = new BytesRef(uuidLen); + uuid.length = uuidLen; + in.read(uuid.bytes, uuid.offset, uuid.length); + final BytesRef expectedUUID = new BytesRef(translogUUID); + if (uuid.bytesEquals(expectedUUID) == false) { + throw new TranslogCorruptedException("expected shard UUID " + expectedUUID + " but got: " + uuid + + " this translog file belongs to a different translog. path:" + path); + } + // Read the primary term + final long primaryTerm; + if (version == VERSION_PRIMARY_TERM) { + primaryTerm = in.readLong(); + assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]"; + } else { + assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]"; + primaryTerm = UNKNOWN_PRIMARY_TERM; + } + // Verify the checksum + if (version >= VERSION_PRIMARY_TERM) { + Translog.verifyChecksum(in); + } + final int headerSizeInBytes = headerSizeInBytes(version, uuid.length); + assert channel.position() == headerSizeInBytes : + "Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]"; + return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes); + } + + private static void tryReportOldVersionError(final Path path, final FileChannel channel) throws IOException { + // Lucene's CodecUtil writes a magic number of 0x3FD76C17 with the header, in binary this looks like: + // binary: 0011 1111 1101 0111 0110 1100 0001 0111 + // hex : 3 f d 7 6 c 1 7 + // + // With version 0 of the translog, the first byte is the Operation.Type, which will always be between 0-4, + // so we know if we grab the first byte, it can be: + // 0x3f => Lucene's magic number, so we can assume it's version 1 or later + // 0x00 => version 0 of the translog + final byte b1 = Channels.readFromFileChannel(channel, 0, 1)[0]; + if (b1 == 0x3f) { // LUCENE_CODEC_HEADER_BYTE + throw new TranslogCorruptedException("translog looks like version 1 or later, but has corrupted header. path:" + path); + } else if (b1 == 0x00) { // UNVERSIONED_TRANSLOG_HEADER_BYTE + throw new IllegalStateException("pre-1.4 translog found [" + path + "]"); + } + } + + /** + * Writes this header with the latest format into the file channel + */ + void write(final FileChannel channel) throws IOException { + // This output is intentionally not closed because closing it will close the FileChannel. + @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed", "resource"}) + final BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput( + new OutputStreamStreamOutput(java.nio.channels.Channels.newOutputStream(channel))); + CodecUtil.writeHeader(new OutputStreamDataOutput(out), TRANSLOG_CODEC, CURRENT_VERSION); + // Write uuid + final BytesRef uuid = new BytesRef(translogUUID); + out.writeInt(uuid.length); + out.writeBytes(uuid.bytes, uuid.offset, uuid.length); + // Write primary term + out.writeLong(primaryTerm); + // Checksum header + out.writeInt((int) out.getChecksum()); + out.flush(); + channel.force(true); + assert channel.position() == headerSizeInBytes : + "Header is not fully written; header size [" + headerSizeInBytes + "], channel position [" + channel.position() + "]"; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index b88037c32fd59..29e30bd25dd37 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -19,15 +19,8 @@ package org.elasticsearch.index.translog; -import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.InputStreamDataInput; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.io.Channels; -import org.elasticsearch.common.io.stream.InputStreamStreamInput; import java.io.Closeable; import java.io.EOFException; @@ -41,10 +34,6 @@ * an immutable translog filereader */ public class TranslogReader extends BaseTranslogReader implements Closeable { - - private static final byte LUCENE_CODEC_HEADER_BYTE = 0x3f; - private static final byte UNVERSIONED_TRANSLOG_HEADER_BYTE = 0x00; - protected final long length; private final int totalOperations; private final Checkpoint checkpoint; @@ -53,13 +42,13 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { /** * Create a translog writer against the specified translog file channel. * - * @param checkpoint the translog checkpoint - * @param channel the translog file channel to open a translog reader against - * @param path the path to the translog - * @param firstOperationOffset the offset to the first operation + * @param checkpoint the translog checkpoint + * @param channel the translog file channel to open a translog reader against + * @param path the path to the translog + * @param header the header of the translog file */ - TranslogReader(final Checkpoint checkpoint, final FileChannel channel, final Path path, final long firstOperationOffset) { - super(checkpoint.generation, channel, path, firstOperationOffset); + TranslogReader(final Checkpoint checkpoint, final FileChannel channel, final Path path, final TranslogHeader header) { + super(checkpoint.generation, channel, path, header); this.length = checkpoint.offset; this.totalOperations = checkpoint.numOps; this.checkpoint = checkpoint; @@ -77,75 +66,8 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { */ public static TranslogReader open( final FileChannel channel, final Path path, final Checkpoint checkpoint, final String translogUUID) throws IOException { - - try { - InputStreamStreamInput headerStream = new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel), - channel.size()); // don't close - // Lucene's CodecUtil writes a magic number of 0x3FD76C17 with the - // header, in binary this looks like: - // - // binary: 0011 1111 1101 0111 0110 1100 0001 0111 - // hex : 3 f d 7 6 c 1 7 - // - // With version 0 of the translog, the first byte is the - // Operation.Type, which will always be between 0-4, so we know if - // we grab the first byte, it can be: - // 0x3f => Lucene's magic number, so we can assume it's version 1 or later - // 0x00 => version 0 of the translog - // - // otherwise the first byte of the translog is corrupted and we - // should bail - byte b1 = headerStream.readByte(); - if (b1 == LUCENE_CODEC_HEADER_BYTE) { - // Read 3 more bytes, meaning a whole integer has been read - byte b2 = headerStream.readByte(); - byte b3 = headerStream.readByte(); - byte b4 = headerStream.readByte(); - // Convert the 4 bytes that were read into an integer - int header = ((b1 & 0xFF) << 24) + ((b2 & 0xFF) << 16) + ((b3 & 0xFF) << 8) + ((b4 & 0xFF) << 0); - // We confirm CodecUtil's CODEC_MAGIC number (0x3FD76C17) - // ourselves here, because it allows us to read the first - // byte separately - if (header != CodecUtil.CODEC_MAGIC) { - throw new TranslogCorruptedException("translog looks like version 1 or later, but has corrupted header. path:" + path); - } - // Confirm the rest of the header using CodecUtil, extracting - // the translog version - int version = CodecUtil.checkHeaderNoMagic(new InputStreamDataInput(headerStream), TranslogWriter.TRANSLOG_CODEC, 1, Integer.MAX_VALUE); - switch (version) { - case TranslogWriter.VERSION_CHECKSUMS: - throw new IllegalStateException("pre-2.0 translog found [" + path + "]"); - case TranslogWriter.VERSION_CHECKPOINTS: - assert path.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX) : "new file ends with old suffix: " + path; - assert checkpoint.numOps >= 0 : "expected at least 0 operation but got: " + checkpoint.numOps; - assert checkpoint.offset <= channel.size() : "checkpoint is inconsistent with channel length: " + channel.size() + " " + checkpoint; - int len = headerStream.readInt(); - if (len > channel.size()) { - throw new TranslogCorruptedException("uuid length can't be larger than the translog"); - } - BytesRef ref = new BytesRef(len); - ref.length = len; - headerStream.read(ref.bytes, ref.offset, ref.length); - BytesRef uuidBytes = new BytesRef(translogUUID); - if (uuidBytes.bytesEquals(ref) == false) { - throw new TranslogCorruptedException("expected shard UUID " + uuidBytes + " but got: " + ref + - " this translog file belongs to a different translog. path:" + path); - } - final long firstOperationOffset; - firstOperationOffset = ref.length + CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC) + Integer.BYTES; - return new TranslogReader(checkpoint, channel, path, firstOperationOffset); - - default: - throw new TranslogCorruptedException("No known translog stream version: " + version + " path:" + path); - } - } else if (b1 == UNVERSIONED_TRANSLOG_HEADER_BYTE) { - throw new IllegalStateException("pre-1.4 translog found [" + path + "]"); - } else { - throw new TranslogCorruptedException("Invalid first byte in translog file, got: " + Long.toHexString(b1) + ", expected 0x00 or 0x3f. path:" + path); - } - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) { - throw new TranslogCorruptedException("Translog header corrupted. path:" + path, e); - } + final TranslogHeader header = TranslogHeader.read(translogUUID, path, channel); + return new TranslogReader(checkpoint, channel, path, header); } public long sizeInBytes() { @@ -168,8 +90,8 @@ protected void readBytes(ByteBuffer buffer, long position) throws IOException { if (position >= length) { throw new EOFException("read requested past EOF. pos [" + position + "] end: [" + length + "]"); } - if (position < firstOperationOffset) { - throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" + firstOperationOffset + "]"); + if (position < getFirstOperationOffset()) { + throw new IOException("read requested before position of first ops. pos [" + position + "] first op on: [" + getFirstOperationOffset() + "]"); } Channels.readFromFileChannelWithEofException(channel, position, buffer); } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index 5f6d14e192eb8..a966720353297 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -39,14 +39,14 @@ final class TranslogSnapshot extends BaseTranslogReader { * Create a snapshot of translog file channel. */ TranslogSnapshot(final BaseTranslogReader reader, final long length) { - super(reader.generation, reader.channel, reader.path, reader.firstOperationOffset); + super(reader.generation, reader.channel, reader.path, reader.header); this.length = length; this.totalOperations = reader.totalOperations(); this.checkpoint = reader.getCheckpoint(); this.reusableBuffer = ByteBuffer.allocate(1024); - readOperations = 0; - position = firstOperationOffset; - reuse = null; + this.readOperations = 0; + this.position = reader.getFirstOperationOffset(); + this.reuse = null; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 3bf70c065313e..cae6578886534 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -19,10 +19,8 @@ package org.elasticsearch.index.translog; -import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.OutputStreamDataOutput; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Assertions; import org.elasticsearch.common.bytes.BytesArray; @@ -47,12 +45,6 @@ import java.util.function.LongSupplier; public class TranslogWriter extends BaseTranslogReader implements Closeable { - - public static final String TRANSLOG_CODEC = "translog"; - public static final int VERSION_CHECKSUMS = 1; - public static final int VERSION_CHECKPOINTS = 2; // since 2.0 we have checkpoints? - public static final int VERSION = VERSION_CHECKPOINTS; - private final ShardId shardId; private final ChannelFactory channelFactory; // the last checkpoint that was written when the translog was last synced @@ -85,10 +77,10 @@ private TranslogWriter( final FileChannel channel, final Path path, final ByteSizeValue bufferSize, - final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier) throws IOException { - super(initialCheckpoint.generation, channel, path, channel.position()); + final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header) throws IOException { + super(initialCheckpoint.generation, channel, path, header); assert initialCheckpoint.offset == channel.position() : - "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel poistion [" + "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position [" + channel.position() + "]"; this.shardId = shardId; this.channelFactory = channelFactory; @@ -104,34 +96,16 @@ private TranslogWriter( this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; } - static int getHeaderLength(String translogUUID) { - return getHeaderLength(new BytesRef(translogUUID).length); - } - - static int getHeaderLength(int uuidLength) { - return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + Integer.BYTES; - } - - static void writeHeader(OutputStreamDataOutput out, BytesRef ref) throws IOException { - CodecUtil.writeHeader(out, TRANSLOG_CODEC, VERSION); - out.writeInt(ref.length); - out.writeBytes(ref.bytes, ref.offset, ref.length); - } - public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize, final long initialMinTranslogGen, long initialGlobalCheckpoint, - final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier) + final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier, + final long primaryTerm) throws IOException { - final BytesRef ref = new BytesRef(translogUUID); - final int firstOperationOffset = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); try { - // This OutputStreamDataOutput is intentionally not closed because - // closing it will close the FileChannel - final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel)); - writeHeader(out, ref); - channel.force(true); - final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(firstOperationOffset, fileGeneration, + final TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm); + header.write(channel); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(header.sizeInBytes(), fileGeneration, initialGlobalCheckpoint, initialMinTranslogGen); writeCheckpoint(channelFactory, file.getParent(), checkpoint); final LongSupplier writerGlobalCheckpointSupplier; @@ -146,7 +120,7 @@ public static TranslogWriter create(ShardId shardId, String translogUUID, long f writerGlobalCheckpointSupplier = globalCheckpointSupplier; } return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, - writerGlobalCheckpointSupplier, minTranslogGenerationSupplier); + writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition @@ -295,7 +269,7 @@ public TranslogReader closeIntoReader() throws IOException { throw ex; } if (closed.compareAndSet(false, true)) { - return new TranslogReader(getLastSyncedCheckpoint(), channel, path, getFirstOperationOffset()); + return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header); } else { throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy); } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java index 164d8fee956dd..b8bd93e05a6f8 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -33,7 +33,6 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.store.OutputStreamDataOutput; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cli.EnvironmentAwareCommand; @@ -213,13 +212,11 @@ static void writeEmptyCheckpoint(Path filename, int translogLength, long translo * Write a translog containing the given translog UUID to the given location. Returns the number of bytes written. */ public static int writeEmptyTranslog(Path filename, String translogUUID) throws IOException { - final BytesRef translogRef = new BytesRef(translogUUID); - try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); - OutputStreamDataOutput out = new OutputStreamDataOutput(Channels.newOutputStream(fc))) { - TranslogWriter.writeHeader(out, translogRef); - fc.force(true); + try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)) { + TranslogHeader header = new TranslogHeader(translogUUID, TranslogHeader.UNKNOWN_PRIMARY_TERM); + header.write(fc); + return header.sizeInBytes(); } - return TranslogWriter.getHeaderLength(translogRef.length); } /** Show a warning about deleting files, asking for a confirmation if {@code batchMode} is false */ diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index eb0db395a155f..244bb462df6ae 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -441,8 +441,8 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa store.ensureIndexHasHistoryUUID(); } // TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2 - final String translogUUID = - Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + final String translogUUID = Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, indexShard.getPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. diff --git a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java index 8c834770b08d7..d5ad3941a5e8f 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java @@ -37,7 +37,8 @@ public class ResyncReplicationRequestTests extends ESTestCase { public void testSerialization() throws IOException { final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8")); - final Translog.Index index = new Translog.Index("type", "id", 0, Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, -1); + final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(), + Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, -1); final ShardId shardId = new ShardId(new Index("index", "uuid"), 0); final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, new Translog.Operation[]{index}); diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 6af1a79249e26..008b05f6a1e95 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -242,7 +242,7 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { assertSame(listener, indexService.getIndexOperationListeners().get(1)); ParsedDocument doc = InternalEngineTests.createParsedDoc("1", null); - Engine.Index index = new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc); + Engine.Index index = new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), randomNonNegativeLong(), doc); ShardId shardId = new ShardId(new Index("foo", "bar"), 0); for (IndexingOperationListener l : indexService.getIndexOperationListeners()) { l.preIndex(shardId, index); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 9cdc68444ea16..60913c644eadb 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -235,7 +235,7 @@ public void testVersionMapAfterAutoIDDocument() throws IOException { assertEquals(2, searcher.reader().numDocs()); } assertFalse("safe access should NOT be required last indexing round was only append only", engine.isSafeAccessRequired()); - engine.delete(new Engine.Delete(operation.type(), operation.id(), operation.uid())); + engine.delete(new Engine.Delete(operation.type(), operation.id(), operation.uid(), primaryTerm.get())); assertTrue("safe access should be required", engine.isSafeAccessRequired()); engine.refresh("test"); assertTrue("safe access should be required", engine.isSafeAccessRequired()); @@ -317,7 +317,7 @@ public void testSegments() throws Exception { assertThat(segments.get(1).isCompound(), equalTo(true)); - engine.delete(new Engine.Delete("test", "1", newUid(doc))); + engine.delete(new Engine.Delete("test", "1", newUid(doc), primaryTerm.get())); engine.refresh("test"); segments = engine.segments(false); @@ -890,7 +890,7 @@ public void testSimpleOperations() throws Exception { searchResult.close(); // now delete - engine.delete(new Engine.Delete("test", "1", newUid(doc))); + engine.delete(new Engine.Delete("test", "1", newUid(doc), primaryTerm.get())); // its not deleted yet searchResult = engine.acquireSearcher("test"); @@ -917,7 +917,7 @@ public void testSimpleOperations() throws Exception { document = testDocumentWithTextField(); document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); doc = testParsedDocument("1", null, document, B_1, null); - engine.index(new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED)); + engine.index(new Engine.Index(newUid(doc), primaryTerm.get(), doc, Versions.MATCH_DELETED)); // its not there... searchResult = engine.acquireSearcher("test"); @@ -994,7 +994,7 @@ public void testSearchResultRelease() throws Exception { // don't release the search result yet... // delete, refresh and do a new search, it should not be there - engine.delete(new Engine.Delete("test", "1", newUid(doc))); + engine.delete(new Engine.Delete("test", "1", newUid(doc), primaryTerm.get())); engine.refresh("test"); Engine.Searcher updateSearchResult = engine.acquireSearcher("test"); MatcherAssert.assertThat(updateSearchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); @@ -1113,7 +1113,7 @@ public void testRenewSyncFlush() throws Exception { engine.index(doc4); assertEquals(engine.getLastWriteNanos(), doc4.startTime()); } else { - Engine.Delete delete = new Engine.Delete(doc1.type(), doc1.id(), doc1.uid()); + Engine.Delete delete = new Engine.Delete(doc1.type(), doc1.id(), doc1.uid(), primaryTerm.get()); engine.delete(delete); assertEquals(engine.getLastWriteNanos(), delete.startTime()); } @@ -1147,7 +1147,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException { } if (randomBoolean()) { final String translogUUID = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), - SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, primaryTerm.get()); store.associateIndexWithNewTranslog(translogUUID); } trimUnsafeCommits(config); @@ -1177,7 +1177,7 @@ public void testSyncedFlushVanishesOnReplay() throws IOException { public void testVersioningNewCreate() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); - Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED); + Engine.Index create = new Engine.Index(newUid(doc), primaryTerm.get(), doc, Versions.MATCH_DELETED); Engine.IndexResult indexResult = engine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); @@ -1189,7 +1189,7 @@ public void testVersioningNewCreate() throws IOException { public void testReplicatedVersioningWithFlush() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); - Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED); + Engine.Index create = new Engine.Index(newUid(doc), primaryTerm.get(), doc, Versions.MATCH_DELETED); Engine.IndexResult indexResult = engine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); assertTrue(indexResult.isCreated()); @@ -1208,7 +1208,7 @@ public void testReplicatedVersioningWithFlush() throws IOException { replicaEngine.flush(); } - Engine.Index update = new Engine.Index(newUid(doc), doc, 1); + Engine.Index update = new Engine.Index(newUid(doc), primaryTerm.get(), doc, 1); Engine.IndexResult updateResult = engine.index(update); assertThat(updateResult.getVersion(), equalTo(2L)); assertFalse(updateResult.isCreated()); @@ -1237,14 +1237,14 @@ public void testVersionedUpdate() throws IOException { final BiFunction searcherFactory = engine::acquireSearcher; ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); - Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED); + Engine.Index create = new Engine.Index(newUid(doc), primaryTerm.get(), doc, Versions.MATCH_DELETED); Engine.IndexResult indexResult = engine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()), searcherFactory)) { assertEquals(1, get.version()); } - Engine.Index update_1 = new Engine.Index(newUid(doc), doc, 1); + Engine.Index update_1 = new Engine.Index(newUid(doc), primaryTerm.get(), doc, 1); Engine.IndexResult update_1_result = engine.index(update_1); assertThat(update_1_result.getVersion(), equalTo(2L)); @@ -1252,7 +1252,7 @@ public void testVersionedUpdate() throws IOException { assertEquals(2, get.version()); } - Engine.Index update_2 = new Engine.Index(newUid(doc), doc, 2); + Engine.Index update_2 = new Engine.Index(newUid(doc), primaryTerm.get(), doc, 2); Engine.IndexResult update_2_result = engine.index(update_2); assertThat(update_2_result.getVersion(), equalTo(3L)); @@ -1293,7 +1293,7 @@ public void testForceMerge() throws IOException { ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); - engine.delete(new Engine.Delete(index.type(), index.id(), index.uid())); + engine.delete(new Engine.Delete(index.type(), index.id(), index.uid(), primaryTerm.get())); engine.forceMerge(true, 10, true, false, false); //expunge deletes engine.refresh("test"); @@ -1305,7 +1305,7 @@ public void testForceMerge() throws IOException { doc = testParsedDocument(Integer.toString(1), null, testDocument(), B_1, null); index = indexForDoc(doc); - engine.delete(new Engine.Delete(index.type(), index.id(), index.uid())); + engine.delete(new Engine.Delete(index.type(), index.id(), index.uid(), primaryTerm.get())); engine.forceMerge(true, 10, false, false, false); //expunge deletes engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); @@ -1892,7 +1892,7 @@ public void testBasicCreatedFlag() throws IOException { indexResult = engine.index(index); assertFalse(indexResult.isCreated()); - engine.delete(new Engine.Delete("doc", "1", newUid(doc))); + engine.delete(new Engine.Delete("doc", "1", newUid(doc), primaryTerm.get())); index = indexForDoc(doc); indexResult = engine.index(index); @@ -2368,7 +2368,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { { store.createEmpty(); final String translogUUID = - Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); store.associateIndexWithNewTranslog(translogUUID); ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null); Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, @@ -2409,7 +2409,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { // open index with new tlog { final String translogUUID = - Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); store.associateIndexWithNewTranslog(translogUUID); trimUnsafeCommits(config); try (InternalEngine engine = new InternalEngine(config)) { @@ -2444,7 +2444,8 @@ public void testMissingTranslog() throws IOException { // test that we can force start the engine , even if the translog is missing. engine.close(); // fake a new translog, causing the engine to point to a missing one. - Translog translog = createTranslog(); + final long primaryTerm = randomNonNegativeLong(); + Translog translog = createTranslog(() -> primaryTerm); long id = translog.currentFileGeneration(); translog.close(); IOUtils.rm(translog.location().resolve(Translog.getFilename(id))); @@ -2455,7 +2456,7 @@ public void testMissingTranslog() throws IOException { // expected } // when a new translog is created it should be ok - final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir, SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir, SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, primaryTerm); store.associateIndexWithNewTranslog(translogUUID); EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null); engine = new InternalEngine(config); @@ -2521,7 +2522,7 @@ public void testTranslogCleanUpPostCommitCrash() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get(); store.createEmpty(); - final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId); + final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId, primaryTerm.get()); store.associateIndexWithNewTranslog(translogUUID); try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null, @@ -2659,7 +2660,7 @@ public void testTranslogReplay() throws IOException { } parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); assertEquals(flush ? 1 : 2, parser.appliedOperations()); - engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc))); + engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc), primaryTerm.get())); if (randomBoolean()) { engine.refresh("test"); } else { @@ -2685,11 +2686,11 @@ public void testRecoverFromForeignTranslog() throws IOException { engine.close(); final Path badTranslogLog = createTempDir(); - final String badUUID = Translog.createEmptyTranslog(badTranslogLog, SequenceNumbers.NO_OPS_PERFORMED, shardId); + final String badUUID = Translog.createEmptyTranslog(badTranslogLog, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); Translog translog = new Translog( new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), - badUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.NO_OPS_PERFORMED); - translog.add(new Translog.Index("test", "SomeBogusId", 0, "{}".getBytes(Charset.forName("UTF-8")))); + badUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); + translog.add(new Translog.Index("test", "SomeBogusId", 0, primaryTerm.get(), "{}".getBytes(Charset.forName("UTF-8")))); assertEquals(generation.translogFileGeneration, translog.currentFileGeneration()); translog.close(); @@ -2703,7 +2704,7 @@ public void testRecoverFromForeignTranslog() throws IOException { new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), - new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO); + new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); fail("translog belongs to a different engine"); @@ -2886,11 +2887,11 @@ public void testHandleDocumentFailure() throws Exception { final Engine.DeleteResult deleteResult; if (randomBoolean()) { throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated")); - deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1))); + deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1), primaryTerm.get())); assertThat(deleteResult.getFailure(), instanceOf(IOException.class)); } else { throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length")); - deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1))); + deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1), primaryTerm.get())); assertThat(deleteResult.getFailure(), instanceOf(IllegalArgumentException.class)); } @@ -2923,7 +2924,7 @@ public BytesRef binaryValue() { if (randomBoolean()) { engine.index(indexForDoc(doc1)); } else { - engine.delete(new Engine.Delete("test", "", newUid(doc1))); + engine.delete(new Engine.Delete("test", "", newUid(doc1), primaryTerm.get())); } fail("engine should be closed"); } catch (Exception e) { @@ -3324,7 +3325,7 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { } try (Store store = createStore(newFSDirectory(storeDir))) { if (randomBoolean() || true) { - final String translogUUID = Translog.createEmptyTranslog(translogDir, SequenceNumbers.NO_OPS_PERFORMED, shardId); + final String translogUUID = Translog.createEmptyTranslog(translogDir, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); store.associateIndexWithNewTranslog(translogUUID); } try (Engine engine = new InternalEngine(configSupplier.apply(store))) { @@ -3478,7 +3479,7 @@ public void testSequenceIDs() throws Exception { seqID = getSequenceID(engine, newGet(false, doc)); logger.info("--> got seqID: {}", seqID); assertThat(seqID.v1(), equalTo(0L)); - assertThat(seqID.v2(), equalTo(2L)); + assertThat(seqID.v2(), equalTo(primaryTerm.get())); // Index the same document again document = testDocumentWithTextField(); @@ -3490,7 +3491,7 @@ public void testSequenceIDs() throws Exception { seqID = getSequenceID(engine, newGet(false, doc)); logger.info("--> got seqID: {}", seqID); assertThat(seqID.v1(), equalTo(1L)); - assertThat(seqID.v2(), equalTo(2L)); + assertThat(seqID.v2(), equalTo(primaryTerm.get())); // Index the same document for the third time, this time changing the primary term document = testDocumentWithTextField(); @@ -3704,13 +3705,12 @@ protected long doGenerateSeqNoForOperation(Operation operation) { } }; noOpEngine.recoverFromTranslog(); - final long primaryTerm = randomNonNegativeLong(); - final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm); + final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get()); final String reason = randomAlphaOfLength(16); noOpEngine.noOp( new Engine.NoOp( maxSeqNo + 1, - primaryTerm, + primaryTerm.get(), randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), System.nanoTime(), reason)); @@ -3728,7 +3728,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) { assertThat(last, instanceOf(Translog.NoOp.class)); final Translog.NoOp noOp = (Translog.NoOp) last; assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 1))); - assertThat(noOp.primaryTerm(), equalTo(primaryTerm)); + assertThat(noOp.primaryTerm(), equalTo(primaryTerm.get())); assertThat(noOp.reason(), equalTo(reason)); } finally { IOUtils.close(noOpEngine); @@ -3931,7 +3931,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { if (operation.opType() == Translog.Operation.Type.NO_OP) { assertEquals(2, operation.primaryTerm()); } else { - assertEquals(1, operation.primaryTerm()); + assertEquals(primaryTerm.get(), operation.primaryTerm()); } } @@ -4131,7 +4131,7 @@ public void testKeepTranslogAfterGlobalCheckpoint() throws Exception { store = createStore(); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); store.createEmpty(); - final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId); + final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId, primaryTerm.get()); store.associateIndexWithNewTranslog(translogUUID); final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, @@ -4201,7 +4201,7 @@ public void testConcurrentAppendUpdateAndRefresh() throws InterruptedException, Engine.Index operation = appendOnlyPrimary(doc, false, 1); engine.index(operation); if (rarely()) { - engine.delete(new Engine.Delete(operation.type(), operation.id(), operation.uid())); + engine.delete(new Engine.Delete(operation.type(), operation.id(), operation.uid(), primaryTerm.get())); numDeletes.incrementAndGet(); } else { doc = testParsedDocument(docID, null, testDocumentWithTextField("updated"), @@ -4340,7 +4340,7 @@ public void testShouldPeriodicallyFlush() throws Exception { engine.index(indexForDoc(doc)); } assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false)); - long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, + long flushThreshold = RandomNumbers.randomLongBetween(random(), 120, engine.getTranslog().stats().getUncommittedSizeInBytes()- extraTranslogSizeInNewEngine); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) @@ -4382,7 +4382,7 @@ public void testShouldPeriodicallyFlush() throws Exception { } public void testStressShouldPeriodicallyFlush() throws Exception { - final long flushThreshold = randomLongBetween(100, 5000); + final long flushThreshold = randomLongBetween(120, 5000); final long generationThreshold = randomLongBetween(1000, 5000); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) @@ -4423,7 +4423,7 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false); // first index an append only document and then delete it. such that we have it in the tombstones engine.index(doc); - engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid())); + engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), primaryTerm.get())); // now index more append only docs and refresh so we re-enabel the optimization for unsafe version map ParsedDocument document1 = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null); @@ -4559,7 +4559,7 @@ public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception { if (randomBoolean()) { engine.index(indexForDoc(parsedDocument)); } else { - engine.delete(new Engine.Delete(parsedDocument.type(), parsedDocument.id(), newUid(parsedDocument.id()))); + engine.delete(new Engine.Delete(parsedDocument.type(), parsedDocument.id(), newUid(parsedDocument.id()), primaryTerm.get())); } } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 66e2a09750a2d..c7469f2432ad3 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -185,7 +185,7 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception { false, SourceToParse.source("index", "type", "replica", new BytesArray("{}"), XContentType.JSON), mapping -> {}); - shards.promoteReplicaToPrimary(promotedReplica); + shards.promoteReplicaToPrimary(promotedReplica).get(); oldPrimary.close("demoted", randomBoolean()); oldPrimary.store().close(); shards.removeReplica(remainingReplica); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index cc241d4ad745d..f7ee54b32ee84 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -335,7 +335,7 @@ public void testMaybeFlush() throws Exception { assertFalse(shard.shouldPeriodicallyFlush()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), - new ByteSizeValue(160 /* size of the operation + two generations header&footer*/, ByteSizeUnit.BYTES)).build()).get(); + new ByteSizeValue(190 /* size of the operation + two generations header&footer*/, ByteSizeUnit.BYTES)).build()).get(); client().prepareIndex("test", "test", "0") .setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertFalse(shard.shouldPeriodicallyFlush()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 10c769199669d..08f0319d6cf14 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -90,6 +90,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreStats; +import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogTests; import org.elasticsearch.indices.IndicesQueryCache; @@ -519,6 +520,7 @@ public void testPrimaryPromotionRollsGeneration() throws Exception { // promote the replica final ShardRouting replicaRouting = indexShard.routingEntry(); + final long newPrimaryTerm = indexShard.getPrimaryTerm() + between(1, 10000); final ShardRouting primaryRouting = newShardRouting( replicaRouting.shardId(), @@ -527,7 +529,7 @@ public void testPrimaryPromotionRollsGeneration() throws Exception { true, ShardRoutingState.STARTED, replicaRouting.allocationId()); - indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, + indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> {}, 0L, Collections.singleton(primaryRouting.allocationId().getId()), new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet()); @@ -553,6 +555,7 @@ public void onFailure(Exception e) { latch.await(); assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(currentTranslogGeneration + 1)); + assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(newPrimaryTerm)); closeShards(indexShard); } @@ -571,7 +574,10 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E ShardRouting replicaRouting = indexShard.routingEntry(); ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, ShardRoutingState.STARTED, replicaRouting.allocationId()); - indexShard.updateShardState(primaryRouting, indexShard.getPrimaryTerm() + 1, (shard, listener) -> {}, 0L, + final long newPrimaryTerm = indexShard.getPrimaryTerm() + between(1, 1000); + indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> { + assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(newPrimaryTerm)); + }, 0L, Collections.singleton(indexShard.routingEntry().allocationId().getId()), new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build(), Collections.emptySet()); @@ -739,6 +745,7 @@ public void onFailure(Exception e) { @Override public void onResponse(Releasable releasable) { assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); + assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(newPrimaryTerm)); assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint)); onResponse.set(true); @@ -784,15 +791,18 @@ private void finish() { assertFalse(onResponse.get()); assertNull(onFailure.get()); assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); + assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(primaryTerm)); Releasables.close(operation1); // our operation should still be blocked assertFalse(onResponse.get()); assertNull(onFailure.get()); assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm)); + assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(primaryTerm)); Releasables.close(operation2); barrier.await(); // now lock acquisition should have succeeded assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm)); + assertThat(TestTranslog.getCurrentTerm(indexShard.getTranslog()), equalTo(newPrimaryTerm)); if (engineClosed) { assertFalse(onResponse.get()); assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class)); @@ -1739,7 +1749,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); // Simulate resync (without rollback): Noop #1, index #2 - shard.primaryTerm++; + acquireReplicaOperationPermitBlockingly(shard, shard.primaryTerm + 1); shard.markSeqNoAsNoop(1, "test"); shard.applyIndexOperationOnReplica(2, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, SourceToParse.source(indexName, "doc", "doc-2", new BytesArray("{}"), XContentType.JSON), mapping); @@ -2052,18 +2062,18 @@ public void testRecoverFromTranslog() throws IOException { IndexMetaData metaData = IndexMetaData.builder("test") .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") .settings(settings) - .primaryTerm(0, 1).build(); + .primaryTerm(0, randomLongBetween(1, Long.MAX_VALUE)).build(); IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); List operations = new ArrayList<>(); int numTotalEntries = randomIntBetween(0, 10); int numCorruptEntries = 0; for (int i = 0; i < numTotalEntries; i++) { if (randomBoolean()) { - operations.add(new Translog.Index("test", "1", 0, 1, VersionType.INTERNAL, + operations.add(new Translog.Index("test", "1", 0, primary.getPrimaryTerm(), 1, VersionType.INTERNAL, "{\"foo\" : \"bar\"}".getBytes(Charset.forName("UTF-8")), null, -1)); } else { // corrupt entry - operations.add(new Translog.Index("test", "2", 1, 1, VersionType.INTERNAL, + operations.add(new Translog.Index("test", "2", 1, primary.getPrimaryTerm(), 1, VersionType.INTERNAL, "{\"foo\" : \"bar}".getBytes(Charset.forName("UTF-8")), null, -1)); numCorruptEntries++; } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java index ed60d38dfa4fd..91e439dcda98d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java @@ -136,8 +136,8 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) { IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger); ParsedDocument doc = InternalEngineTests.createParsedDoc("1", null); - Engine.Delete delete = new Engine.Delete("test", "1", new Term("_id", Uid.encodeId(doc.id()))); - Engine.Index index = new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc); + Engine.Delete delete = new Engine.Delete("test", "1", new Term("_id", Uid.encodeId(doc.id())), randomNonNegativeLong()); + Engine.Index index = new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), randomNonNegativeLong(), doc); compositeListener.postDelete(randomShardId, delete, new Engine.DeleteResult(1, SequenceNumbers.UNASSIGNED_SEQ_NO, true)); assertEquals(0, preIndex.get()); assertEquals(0, postIndex.get()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 1bd98cd1c9e69..5803bf263633d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -122,14 +122,15 @@ public void onFailedEngine(String reason, @Nullable Exception e) { } }; store.createEmpty(); + final long primaryTerm = randomNonNegativeLong(); final String translogUUID = - Translog.createEmptyTranslog(translogConfig.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + Translog.createEmptyTranslog(translogConfig.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm); store.associateIndexWithNewTranslog(translogUUID); EngineConfig config = new EngineConfig(shardId, allocationId, threadPool, indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null, - (e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED); + (e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm); engine = new InternalEngine(config); engine.recoverFromTranslog(); listeners.setTranslog(engine.getTranslog()); @@ -363,7 +364,7 @@ private Engine.IndexResult index(String id, String testFieldValue) throws IOExce BytesReference source = new BytesArray(new byte[] { 1 }); ParsedDocument doc = new ParsedDocument(versionField, seqID, id, "test", null, Arrays.asList(document), source, XContentType.JSON, null); - Engine.Index index = new Engine.Index(new Term("_id", doc.id()), doc); + Engine.Index index = new Engine.Index(new Term("_id", doc.id()), engine.config().getPrimaryTermSupplier().getAsLong(), doc); return engine.index(index); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index 4077d033da9cd..7ab9fa6733011 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -83,26 +83,7 @@ public static Set corruptTranslogFiles(Logger logger, Random random, Colle int corruptions = RandomNumbers.randomIntBetween(random, 5, 20); for (int i = 0; i < corruptions; i++) { Path fileToCorrupt = RandomPicks.randomFrom(random, candidates); - try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) { - // read - raf.position(RandomNumbers.randomLongBetween(random, 0, raf.size() - 1)); - long filePointer = raf.position(); - ByteBuffer bb = ByteBuffer.wrap(new byte[1]); - raf.read(bb); - bb.flip(); - - // corrupt - byte oldValue = bb.get(0); - byte newValue = (byte) (oldValue + 1); - bb.put(0, newValue); - - // rewrite - raf.position(filePointer); - raf.write(bb); - logger.info("--> corrupting file {} -- flipping at position {} from {} to {} file: {}", - fileToCorrupt, filePointer, Integer.toHexString(oldValue), - Integer.toHexString(newValue), fileToCorrupt); - } + corruptFile(logger, random, fileToCorrupt); corruptedFiles.add(fileToCorrupt); } } @@ -110,6 +91,29 @@ public static Set corruptTranslogFiles(Logger logger, Random random, Colle return corruptedFiles; } + static void corruptFile(Logger logger, Random random, Path fileToCorrupt) throws IOException { + try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) { + // read + raf.position(RandomNumbers.randomLongBetween(random, 0, raf.size() - 1)); + long filePointer = raf.position(); + ByteBuffer bb = ByteBuffer.wrap(new byte[1]); + raf.read(bb); + bb.flip(); + + // corrupt + byte oldValue = bb.get(0); + byte newValue = (byte) (oldValue + 1); + bb.put(0, newValue); + + // rewrite + raf.position(filePointer); + raf.write(bb); + logger.info("--> corrupting file {} -- flipping at position {} from {} to {} file: {}", + fileToCorrupt, filePointer, Integer.toHexString(oldValue), + Integer.toHexString(newValue), fileToCorrupt); + } + } + /** * Lists all existing commits in a given index path, then read the minimum translog generation that will be used in recoverFromTranslog. */ @@ -122,4 +126,11 @@ private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOExc return Long.parseLong(recoveringCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); } } + + /** + * Returns the primary term associated with the current translog writer of the given translog. + */ + public static long getCurrentTerm(Translog translog) { + return translog.getCurrent().getPrimaryTerm(); + } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 2f6f4ee3178f2..9ae502fecb580 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -171,7 +171,7 @@ private Tuple, TranslogWriter> createReadersAndWriter(final } writer = TranslogWriter.create(new ShardId("index", "uuid", 0), translogUUID, gen, tempDir.resolve(Translog.getFilename(gen)), FileChannel::open, TranslogConfig.DEFAULT_BUFFER_SIZE, 1L, 1L, () -> 1L, - () -> 1L); + () -> 1L, randomNonNegativeLong()); writer = Mockito.spy(writer); Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java new file mode 100644 index 0000000000000..0dc404767de3c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderTests.java @@ -0,0 +1,128 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.OutputStreamDataOutput; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; + +public class TranslogHeaderTests extends ESTestCase { + + public void testCurrentHeaderVersion() throws Exception { + final String translogUUID = UUIDs.randomBase64UUID(); + final TranslogHeader outHeader = new TranslogHeader(translogUUID, randomNonNegativeLong()); + final long generation = randomNonNegativeLong(); + final Path translogFile = createTempDir().resolve(Translog.getFilename(generation)); + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) { + outHeader.write(channel); + assertThat(outHeader.sizeInBytes(), equalTo((int)channel.position())); + } + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { + final TranslogHeader inHeader = TranslogHeader.read(translogUUID, translogFile, channel); + assertThat(inHeader.getTranslogUUID(), equalTo(translogUUID)); + assertThat(inHeader.getPrimaryTerm(), equalTo(outHeader.getPrimaryTerm())); + assertThat(inHeader.sizeInBytes(), equalTo((int)channel.position())); + } + final TranslogCorruptedException mismatchUUID = expectThrows(TranslogCorruptedException.class, () -> { + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { + TranslogHeader.read(UUIDs.randomBase64UUID(), translogFile, channel); + } + }); + assertThat(mismatchUUID.getMessage(), containsString("this translog file belongs to a different translog")); + int corruptions = between(1, 10); + for (int i = 0; i < corruptions; i++) { + TestTranslog.corruptFile(logger, random(), translogFile); + } + expectThrows(TranslogCorruptedException.class, () -> { + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { + TranslogHeader.read(outHeader.getTranslogUUID(), translogFile, channel); + } + }); + } + + public void testHeaderWithoutPrimaryTerm() throws Exception { + final String translogUUID = UUIDs.randomBase64UUID(); + final long generation = randomNonNegativeLong(); + final Path translogFile = createTempDir().resolve(Translog.getFilename(generation)); + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) { + writeHeaderWithoutTerm(channel, translogUUID); + assertThat((int)channel.position(), lessThan(TranslogHeader.headerSizeInBytes(translogUUID))); + } + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { + final TranslogHeader inHeader = TranslogHeader.read(translogUUID, translogFile, channel); + assertThat(inHeader.getTranslogUUID(), equalTo(translogUUID)); + assertThat(inHeader.getPrimaryTerm(), equalTo(TranslogHeader.UNKNOWN_PRIMARY_TERM)); + assertThat(inHeader.sizeInBytes(), equalTo((int)channel.position())); + } + expectThrows(TranslogCorruptedException.class, () -> { + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { + TranslogHeader.read(UUIDs.randomBase64UUID(), translogFile, channel); + } + }); + } + + static void writeHeaderWithoutTerm(FileChannel channel, String translogUUID) throws IOException { + final OutputStreamStreamOutput out = new OutputStreamStreamOutput(Channels.newOutputStream(channel)); + CodecUtil.writeHeader(new OutputStreamDataOutput(out), TranslogHeader.TRANSLOG_CODEC, TranslogHeader.VERSION_CHECKPOINTS); + final BytesRef uuid = new BytesRef(translogUUID); + out.writeInt(uuid.length); + out.writeBytes(uuid.bytes, uuid.offset, uuid.length); + channel.force(true); + assertThat(channel.position(), equalTo(43L)); + } + + public void testLegacyTranslogVersions() throws Exception { + checkFailsToOpen("/org/elasticsearch/index/translog/translog-v0.binary", IllegalStateException.class, "pre-1.4 translog"); + checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1.binary", IllegalStateException.class, "pre-2.0 translog"); + checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1-truncated.binary", IllegalStateException.class, "pre-2.0 translog"); + checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1-corrupted-magic.binary", + TranslogCorruptedException.class, "translog looks like version 1 or later, but has corrupted header"); + checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1-corrupted-body.binary", + IllegalStateException.class, "pre-2.0 translog"); + } + + private void checkFailsToOpen(String file, Class expectedErrorType, String expectedMessage) { + final Path translogFile = getDataPath(file); + assertThat("test file [" + translogFile + "] should exist", Files.exists(translogFile), equalTo(true)); + final E error = expectThrows(expectedErrorType, () -> { + final Checkpoint checkpoint = new Checkpoint(Files.size(translogFile), 1, 1, + SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, 1); + try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) { + TranslogReader.open(channel, translogFile, checkpoint, null); + } + }); + assertThat(error.getMessage(), containsString(expectedMessage)); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index faff540028237..b3b9fca886e17 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -109,6 +109,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.LongStream; +import java.util.stream.Stream; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; @@ -132,6 +133,8 @@ public class TranslogTests extends ESTestCase { protected Translog translog; private AtomicLong globalCheckpoint; protected Path translogDir; + // A default primary term is used by translog instances created in this test. + private final AtomicLong primaryTerm = new AtomicLong(); @Override protected void afterIfSuccessful() throws Exception { @@ -152,14 +155,14 @@ protected void afterIfSuccessful() throws Exception { protected Translog createTranslog(TranslogConfig config) throws IOException { String translogUUID = - Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); return new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), - () -> SequenceNumbers.NO_OPS_PERFORMED); + () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); } protected Translog openTranslog(TranslogConfig config, String translogUUID) throws IOException { return new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), - () -> SequenceNumbers.NO_OPS_PERFORMED); + () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); } @@ -188,6 +191,7 @@ private void commit(Translog translog, long genToRetain, long genToCommit) throw @Before public void setUp() throws Exception { super.setUp(); + primaryTerm.set(randomLongBetween(1, Integer.MAX_VALUE)); // if a previous test failed we clean up things here translogDir = createTempDir(); translog = create(translogDir); @@ -208,8 +212,8 @@ private Translog create(Path path) throws IOException { globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final TranslogConfig translogConfig = getTranslogConfig(path); final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); - final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId); - return new Translog(translogConfig, translogUUID, deletionPolicy, () -> globalCheckpoint.get()); + final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); + return new Translog(translogConfig, translogUUID, deletionPolicy, () -> globalCheckpoint.get(), primaryTerm::get); } private TranslogConfig getTranslogConfig(final Path path) { @@ -303,22 +307,22 @@ public void testSimpleOperations() throws IOException { assertThat(snapshot, SnapshotMatchers.size(0)); } - addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1})); + addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, primaryTerm.get(), new byte[]{1})); try (Translog.Snapshot snapshot = translog.newSnapshot()) { assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - addToTranslogAndList(translog, ops, new Translog.Delete("test", "2", 1, newUid("2"))); + addToTranslogAndList(translog, ops, new Translog.Delete("test", "2", 1, primaryTerm.get(), newUid("2"))); try (Translog.Snapshot snapshot = translog.newSnapshot()) { assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot.totalOperations(), equalTo(ops.size())); } final long seqNo = randomNonNegativeLong(); - final long primaryTerm = randomNonNegativeLong(); final String reason = randomAlphaOfLength(16); - addToTranslogAndList(translog, ops, new Translog.NoOp(seqNo, primaryTerm, reason)); + final long noopTerm = randomLongBetween(1, primaryTerm.get()); + addToTranslogAndList(translog, ops, new Translog.NoOp(seqNo, noopTerm, reason)); try (Translog.Snapshot snapshot = translog.newSnapshot()) { @@ -333,7 +337,7 @@ public void testSimpleOperations() throws IOException { Translog.NoOp noOp = (Translog.NoOp) snapshot.next(); assertNotNull(noOp); assertThat(noOp.seqNo(), equalTo(seqNo)); - assertThat(noOp.primaryTerm(), equalTo(primaryTerm)); + assertThat(noOp.primaryTerm(), equalTo(noopTerm)); assertThat(noOp.reason(), equalTo(reason)); assertNull(snapshot.next()); @@ -401,35 +405,35 @@ public void testStats() throws IOException { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(0)); } - assertThat((int) firstOperationPosition, greaterThan(CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC))); - translog.add(new Translog.Index("test", "1", 0, new byte[]{1})); + assertThat((int) firstOperationPosition, greaterThan(CodecUtil.headerLength(TranslogHeader.TRANSLOG_CODEC))); + translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(), new byte[]{1})); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(1)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(139L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(163L)); assertThat(stats.getUncommittedOperations(), equalTo(1)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(139L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(163L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); } - translog.add(new Translog.Delete("test", "2", 1, newUid("2"))); + translog.add(new Translog.Delete("test", "2", 1, primaryTerm.get(), newUid("2"))); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(2)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(188L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(212L)); assertThat(stats.getUncommittedOperations(), equalTo(2)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(188L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(212L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); } - translog.add(new Translog.Delete("test", "3", 2, newUid("3"))); + translog.add(new Translog.Delete("test", "3", 2, primaryTerm.get(), newUid("3"))); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(3)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(237L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(261L)); assertThat(stats.getUncommittedOperations(), equalTo(3)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(237L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(261L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); } @@ -437,13 +441,13 @@ public void testStats() throws IOException { { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(279L)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(303L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(279L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(303L)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); } - final long expectedSizeInBytes = 322L; + final long expectedSizeInBytes = 358L; translog.rollGeneration(); { final TranslogStats stats = stats(); @@ -494,7 +498,7 @@ public void testUncommittedOperations() throws Exception { int uncommittedOps = 0; int operationsInLastGen = 0; for (int i = 0; i < operations; i++) { - translog.add(new Translog.Index("test", Integer.toString(i), i, new byte[]{1})); + translog.add(new Translog.Index("test", Integer.toString(i), i, primaryTerm.get(), new byte[]{1})); uncommittedOps++; operationsInLastGen++; if (rarely()) { @@ -563,7 +567,7 @@ public void testSnapshot() throws IOException { assertThat(snapshot, SnapshotMatchers.size(0)); } - addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1})); + addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, primaryTerm.get(), new byte[]{1})); try (Translog.Snapshot snapshot = translog.newSnapshot()) { assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); @@ -583,9 +587,9 @@ public void testSnapshot() throws IOException { public void testReadLocation() throws IOException { ArrayList ops = new ArrayList<>(); ArrayList locs = new ArrayList<>(); - locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1}))); - locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "2", 1, new byte[]{1}))); - locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, new byte[]{1}))); + locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, primaryTerm.get(), new byte[]{1}))); + locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "2", 1, primaryTerm.get(), new byte[]{1}))); + locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, primaryTerm.get(), new byte[]{1}))); int i = 0; for (Translog.Operation op : ops) { assertEquals(op, translog.readOperation(locs.get(i++))); @@ -601,16 +605,16 @@ public void testSnapshotWithNewTranslog() throws IOException { toClose.add(snapshot); assertThat(snapshot, SnapshotMatchers.size(0)); - addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1})); + addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, primaryTerm.get(), new byte[]{1})); Translog.Snapshot snapshot1 = translog.newSnapshot(); toClose.add(snapshot1); - addToTranslogAndList(translog, ops, new Translog.Index("test", "2", 1, new byte[]{2})); + addToTranslogAndList(translog, ops, new Translog.Index("test", "2", 1, primaryTerm.get(), new byte[]{2})); assertThat(snapshot1, SnapshotMatchers.equalsTo(ops.get(0))); translog.rollGeneration(); - addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, new byte[]{3})); + addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, primaryTerm.get(), new byte[]{3})); Translog.Snapshot snapshot2 = translog.newSnapshot(); toClose.add(snapshot2); @@ -624,7 +628,7 @@ public void testSnapshotWithNewTranslog() throws IOException { public void testSnapshotOnClosedTranslog() throws IOException { assertTrue(Files.exists(translogDir.resolve(Translog.getFilename(1)))); - translog.add(new Translog.Index("test", "1", 0, new byte[]{1})); + translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(), new byte[]{1})); translog.close(); try { Translog.Snapshot snapshot = translog.newSnapshot(); @@ -747,7 +751,7 @@ public void testTranslogChecksums() throws Exception { int translogOperations = randomIntBetween(10, 100); for (int op = 0; op < translogOperations; op++) { String ascii = randomAlphaOfLengthBetween(1, 50); - locations.add(translog.add(new Translog.Index("test", "" + op, op, ascii.getBytes("UTF-8")))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), ascii.getBytes("UTF-8")))); } translog.sync(); @@ -774,7 +778,7 @@ public void testTruncatedTranslogs() throws Exception { int translogOperations = randomIntBetween(10, 100); for (int op = 0; op < translogOperations; op++) { String ascii = randomAlphaOfLengthBetween(1, 50); - locations.add(translog.add(new Translog.Index("test", "" + op, op, ascii.getBytes("UTF-8")))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), ascii.getBytes("UTF-8")))); } translog.sync(); @@ -838,7 +842,7 @@ private Term newUid(String id) { public void testVerifyTranslogIsNotDeleted() throws IOException { assertFileIsPresent(translog, 1); - translog.add(new Translog.Index("test", "1", 0, new byte[]{1})); + translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(), new byte[]{1})); try (Translog.Snapshot snapshot = translog.newSnapshot()) { assertThat(snapshot, SnapshotMatchers.size(1)); assertFileIsPresent(translog, 1); @@ -890,10 +894,10 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep switch (type) { case CREATE: case INDEX: - op = new Translog.Index("type", "" + id, id, new byte[]{(byte) id}); + op = new Translog.Index("type", "" + id, id, primaryTerm.get(), new byte[]{(byte) id}); break; case DELETE: - op = new Translog.Delete("test", Long.toString(id), id, newUid(Long.toString(id))); + op = new Translog.Delete("test", Long.toString(id), id, primaryTerm.get(), newUid(Long.toString(id))); break; case NO_OP: op = new Translog.NoOp(id, 1, Long.toString(id)); @@ -1052,13 +1056,13 @@ public void testSyncUpTo() throws IOException { for (int op = 0; op < translogOperations; op++) { int seqNo = ++count; final Translog.Location location = - translog.add(new Translog.Index("test", "" + op, seqNo, Integer.toString(seqNo).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + op, seqNo, primaryTerm.get(), Integer.toString(seqNo).getBytes(Charset.forName("UTF-8")))); if (randomBoolean()) { assertTrue("at least one operation pending", translog.syncNeeded()); assertTrue("this operation has not been synced", translog.ensureSynced(location)); assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced seqNo = ++count; - translog.add(new Translog.Index("test", "" + op, seqNo, Integer.toString(seqNo).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + op, seqNo, primaryTerm.get(), Integer.toString(seqNo).getBytes(Charset.forName("UTF-8")))); assertTrue("one pending operation", translog.syncNeeded()); assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now assertTrue("we only synced a previous operation yet", translog.syncNeeded()); @@ -1087,7 +1091,7 @@ public void testSyncUpToStream() throws IOException { rollAndCommit(translog); // do this first so that there is at least one pending tlog entry } final Translog.Location location = - translog.add(new Translog.Index("test", "" + op, op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8")))); locations.add(location); } Collections.shuffle(locations, random()); @@ -1115,7 +1119,7 @@ public void testLocationComparison() throws IOException { int count = 0; for (int op = 0; op < translogOperations; op++) { locations.add( - translog.add(new Translog.Index("test", "" + op, op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))))); + translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8"))))); if (rarely() && translogOperations > op + 1) { rollAndCommit(translog); } @@ -1152,7 +1156,7 @@ public void testBasicCheckpoint() throws IOException { int lastSynced = -1; long lastSyncedGlobalCheckpoint = globalCheckpoint.get(); for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (randomBoolean()) { globalCheckpoint.set(globalCheckpoint.get() + randomIntBetween(1, 16)); } @@ -1163,8 +1167,8 @@ public void testBasicCheckpoint() throws IOException { } } assertEquals(translogOperations, translog.totalOperations()); - translog.add(new Translog.Index( - "test", "" + translogOperations, translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + translogOperations, translogOperations, primaryTerm.get(), + Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8")))); final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)); try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) { @@ -1288,7 +1292,7 @@ public void testBasicRecovery() throws IOException { int minUncommittedOp = -1; final boolean commitOften = randomBoolean(); for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); final boolean commit = commitOften ? frequently() : rarely(); if (commit && op < translogOperations - 1) { rollAndCommit(translog); @@ -1309,7 +1313,7 @@ public void testBasicRecovery() throws IOException { assertNull(snapshot.next()); } } else { - translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED); + translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGeneration.translogFileGeneration)) { @@ -1331,7 +1335,7 @@ public void testRecoveryUncommitted() throws IOException { Translog.TranslogGeneration translogGeneration = null; final boolean sync = randomBoolean(); for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (op == prepareOp) { translogGeneration = translog.getGeneration(); translog.rollGeneration(); @@ -1348,7 +1352,7 @@ public void testRecoveryUncommitted() throws IOException { TranslogConfig config = translog.getConfig(); final String translogUUID = translog.getTranslogUUID(); final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); - try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) { + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1362,7 +1366,7 @@ public void testRecoveryUncommitted() throws IOException { } } if (randomBoolean()) { // recover twice - try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) { + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); @@ -1387,7 +1391,7 @@ public void testRecoveryUncommittedFileExists() throws IOException { Translog.TranslogGeneration translogGeneration = null; final boolean sync = randomBoolean(); for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (op == prepareOp) { translogGeneration = translog.getGeneration(); translog.rollGeneration(); @@ -1408,7 +1412,7 @@ public void testRecoveryUncommittedFileExists() throws IOException { final String translogUUID = translog.getTranslogUUID(); final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); - try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) { + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1423,7 +1427,7 @@ public void testRecoveryUncommittedFileExists() throws IOException { } if (randomBoolean()) { // recover twice - try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) { + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); @@ -1447,7 +1451,7 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { Translog.TranslogGeneration translogGeneration = null; final boolean sync = randomBoolean(); for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (op == prepareOp) { translogGeneration = translog.getGeneration(); translog.rollGeneration(); @@ -1466,15 +1470,15 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); final String translogUUID = translog.getTranslogUUID(); final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); - try (Translog ignored = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) { + try (Translog ignored = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { fail("corrupted"); } catch (IllegalStateException ex) { - assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3068, " + + assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3080, " + "numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1} but got: Checkpoint{offset=0, numOps=0, " + "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, minTranslogGeneration=0}", ex.getMessage()); } Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); - try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) { + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { assertNotNull(translogGeneration); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); @@ -1494,7 +1498,7 @@ public void testSnapshotFromStreamInput() throws IOException { List ops = new ArrayList<>(); int translogOperations = randomIntBetween(10, 100); for (int op = 0; op < translogOperations; op++) { - Translog.Index test = new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))); + Translog.Index test = new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))); ops.add(test); } Translog.writeOperations(out, ops); @@ -1509,8 +1513,8 @@ public void testLocationHashCodeEquals() throws IOException { int translogOperations = randomIntBetween(10, 100); try (Translog translog2 = create(createTempDir())) { for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); - locations2.add(translog2.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations2.add(translog2.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); } int iters = randomIntBetween(10, 100); for (int i = 0; i < iters; i++) { @@ -1536,7 +1540,7 @@ public void testOpenForeignTranslog() throws IOException { int translogOperations = randomIntBetween(1, 10); int firstUncommitted = 0; for (int op = 0; op < translogOperations; op++) { - locations.add(translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); + locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (randomBoolean()) { rollAndCommit(translog); firstUncommitted = op + 1; @@ -1551,12 +1555,12 @@ public void testOpenForeignTranslog() throws IOException { final String foreignTranslog = randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()); try { - new Translog(config, foreignTranslog, createTranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED); + new Translog(config, foreignTranslog, createTranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); fail("translog doesn't belong to this UUID"); } catch (TranslogCorruptedException ex) { } - this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED); + this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); try (Translog.Snapshot snapshot = this.translog.newSnapshotFromGen(translogGeneration.translogFileGeneration)) { for (int i = firstUncommitted; i < translogOperations; i++) { Translog.Operation next = snapshot.next(); @@ -1568,10 +1572,10 @@ public void testOpenForeignTranslog() throws IOException { } public void testFailOnClosedWrite() throws IOException { - translog.add(new Translog.Index("test", "1", 0, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(), Integer.toString(1).getBytes(Charset.forName("UTF-8")))); translog.close(); try { - translog.add(new Translog.Index("test", "1", 0, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(), Integer.toString(1).getBytes(Charset.forName("UTF-8")))); fail("closed"); } catch (AlreadyClosedException ex) { // all is well @@ -1609,7 +1613,7 @@ public void testCloseConcurrently() throws Throwable { } } - private static class TranslogThread extends Thread { + private class TranslogThread extends Thread { private final CountDownLatch downLatch; private final int opsPerThread; private final int threadId; @@ -1640,19 +1644,19 @@ public void run() { case CREATE: case INDEX: op = new Translog.Index("test", threadId + "_" + opCount, seqNoGenerator.getAndIncrement(), - randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8")); + primaryTerm.get(), randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8")); break; case DELETE: op = new Translog.Delete( "test", threadId + "_" + opCount, new Term("_uid", threadId + "_" + opCount), seqNoGenerator.getAndIncrement(), - 0, + primaryTerm.get(), 1 + randomInt(100000), randomFrom(VersionType.values())); break; case NO_OP: - op = new Translog.NoOp(seqNoGenerator.getAndIncrement(), randomNonNegativeLong(), randomAlphaOfLength(16)); + op = new Translog.NoOp(seqNoGenerator.getAndIncrement(), primaryTerm.get(), randomAlphaOfLength(16)); break; default: throw new AssertionError("unsupported operation type [" + type + "]"); @@ -1690,7 +1694,7 @@ public void testFailFlush() throws IOException { while (failed == false) { try { locations.add(translog.add( - new Translog.Index("test", "" + opsSynced, opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); + new Translog.Index("test", "" + opsSynced, opsSynced, primaryTerm.get(), Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); translog.sync(); opsSynced++; } catch (MockDirectoryWrapper.FakeIOException ex) { @@ -1711,7 +1715,7 @@ public void testFailFlush() throws IOException { if (randomBoolean()) { try { locations.add(translog.add( - new Translog.Index("test", "" + opsSynced, opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); + new Translog.Index("test", "" + opsSynced, opsSynced, primaryTerm.get(), Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); fail("we are already closed"); } catch (AlreadyClosedException ex) { assertNotNull(ex.getCause()); @@ -1745,7 +1749,7 @@ public void testFailFlush() throws IOException { translog.close(); // we are closed final String translogUUID = translog.getTranslogUUID(); final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); - try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) { + try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration()); assertFalse(tlog.syncNeeded()); @@ -1768,7 +1772,7 @@ public void testTranslogOpsCountIsCorrect() throws IOException { LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer borders regularly for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { locations.add(translog.add( - new Translog.Index("test", "" + opsAdded, opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))))); + new Translog.Index("test", "" + opsAdded, opsAdded, primaryTerm.get(), lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))))); try (Translog.Snapshot snapshot = this.translog.newSnapshot()) { assertEquals(opsAdded + 1, snapshot.totalOperations()); for (int i = 0; i < opsAdded; i++) { @@ -1787,11 +1791,11 @@ public void testTragicEventCanBeAnyException() throws IOException { TranslogConfig config = getTranslogConfig(tempDir); Translog translog = getFailableTranslog(fail, config, false, true, null, createTranslogDeletionPolicy()); LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly - translog.add(new Translog.Index("test", "1", 0, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(), lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); fail.failAlways(); try { Translog.Location location = translog.add( - new Translog.Index("test", "2", 1, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); + new Translog.Index("test", "2", 1, primaryTerm.get(), lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); if (randomBoolean()) { translog.ensureSynced(location); } else { @@ -1881,7 +1885,7 @@ protected void afterAdd() throws IOException { } } try (Translog tlog = - new Translog(config, translogUUID, createTranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED); + new Translog(config, translogUUID, createTranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); Translog.Snapshot snapshot = tlog.newSnapshot()) { if (writtenOperations.size() != snapshot.totalOperations()) { for (int i = 0; i < threadCount; i++) { @@ -1908,7 +1912,7 @@ protected void afterAdd() throws IOException { public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { int translogOperations = randomIntBetween(10, 100); for (int op = 0; op < translogOperations / 2; op++) { - translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } @@ -1916,7 +1920,7 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { translog.rollGeneration(); long comittedGeneration = randomLongBetween(2, translog.currentFileGeneration()); for (int op = translogOperations / 2; op < translogOperations; op++) { - translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } @@ -1927,7 +1931,7 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); - translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED); + translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); assertThat(translog.getMinFileGeneration(), equalTo(1L)); // no trimming done yet, just recovered for (long gen = 1; gen < translog.currentFileGeneration(); gen++) { @@ -1958,7 +1962,7 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { translogUUID = translog.getTranslogUUID(); int translogOperations = randomIntBetween(10, 100); for (int op = 0; op < translogOperations / 2; op++) { - translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } @@ -1966,7 +1970,7 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { translog.rollGeneration(); comittedGeneration = randomLongBetween(2, translog.currentFileGeneration()); for (int op = translogOperations / 2; op < translogOperations; op++) { - translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } @@ -1983,7 +1987,7 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); - try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) { + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { // we don't know when things broke exactly assertThat(translog.getMinFileGeneration(), greaterThanOrEqualTo(1L)); assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(comittedGeneration)); @@ -2047,9 +2051,9 @@ private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig }; if (translogUUID == null) { translogUUID = Translog.createEmptyTranslog( - config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, channelFactory); + config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, channelFactory, primaryTerm.get()); } - return new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED) { + return new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get) { @Override ChannelFactory getChannelFactory() { return channelFactory; @@ -2157,10 +2161,10 @@ public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException { Path tempDir = createTempDir(); TranslogConfig config = getTranslogConfig(tempDir); Translog translog = createTranslog(config); - translog.add(new Translog.Index("test", "boom", 0, "boom".getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "boom", 0, primaryTerm.get(), "boom".getBytes(Charset.forName("UTF-8")))); translog.close(); try { - new Translog(config, translog.getTranslogUUID(), createTranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED) { + new Translog(config, translog.getTranslogUUID(), createTranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get) { @Override protected TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, long initialGlobalCheckpoint) throws IOException { @@ -2175,7 +2179,7 @@ protected TranslogWriter createWriter(long fileGeneration, long initialMinTransl } public void testRecoverWithUnbackedNextGen() throws IOException { - translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + 0, 0, primaryTerm.get(), Integer.toString(1).getBytes(Charset.forName("UTF-8")))); translog.close(); TranslogConfig config = translog.getConfig(); @@ -2191,7 +2195,7 @@ public void testRecoverWithUnbackedNextGen() throws IOException { assertNotNull("operation 1 must be non-null", op); assertEquals("payload mismatch for operation 1", 1, Integer.parseInt(op.getSource().source.utf8ToString())); - tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(2).getBytes(Charset.forName("UTF-8")))); + tlog.add(new Translog.Index("test", "" + 1, 1, primaryTerm.get(), Integer.toString(2).getBytes(Charset.forName("UTF-8")))); } try (Translog tlog = openTranslog(config, translog.getTranslogUUID()); @@ -2209,7 +2213,7 @@ public void testRecoverWithUnbackedNextGen() throws IOException { } public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException { - translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + 0, 0, primaryTerm.get(), Integer.toString(0).getBytes(Charset.forName("UTF-8")))); translog.close(); TranslogConfig config = translog.getConfig(); Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); @@ -2218,7 +2222,7 @@ public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException { Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); try { - Translog tlog = new Translog(config, translog.getTranslogUUID(), translog.getDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED); + Translog tlog = new Translog(config, translog.getTranslogUUID(), translog.getDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); fail("file already exists?"); } catch (TranslogException ex) { // all is well @@ -2228,7 +2232,7 @@ public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException { } public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { - translog.add(new Translog.Index("test", "" + 0, 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + translog.add(new Translog.Index("test", "" + 0, 0, primaryTerm.get(), Integer.toString(0).getBytes(Charset.forName("UTF-8")))); translog.close(); TranslogConfig config = translog.getConfig(); final String translogUUID = translog.getTranslogUUID(); @@ -2240,7 +2244,7 @@ public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); // we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog")); - try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED)) { + try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) { assertFalse(tlog.syncNeeded()); try (Translog.Snapshot snapshot = tlog.newSnapshot()) { for (int i = 0; i < 1; i++) { @@ -2249,11 +2253,11 @@ public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString())); } } - tlog.add(new Translog.Index("test", "" + 1, 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + tlog.add(new Translog.Index("test", "" + 1, 1, primaryTerm.get(), Integer.toString(1).getBytes(Charset.forName("UTF-8")))); } try { - Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED); + Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); fail("file already exists?"); } catch (TranslogException ex) { // all is well @@ -2289,7 +2293,7 @@ public void testWithRandomException() throws IOException { LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { String doc = lineFileDocs.nextDoc().toString(); - failableTLog.add(new Translog.Index("test", "" + opsAdded, opsAdded, doc.getBytes(Charset.forName("UTF-8")))); + failableTLog.add(new Translog.Index("test", "" + opsAdded, opsAdded, primaryTerm.get(), doc.getBytes(Charset.forName("UTF-8")))); unsynced.add(doc); if (randomBoolean()) { failableTLog.sync(); @@ -2361,9 +2365,9 @@ public void testWithRandomException() throws IOException { deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); if (generationUUID == null) { // we never managed to successfully create a translog, make it - generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); } - try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED); + try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); Translog.Snapshot snapshot = translog.newSnapshotFromGen(minGenForRecovery)) { assertEquals(syncedDocs.size(), snapshot.totalOperations()); for (int i = 0; i < syncedDocs.size(); i++) { @@ -2422,20 +2426,20 @@ public void testCheckpointOnDiskFull() throws IOException { * Tests that closing views after the translog is fine and we can reopen the translog */ public void testPendingDelete() throws IOException { - translog.add(new Translog.Index("test", "1", 0, new byte[]{1})); + translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(), new byte[]{1})); translog.rollGeneration(); TranslogConfig config = translog.getConfig(); final String translogUUID = translog.getTranslogUUID(); final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings()); translog.close(); - translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED); - translog.add(new Translog.Index("test", "2", 1, new byte[]{2})); + translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); + translog.add(new Translog.Index("test", "2", 1, primaryTerm.get(), new byte[]{2})); translog.rollGeneration(); Closeable lock = translog.acquireRetentionLock(); - translog.add(new Translog.Index("test", "3", 2, new byte[]{3})); + translog.add(new Translog.Index("test", "3", 2, primaryTerm.get(), new byte[]{3})); translog.close(); IOUtils.close(lock); - translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED); + translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get); } public static Translog.Location randomTranslogLocation() { @@ -2500,20 +2504,31 @@ public void testRollGeneration() throws Exception { final int rolls = randomIntBetween(1, 16); int totalOperations = 0; int seqNo = 0; + final List primaryTerms = new ArrayList<>(); + primaryTerms.add(primaryTerm.get()); // We always create an empty translog. + primaryTerms.add(primaryTerm.get()); for (int i = 0; i < rolls; i++) { final int operations = randomIntBetween(1, 128); for (int j = 0; j < operations; j++) { - translog.add(new Translog.NoOp(seqNo++, 0, "test")); + translog.add(new Translog.NoOp(seqNo++, primaryTerm.get(), "test")); totalOperations++; } try (ReleasableLock ignored = translog.writeLock.acquire()) { + if (randomBoolean()){ + primaryTerm.incrementAndGet(); + } translog.rollGeneration(); + primaryTerms.add(primaryTerm.get()); } assertThat(translog.currentFileGeneration(), equalTo(generation + i + 1)); + assertThat(translog.getCurrent().getPrimaryTerm(), equalTo(primaryTerm.get())); assertThat(translog.totalOperations(), equalTo(totalOperations)); } for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); + final List storedPrimaryTerms = Stream.concat(translog.getReaders().stream(), Stream.of(translog.getCurrent())) + .map(t -> t.getPrimaryTerm()).collect(Collectors.toList()); + assertThat(storedPrimaryTerms, equalTo(primaryTerms)); } long minGenForRecovery = randomLongBetween(generation, generation + rolls); commit(translog, minGenForRecovery, generation + rolls); @@ -2616,8 +2631,11 @@ public void testSimpleCommit() throws IOException { final int operations = randomIntBetween(1, 4096); long seqNo = 0; for (int i = 0; i < operations; i++) { - translog.add(new Translog.NoOp(seqNo++, 0, "test'")); + translog.add(new Translog.NoOp(seqNo++, primaryTerm.get(), "test'")); if (rarely()) { + if (rarely()) { + primaryTerm.incrementAndGet(); + } translog.rollGeneration(); } } @@ -2674,7 +2692,7 @@ public void testSnapshotReadOperationInReverse() throws Exception { for (int gen = 0; gen < generations; gen++) { final int operations = randomIntBetween(1, 100); for (int i = 0; i < operations; i++) { - Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo.getAndIncrement(), new byte[]{1}); + Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo.getAndIncrement(), primaryTerm.get(), new byte[]{1}); translog.add(op); views.peek().add(op); } @@ -2699,7 +2717,7 @@ public void testSnapshotDedupOperations() throws Exception { List batch = LongStream.rangeClosed(0, between(0, 500)).boxed().collect(Collectors.toList()); Randomness.shuffle(batch); for (Long seqNo : batch) { - Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo, new byte[]{1}); + Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{1}); translog.add(op); latestOperations.put(op.seqNo(), op); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java deleted file mode 100644 index d57373ebfe349..0000000000000 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.translog; - -import org.elasticsearch.index.seqno.SequenceNumbers; -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; - -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; - -/** - * Tests for reading old and new translog files - */ -public class TranslogVersionTests extends ESTestCase { - - private void checkFailsToOpen(String file, String expectedMessage) throws IOException { - Path translogFile = getDataPath(file); - assertThat("test file should exist", Files.exists(translogFile), equalTo(true)); - try { - openReader(translogFile, 0); - fail("should be able to open an old translog"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), containsString(expectedMessage)); - } - - } - - public void testV0LegacyTranslogVersion() throws Exception { - checkFailsToOpen("/org/elasticsearch/index/translog/translog-v0.binary", "pre-1.4 translog"); - } - - public void testV1ChecksummedTranslogVersion() throws Exception { - checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1.binary", "pre-2.0 translog"); - } - - public void testCorruptedTranslogs() throws Exception { - try { - Path translogFile = getDataPath("/org/elasticsearch/index/translog/translog-v1-corrupted-magic.binary"); - assertThat("test file should exist", Files.exists(translogFile), equalTo(true)); - openReader(translogFile, 0); - fail("should have thrown an exception about the header being corrupt"); - } catch (TranslogCorruptedException e) { - assertThat("translog corruption from header: " + e.getMessage(), - e.getMessage().contains("translog looks like version 1 or later, but has corrupted header"), equalTo(true)); - } - - try { - Path translogFile = getDataPath("/org/elasticsearch/index/translog/translog-invalid-first-byte.binary"); - assertThat("test file should exist", Files.exists(translogFile), equalTo(true)); - openReader(translogFile, 0); - fail("should have thrown an exception about the header being corrupt"); - } catch (TranslogCorruptedException e) { - assertThat("translog corruption from header: " + e.getMessage(), - e.getMessage().contains("Invalid first byte in translog file, got: 1, expected 0x00 or 0x3f"), equalTo(true)); - } - - checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1-corrupted-body.binary", "pre-2.0 translog"); - } - - public void testTruncatedTranslog() throws Exception { - checkFailsToOpen("/org/elasticsearch/index/translog/translog-v1-truncated.binary", "pre-2.0 translog"); - } - - public TranslogReader openReader(final Path path, final long id) throws IOException { - try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) { - final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; - final long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; - final Checkpoint checkpoint = - new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbers.UNASSIGNED_SEQ_NO, id); - return TranslogReader.open(channel, path, checkpoint, null); - } - } -} diff --git a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index a914eb435bb7d..a2149b9d28a0b 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -241,7 +241,7 @@ public void testUnallocatedShardsDoesNotHang() throws InterruptedException { private void indexDoc(Engine engine, String id) throws IOException { final ParsedDocument doc = InternalEngineTests.createParsedDoc(id, null); - final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc)); + final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), 1L, doc)); assertThat(indexResult.getFailure(), nullValue()); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 917f5deeac113..babf8518d4492 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -268,7 +268,7 @@ private Engine.Index getIndex(final String id) { final BytesReference source = new BytesArray(new byte[] { 1 }); final ParsedDocument doc = new ParsedDocument(versionField, seqID, id, type, null, Arrays.asList(document), source, XContentType.JSON, null); - return new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc); + return new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), randomNonNegativeLong(), doc); } public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 49e557c3dde78..f46ab7ebbd603 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -200,7 +200,8 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { final String historyUUIDtoUse = UUIDs.randomBase64UUID(random()); if (randomBoolean()) { // create a new translog - translogUUIDtoUse = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), flushedDocs, replica.shardId()); + translogUUIDtoUse = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), flushedDocs, + replica.shardId(), replica.getPrimaryTerm()); translogGenToUse = 1; } else { translogUUIDtoUse = translogGeneration.translogUUID; diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 667adf9d990cc..dea92c2927d86 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -84,6 +84,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.LongSupplier; import java.util.function.ToLongBiFunction; @@ -110,6 +111,8 @@ public abstract class EngineTestCase extends ESTestCase { protected String codecName; protected Path primaryTranslogDir; protected Path replicaTranslogDir; + // A default primary term is used by engine instances created in this test. + protected AtomicLong primaryTerm = new AtomicLong(); protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException { assertVisibleCount(engine, numDocs, true); @@ -130,7 +133,7 @@ protected static void assertVisibleCount(Engine engine, int numDocs, boolean ref @Before public void setUp() throws Exception { super.setUp(); - + primaryTerm.set(randomLongBetween(1, Long.MAX_VALUE)); CodecService codecService = new CodecService(null, logger); String name = Codec.getDefault().getName(); if (Arrays.asList(codecService.availableCodecs()).contains(name)) { @@ -178,7 +181,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(), - config.getCircuitBreakerService(), globalCheckpointSupplier); + config.getCircuitBreakerService(), globalCheckpointSupplier, config.getPrimaryTermSupplier()); } public EngineConfig copy(EngineConfig config, Analyzer analyzer) { @@ -187,7 +190,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(), - config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier()); + config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier()); } @Override @@ -260,15 +263,16 @@ public Directory newDirectory() throws IOException { return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); } - protected Translog createTranslog() throws IOException { - return createTranslog(primaryTranslogDir); + protected Translog createTranslog(LongSupplier primaryTermSupplier) throws IOException { + return createTranslog(primaryTranslogDir, primaryTermSupplier); } - protected Translog createTranslog(Path translogPath) throws IOException { + protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSupplier) throws IOException { TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); - String translogUUID = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId); - return new Translog( - translogConfig, translogUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.NO_OPS_PERFORMED); + String translogUUID = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId, + primaryTermSupplier.getAsLong()); + return new Translog(translogConfig, translogUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), + () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTermSupplier); } protected InternalEngine createEngine(Store store, Path translogPath) throws IOException { @@ -366,8 +370,8 @@ private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFact final Directory directory = store.directory(); if (Lucene.indexExists(directory) == false) { store.createEmpty(); - final String translogUuid = - Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + final String translogUuid = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); store.associateIndexWithNewTranslog(translogUuid); } @@ -449,7 +453,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { new NoneCircuitBreakerService(), globalCheckpointSupplier == null ? new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED) : - globalCheckpointSupplier); + globalCheckpointSupplier, primaryTerm::get); return config; } @@ -475,12 +479,12 @@ protected Engine.Get newGet(boolean realtime, ParsedDocument doc) { } protected Engine.Index indexForDoc(ParsedDocument doc) { - return new Engine.Index(newUid(doc), doc); + return new Engine.Index(newUid(doc), primaryTerm.get(), doc); } protected Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, boolean isRetry) { - return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, + return new Engine.Index(newUid(doc), doc, seqNo, primaryTerm.get(), version, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); }