Skip to content

Commit

Permalink
Add primary term to translog header (#29227)
Browse files Browse the repository at this point in the history
This change adds the current primary term to the header of the current
translog file. Having a term in a translog header is a prerequisite step
that allows us to trim translog operations given the max valid seq# for
that term.

This commit also updates tests to conform the primary term invariant 
which guarantees that all translog operations in a translog file have
its terms at most the term stored in the translog header.
  • Loading branch information
dnhatn authored Apr 12, 2018
1 parent d72d3f9 commit f96e00b
Show file tree
Hide file tree
Showing 31 changed files with 680 additions and 482 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public final class IndexSettings {
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB),
/*
* An empty translog occupies 43 bytes on disk. If the flush threshold is below this, the flush thread
* An empty translog occupies 55 bytes on disk. If the flush threshold is below this, the flush thread
* can get stuck in an infinite loop as the shouldPeriodicallyFlush can still be true after flushing.
* However, small thresholds are useful for testing so we do not add a large lower bound here.
*/
Expand Down Expand Up @@ -220,7 +220,7 @@ public final class IndexSettings {
"index.translog.generation_threshold_size",
new ByteSizeValue(64, ByteSizeUnit.MB),
/*
* An empty translog occupies 43 bytes on disk. If the generation threshold is
* An empty translog occupies 55 bytes on disk. If the generation threshold is
* below this, the flush thread can get stuck in an infinite loop repeatedly
* rolling the generation as every new generation will already exceed the
* generation threshold. However, small thresholds are useful for testing so we
Expand Down
15 changes: 7 additions & 8 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1066,14 +1066,13 @@ public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long ve
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
}

public Index(Term uid, ParsedDocument doc) {
this(uid, doc, Versions.MATCH_ANY);
public Index(Term uid, long primaryTerm, ParsedDocument doc) {
this(uid, primaryTerm, doc, Versions.MATCH_ANY);
} // TEST ONLY

Index(Term uid, ParsedDocument doc, long version) {
// use a primary term of 2 to allow tests to reduce it to a valid >0 term
this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 2, version, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), -1, false);
Index(Term uid, long primaryTerm, ParsedDocument doc, long version) {
this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), -1, false);
} // TEST ONLY

public ParsedDocument parsedDoc() {
Expand Down Expand Up @@ -1143,8 +1142,8 @@ public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, lo
this.id = Objects.requireNonNull(id);
}

public Delete(String type, String id, Term uid) {
this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
public Delete(String type, String id, Term uid, long primaryTerm) {
this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
}

public Delete(Delete template, VersionType versionType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public final class EngineConfig {
@Nullable
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final LongSupplier primaryTermSupplier;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -125,7 +126,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier) {
LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) {
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
Expand All @@ -152,6 +153,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
this.translogRecoveryRunner = translogRecoveryRunner;
this.circuitBreakerService = circuitBreakerService;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.primaryTermSupplier = primaryTermSupplier;
}

/**
Expand Down Expand Up @@ -354,4 +356,11 @@ public Sort getIndexSort() {
public CircuitBreakerService getCircuitBreakerService() {
return this.circuitBreakerService;
}

/**
* Returns a supplier that supplies the latest primary term value of the associated shard.
*/
public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID = loadTranslogUUIDFromLastCommit();
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier);
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2136,7 +2136,7 @@ private EngineConfig newEngineConfig() {
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker);
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
} else if (indexShouldExists) {
// since we recover from local, just fill the files and size
Expand All @@ -407,8 +408,8 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
}
} else {
store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(),
SequenceNumbers.NO_OPS_PERFORMED, shardId);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId, indexShard.getPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
}
indexShard.openEngineAndRecoverFromTranslog();
Expand Down Expand Up @@ -456,7 +457,8 @@ private void restore(final IndexShard indexShard, final Repository repository, f
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.openEngineAndRecoverFromTranslog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
protected final long generation;
protected final FileChannel channel;
protected final Path path;
protected final long firstOperationOffset;
protected final TranslogHeader header;

public BaseTranslogReader(long generation, FileChannel channel, Path path, long firstOperationOffset) {
public BaseTranslogReader(long generation, FileChannel channel, Path path, TranslogHeader header) {
assert Translog.parseIdFromFileName(path) == generation : "generation mismatch. Path: " + Translog.parseIdFromFileName(path) + " but generation: " + generation;

this.generation = generation;
this.path = path;
this.channel = channel;
this.firstOperationOffset = firstOperationOffset;
this.header = header;
}

public long getGeneration() {
Expand All @@ -57,7 +57,14 @@ public long getGeneration() {
abstract Checkpoint getCheckpoint();

public final long getFirstOperationOffset() {
return firstOperationOffset;
return header.sizeInBytes();
}

/**
* Returns the primary term associated with this translog reader.
*/
public final long getPrimaryTerm() {
return header.getPrimaryTerm();
}

/** read the size of the op (i.e., number of bytes, including the op size) written at the given position */
Expand Down Expand Up @@ -100,7 +107,12 @@ protected final BufferedChecksumStreamInput checksummedStream(ByteBuffer reusabl
}

protected Translog.Operation read(BufferedChecksumStreamInput inStream) throws IOException {
return Translog.readOperation(inStream);
final Translog.Operation op = Translog.readOperation(inStream);
if (op.primaryTerm() > getPrimaryTerm() && getPrimaryTerm() != TranslogHeader.UNKNOWN_PRIMARY_TERM) {
throw new TranslogCorruptedException("Operation's term is newer than translog header term; " +
"operation term[" + op.primaryTerm() + "], translog header term [" + getPrimaryTerm() + "]");
}
return op;
}

/**
Expand Down
Loading

0 comments on commit f96e00b

Please sign in to comment.