Skip to content

Commit

Permalink
bootstrap primary term for an empty translog
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Mar 28, 2018
1 parent d50d7ed commit ad5a212
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
} else if (indexShouldExists) {
// since we recover from local, just fill the files and size
Expand All @@ -407,8 +408,8 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
}
} else {
store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(),
SequenceNumbers.NO_OPS_PERFORMED, shardId);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId, indexShard.getPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
}
indexShard.openEngineAndRecoverFromTranslog();
Expand Down Expand Up @@ -456,7 +457,8 @@ private void restore(final IndexShard indexShard, final Repository repository, f
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.openEngineAndRecoverFromTranslog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1716,13 +1716,14 @@ List<TranslogReader> getReaders() {
return readers;
}

public static String createEmptyTranslog(final Path location, final long initialGlobalCheckpoint, final ShardId shardId)
throws IOException {
public static String createEmptyTranslog(final Path location, final long initialGlobalCheckpoint,
final ShardId shardId, final long primaryTerm) throws IOException {
final ChannelFactory channelFactory = FileChannel::open;
return createEmptyTranslog(location, initialGlobalCheckpoint, shardId, channelFactory);
return createEmptyTranslog(location, initialGlobalCheckpoint, shardId, channelFactory, primaryTerm);
}

static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, ShardId shardId, ChannelFactory channelFactory) throws IOException {
static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, ShardId shardId,
ChannelFactory channelFactory, long primaryTerm) throws IOException {
IOUtils.rm(location);
Files.createDirectories(location);
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, 1, initialGlobalCheckpoint, 1);
Expand All @@ -1732,7 +1733,7 @@ static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, S
final String translogUUID = UUIDs.randomBase64UUID();
TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, location.resolve(getFilename(1)), channelFactory,
new ByteSizeValue(10), 1, initialGlobalCheckpoint,
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, 0L
() -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm
);
writer.close();
return translogUUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa
store.ensureIndexHasHistoryUUID();
}
// TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
final String translogUUID =
Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, indexShard.getPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException {
}
if (randomBoolean()) {
final String translogUUID = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(),
SequenceNumbers.UNASSIGNED_SEQ_NO, shardId);
SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, primaryTerm.get());
store.associateIndexWithNewTranslog(translogUUID);
}
engine = new InternalEngine(config);
Expand Down Expand Up @@ -2369,7 +2369,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
{
store.createEmpty();
final String translogUUID =
Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
store.associateIndexWithNewTranslog(translogUUID);
ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
Expand Down Expand Up @@ -2409,7 +2409,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
// open index with new tlog
{
final String translogUUID =
Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
store.associateIndexWithNewTranslog(translogUUID);
try (InternalEngine engine = new InternalEngine(config)) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
Expand Down Expand Up @@ -2454,7 +2454,7 @@ public void testMissingTranslog() throws IOException {
// expected
}
// when a new translog is created it should be ok
final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir, SequenceNumbers.UNASSIGNED_SEQ_NO, shardId);
final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir, SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, primaryTerm);
store.associateIndexWithNewTranslog(translogUUID);
EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null);
engine = new InternalEngine(config);
Expand Down Expand Up @@ -2519,7 +2519,7 @@ public void testTranslogCleanUpPostCommitCrash() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get();
store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId);
final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId, primaryTerm.get());
store.associateIndexWithNewTranslog(translogUUID);
try (InternalEngine engine =
new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null,
Expand Down Expand Up @@ -2681,7 +2681,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
engine.close();

final Path badTranslogLog = createTempDir();
final String badUUID = Translog.createEmptyTranslog(badTranslogLog, SequenceNumbers.NO_OPS_PERFORMED, shardId);
final String badUUID = Translog.createEmptyTranslog(badTranslogLog, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
Translog translog = new Translog(
new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE),
badUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
Expand Down Expand Up @@ -3311,7 +3311,7 @@ public void testEngineMaxTimestampIsInitialized() throws IOException {
}
try (Store store = createStore(newFSDirectory(storeDir))) {
if (randomBoolean() || true) {
final String translogUUID = Translog.createEmptyTranslog(translogDir, SequenceNumbers.NO_OPS_PERFORMED, shardId);
final String translogUUID = Translog.createEmptyTranslog(translogDir, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
store.associateIndexWithNewTranslog(translogUUID);
}
try (Engine engine = new InternalEngine(configSupplier.apply(store))) {
Expand Down Expand Up @@ -4114,7 +4114,7 @@ public void testKeepTranslogAfterGlobalCheckpoint() throws Exception {
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId);
final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId, primaryTerm.get());
store.associateIndexWithNewTranslog(translogUUID);

final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
}
};
store.createEmpty();
final long primaryTerm = randomNonNegativeLong();
final String translogUUID =
Translog.createEmptyTranslog(translogConfig.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
Translog.createEmptyTranslog(translogConfig.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm);
store.associateIndexWithNewTranslog(translogUUID);
final long primaryTerm = randomNonNegativeLong();
EngineConfig config = new EngineConfig(shardId, allocationId, threadPool,
indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -155,7 +154,7 @@ protected void afterIfSuccessful() throws Exception {

protected Translog createTranslog(TranslogConfig config) throws IOException {
String translogUUID =
Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
return new Translog(config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()),
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
}
Expand Down Expand Up @@ -212,7 +211,7 @@ private Translog create(Path path) throws IOException {
globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final TranslogConfig translogConfig = getTranslogConfig(path);
final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings());
final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId);
final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
return new Translog(translogConfig, translogUUID, deletionPolicy, () -> globalCheckpoint.get(), primaryTerm::get);
}

Expand Down Expand Up @@ -2032,7 +2031,7 @@ private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig
};
if (translogUUID == null) {
translogUUID = Translog.createEmptyTranslog(
config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, channelFactory);
config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, channelFactory, primaryTerm.get());
}
return new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get) {
@Override
Expand Down Expand Up @@ -2346,7 +2345,7 @@ public void testWithRandomException() throws IOException {
deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery);
if (generationUUID == null) {
// we never managed to successfully create a translog, make it
generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
}
try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
Translog.Snapshot snapshot = translog.newSnapshotFromGen(minGenForRecovery)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception {
final String historyUUIDtoUse = UUIDs.randomBase64UUID(random());
if (randomBoolean()) {
// create a new translog
translogUUIDtoUse = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), flushedDocs, replica.shardId());
translogUUIDtoUse = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), flushedDocs,
replica.shardId(), replica.getPrimaryTerm());
translogGenToUse = 1;
} else {
translogUUIDtoUse = translogGeneration.translogUUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ protected Translog createTranslog(LongSupplier primaryTermSupplier) throws IOExc

protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSupplier) throws IOException {
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
String translogUUID = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId);
String translogUUID = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId,
primaryTermSupplier.getAsLong());
return new Translog(translogConfig, translogUUID, createTranslogDeletionPolicy(INDEX_SETTINGS),
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTermSupplier);
}
Expand Down Expand Up @@ -369,8 +370,8 @@ private InternalEngine createEngine(@Nullable IndexWriterFactory indexWriterFact
final Directory directory = store.directory();
if (Lucene.indexExists(directory) == false) {
store.createEmpty();
final String translogUuid =
Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
final String translogUuid = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
store.associateIndexWithNewTranslog(translogUuid);

}
Expand Down

0 comments on commit ad5a212

Please sign in to comment.