Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add primary term to translog header #29227

Merged
merged 18 commits into from
Apr 12, 2018
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public final class IndexSettings {
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB),
/*
* An empty translog occupies 43 bytes on disk. If the flush threshold is below this, the flush thread
* An empty translog occupies 55 bytes on disk. If the flush threshold is below this, the flush thread
* can get stuck in an infinite loop as the shouldPeriodicallyFlush can still be true after flushing.
* However, small thresholds are useful for testing so we do not add a large lower bound here.
*/
Expand Down Expand Up @@ -220,7 +220,7 @@ public final class IndexSettings {
"index.translog.generation_threshold_size",
new ByteSizeValue(64, ByteSizeUnit.MB),
/*
* An empty translog occupies 43 bytes on disk. If the generation threshold is
* An empty translog occupies 55 bytes on disk. If the generation threshold is
* below this, the flush thread can get stuck in an infinite loop repeatedly
* rolling the generation as every new generation will already exceed the
* generation threshold. However, small thresholds are useful for testing so we
Expand Down
15 changes: 7 additions & 8 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1066,14 +1066,13 @@ public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long ve
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
}

public Index(Term uid, ParsedDocument doc) {
this(uid, doc, Versions.MATCH_ANY);
public Index(Term uid, long primaryTerm, ParsedDocument doc) {
this(uid, primaryTerm, doc, Versions.MATCH_ANY);
} // TEST ONLY

Index(Term uid, ParsedDocument doc, long version) {
// use a primary term of 2 to allow tests to reduce it to a valid >0 term
this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 2, version, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), -1, false);
Index(Term uid, long primaryTerm, ParsedDocument doc, long version) {
this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), -1, false);
} // TEST ONLY

public ParsedDocument parsedDoc() {
Expand Down Expand Up @@ -1147,8 +1146,8 @@ public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, lo
this.id = Objects.requireNonNull(id);
}

public Delete(String type, String id, Term uid) {
this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
public Delete(String type, String id, Term uid, long primaryTerm) {
this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
}

public Delete(Delete template, VersionType versionType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public final class EngineConfig {
@Nullable
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final LongSupplier primaryTermSupplier;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -125,7 +126,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier) {
LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) {
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
Expand All @@ -152,6 +153,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
this.translogRecoveryRunner = translogRecoveryRunner;
this.circuitBreakerService = circuitBreakerService;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.primaryTermSupplier = primaryTermSupplier;
}

/**
Expand Down Expand Up @@ -354,4 +356,11 @@ public Sort getIndexSort() {
public CircuitBreakerService getCircuitBreakerService() {
return this.circuitBreakerService;
}

/**
* Returns a supplier that supplies the latest primary term value of the associated shard.
*/
public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID = loadTranslogUUIDFromLastCommit();
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier);
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2150,7 +2150,7 @@ private EngineConfig newEngineConfig() {
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker);
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
} else if (indexShouldExists) {
// since we recover from local, just fill the files and size
Expand All @@ -407,8 +408,8 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
}
} else {
store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(),
SequenceNumbers.NO_OPS_PERFORMED, shardId);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId, indexShard.getPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
}
indexShard.openEngineAndRecoverFromTranslog();
Expand Down Expand Up @@ -456,7 +457,8 @@ private void restore(final IndexShard indexShard, final Repository repository, f
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.openEngineAndRecoverFromTranslog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public abstract class BaseTranslogReader implements Comparable<BaseTranslogReade
protected final long generation;
protected final FileChannel channel;
protected final Path path;
protected final long firstOperationOffset;
protected final TranslogHeader header;

public BaseTranslogReader(long generation, FileChannel channel, Path path, long firstOperationOffset) {
public BaseTranslogReader(long generation, FileChannel channel, Path path, TranslogHeader header) {
assert Translog.parseIdFromFileName(path) == generation : "generation mismatch. Path: " + Translog.parseIdFromFileName(path) + " but generation: " + generation;

this.generation = generation;
this.path = path;
this.channel = channel;
this.firstOperationOffset = firstOperationOffset;
this.header = header;
}

public long getGeneration() {
Expand All @@ -57,7 +57,14 @@ public long getGeneration() {
abstract Checkpoint getCheckpoint();

public final long getFirstOperationOffset() {
return firstOperationOffset;
return header.sizeInBytes();
}

/**
* Returns the primary term associated with this translog reader.
*/
public final long getPrimaryTerm() {
return header.getPrimaryTerm();
}

/** read the size of the op (i.e., number of bytes, including the op size) written at the given position */
Expand Down Expand Up @@ -100,7 +107,12 @@ protected final BufferedChecksumStreamInput checksummedStream(ByteBuffer reusabl
}

protected Translog.Operation read(BufferedChecksumStreamInput inStream) throws IOException {
return Translog.readOperation(inStream);
final Translog.Operation op = Translog.readOperation(inStream);
if (op.primaryTerm() > getPrimaryTerm() && getPrimaryTerm() != TranslogHeader.UNKNOWN_PRIMARY_TERM) {
throw new TranslogCorruptedException("Operation's term is newer than translog header term; " +
"operation term[" + op.primaryTerm() + "], translog header term [" + getPrimaryTerm() + "]");
}
return op;
}

/**
Expand Down
Loading