diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java
index 66fc680beb62c..5868698416c30 100644
--- a/server/src/main/java/org/opensearch/index/engine/Engine.java
+++ b/server/src/main/java/org/opensearch/index/engine/Engine.java
@@ -81,7 +81,9 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
-import org.opensearch.index.translog.TranslogStats;
+import org.opensearch.index.translog.TranslogManager;
+import org.opensearch.index.translog.TranslogDeletionPolicy;
+import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.search.suggest.completion.CompletionStats;
import java.io.Closeable;
@@ -107,7 +109,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
-import java.util.stream.Stream;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
@@ -167,6 +168,8 @@ public final EngineConfig config() {
return engineConfig;
}
+ public abstract TranslogManager translogManager();
+
protected abstract SegmentInfos getLastCommittedSegmentInfos();
/**
@@ -346,12 +349,6 @@ boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and te
*/
public abstract boolean isThrottled();
- /**
- * Trims translog for terms below belowTerm
and seq# above aboveSeqNo
- * @see Translog#trimOperations(long, long)
- */
- public abstract void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException;
-
/**
* A Lock implementation that always allows the lock to be acquired
*
@@ -784,18 +781,6 @@ public enum SearcherScope {
INTERNAL
}
- /**
- * Checks if the underlying storage sync is required.
- */
- public abstract boolean isTranslogSyncNeeded();
-
- /**
- * Ensures that all locations in the given stream have been written to the underlying storage.
- */
- public abstract boolean ensureTranslogSynced(Stream locations) throws IOException;
-
- public abstract void syncTranslog() throws IOException;
-
/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
@@ -831,13 +816,6 @@ public abstract Translog.Snapshot newChangesSnapshot(
*/
public abstract long getMinRetainedSeqNo();
- public abstract TranslogStats getTranslogStats();
-
- /**
- * Returns the last location that the translog of this engine has written into.
- */
- public abstract Translog.Location getTranslogLastWriteLocation();
-
protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
@@ -905,6 +883,22 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
return stats;
}
+ protected TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) {
+ TranslogDeletionPolicy customTranslogDeletionPolicy = null;
+ if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
+ customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
+ .create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
+ }
+ return Objects.requireNonNullElseGet(
+ customTranslogDeletionPolicy,
+ () -> new DefaultTranslogDeletionPolicy(
+ engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
+ engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
+ engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
+ )
+ );
+ }
+
protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegmentFileSizes, SegmentsStats stats) {
stats.add(1);
if (includeSegmentFileSizes) {
@@ -1152,25 +1146,6 @@ public final void flush() throws EngineException {
flush(false, false);
}
- /**
- * checks and removes translog files that no longer need to be retained. See
- * {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details
- */
- public abstract void trimUnreferencedTranslogFiles() throws EngineException;
-
- /**
- * Tests whether or not the translog generation should be rolled to a new generation.
- * This test is based on the size of the current generation compared to the configured generation threshold size.
- *
- * @return {@code true} if the current generation should be rolled to a new generation
- */
- public abstract boolean shouldRollTranslogGeneration();
-
- /**
- * Rolls the translog generation and cleans unneeded.
- */
- public abstract void rollTranslogGeneration() throws EngineException;
-
/**
* Triggers a forced merge on this engine
*/
@@ -1982,14 +1957,6 @@ public interface Warmer {
*/
public abstract void deactivateThrottling();
- /**
- * This method replays translog to restore the Lucene index which might be reverted previously.
- * This ensures that all acknowledged writes are restored correctly when this engine is promoted.
- *
- * @return the number of translog operations have been recovered
- */
- public abstract int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException;
-
/**
* Fills up the local checkpoints history with no-ops until the local checkpoint
* and the max seen sequence ID are identical.
@@ -1998,20 +1965,6 @@ public interface Warmer {
*/
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;
- /**
- * Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
- * This operation will close the engine if the recovery fails.
- *
- * @param translogRecoveryRunner the translog recovery runner
- * @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
- */
- public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;
-
- /**
- * Do not replay translog operations, but make the engine be ready.
- */
- public abstract void skipTranslogRecovery();
-
/**
* Tries to prune buffered deletes from the version map.
*/
@@ -2032,16 +1985,6 @@ public long getMaxSeenAutoIdTimestamp() {
*/
public abstract void updateMaxUnsafeAutoIdTimestamp(long newTimestamp);
- /**
- * The runner for translog recovery
- *
- * @opensearch.internal
- */
- @FunctionalInterface
- public interface TranslogRecoveryRunner {
- int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
- }
-
/**
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java
index d2d688a90353e..d5c7a2f7cd0dc 100644
--- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java
@@ -33,7 +33,6 @@
package org.opensearch.index.engine;
import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
@@ -105,12 +104,15 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardId;
-import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.index.translog.TranslogDeletionPolicy;
-import org.opensearch.index.translog.TranslogStats;
+import org.opensearch.index.translog.TranslogManager;
+import org.opensearch.index.translog.TranslogException;
+import org.opensearch.index.translog.InternalTranslogManager;
+import org.opensearch.index.translog.listener.TranslogEventListener;
+import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
import org.opensearch.search.suggest.completion.CompletionStats;
import org.opensearch.threadpool.ThreadPool;
@@ -149,7 +151,7 @@ public class InternalEngine extends Engine {
*/
private volatile long lastDeleteVersionPruneTimeMSec;
- private final Translog translog;
+ private final TranslogManager translogManager;
private final OpenSearchConcurrentMergeScheduler mergeScheduler;
private final IndexWriter indexWriter;
@@ -176,7 +178,6 @@ public class InternalEngine extends Engine {
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
// incoming indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger();
- private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
// max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine.
@@ -221,36 +222,36 @@ public class InternalEngine extends Engine {
private volatile String forceMergeUUID;
public InternalEngine(EngineConfig engineConfig) {
- this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
+ this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER);
}
- InternalEngine(EngineConfig engineConfig, int maxDocs, BiFunction localCheckpointTrackerSupplier) {
+ public InternalEngine(EngineConfig engineConfig, TranslogEventListener translogEventListener) {
+ this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new, translogEventListener);
+ }
+
+ @Override
+ public TranslogManager translogManager() {
+ return translogManager;
+ }
+
+ InternalEngine(
+ EngineConfig engineConfig,
+ int maxDocs,
+ BiFunction localCheckpointTrackerSupplier,
+ TranslogEventListener translogEventListener
+ ) {
super(engineConfig);
this.maxDocs = maxDocs;
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
- final TranslogDeletionPolicy translogDeletionPolicy;
- TranslogDeletionPolicy customTranslogDeletionPolicy = null;
- if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
- customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
- .create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
- }
- if (customTranslogDeletionPolicy != null) {
- translogDeletionPolicy = customTranslogDeletionPolicy;
- } else {
- translogDeletionPolicy = new DefaultTranslogDeletionPolicy(
- engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
- engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
- engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
- );
- }
+ final TranslogDeletionPolicy translogDeletionPolicy = getTranslogDeletionPolicy(engineConfig);
store.incRef();
IndexWriter writer = null;
- Translog translog = null;
ExternalReaderManager externalReaderManager = null;
OpenSearchReaderManager internalReaderManager = null;
EngineMergeScheduler scheduler = null;
+ TranslogManager translogManagerRef = null;
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
@@ -258,21 +259,48 @@ public InternalEngine(EngineConfig engineConfig) {
throttle = new IndexThrottle();
try {
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
- translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), seqNo -> {
- final LocalCheckpointTracker tracker = getLocalCheckpointTracker();
- assert tracker != null || getTranslog().isOpen() == false;
- if (tracker != null) {
- tracker.markSeqNoAsPersisted(seqNo);
+ final Map userData = store.readLastCommittedSegmentsInfo().getUserData();
+ final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
+ TranslogEventListener internalTranslogEventListener = new TranslogEventListener() {
+ @Override
+ public void onAfterTranslogSync() {
+ revisitIndexDeletionPolicyOnTranslogSynced();
}
- });
- assert translog.getGeneration() != null;
- this.translog = translog;
+
+ @Override
+ public void onAfterTranslogRecovery() {
+ flush(false, true);
+ translogManager.trimUnreferencedTranslogFiles();
+ }
+
+ @Override
+ public void onFailure(String reason, Exception ex) {
+ if (ex instanceof AlreadyClosedException) {
+ failOnTragicEvent((AlreadyClosedException) ex);
+ } else {
+ failEngine(reason, ex);
+ }
+ }
+ };
+ translogManagerRef = new InternalTranslogManager(
+ engineConfig.getTranslogConfig(),
+ engineConfig.getPrimaryTermSupplier(),
+ engineConfig.getGlobalCheckpointSupplier(),
+ translogDeletionPolicy,
+ shardId,
+ readLock,
+ () -> getLocalCheckpointTracker(),
+ translogUUID,
+ new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
+ this::ensureOpen
+ );
+ this.translogManager = translogManagerRef;
this.softDeletesPolicy = newSoftDeletesPolicy();
this.combinedDeletionPolicy = new CombinedDeletionPolicy(
logger,
translogDeletionPolicy,
softDeletesPolicy,
- translog::getLastSyncedGlobalCheckpoint
+ translogManager.getTranslog()::getLastSyncedGlobalCheckpoint
);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
writer = createWriter();
@@ -297,9 +325,6 @@ public InternalEngine(EngineConfig engineConfig) {
this.internalReaderManager = internalReaderManager;
this.externalReaderManager = externalReaderManager;
internalReaderManager.addListener(versionMap);
- assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
- // don't allow commits until we are done with recovering
- pendingTranslogRecovery.set(true);
for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) {
this.externalReaderManager.addListener(listener);
}
@@ -308,7 +333,9 @@ public InternalEngine(EngineConfig engineConfig) {
}
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint());
this.internalReaderManager.addListener(lastRefreshedCheckpointListener);
- maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()));
+ maxSeqNoOfUpdatesOrDeletes = new AtomicLong(
+ SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog().getMaxSeqNo())
+ );
if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) {
restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
@@ -325,6 +352,10 @@ public InternalEngine(EngineConfig engineConfig) {
success = true;
} finally {
if (success == false) {
+ Translog translog = null;
+ if (translogManagerRef != null) {
+ translog = translogManagerRef.getTranslog();
+ }
IOUtils.closeWhileHandlingException(writer, translog, internalReaderManager, externalReaderManager, scheduler);
if (isClosed.get() == false) {
// failure we need to dec the store reference
@@ -358,7 +389,7 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1;
}
return new SoftDeletesPolicy(
- translog::getLastSyncedGlobalCheckpoint,
+ translogManager.getTranslog()::getLastSyncedGlobalCheckpoint,
lastMinRetainedSeqNo,
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(),
engineConfig.retentionLeasesSupplier()
@@ -457,17 +488,6 @@ final boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) {
return true;
}
- @Override
- public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
- try (ReleasableLock ignored = readLock.acquire()) {
- ensureOpen();
- final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
- try (Translog.Snapshot snapshot = getTranslog().newSnapshot(localCheckpoint + 1, Long.MAX_VALUE)) {
- return translogRecoveryRunner.run(this, snapshot);
- }
- }
- }
-
@Override
public int fillSeqNoGaps(long primaryTerm) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
@@ -486,7 +506,7 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
+ "]";
}
- syncTranslog(); // to persist noops associated with the advancement of the local checkpoint
+ translogManager.syncTranslog(); // to persist noops associated with the advancement of the local checkpoint
assert localCheckpointTracker.getPersistedCheckpoint() == maxSeqNo
: "persisted local checkpoint did not advance to max seq no; is ["
+ localCheckpointTracker.getPersistedCheckpoint()
@@ -508,61 +528,6 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
}
}
- @Override
- public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
- try (ReleasableLock lock = readLock.acquire()) {
- ensureOpen();
- if (pendingTranslogRecovery.get() == false) {
- throw new IllegalStateException("Engine has already been recovered");
- }
- try {
- recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo);
- } catch (Exception e) {
- try {
- pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
- failEngine("failed to recover from translog", e);
- } catch (Exception inner) {
- e.addSuppressed(inner);
- }
- throw e;
- }
- }
- return this;
- }
-
- @Override
- public void skipTranslogRecovery() {
- assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
- pendingTranslogRecovery.set(false); // we are good - now we can commit
- }
-
- private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
- final int opsRecovered;
- final long localCheckpoint = getProcessedLocalCheckpoint();
- if (localCheckpoint < recoverUpToSeqNo) {
- try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
- opsRecovered = translogRecoveryRunner.run(this, snapshot);
- } catch (Exception e) {
- throw new EngineException(shardId, "failed to recover from translog", e);
- }
- } else {
- opsRecovered = 0;
- }
- // flush if we recovered something or if we have references to older translogs
- // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
- assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
- pendingTranslogRecovery.set(false); // we are good - now we can commit
- logger.trace(
- () -> new ParameterizedMessage(
- "flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
- opsRecovered,
- translog.currentFileGeneration()
- )
- );
- flush(false, true);
- translog.trimUnreferencedReaders();
- }
-
private Translog openTranslog(
EngineConfig engineConfig,
TranslogDeletionPolicy translogDeletionPolicy,
@@ -584,52 +549,20 @@ private Translog openTranslog(
);
}
- // Package private for testing purposes only
- Translog getTranslog() {
- ensureOpen();
- return translog;
- }
-
// Package private for testing purposes only
boolean hasSnapshottedCommits() {
return combinedDeletionPolicy.hasSnapshottedCommits();
}
- @Override
- public boolean isTranslogSyncNeeded() {
- return getTranslog().syncNeeded();
- }
-
- @Override
- public boolean ensureTranslogSynced(Stream locations) throws IOException {
- final boolean synced = translog.ensureSynced(locations);
- if (synced) {
- revisitIndexDeletionPolicyOnTranslogSynced();
- }
- return synced;
- }
-
- @Override
- public void syncTranslog() throws IOException {
- translog.sync();
- revisitIndexDeletionPolicyOnTranslogSynced();
- }
-
- @Override
- public TranslogStats getTranslogStats() {
- return getTranslog().stats();
- }
-
- @Override
- public Translog.Location getTranslogLastWriteLocation() {
- return getTranslog().getLastWriteLocation();
- }
-
- private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
- if (combinedDeletionPolicy.hasUnreferencedCommits()) {
- indexWriter.deleteUnusedFiles();
+ private void revisitIndexDeletionPolicyOnTranslogSynced() {
+ try {
+ if (combinedDeletionPolicy.hasUnreferencedCommits()) {
+ indexWriter.deleteUnusedFiles();
+ }
+ translogManager.getTranslog().trimUnreferencedReaders();
+ } catch (IOException ex) {
+ throw new TranslogException(shardId, "Failed to execute index deletion policy on translog synced", ex);
}
- translog.trimUnreferencedReaders();
}
@Override
@@ -718,7 +651,7 @@ public GetResult get(Get get, BiFunction
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
- Translog.Operation operation = translog.readOperation(versionValue.getLocation());
+ Translog.Operation operation = translogManager.getTranslog().readOperation(versionValue.getLocation());
if (operation != null) {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
final Translog.Index index = (Translog.Index) operation;
@@ -1023,7 +956,7 @@ public IndexResult index(Index index) throws IOException {
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
- location = translog.add(new Translog.Index(index, indexResult));
+ location = translogManager.getTranslog().add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
final NoOp noOp = new NoOp(
@@ -1463,7 +1396,7 @@ public DeleteResult delete(Delete delete) throws IOException {
}
}
if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) {
- final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
+ final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
}
localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo());
@@ -1790,7 +1723,8 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
}
noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) {
- final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
+ final Translog.Location location = translogManager.getTranslog()
+ .add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
}
}
@@ -1891,11 +1825,10 @@ public boolean shouldPeriodicallyFlush() {
final long localCheckpointOfLastCommit = Long.parseLong(
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
);
- final long translogGenerationOfLastCommit = translog.getMinGenerationForSeqNo(
- localCheckpointOfLastCommit + 1
- ).translogFileGeneration;
+ final long translogGenerationOfLastCommit = translogManager.getTranslog()
+ .getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration;
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
- if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
+ if (translogManager.getTranslog().sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
return false;
}
/*
@@ -1913,9 +1846,8 @@ public boolean shouldPeriodicallyFlush() {
*
* This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered.
*/
- final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo(
- localCheckpointTracker.getProcessedCheckpoint() + 1
- ).translogFileGeneration;
+ final long translogGenerationOfNewCommit = translogManager.getTranslog()
+ .getMinGenerationForSeqNo(localCheckpointTracker.getProcessedCheckpoint() + 1).translogFileGeneration;
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
|| localCheckpointTracker.getProcessedCheckpoint() == localCheckpointTracker.getMaxSeqNo();
}
@@ -1954,11 +1886,11 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
|| getProcessedLocalCheckpoint() > Long.parseLong(
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
)) {
- ensureCanFlush();
+ translogManager.ensureCanFlush();
try {
- translog.rollGeneration();
+ translogManager.getTranslog().rollGeneration();
logger.trace("starting commit for flush; commitTranslog=true");
- commitIndexWriter(indexWriter, translog);
+ commitIndexWriter(indexWriter, translogManager.getTranslog());
logger.trace("finished commit for flush");
// a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved
@@ -1971,7 +1903,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
// we need to refresh in order to clear older version values
refresh("version_table_flush", SearcherScope.INTERNAL, true);
- translog.trimUnreferencedReaders();
+ translogManager.getTranslog().trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
@@ -2022,66 +1954,6 @@ private void refreshLastCommittedSegmentInfos() {
}
}
- @Override
- public void rollTranslogGeneration() throws EngineException {
- try (ReleasableLock ignored = readLock.acquire()) {
- ensureOpen();
- translog.rollGeneration();
- translog.trimUnreferencedReaders();
- } catch (AlreadyClosedException e) {
- failOnTragicEvent(e);
- throw e;
- } catch (Exception e) {
- try {
- failEngine("translog trimming failed", e);
- } catch (Exception inner) {
- e.addSuppressed(inner);
- }
- throw new EngineException(shardId, "failed to roll translog", e);
- }
- }
-
- @Override
- public void trimUnreferencedTranslogFiles() throws EngineException {
- try (ReleasableLock lock = readLock.acquire()) {
- ensureOpen();
- translog.trimUnreferencedReaders();
- } catch (AlreadyClosedException e) {
- failOnTragicEvent(e);
- throw e;
- } catch (Exception e) {
- try {
- failEngine("translog trimming failed", e);
- } catch (Exception inner) {
- e.addSuppressed(inner);
- }
- throw new EngineException(shardId, "failed to trim translog", e);
- }
- }
-
- @Override
- public boolean shouldRollTranslogGeneration() {
- return getTranslog().shouldRollGeneration();
- }
-
- @Override
- public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
- try (ReleasableLock lock = readLock.acquire()) {
- ensureOpen();
- translog.trimOperations(belowTerm, aboveSeqNo);
- } catch (AlreadyClosedException e) {
- failOnTragicEvent(e);
- throw e;
- } catch (Exception e) {
- try {
- failEngine("translog operations trimming failed", e);
- } catch (Exception inner) {
- e.addSuppressed(inner);
- }
- throw new EngineException(shardId, "failed to trim translog operations", e);
- }
- }
-
private void pruneDeletedTombstones() {
/*
* We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary
@@ -2248,8 +2120,8 @@ private boolean failOnTragicEvent(AlreadyClosedException ex) {
}
failEngine("already closed by tragic event on the index writer", tragicException);
engineFailed = true;
- } else if (translog.isOpen() == false && translog.getTragicException() != null) {
- failEngine("already closed by tragic event on the translog", translog.getTragicException());
+ } else if (translogManager.getTranslog().isOpen() == false && translogManager.getTranslog().getTragicException() != null) {
+ failEngine("already closed by tragic event on the translog", translogManager.getTranslog().getTragicException());
engineFailed = true;
} else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet?
// this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by
@@ -2274,7 +2146,7 @@ protected boolean maybeFailEngine(String source, Exception e) {
return failOnTragicEvent((AlreadyClosedException) e);
} else if (e != null
&& ((indexWriter.isOpen() == false && indexWriter.getTragicException() == e)
- || (translog.isOpen() == false && translog.getTragicException() == e))) {
+ || (translogManager.getTranslog().isOpen() == false && translogManager.getTranslog().getTragicException() == e))) {
// this spot on - we are handling the tragic event exception here so we have to fail the engine
// right away
failEngine(source, e);
@@ -2376,7 +2248,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
logger.warn("Failed to close ReaderManager", e);
}
try {
- IOUtils.close(translog);
+ IOUtils.close(translogManager.getTranslog());
} catch (Exception e) {
logger.warn("Failed to close translog", e);
}
@@ -2645,7 +2517,7 @@ protected void doRun() throws Exception {
* @param translog the translog
*/
protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException {
- ensureCanFlush();
+ translogManager.ensureCanFlush();
try {
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
writer.setLiveCommitData(() -> {
@@ -2700,16 +2572,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
}
}
- final void ensureCanFlush() {
- // translog recovery happens after the engine is fully constructed.
- // If we are in this stage we have to prevent flushes from this
- // engine otherwise we might loose documents if the flush succeeds
- // and the translog recovery fails when we "commit" the translog on flush.
- if (pendingTranslogRecovery.get()) {
- throw new IllegalStateException(shardId.toString() + " flushes are disabled - pending translog recovery");
- }
- }
-
@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
mergeScheduler.refreshConfig();
@@ -2721,7 +2583,7 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran
// the setting will be re-interpreted if it's set to true
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
- final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
+ final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps);
@@ -2737,7 +2599,7 @@ LocalCheckpointTracker getLocalCheckpointTracker() {
@Override
public long getLastSyncedGlobalCheckpoint() {
- return getTranslog().getLastSyncedGlobalCheckpoint();
+ return translogManager.getTranslog().getLastSyncedGlobalCheckpoint();
}
@Override
diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
index e4f4bbbba8f16..b16e550baeacb 100644
--- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
@@ -18,16 +18,17 @@
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
-import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
-import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog;
+import org.opensearch.index.translog.TranslogManager;
+import org.opensearch.index.translog.WriteOnlyTranslogManager;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicy;
-import org.opensearch.index.translog.TranslogStats;
+import org.opensearch.index.translog.TranslogException;
+import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.search.suggest.completion.CompletionStats;
import java.io.Closeable;
@@ -40,7 +41,6 @@
import java.util.function.BiFunction;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
-import java.util.stream.Stream;
/**
* This is an {@link Engine} implementation intended for replica shards when Segment Replication
@@ -55,12 +55,13 @@ public class NRTReplicationEngine extends Engine {
private final NRTReplicationReaderManager readerManager;
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
- private final Translog translog;
+ private final TranslogManager translogManager;
public NRTReplicationEngine(EngineConfig engineConfig) {
super(engineConfig);
store.incRef();
NRTReplicationReaderManager readerManager = null;
+ TranslogManager translogManagerRef = null;
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
@@ -71,18 +72,50 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
this.readerManager = readerManager;
this.readerManager.addListener(completionStatsCache);
- this.translog = openTranslog(
- engineConfig,
- getTranslogDeletionPolicy(engineConfig),
+ final Map userData = store.readLastCommittedSegmentsInfo().getUserData();
+ final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
+ translogManagerRef = new WriteOnlyTranslogManager(
+ engineConfig.getTranslogConfig(),
+ engineConfig.getPrimaryTermSupplier(),
engineConfig.getGlobalCheckpointSupplier(),
- localCheckpointTracker::markSeqNoAsPersisted
+ getTranslogDeletionPolicy(engineConfig),
+ shardId,
+ readLock,
+ this::getLocalCheckpointTracker,
+ translogUUID,
+ new TranslogEventListener() {
+ @Override
+ public void onFailure(String reason, Exception ex) {
+ failEngine(reason, ex);
+ }
+
+ @Override
+ public void onAfterTranslogSync() {
+ try {
+ translogManager.getTranslog().trimUnreferencedReaders();
+ } catch (IOException ex) {
+ throw new TranslogException(shardId, "failed to trim unreferenced translog readers", ex);
+ }
+ }
+ },
+ this
);
+ this.translogManager = translogManagerRef;
} catch (IOException e) {
- IOUtils.closeWhileHandlingException(store::decRef, readerManager);
+ Translog translog = null;
+ if (translogManagerRef != null) {
+ translog = translogManagerRef.getTranslog();
+ }
+ IOUtils.closeWhileHandlingException(store::decRef, readerManager, translog);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
}
}
+ @Override
+ public TranslogManager translogManager() {
+ return translogManager;
+ }
+
public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
// Update the current infos reference on the Engine's reader.
readerManager.updateSegments(infos);
@@ -91,7 +124,7 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
this.lastCommittedSegmentInfos = infos;
- rollTranslogGeneration();
+ translogManager.rollTranslogGeneration();
}
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}
@@ -121,26 +154,11 @@ public boolean isThrottled() {
return false;
}
- @Override
- public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
- try (ReleasableLock lock = readLock.acquire()) {
- ensureOpen();
- translog.trimOperations(belowTerm, aboveSeqNo);
- } catch (Exception e) {
- try {
- failEngine("translog operations trimming failed", e);
- } catch (Exception inner) {
- e.addSuppressed(inner);
- }
- throw new EngineException(shardId, "failed to trim translog operations", e);
- }
- }
-
@Override
public IndexResult index(Index index) throws IOException {
ensureOpen();
IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false);
- final Translog.Location location = translog.add(new Translog.Index(index, indexResult));
+ final Translog.Location location = translogManager.getTranslog().add(new Translog.Index(index, indexResult));
indexResult.setTranslogLocation(location);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
@@ -152,7 +170,7 @@ public IndexResult index(Index index) throws IOException {
public DeleteResult delete(Delete delete) throws IOException {
ensureOpen();
DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true);
- final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
+ final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
@@ -164,7 +182,8 @@ public DeleteResult delete(Delete delete) throws IOException {
public NoOpResult noOp(NoOp noOp) throws IOException {
ensureOpen();
NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
- final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
+ final Translog.Location location = translogManager.getTranslog()
+ .add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
noOpResult.setTook(System.nanoTime() - noOp.startTime());
noOpResult.freeze();
@@ -182,26 +201,6 @@ protected ReferenceManager getReferenceManager(Search
return readerManager;
}
- @Override
- public boolean isTranslogSyncNeeded() {
- return translog.syncNeeded();
- }
-
- @Override
- public boolean ensureTranslogSynced(Stream locations) throws IOException {
- boolean synced = translog.ensureSynced(locations);
- if (synced) {
- translog.trimUnreferencedReaders();
- }
- return synced;
- }
-
- @Override
- public void syncTranslog() throws IOException {
- translog.sync();
- translog.trimUnreferencedReaders();
- }
-
@Override
public Closeable acquireHistoryRetentionLock() {
throw new UnsupportedOperationException("Not implemented");
@@ -233,16 +232,6 @@ public long getMinRetainedSeqNo() {
return localCheckpointTracker.getProcessedCheckpoint();
}
- @Override
- public TranslogStats getTranslogStats() {
- return translog.stats();
- }
-
- @Override
- public Translog.Location getTranslogLastWriteLocation() {
- return translog.getLastWriteLocation();
- }
-
@Override
public long getPersistedLocalCheckpoint() {
return localCheckpointTracker.getPersistedCheckpoint();
@@ -260,7 +249,7 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) {
@Override
public long getLastSyncedGlobalCheckpoint() {
- return translog.getLastSyncedGlobalCheckpoint();
+ return translogManager.getTranslog().getLastSyncedGlobalCheckpoint();
}
@Override
@@ -292,42 +281,6 @@ public boolean shouldPeriodicallyFlush() {
@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {}
- @Override
- public void trimUnreferencedTranslogFiles() throws EngineException {
- try (ReleasableLock lock = readLock.acquire()) {
- ensureOpen();
- translog.trimUnreferencedReaders();
- } catch (Exception e) {
- try {
- failEngine("translog trimming failed", e);
- } catch (Exception inner) {
- e.addSuppressed(inner);
- }
- throw new EngineException(shardId, "failed to trim translog", e);
- }
- }
-
- @Override
- public boolean shouldRollTranslogGeneration() {
- return translog.shouldRollGeneration();
- }
-
- @Override
- public void rollTranslogGeneration() throws EngineException {
- try (ReleasableLock ignored = readLock.acquire()) {
- ensureOpen();
- translog.rollGeneration();
- translog.trimUnreferencedReaders();
- } catch (Exception e) {
- try {
- failEngine("translog trimming failed", e);
- } catch (Exception inner) {
- e.addSuppressed(inner);
- }
- throw new EngineException(shardId, "failed to roll translog", e);
- }
- }
-
@Override
public void forceMerge(
boolean flush,
@@ -364,7 +317,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
- IOUtils.close(readerManager, translog, store::decRef);
+ IOUtils.close(readerManager, translogManager().getTranslog(), store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
} finally {
@@ -380,26 +333,11 @@ public void activateThrottling() {}
@Override
public void deactivateThrottling() {}
- @Override
- public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
- return 0;
- }
-
@Override
public int fillSeqNoGaps(long primaryTerm) throws IOException {
return 0;
}
- @Override
- public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
- throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog.");
- }
-
- @Override
- public void skipTranslogRecovery() {
- // Do nothing.
- }
-
@Override
public void maybePruneDeletes() {}
@@ -414,13 +352,9 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
@Override
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {}
- public Translog getTranslog() {
- return translog;
- }
-
@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
- final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
+ final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
}
@@ -463,21 +397,4 @@ private Translog openTranslog(
persistedSequenceNumberConsumer
);
}
-
- private TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) {
- TranslogDeletionPolicy customTranslogDeletionPolicy = null;
- if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
- customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
- .create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
- }
- return Objects.requireNonNullElseGet(
- customTranslogDeletionPolicy,
- () -> new DefaultTranslogDeletionPolicy(
- engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
- engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
- engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
- )
- );
- }
-
}
diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java
index 3a16d884fd899..4bc37084675ea 100644
--- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java
@@ -45,7 +45,10 @@
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
+import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogConfig;
+import org.opensearch.index.translog.TranslogException;
+import org.opensearch.index.translog.NoOpTranslogManager;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicy;
@@ -60,7 +63,7 @@
* required in order to have an engine. All attempts to do something (search,
* index, get), throw {@link UnsupportedOperationException}. However, NoOpEngine
* allows to trim any existing translog files through the usage of the
- * {{@link #trimUnreferencedTranslogFiles()}} method.
+ * {{@link TranslogManager#trimUnreferencedTranslogFiles()}} method.
*
* @opensearch.internal
*/
@@ -153,52 +156,78 @@ public DocsStats docStats() {
* This implementation will trim existing translog files using a {@link TranslogDeletionPolicy}
* that retains nothing but the last translog generation from safe commit.
*/
- @Override
- public void trimUnreferencedTranslogFiles() {
- final Store store = this.engineConfig.getStore();
- store.incRef();
- try (ReleasableLock lock = readLock.acquire()) {
- ensureOpen();
- final List commits = DirectoryReader.listCommits(store.directory());
- if (commits.size() == 1 && translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) {
- final Map commitUserData = getLastCommittedSegmentInfos().getUserData();
- final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
- if (translogUuid == null) {
- throw new IllegalStateException("commit doesn't contain translog unique id");
+ public TranslogManager translogManager() {
+ try {
+ return new NoOpTranslogManager(shardId, readLock, this::ensureOpen, this.translogStats, new Translog.Snapshot() {
+ @Override
+ public void close() {}
+
+ @Override
+ public int totalOperations() {
+ return 0;
}
- final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
- final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
- final TranslogDeletionPolicy translogDeletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0);
- translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
- try (
- Translog translog = new Translog(
- translogConfig,
- translogUuid,
- translogDeletionPolicy,
- engineConfig.getGlobalCheckpointSupplier(),
- engineConfig.getPrimaryTermSupplier(),
- seqNo -> {}
- )
- ) {
- translog.trimUnreferencedReaders();
- // refresh the translog stats
- this.translogStats = translog.stats();
- assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
- + " current gen "
- + translog.currentFileGeneration()
- + " != min gen "
- + translog.getMinFileGeneration();
+
+ @Override
+ public Translog.Operation next() {
+ return null;
}
- }
- } catch (final Exception e) {
- try {
- failEngine("translog trimming failed", e);
- } catch (Exception inner) {
- e.addSuppressed(inner);
- }
- throw new EngineException(shardId, "failed to trim translog", e);
- } finally {
- store.decRef();
+
+ }) {
+ /**
+ * This implementation will trim existing translog files using a {@link TranslogDeletionPolicy}
+ * that retains nothing but the last translog generation from safe commit.
+ */
+ @Override
+ public void trimUnreferencedTranslogFiles() throws TranslogException {
+ final Store store = engineConfig.getStore();
+ store.incRef();
+ try (ReleasableLock ignored = readLock.acquire()) {
+ ensureOpen();
+ final List commits = DirectoryReader.listCommits(store.directory());
+ if (commits.size() == 1 && translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) {
+ final Map commitUserData = getLastCommittedSegmentInfos().getUserData();
+ final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
+ if (translogUuid == null) {
+ throw new IllegalStateException("commit doesn't contain translog unique id");
+ }
+ final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
+ final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
+ final TranslogDeletionPolicy translogDeletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0);
+ translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
+ try (
+ Translog translog = new Translog(
+ translogConfig,
+ translogUuid,
+ translogDeletionPolicy,
+ engineConfig.getGlobalCheckpointSupplier(),
+ engineConfig.getPrimaryTermSupplier(),
+ seqNo -> {}
+ )
+ ) {
+ translog.trimUnreferencedReaders();
+ // refresh the translog stats
+ translogStats = translog.stats();
+ assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ + " current gen "
+ + translog.currentFileGeneration()
+ + " != min gen "
+ + translog.getMinFileGeneration();
+ }
+ }
+ } catch (final Exception e) {
+ try {
+ failEngine("translog trimming failed", e);
+ } catch (Exception inner) {
+ e.addSuppressed(inner);
+ }
+ throw new EngineException(shardId, "failed to trim translog", e);
+ } finally {
+ store.decRef();
+ }
+ }
+ };
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
}
}
}
diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java
index 6262a9269c01c..cebe262fee5d1 100644
--- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java
@@ -44,13 +44,14 @@
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
-import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog;
+import org.opensearch.index.translog.TranslogManager;
+import org.opensearch.index.translog.NoOpTranslogManager;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogStats;
@@ -65,7 +66,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.Function;
-import java.util.stream.Stream;
/**
* A basic read-only engine that allows switching a shard to be true read-only temporarily or permanently.
@@ -90,6 +90,7 @@ public class ReadOnlyEngine extends Engine {
private final SafeCommitInfo safeCommitInfo;
private final CompletionStatsCache completionStatsCache;
private final boolean requireCompleteHistory;
+ private final TranslogManager translogManager;
protected volatile TranslogStats translogStats;
@@ -143,6 +144,21 @@ public ReadOnlyEngine(
completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
+ translogManager = new NoOpTranslogManager(shardId, readLock, this::ensureOpen, this.translogStats, new Translog.Snapshot() {
+ @Override
+ public void close() {}
+
+ @Override
+ public int totalOperations() {
+ return 0;
+ }
+
+ @Override
+ public Translog.Operation next() {
+ return null;
+ }
+ });
+
success = true;
} finally {
if (success == false) {
@@ -265,6 +281,11 @@ protected ReferenceManager getReferenceManager(Search
return readerManager;
}
+ @Override
+ public TranslogManager translogManager() {
+ return translogManager;
+ }
+
@Override
protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
@@ -313,19 +334,6 @@ public NoOpResult noOp(NoOp noOp) {
throw new UnsupportedOperationException("no-ops are not supported on a read-only engine");
}
- @Override
- public boolean isTranslogSyncNeeded() {
- return false;
- }
-
- @Override
- public boolean ensureTranslogSynced(Stream locations) {
- return false;
- }
-
- @Override
- public void syncTranslog() {}
-
@Override
public Closeable acquireHistoryRetentionLock() {
return () -> {};
@@ -359,16 +367,6 @@ public long getMinRetainedSeqNo() {
throw new UnsupportedOperationException();
}
- @Override
- public TranslogStats getTranslogStats() {
- return translogStats;
- }
-
- @Override
- public Translog.Location getTranslogLastWriteLocation() {
- return new Translog.Location(0, 0, 0);
- }
-
@Override
public long getPersistedLocalCheckpoint() {
return seqNoStats.getLocalCheckpoint();
@@ -455,46 +453,11 @@ public void activateThrottling() {}
@Override
public void deactivateThrottling() {}
- @Override
- public void trimUnreferencedTranslogFiles() {}
-
- @Override
- public boolean shouldRollTranslogGeneration() {
- return false;
- }
-
- @Override
- public void rollTranslogGeneration() {}
-
- @Override
- public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) {
- return 0;
- }
-
@Override
public int fillSeqNoGaps(long primaryTerm) {
return 0;
}
- @Override
- public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryRunner, final long recoverUpToSeqNo) {
- try (ReleasableLock lock = readLock.acquire()) {
- ensureOpen();
- try (Translog.Snapshot snapshot = newEmptySnapshot()) {
- translogRecoveryRunner.run(this, snapshot);
- } catch (final Exception e) {
- throw new EngineException(shardId, "failed to recover from empty translog snapshot", e);
- }
- }
- return this;
- }
-
- @Override
- public void skipTranslogRecovery() {}
-
- @Override
- public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {}
-
@Override
public void maybePruneDeletes() {}
diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
index d25847dde235c..dff5fcdba4239 100644
--- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
@@ -149,6 +149,7 @@
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
+import org.opensearch.index.translog.TranslogRecoveryRunner;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.warmer.ShardIndexWarmerService;
import org.opensearch.index.warmer.WarmerStats;
@@ -163,7 +164,6 @@
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
-import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
@@ -628,20 +628,17 @@ public void updateShardState(
* the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes.
*/
final Engine engine = getEngine();
- engine.restoreLocalHistoryFromTranslog(
- (resettingEngine, snapshot) -> runTranslogRecovery(
- resettingEngine,
- snapshot,
- Engine.Operation.Origin.LOCAL_RESET,
- () -> {}
- )
- );
+ engine.translogManager()
+ .restoreLocalHistoryFromTranslog(
+ engine.getProcessedLocalCheckpoint(),
+ (snapshot) -> runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {})
+ );
/* Rolling the translog generation is not strictly needed here (as we will never have collisions between
* sequence numbers in a translog generation in a new primary as it takes the last known sequence number
* as a starting point), but it simplifies reasoning about the relationship between primary terms and
* translog generations.
*/
- engine.rollTranslogGeneration();
+ engine.translogManager().rollTranslogGeneration();
engine.fillSeqNoGaps(newPrimaryTerm);
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint());
primaryReplicaSyncer.accept(this, new ActionListener() {
@@ -1249,7 +1246,7 @@ public FieldDataStats fieldDataStats(String... fields) {
}
public TranslogStats translogStats() {
- return getEngine().getTranslogStats();
+ return getEngine().translogManager().getTranslogStats();
}
public CompletionStats completionStats(String... fields) {
@@ -1284,7 +1281,7 @@ public void flush(FlushRequest request) {
public void trimTranslog() {
verifyNotClosed();
final Engine engine = getEngine();
- engine.trimUnreferencedTranslogFiles();
+ engine.translogManager().trimUnreferencedTranslogFiles();
}
/**
@@ -1292,7 +1289,7 @@ public void trimTranslog() {
*/
public void rollTranslogGeneration() {
final Engine engine = getEngine();
- engine.rollTranslogGeneration();
+ engine.translogManager().rollTranslogGeneration();
}
public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
@@ -1742,10 +1739,10 @@ public long recoverLocallyUpToGlobalCheckpoint() {
return safeCommit.get().localCheckpoint + 1;
}
try {
- final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
+ final TranslogRecoveryRunner translogRecoveryRunner = (snapshot) -> {
recoveryState.getTranslog().totalLocal(snapshot.totalOperations());
final int recoveredOps = runTranslogRecovery(
- engine,
+ getEngine(),
snapshot,
Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
recoveryState.getTranslog()::incrementRecoveredOperations
@@ -1754,7 +1751,8 @@ public long recoverLocallyUpToGlobalCheckpoint() {
return recoveredOps;
};
innerOpenEngineAndTranslog(() -> globalCheckpoint);
- getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
+ getEngine().translogManager()
+ .recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), globalCheckpoint);
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
} finally {
synchronized (engineMutex) {
@@ -1783,7 +1781,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
}
public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
- getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo);
+ getEngine().translogManager().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo);
}
/**
@@ -1923,11 +1921,11 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
- final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
+ final TranslogRecoveryRunner translogRecoveryRunner = (snapshot) -> {
translogRecoveryStats.totalOperations(snapshot.totalOperations());
translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations());
return runTranslogRecovery(
- engine,
+ getEngine(),
snapshot,
Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
translogRecoveryStats::incrementRecoveredOperations
@@ -1935,7 +1933,8 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
};
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
- getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
+ getEngine().translogManager()
+ .recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), Long.MAX_VALUE);
}
/**
@@ -1947,7 +1946,7 @@ public void openEngineAndSkipTranslogRecovery() throws IOException {
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
- getEngine().skipTranslogRecovery();
+ getEngine().translogManager().skipTranslogRecovery();
}
private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {
@@ -2002,7 +2001,7 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
private void onNewEngine(Engine newEngine) {
assert Thread.holdsLock(engineMutex);
- refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
+ refreshListeners.setCurrentRefreshLocationSupplier(newEngine.translogManager()::getTranslogLastWriteLocation);
}
/**
@@ -2280,7 +2279,7 @@ boolean shouldRollTranslogGeneration() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
- return engine.shouldRollTranslogGeneration();
+ return engine.translogManager().shouldRollTranslogGeneration();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll
}
@@ -3535,7 +3534,7 @@ private void innerAcquireReplicaOperationPermit(
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
- getEngine().rollTranslogGeneration();
+ getEngine().translogManager().rollTranslogGeneration();
}
}, allowCombineOperationWithPrimaryTermUpdate ? operationListener : null);
@@ -3589,7 +3588,7 @@ private static AsyncIOProcessor createTranslogSyncProcessor(
@Override
protected void write(List>> candidates) throws IOException {
try {
- engineSupplier.get().ensureTranslogSynced(candidates.stream().map(Tuple::v1));
+ engineSupplier.get().translogManager().ensureTranslogSynced(candidates.stream().map(Tuple::v1));
} catch (AlreadyClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods
// documentation
@@ -3617,14 +3616,14 @@ public final void sync(Translog.Location location, Consumer syncListe
public void sync() throws IOException {
verifyNotClosed();
- getEngine().syncTranslog();
+ getEngine().translogManager().syncTranslog();
}
/**
* Checks if the underlying storage sync is required.
*/
public boolean isSyncNeeded() {
- return getEngine().isTranslogSyncNeeded();
+ return getEngine().translogManager().isTranslogSyncNeeded();
}
/**
@@ -3804,7 +3803,7 @@ public final boolean hasRefreshPending() {
}
private void setRefreshPending(Engine engine) {
- final Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation();
+ final Translog.Location lastWriteLocation = engine.translogManager().getTranslogLastWriteLocation();
pendingRefreshLocation.updateAndGet(curr -> {
if (curr == null || curr.compareTo(lastWriteLocation) <= 0) {
return lastWriteLocation;
@@ -3820,7 +3819,7 @@ private class RefreshPendingLocationListener implements ReferenceManager.Refresh
@Override
public void beforeRefresh() {
try {
- lastWriteLocation = getEngine().getTranslogLastWriteLocation();
+ lastWriteLocation = getEngine().translogManager().getTranslogLastWriteLocation();
} catch (AlreadyClosedException exc) {
// shard is closed - no location is fine
lastWriteLocation = null;
@@ -4013,15 +4012,17 @@ public void close() throws IOException {
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
- final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
- engine,
+ final TranslogRecoveryRunner translogRunner = (snapshot) -> runTranslogRecovery(
+ newEngineReference.get(),
snapshot,
Engine.Operation.Origin.LOCAL_RESET,
() -> {
// TODO: add a dedicate recovery stats for the reset translog
}
);
- newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
+ newEngineReference.get()
+ .translogManager()
+ .recoverFromTranslog(translogRunner, newEngineReference.get().getProcessedLocalCheckpoint(), globalCheckpoint);
newEngineReference.get().refresh("reset_engine");
synchronized (engineMutex) {
verifyNotClosed();
diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java
index 22f72cc3d9acd..1307dcec90828 100644
--- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java
+++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java
@@ -81,11 +81,11 @@ public void rollTranslogGeneration() throws TranslogException {
translog.rollGeneration();
translog.trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
- translogEventListener.onTragicFailure(e);
+ translogEventListener.onFailure("translog roll generation failed", e);
throw e;
} catch (Exception e) {
try {
- translogEventListener.onFailure("translog trimming failed", e);
+ translogEventListener.onFailure("translog roll generation failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
@@ -204,15 +204,15 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
engineLifeCycleAware.ensureOpen();
translog.trimUnreferencedReaders();
} catch (AlreadyClosedException e) {
- translogEventListener.onTragicFailure(e);
+ translogEventListener.onFailure("translog trimming unreferenced translog failed", e);
throw e;
} catch (Exception e) {
try {
- translogEventListener.onFailure("translog trimming failed", e);
+ translogEventListener.onFailure("translog trimming unreferenced translog failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
- throw new TranslogException(shardId, "failed to trim translog", e);
+ throw new TranslogException(shardId, "failed to trim unreferenced translog translog", e);
}
}
@@ -237,7 +237,7 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws T
engineLifeCycleAware.ensureOpen();
translog.trimOperations(belowTerm, aboveSeqNo);
} catch (AlreadyClosedException e) {
- translogEventListener.onTragicFailure(e);
+ translogEventListener.onFailure("translog operations trimming failed", e);
throw e;
} catch (Exception e) {
try {
@@ -309,11 +309,14 @@ private Translog openTranslog(
/**
* Returns the the translog instance
- * @param ensureOpen check if the engine is open
* @return the {@link Translog} instance
*/
@Override
- public Translog getTranslog(boolean ensureOpen) {
+ public Translog getTranslog() {
+ return translog;
+ }
+
+ private Translog getTranslog(boolean ensureOpen) {
if (ensureOpen) {
this.engineLifeCycleAware.ensureOpen();
}
diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java
index 07cae808ce071..88e6ce97b2784 100644
--- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java
+++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java
@@ -93,7 +93,7 @@ public boolean shouldRollTranslogGeneration() {
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException {}
@Override
- public Translog getTranslog(boolean ensureOpen) {
+ public Translog getTranslog() {
return null;
}
diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java
index 988a88c5d2ae5..dc2c2e20015b0 100644
--- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java
+++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java
@@ -96,10 +96,9 @@ public interface TranslogManager {
/**
* Returns the instance of the translog with a precondition
- * @param ensureOpen check if the engine is open
* @return the translog instance
*/
- Translog getTranslog(boolean ensureOpen);
+ Translog getTranslog();
/**
* Checks if the translog has a pending recovery
diff --git a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java
index 731b069ab0c74..b738fa0feea59 100644
--- a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java
+++ b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java
@@ -11,8 +11,9 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
+import org.opensearch.index.shard.ShardId;
+import org.opensearch.index.translog.TranslogException;
import java.util.ArrayList;
import java.util.Collection;
@@ -27,9 +28,11 @@
public final class CompositeTranslogEventListener implements TranslogEventListener {
private final List listeners;
+ private final ShardId shardId;
private final Logger logger = LogManager.getLogger(CompositeTranslogEventListener.class);
- public CompositeTranslogEventListener(Collection listeners) {
+ public CompositeTranslogEventListener(Collection listeners, ShardId shardId) {
+ this.shardId = shardId;
for (TranslogEventListener listener : listeners) {
if (listener == null) {
throw new IllegalArgumentException("listeners must be non-null");
@@ -49,7 +52,7 @@ public void onAfterTranslogSync() {
exceptionList.add(ex);
}
}
- ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
+ maybeThrowTranslogExceptionAndSuppress(exceptionList);
}
@Override
@@ -63,7 +66,7 @@ public void onAfterTranslogRecovery() {
exceptionList.add(ex);
}
}
- ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
+ maybeThrowTranslogExceptionAndSuppress(exceptionList);
}
@Override
@@ -77,7 +80,7 @@ public void onBeginTranslogRecovery() {
exceptionList.add(ex);
}
}
- ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
+ maybeThrowTranslogExceptionAndSuppress(exceptionList);
}
@Override
@@ -91,20 +94,16 @@ public void onFailure(String reason, Exception e) {
exceptionList.add(ex);
}
}
- ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
+ maybeThrowTranslogExceptionAndSuppress(exceptionList);
}
- @Override
- public void onTragicFailure(AlreadyClosedException e) {
- List exceptionList = new ArrayList<>(listeners.size());
- for (TranslogEventListener listener : listeners) {
- try {
- listener.onTragicFailure(e);
- } catch (Exception ex) {
- logger.warn(() -> new ParameterizedMessage("failed to invoke onTragicFailure listener"), ex);
- exceptionList.add(ex);
- }
+ private void maybeThrowTranslogExceptionAndSuppress(List exceptions) {
+ T main = null;
+ for (T ex : exceptions) {
+ main = ExceptionsHelper.useOrSuppress(main, ex);
+ }
+ if (main != null) {
+ throw new TranslogException(shardId, "Error while executing translog event listener", main);
}
- ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
}
}
diff --git a/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java
index 1862b4b9a62b7..664cdd6c60985 100644
--- a/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java
+++ b/server/src/main/java/org/opensearch/index/translog/listener/TranslogEventListener.java
@@ -8,8 +8,6 @@
package org.opensearch.index.translog.listener;
-import org.apache.lucene.store.AlreadyClosedException;
-
/**
* The listener that gets fired on events related to {@link org.opensearch.index.translog.TranslogManager}
*
@@ -35,12 +33,6 @@ default void onAfterTranslogRecovery() {}
*/
default void onBeginTranslogRecovery() {}
- /**
- * Invoked when translog operations run into accessing an already closed resource
- * @param ex the exception thrown when accessing a closed resource
- */
- default void onTragicFailure(AlreadyClosedException ex) {}
-
/**
* Invoked when translog operations run into any other failure
* @param reason the failure reason
diff --git a/server/src/test/java/org/opensearch/index/IndexServiceTests.java b/server/src/test/java/org/opensearch/index/IndexServiceTests.java
index be38b707b77b4..2d6907ba8348f 100644
--- a/server/src/test/java/org/opensearch/index/IndexServiceTests.java
+++ b/server/src/test/java/org/opensearch/index/IndexServiceTests.java
@@ -448,7 +448,7 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception {
final Engine readOnlyEngine = getEngine(indexService.getShard(0));
assertBusy(
() -> assertThat(
- readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(),
+ readOnlyEngine.translogManager().getTranslogStats().getTranslogSizeInBytes(),
equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES)
)
);
diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java
index b14ad15070118..3d40273328c6f 100644
--- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java
+++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java
@@ -145,7 +145,9 @@
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
+import org.opensearch.index.translog.TranslogException;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
+import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.VersionUtils;
@@ -759,20 +761,20 @@ public long getProcessedCheckpoint() {
}
public void testFlushIsDisabledDuringTranslogRecovery() throws IOException {
- engine.ensureCanFlush(); // recovered already
+ engine.translogManager().ensureCanFlush(); // recovered already
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
engine.close();
engine = new InternalEngine(engine.config());
- expectThrows(IllegalStateException.class, engine::ensureCanFlush);
+ expectThrows(IllegalStateException.class, engine.translogManager()::ensureCanFlush);
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
if (randomBoolean()) {
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
} else {
- engine.skipTranslogRecovery();
+ engine.translogManager().skipTranslogRecovery();
}
- engine.ensureCanFlush(); // ready
+ engine.translogManager().ensureCanFlush(); // ready
doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
engine.flush();
@@ -824,7 +826,9 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException {
IOUtils.close(engine);
}
try (Engine recoveringEngine = new InternalEngine(engine.config())) {
- recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ TranslogHandler translogHandler = createTranslogHandler(engine.config().getIndexSettings(), recoveringEngine);
+ recoveringEngine.translogManager()
+ .recoverFromTranslog(translogHandler, recoveringEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
recoveringEngine.refresh("test");
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
@@ -860,7 +864,9 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I
}
};
assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs));
- recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ TranslogHandler translogHandler = createTranslogHandler(initialEngine.config().getIndexSettings(), recoveringEngine);
+ recoveringEngine.translogManager()
+ .recoverFromTranslog(translogHandler, recoveringEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
assertTrue(committed.get());
} finally {
IOUtils.close(recoveringEngine);
@@ -894,7 +900,9 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
}
initialEngine.close();
recoveringEngine = new InternalEngine(initialEngine.config());
- recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ TranslogHandler translogHandler = createTranslogHandler(initialEngine.config().getIndexSettings(), recoveringEngine);
+ recoveringEngine.translogManager()
+ .recoverFromTranslog(translogHandler, recoveringEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
recoveringEngine.refresh("test");
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), docs);
@@ -917,23 +925,25 @@ public void testRecoveryFromTranslogUpToSeqNo() throws IOException {
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
if (rarely()) {
- engine.rollTranslogGeneration();
+ engine.translogManager().rollTranslogGeneration();
} else if (rarely()) {
engine.flush(randomBoolean(), true);
}
}
maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getProcessedLocalCheckpoint()));
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
}
try (InternalEngine engine = new InternalEngine(config)) {
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), engine);
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
assertThat(engine.getProcessedLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo));
}
try (InternalEngine engine = new InternalEngine(config)) {
long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo);
- engine.recoverFromTranslog(translogHandler, upToSeqNo);
+ TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), engine);
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), upToSeqNo);
assertThat(engine.getProcessedLocalCheckpoint(), equalTo(upToSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo));
}
@@ -1248,26 +1258,30 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException {
engine.index(indexForDoc(doc));
boolean inSync = randomBoolean();
if (inSync) {
- engine.syncTranslog(); // to advance persisted local checkpoint
+ engine.translogManager().syncTranslog(); // to advance persisted local checkpoint
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
}
engine.flush();
- assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
- assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L));
+ engine.ensureOpen();
+ assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L));
+ assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L));
engine.flush();
- assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
- assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L));
+ engine.ensureOpen();
+ assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L));
+ assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L));
engine.flush(true, true);
- assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
- assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L));
+ engine.ensureOpen();
+ assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L));
+ assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L));
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
engine.flush(true, true);
- assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
- assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(3L));
+ engine.ensureOpen();
+ assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L));
+ assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(3L));
}
public void testSyncTranslogConcurrently() throws Exception {
@@ -1280,7 +1294,7 @@ public void testSyncTranslogConcurrently() throws Exception {
applyOperations(engine, ops);
engine.flush(true, true);
final CheckedRunnable checker = () -> {
- assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(0));
+ assertThat(engine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(0));
assertThat(engine.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint.get()));
try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) {
SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
@@ -1296,7 +1310,7 @@ public void testSyncTranslogConcurrently() throws Exception {
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
try {
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
checker.run();
} catch (IOException e) {
throw new AssertionError(e);
@@ -1351,7 +1365,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException {
store.associateIndexWithNewTranslog(translogUUID);
}
engine = new InternalEngine(config);
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
}
@@ -1385,7 +1399,8 @@ public void testSyncedFlushVanishesOnReplay() throws IOException {
EngineConfig config = engine.config();
engine.close();
engine = new InternalEngine(config);
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), engine);
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
assertNull(
"Sync ID must be gone since we have a document to replay",
engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)
@@ -1697,7 +1712,7 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception {
long localCheckpoint = engine.getProcessedLocalCheckpoint();
globalCheckpoint.set(randomLongBetween(0, localCheckpoint));
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
final long safeCommitCheckpoint;
try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) {
safeCommitCheckpoint = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
@@ -1728,7 +1743,7 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception {
indexSettings.getSoftDeleteRetentionOperations()
);
globalCheckpoint.set(localCheckpoint);
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
@@ -1786,7 +1801,7 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc
}
engine.flush();
globalCheckpoint.set(randomLongBetween(0, engine.getPersistedLocalCheckpoint()));
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
final long minSeqNoToRetain;
try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) {
long safeCommitLocalCheckpoint = Long.parseLong(
@@ -1834,12 +1849,12 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc
if (useRecoverySource == false) {
liveDocsWithSource.add(doc.id());
}
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
engine.flush(randomBoolean(), true);
} else {
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
}
engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID());
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
@@ -2197,7 +2212,7 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion
final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean() ? lastOpTerm + 1 : lastOpTerm;
if (rarely()) {
currentTerm.set(currentTerm.get() + 1L);
- engine.rollTranslogGeneration();
+ engine.translogManager().rollTranslogGeneration();
}
final long correctVersion = docDeleted ? Versions.MATCH_DELETED : lastOpVersion;
logger.info(
@@ -2743,7 +2758,7 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException {
}
}
- initialEngine.syncTranslog(); // to advance persisted local checkpoint
+ initialEngine.translogManager().syncTranslog(); // to advance persisted local checkpoint
if (randomInt(10) < 3) {
// only update rarely as we do it every doc
@@ -2770,8 +2785,11 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException {
Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
equalTo(localCheckpoint)
);
- initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint
- assertThat(initialEngine.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint));
+ initialEngine.ensureOpen();
+ initialEngine.translogManager().getTranslog().sync(); // to guarantee the global checkpoint is written to the translog
+ // checkpoint
+ initialEngine.ensureOpen();
+ assertThat(initialEngine.translogManager().getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint));
assertThat(Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), equalTo(maxSeqNo));
} finally {
@@ -2779,14 +2797,17 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException {
}
try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) {
- recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ TranslogHandler translogHandler = createTranslogHandler(initialEngine.config().getIndexSettings(), recoveringEngine);
+ recoveringEngine.translogManager()
+ .recoverFromTranslog(translogHandler, recoveringEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
equalTo(primarySeqNo)
);
- assertThat(recoveringEngine.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint));
+ recoveringEngine.ensureOpen();
+ assertThat(recoveringEngine.translogManager().getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint));
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
// after recovering from translog, all docs have been flushed to Lucene segments, so here we will assert
@@ -3190,24 +3211,32 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException {
try (InternalEngine engine = createEngine(config)) {
engine.index(firstIndexRequest);
- engine.syncTranslog(); // to advance persisted local checkpoint
+ engine.translogManager().syncTranslog(); // to advance persisted local checkpoint
assertEquals(engine.getProcessedLocalCheckpoint(), engine.getPersistedLocalCheckpoint());
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
- expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE));
+ expectThrows(
+ IllegalStateException.class,
+ () -> engine.translogManager()
+ .recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE)
+ );
Map userData = engine.getLastCommittedSegmentInfos().getUserData();
- assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
+ engine.ensureOpen();
+ assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
// open and recover tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(config)) {
- expectThrows(IllegalStateException.class, engine::ensureCanFlush);
+ expectThrows(IllegalStateException.class, engine.translogManager()::ensureCanFlush);
Map userData = engine.getLastCommittedSegmentInfos().getUserData();
- assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ engine.ensureOpen();
+ assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
+ TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), engine);
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
- assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
+ engine.ensureOpen();
+ assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
@@ -3222,10 +3251,12 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException {
store.associateIndexWithNewTranslog(translogUUID);
try (InternalEngine engine = new InternalEngine(config)) {
Map userData = engine.getLastCommittedSegmentInfos().getUserData();
- assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
- assertEquals(2, engine.getTranslog().currentFileGeneration());
- assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
+ engine.ensureOpen();
+ assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
+ engine.ensureOpen();
+ assertEquals(2, engine.translogManager().getTranslog().currentFileGeneration());
+ assertEquals(0L, engine.translogManager().getTranslog().stats().getUncommittedOperations());
}
}
@@ -3234,10 +3265,12 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException {
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(config)) {
Map userData = engine.getLastCommittedSegmentInfos().getUserData();
- assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ engine.ensureOpen();
+ assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
- assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
+ engine.ensureOpen();
+ assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
@@ -3305,7 +3338,7 @@ public void testTranslogReplayWithFailure() throws IOException {
try {
engine = createEngine(store, translogPath);
started = true;
- } catch (EngineException | IOException e) {
+ } catch (EngineException | TranslogException | IOException e) {
logger.trace("exception on open", e);
}
directory.setRandomIOExceptionRateOnOpen(0.0);
@@ -3358,10 +3391,10 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I
}
}
) {
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc1));
- engine.syncTranslog(); // to advance local checkpoint
+ engine.translogManager().syncTranslog(); // to advance local checkpoint
assertEquals(engine.getProcessedLocalCheckpoint(), engine.getPersistedLocalCheckpoint());
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
throwErrorOnCommit.set(true);
@@ -3373,12 +3406,15 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I
config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier)
)
) {
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
assertVisibleCount(engine, 1);
final long localCheckpoint = Long.parseLong(
engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
);
- final long committedGen = engine.getTranslog().getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration;
+ engine.ensureOpen();
+ final long committedGen = engine.translogManager()
+ .getTranslog()
+ .getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration;
for (int gen = 1; gen < committedGen; gen++) {
final Path genFile = translogPath.resolve(Translog.getFilename(gen));
assertFalse(genFile + " wasn't cleaned up", Files.exists(genFile));
@@ -3412,7 +3448,7 @@ public void testSkipTranslogReplay() throws IOException {
assertVisibleCount(engine, numDocs);
engine.close();
try (InternalEngine engine = new InternalEngine(config)) {
- engine.skipTranslogRecovery();
+ engine.translogManager().skipTranslogRecovery();
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
assertThat(topDocs.totalHits.value, equalTo(0L));
@@ -3454,19 +3490,20 @@ public void testTranslogReplay() throws IOException {
assertThat(indexResult.getVersion(), equalTo(1L));
}
assertVisibleCount(engine, numDocs);
- translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings());
+ translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings(), engine);
engine.close();
// we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier));
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings(), engine);
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
engine.refresh("warm_up");
assertVisibleCount(engine, numDocs, false);
assertEquals(numDocs, translogHandler.appliedOperations());
engine.close();
- translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings());
+ translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings(), engine);
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
engine.refresh("warm_up");
assertVisibleCount(engine, numDocs, false);
@@ -3520,7 +3557,7 @@ public void testTranslogReplay() throws IOException {
}
engine.close();
- translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings());
+ translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings(), engine);
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
engine.refresh("warm_up");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
@@ -3562,7 +3599,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
assertThat(index.getVersion(), equalTo(1L));
}
assertVisibleCount(engine, numDocs);
- Translog.TranslogGeneration generation = engine.getTranslog().getGeneration();
+ engine.ensureOpen();
+ Translog.TranslogGeneration generation = engine.translogManager().getTranslog().getGeneration();
engine.close();
final Path badTranslogLog = createTempDir();
@@ -3658,7 +3696,8 @@ public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier commits = DirectoryReader.listCommits(store.directory());
assertThat(
Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
@@ -5858,9 +5915,10 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
}
// Global checkpoint advanced enough - only the last commit is kept.
globalCheckpoint.set(randomLongBetween(engine.getPersistedLocalCheckpoint(), Long.MAX_VALUE));
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
- assertThat(engine.getTranslog().totalOperations(), equalTo(0));
+ engine.ensureOpen();
+ assertThat(engine.translogManager().getTranslog().totalOperations(), equalTo(0));
}
}
@@ -5883,7 +5941,7 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception {
snapshots.add(engine.acquireSafeIndexCommit()); // taking snapshots from the safe commit.
}
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
final List commits = DirectoryReader.listCommits(store.directory());
for (int i = 0; i < numSnapshots - 1; i++) {
snapshots.get(i).close();
@@ -5898,12 +5956,13 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception {
public void testShouldPeriodicallyFlush() throws Exception {
assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false));
// A new engine may have more than one empty translog files - the test should account this extra.
- final Translog translog = engine.getTranslog();
+ engine.ensureOpen();
+ final Translog translog = engine.translogManager().getTranslog();
final IntSupplier uncommittedTranslogOperationsSinceLastCommit = () -> {
long localCheckpoint = Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
return translog.totalOperationsByMinGen(translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration);
};
- final long extraTranslogSizeInNewEngine = engine.getTranslog().stats().getUncommittedSizeInBytes()
+ final long extraTranslogSizeInNewEngine = engine.translogManager().getTranslog().stats().getUncommittedSizeInBytes()
- Translog.DEFAULT_HEADER_SIZE_IN_BYTES;
int numDocs = between(10, 100);
for (int id = 0; id < numDocs; id++) {
@@ -5911,10 +5970,11 @@ public void testShouldPeriodicallyFlush() throws Exception {
engine.index(indexForDoc(doc));
}
assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false));
+ engine.ensureOpen();
long flushThreshold = RandomNumbers.randomLongBetween(
random(),
120,
- engine.getTranslog().stats().getUncommittedSizeInBytes() - extraTranslogSizeInNewEngine
+ engine.translogManager().getTranslog().stats().getUncommittedSizeInBytes() - extraTranslogSizeInNewEngine
);
final IndexSettings indexSettings = engine.config().getIndexSettings();
final IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
@@ -5930,7 +5990,8 @@ public void testShouldPeriodicallyFlush() throws Exception {
indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations()
);
- assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs));
+ engine.ensureOpen();
+ assertThat(engine.translogManager().getTranslog().stats().getUncommittedOperations(), equalTo(numDocs));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
engine.flush();
assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0));
@@ -5986,11 +6047,13 @@ public void testShouldPeriodicallyFlushAfterMerge() throws Exception {
indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations()
);
- assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(1));
+ engine.ensureOpen();
+ assertThat(engine.translogManager().getTranslog().stats().getUncommittedOperations(), equalTo(1));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
- assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(2));
+ engine.ensureOpen();
+ assertThat(engine.translogManager().getTranslog().stats().getUncommittedOperations(), equalTo(2));
engine.refresh("test");
engine.forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID());
assertBusy(() -> {
@@ -6025,8 +6088,9 @@ public void testStressShouldPeriodicallyFlush() throws Exception {
final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5);
final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null);
engine.index(replicaIndexForDoc(doc, 1L, seqno, false));
- if (rarely() && engine.getTranslog().shouldRollGeneration()) {
- engine.rollTranslogGeneration();
+ engine.ensureOpen();
+ if (rarely() && engine.translogManager().getTranslog().shouldRollGeneration()) {
+ engine.translogManager().rollTranslogGeneration();
}
if (rarely() || engine.shouldPeriodicallyFlush()) {
engine.flush();
@@ -6247,8 +6311,9 @@ public void testTrimUnsafeCommits() throws Exception {
}
}
globalCheckpoint.set(randomInt(maxSeqNo));
- engine.syncTranslog();
- minTranslogGen = engine.getTranslog().getMinFileGeneration();
+ engine.translogManager().syncTranslog();
+ engine.ensureOpen();
+ minTranslogGen = engine.translogManager().getTranslog().getMinFileGeneration();
}
store.trimUnsafeCommits(config.getTranslogConfig().getTranslogPath());
@@ -6429,7 +6494,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
}
existingSeqNos.add(result.getSeqNo());
if (randomBoolean()) {
- engine.syncTranslog(); // advance persisted local checkpoint
+ engine.translogManager().syncTranslog(); // advance persisted local checkpoint
assertEquals(engine.getProcessedLocalCheckpoint(), engine.getPersistedLocalCheckpoint());
globalCheckpoint.set(
randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getPersistedCheckpoint())
@@ -6665,7 +6730,7 @@ public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception {
flushedOperations.add(op);
applyOperation(engine, op);
if (randomBoolean()) {
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint()));
}
if (randomInt(100) < 10) {
@@ -6725,7 +6790,8 @@ public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception {
equalTo(seqNosInSafeCommit.contains(op.seqNo()))
);
}
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), engine);
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
assertThat(getDocIds(engine, true), equalTo(docs));
}
}
@@ -6779,8 +6845,9 @@ public void testStoreHonorsLuceneVersion() throws IOException {
public void testMaxSeqNoInCommitUserData() throws Exception {
AtomicBoolean running = new AtomicBoolean(true);
Thread rollTranslog = new Thread(() -> {
- while (running.get() && engine.getTranslog().currentFileGeneration() < 500) {
- engine.rollTranslogGeneration(); // make adding operations to translog slower
+ engine.ensureOpen();
+ while (running.get() && engine.translogManager().getTranslog().currentFileGeneration() < 500) {
+ engine.translogManager().rollTranslogGeneration(); // make adding operations to translog slower
}
});
rollTranslog.start();
@@ -6919,7 +6986,7 @@ public void testRecoverFromLocalTranslog() throws Exception {
for (Engine.Operation op : operations) {
applyOperation(engine, op);
if (randomBoolean()) {
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint()));
}
if (randomInt(100) < 10) {
@@ -6934,21 +7001,23 @@ public void testRecoverFromLocalTranslog() throws Exception {
}
if (randomBoolean()) {
// engine is flushed properly before shutting down.
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
engine.flush();
}
docs = getDocIds(engine, true);
}
try (InternalEngine engine = new InternalEngine(config)) {
+ translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings(), engine);
engine.onSettingsChanged(TimeValue.MINUS_ONE, ByteSizeValue.ZERO, 0);
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
assertThat(getDocIds(engine, randomBoolean()), equalTo(docs));
if (engine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo() == globalCheckpoint.get()) {
+ engine.ensureOpen();
assertThat(
"engine should trim all unreferenced translog after recovery",
- engine.getTranslog().getMinFileGeneration(),
- equalTo(engine.getTranslog().currentFileGeneration())
+ engine.translogManager().getTranslog().getMinFileGeneration(),
+ equalTo(engine.translogManager().getTranslog().currentFileGeneration())
);
}
}
@@ -7020,8 +7089,8 @@ public void testAlwaysRecordReplicaOrPeerRecoveryOperationsToTranslog() throws E
);
}
primaryTerm.set(randomLongBetween(primaryTerm.get(), Long.MAX_VALUE));
- engine.rollTranslogGeneration();
- engine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog
+ engine.translogManager().rollTranslogGeneration();
+ engine.translogManager().trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(0));
assertNull(snapshot.next());
diff --git a/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java
index e3117e179e7fa..d1a3097005e6c 100644
--- a/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java
+++ b/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java
@@ -226,7 +226,7 @@ public void testSkipNonRootOfNestedDocuments() throws Exception {
engine.refresh("test");
}
if (rarely()) {
- engine.rollTranslogGeneration();
+ engine.translogManager().rollTranslogGeneration();
}
if (rarely()) {
engine.flush();
@@ -300,11 +300,12 @@ class Follower extends Thread {
this.leader = leader;
this.isDone = isDone;
this.readLatch = readLatch;
+ this.engine = createEngine(createStore(), createTempDir());
this.translogHandler = new TranslogHandler(
xContentRegistry(),
- IndexSettingsModule.newIndexSettings(shardId.getIndexName(), leader.engineConfig.getIndexSettings().getSettings())
+ IndexSettingsModule.newIndexSettings(shardId.getIndexName(), leader.engineConfig.getIndexSettings().getSettings()),
+ engine
);
- this.engine = createEngine(createStore(), createTempDir());
}
void pullOperations(InternalEngine follower) throws IOException {
@@ -315,7 +316,7 @@ void pullOperations(InternalEngine follower) throws IOException {
long batchSize = randomLongBetween(0, 100);
long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint);
try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean())) {
- translogHandler.run(follower, snapshot);
+ translogHandler.run(snapshot);
}
}
}
diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java
index d3496fcb5d13a..0a590cc0e286f 100644
--- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java
+++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java
@@ -75,19 +75,23 @@ public void testEngineWritesOpsToTranslog() throws Exception {
applyOperation(nrtEngine, op);
}
- assertEquals(nrtEngine.getTranslogLastWriteLocation(), engine.getTranslogLastWriteLocation());
+ assertEquals(
+ nrtEngine.translogManager().getTranslogLastWriteLocation(),
+ engine.translogManager().getTranslogLastWriteLocation()
+ );
assertEquals(nrtEngine.getLastSyncedGlobalCheckpoint(), engine.getLastSyncedGlobalCheckpoint());
// we don't index into nrtEngine, so get the doc ids from the regular engine.
final List docs = getDocIds(engine, true);
// recover a new engine from the nrtEngine's xlog.
- nrtEngine.syncTranslog();
+ nrtEngine.translogManager().syncTranslog();
try (InternalEngine engine = new InternalEngine(nrtEngine.config())) {
- engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ TranslogHandler translogHandler = createTranslogHandler(nrtEngine.config().getIndexSettings(), engine);
+ engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
assertEquals(getDocIds(engine, true), docs);
}
- assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog());
+ assertEngineCleanedUp(nrtEngine, nrtEngine.translogManager().getTranslog());
}
}
@@ -128,11 +132,12 @@ public void testUpdateSegments() throws Exception {
// Flush the primary and update the NRTEngine with the latest committed infos.
engine.flush();
- nrtEngine.syncTranslog(); // to advance persisted checkpoint
+ nrtEngine.translogManager().syncTranslog(); // to advance persisted checkpoint
Set seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
- try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
+ nrtEngine.ensureOpen();
+ try (Translog.Snapshot snapshot = nrtEngine.translogManager().getTranslog().newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
@@ -145,11 +150,11 @@ public void testUpdateSegments() throws Exception {
assertMatchingSegmentsAndCheckpoints(nrtEngine, primaryInfos);
assertEquals(
- nrtEngine.getTranslog().getGeneration().translogFileGeneration,
- engine.getTranslog().getGeneration().translogFileGeneration
+ nrtEngine.translogManager().getTranslog().getGeneration().translogFileGeneration,
+ engine.translogManager().getTranslog().getGeneration().translogFileGeneration
);
- try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
+ try (Translog.Snapshot snapshot = nrtEngine.translogManager().getTranslog().newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
@@ -163,7 +168,7 @@ public void testUpdateSegments() throws Exception {
expectedDocCount = test.count(Queries.newMatchAllQuery());
assertSearcherHits(nrtEngine, expectedDocCount);
}
- assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog());
+ assertEngineCleanedUp(nrtEngine, nrtEngine.translogManager().getTranslog());
}
}
@@ -182,15 +187,16 @@ public void testTrimTranslogOps() throws Exception {
);
applyOperations(nrtEngine, operations);
Set seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
- try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
+ nrtEngine.ensureOpen();
+ try (Translog.Snapshot snapshot = nrtEngine.translogManager().getTranslog().newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos)
);
}
- nrtEngine.rollTranslogGeneration();
- nrtEngine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED);
+ nrtEngine.translogManager().rollTranslogGeneration();
+ nrtEngine.translogManager().trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED);
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(0));
assertNull(snapshot.next());
diff --git a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java
index a015443979527..db3e705c4d765 100644
--- a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java
+++ b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java
@@ -145,7 +145,7 @@ public void testNoOpEngineStats() throws Exception {
if (rarely()) {
engine.flush();
}
- engine.syncTranslog(); // advance persisted local checkpoint
+ engine.translogManager().syncTranslog(); // advance persisted local checkpoint
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
}
@@ -154,7 +154,7 @@ public void testNoOpEngineStats() throws Exception {
String delId = Integer.toString(i);
Engine.DeleteResult result = engine.delete(new Engine.Delete(delId, newUid(delId), primaryTerm.get()));
assertTrue(result.isFound());
- engine.syncTranslog(); // advance persisted local checkpoint
+ engine.translogManager().syncTranslog(); // advance persisted local checkpoint
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
deletions += 1;
}
@@ -217,20 +217,24 @@ public void testTrimUnreferencedTranslogFiles() throws Exception {
engine.flush();
}
if (randomBoolean()) {
- engine.rollTranslogGeneration();
+ engine.translogManager().rollTranslogGeneration();
}
}
// prevent translog from trimming so we can test trimUnreferencedFiles in NoOpEngine.
- final Translog.Snapshot snapshot = engine.getTranslog().newSnapshot();
+ engine.ensureOpen();
+ final Translog.Snapshot snapshot = engine.translogManager().getTranslog().newSnapshot();
engine.flush(true, true);
engine.close();
final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker));
- assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(totalTranslogOps));
- noOpEngine.trimUnreferencedTranslogFiles();
- assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(0));
- assertThat(noOpEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
- assertThat(noOpEngine.getTranslogStats().getTranslogSizeInBytes(), equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES));
+ assertThat(noOpEngine.translogManager().getTranslogStats().estimatedNumberOfOperations(), equalTo(totalTranslogOps));
+ noOpEngine.translogManager().trimUnreferencedTranslogFiles();
+ assertThat(noOpEngine.translogManager().getTranslogStats().estimatedNumberOfOperations(), equalTo(0));
+ assertThat(noOpEngine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(0));
+ assertThat(
+ noOpEngine.translogManager().getTranslogStats().getTranslogSizeInBytes(),
+ equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES)
+ );
snapshot.close();
noOpEngine.close();
}
diff --git a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java
index da0db02ac402e..7a3bd0ef6c302 100644
--- a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java
+++ b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java
@@ -93,13 +93,13 @@ public void testReadOnlyEngine() throws Exception {
}
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint()));
}
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint()));
engine.flush();
readOnlyEngine = new ReadOnlyEngine(
engine.engineConfig,
engine.getSeqNoStats(globalCheckpoint.get()),
- engine.getTranslogStats(),
+ engine.translogManager().getTranslogStats(),
false,
Function.identity(),
true
@@ -141,7 +141,9 @@ public void testReadOnlyEngine() throws Exception {
}
// Close and reopen the main engine
try (InternalEngine recoveringEngine = new InternalEngine(config)) {
- recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), recoveringEngine);
+ recoveringEngine.translogManager()
+ .recoverFromTranslog(translogHandler, recoveringEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
// the locked down engine should still point to the previous commit
assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint()));
@@ -181,7 +183,7 @@ public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException {
);
maxSeqNo = engine.getProcessedLocalCheckpoint();
}
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
globalCheckpoint.set(engine.getPersistedLocalCheckpoint() - 1);
engine.flushAndClose();
@@ -277,12 +279,13 @@ public void testRecoverFromTranslogAppliesNoOperations() throws IOException {
}
globalCheckpoint.set(i);
}
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
engine.flushAndClose();
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity(), true)) {
- final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
- readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong());
+ final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings(), readOnlyEngine);
+ readOnlyEngine.translogManager()
+ .recoverFromTranslog(translogHandler, readOnlyEngine.getProcessedLocalCheckpoint(), randomNonNegativeLong());
assertThat(translogHandler.appliedOperations(), equalTo(0L));
}
@@ -327,23 +330,26 @@ public void testTranslogStats() throws IOException {
}
assertThat(
- engine.getTranslogStats().estimatedNumberOfOperations(),
+ engine.translogManager().getTranslogStats().estimatedNumberOfOperations(),
equalTo(softDeletesEnabled ? uncommittedDocs : numDocs)
);
- assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(uncommittedDocs));
- assertThat(engine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
- assertThat(engine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
- assertThat(engine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L));
+ assertThat(engine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(uncommittedDocs));
+ assertThat(engine.translogManager().getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
+ assertThat(engine.translogManager().getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
+ assertThat(engine.translogManager().getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L));
engine.flush(true, true);
}
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity(), true)) {
- assertThat(readOnlyEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeletesEnabled ? 0 : numDocs));
- assertThat(readOnlyEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
- assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
- assertThat(readOnlyEngine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
- assertThat(readOnlyEngine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L));
+ assertThat(
+ readOnlyEngine.translogManager().getTranslogStats().estimatedNumberOfOperations(),
+ equalTo(softDeletesEnabled ? 0 : numDocs)
+ );
+ assertThat(readOnlyEngine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(0));
+ assertThat(readOnlyEngine.translogManager().getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L));
+ assertThat(readOnlyEngine.translogManager().getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L));
+ assertThat(readOnlyEngine.translogManager().getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L));
}
}
}
diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java
index 49d0c089f072b..733c157e1df72 100644
--- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java
+++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java
@@ -131,6 +131,7 @@
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogStats;
+import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
@@ -2192,7 +2193,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
long primaryTerm = shard.getOperationPrimaryTerm();
shard.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete
shard.applyDeleteOperationOnReplica(1, primaryTerm, 2, "id");
- shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation
+ shard.getEngine().translogManager().rollTranslogGeneration(); // isolate the delete in it's own generation
shard.applyIndexOperationOnReplica(
0,
primaryTerm,
@@ -2240,7 +2241,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
replayedOps = 3;
} else {
if (randomBoolean()) {
- shard.getEngine().rollTranslogGeneration();
+ shard.getEngine().translogManager().rollTranslogGeneration();
}
translogOps = 5;
replayedOps = 5;
@@ -2513,7 +2514,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
);
flushShard(shard);
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1"));
- shard.getEngine().rollTranslogGeneration();
+ shard.getEngine().translogManager().rollTranslogGeneration();
shard.markSeqNoAsNoop(1, primaryTerm, "test");
shard.applyIndexOperationOnReplica(
2,
@@ -4107,19 +4108,17 @@ public void testResetEngine() throws Exception {
public void testCloseShardWhileResettingEngine() throws Exception {
CountDownLatch readyToCloseLatch = new CountDownLatch(1);
CountDownLatch closeDoneLatch = new CountDownLatch(1);
- IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) {
+ IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config, new TranslogEventListener() {
@Override
- public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo)
- throws IOException {
+ public void onBeginTranslogRecovery() {
readyToCloseLatch.countDown();
try {
closeDoneLatch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
- return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
}
- });
+ }));
Thread closeShardThread = new Thread(() -> {
try {
@@ -4166,20 +4165,17 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover
public void testSnapshotWhileResettingEngine() throws Exception {
CountDownLatch readyToSnapshotLatch = new CountDownLatch(1);
CountDownLatch snapshotDoneLatch = new CountDownLatch(1);
- IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) {
+ IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config, new TranslogEventListener() {
@Override
- public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo)
- throws IOException {
- InternalEngine engine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
+ public void onAfterTranslogRecovery() {
readyToSnapshotLatch.countDown();
try {
snapshotDoneLatch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
- return engine;
}
- });
+ }));
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()));
final long globalCheckpoint = randomLongBetween(shard.getLastKnownGlobalCheckpoint(), shard.getLocalCheckpoint());
diff --git a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java
index eea316d9a9370..3ed944cc39c51 100644
--- a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java
+++ b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java
@@ -173,8 +173,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
EngineTestCase.tombstoneDocSupplier()
);
engine = new InternalEngine(config);
- engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);
- listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);
+ engine.translogManager().recoverFromTranslog((s) -> 0, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
+ listeners.setCurrentRefreshLocationSupplier(engine.translogManager()::getTranslogLastWriteLocation);
}
@After
diff --git a/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java
index 9a2a0dd7e070c..32ded73c8e9a6 100644
--- a/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java
+++ b/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java
@@ -59,12 +59,12 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.engine.EngineConfigFactory;
-import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.TranslogCorruptedException;
+import org.opensearch.index.translog.TranslogException;
import org.opensearch.test.CorruptionUtils;
import org.opensearch.test.DummyShardLock;
import org.junit.Before;
@@ -290,7 +290,7 @@ public void testCorruptedTranslog() throws Exception {
allowShardFailures();
// it has to fail on start up due to index.shard.check_on_startup = checksum
final Exception exception = expectThrows(Exception.class, () -> newStartedShard(p -> corruptedShard, true));
- final Throwable cause = exception.getCause() instanceof EngineException ? exception.getCause().getCause() : exception.getCause();
+ final Throwable cause = exception.getCause() instanceof TranslogException ? exception.getCause().getCause() : exception.getCause();
assertThat(cause, instanceOf(TranslogCorruptedException.class));
closeShard(corruptedShard, false); // translog is corrupted already - do not check consistency
diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java
index 4db792b4a3fc2..9cc58e250d74e 100644
--- a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java
+++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java
@@ -56,7 +56,7 @@ public void testRecoveryFromTranslog() throws IOException {
Engine.Index index = indexForDoc(doc);
Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true);
tracker.markSeqNoAsProcessed(i);
- translogManager.getTranslog(false).add(new Translog.Index(index, indexResult));
+ translogManager.getTranslog().add(new Translog.Index(index, indexResult));
translogManager.rollTranslogGeneration();
}
long maxSeqNo = tracker.getMaxSeqNo();
@@ -64,7 +64,7 @@ public void testRecoveryFromTranslog() throws IOException {
assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations());
translogManager.syncTranslog();
- translogManager.getTranslog(false).close();
+ translogManager.getTranslog().close();
translogManager = new InternalTranslogManager(
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE),
primaryTerm,
@@ -103,7 +103,7 @@ public void onBeginTranslogRecovery() {
assertTrue(onTranslogRecoveryInvoked.get());
} finally {
- translogManager.getTranslog(false).close();
+ translogManager.getTranslog().close();
}
}
@@ -131,7 +131,7 @@ public void testTranslogRollsGeneration() throws IOException {
Engine.Index index = indexForDoc(doc);
Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true);
tracker.markSeqNoAsProcessed(i);
- translogManager.getTranslog(false).add(new Translog.Index(index, indexResult));
+ translogManager.getTranslog().add(new Translog.Index(index, indexResult));
translogManager.rollTranslogGeneration();
}
long maxSeqNo = tracker.getMaxSeqNo();
@@ -139,7 +139,7 @@ public void testTranslogRollsGeneration() throws IOException {
assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations());
translogManager.syncTranslog();
- translogManager.getTranslog(false).close();
+ translogManager.getTranslog().close();
translogManager = new InternalTranslogManager(
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE),
primaryTerm,
@@ -164,7 +164,7 @@ public void testTranslogRollsGeneration() throws IOException {
assertEquals(maxSeqNo + 1, opsRecovered.get());
assertEquals(maxSeqNo + 1, opsRecoveredFromTranslog);
} finally {
- translogManager.getTranslog(false).close();
+ translogManager.getTranslog().close();
}
}
@@ -192,7 +192,7 @@ public void testTrimOperationsFromTranslog() throws IOException {
Engine.Index index = indexForDoc(doc);
Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true);
tracker.markSeqNoAsProcessed(i);
- translogManager.getTranslog(false).add(new Translog.Index(index, indexResult));
+ translogManager.getTranslog().add(new Translog.Index(index, indexResult));
}
long maxSeqNo = tracker.getMaxSeqNo();
assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().getUncommittedOperations());
@@ -202,7 +202,7 @@ public void testTrimOperationsFromTranslog() throws IOException {
translogManager.rollTranslogGeneration();
translogManager.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog
- translogManager.getTranslog(false).close();
+ translogManager.getTranslog().close();
translogManager = new InternalTranslogManager(
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE),
primaryTerm,
@@ -227,7 +227,7 @@ public void testTrimOperationsFromTranslog() throws IOException {
assertEquals(0, opsRecovered.get());
assertEquals(0, opsRecoveredFromTranslog);
} finally {
- translogManager.getTranslog(false).close();
+ translogManager.getTranslog().close();
}
}
@@ -253,7 +253,7 @@ public void testTranslogSync() throws IOException {
@Override
public void onAfterTranslogSync() {
try {
- translogManagerAtomicReference.get().getTranslog(false).trimUnreferencedReaders();
+ translogManagerAtomicReference.get().getTranslog().trimUnreferencedReaders();
syncListenerInvoked.set(true);
} catch (IOException ex) {
fail("Failed due to " + ex);
@@ -265,15 +265,15 @@ public void onAfterTranslogSync() {
translogManagerAtomicReference.set(translogManager);
Engine.Index index = indexForDoc(doc);
Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), 1, false);
- translogManager.getTranslog(false).add(new Translog.Index(index, indexResult));
+ translogManager.getTranslog().add(new Translog.Index(index, indexResult));
translogManager.syncTranslog();
- assertThat(translogManager.getTranslog(true).currentFileGeneration(), equalTo(2L));
- assertThat(translogManager.getTranslog(true).getMinFileGeneration(), equalTo(2L));
+ assertThat(translogManager.getTranslog().currentFileGeneration(), equalTo(2L));
+ assertThat(translogManager.getTranslog().getMinFileGeneration(), equalTo(2L));
assertTrue(syncListenerInvoked.get());
} finally {
- translogManager.getTranslog(false).close();
+ translogManager.getTranslog().close();
}
}
}
diff --git a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java
index 79c243772b252..062801fc43d2f 100644
--- a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java
+++ b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java
@@ -8,7 +8,8 @@
package org.opensearch.index.translog.listener;
-import org.apache.lucene.store.AlreadyClosedException;
+import org.opensearch.index.Index;
+import org.opensearch.index.shard.ShardId;
import org.opensearch.test.OpenSearchTestCase;
import java.lang.reflect.Proxy;
@@ -22,7 +23,6 @@ public void testCompositeTranslogEventListener() {
AtomicInteger onTranslogRecoveryInvoked = new AtomicInteger();
AtomicInteger onBeginTranslogRecoveryInvoked = new AtomicInteger();
AtomicInteger onFailureInvoked = new AtomicInteger();
- AtomicInteger onTragicFailureInvoked = new AtomicInteger();
TranslogEventListener listener = new TranslogEventListener() {
@Override
@@ -44,27 +44,23 @@ public void onBeginTranslogRecovery() {
public void onFailure(String reason, Exception ex) {
onFailureInvoked.incrementAndGet();
}
-
- @Override
- public void onTragicFailure(AlreadyClosedException ex) {
- onTragicFailureInvoked.incrementAndGet();
- }
};
final List translogEventListeners = new ArrayList<>(Arrays.asList(listener, listener));
Collections.shuffle(translogEventListeners, random());
- TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners);
+ TranslogEventListener compositeListener = new CompositeTranslogEventListener(
+ translogEventListeners,
+ new ShardId(new Index("indexName", "indexUuid"), 123)
+ );
compositeListener.onAfterTranslogRecovery();
compositeListener.onAfterTranslogSync();
compositeListener.onBeginTranslogRecovery();
compositeListener.onFailure("reason", new RuntimeException("reason"));
- compositeListener.onTragicFailure(new AlreadyClosedException("reason"));
assertEquals(2, onBeginTranslogRecoveryInvoked.get());
assertEquals(2, onTranslogRecoveryInvoked.get());
assertEquals(2, onTranslogSyncInvoked.get());
assertEquals(2, onFailureInvoked.get());
- assertEquals(2, onTragicFailureInvoked.get());
}
public void testCompositeTranslogEventListenerOnExceptions() {
@@ -72,7 +68,6 @@ public void testCompositeTranslogEventListenerOnExceptions() {
AtomicInteger onTranslogRecoveryInvoked = new AtomicInteger();
AtomicInteger onBeginTranslogRecoveryInvoked = new AtomicInteger();
AtomicInteger onFailureInvoked = new AtomicInteger();
- AtomicInteger onTragicFailureInvoked = new AtomicInteger();
TranslogEventListener listener = new TranslogEventListener() {
@Override
@@ -94,11 +89,6 @@ public void onBeginTranslogRecovery() {
public void onFailure(String reason, Exception ex) {
onFailureInvoked.incrementAndGet();
}
-
- @Override
- public void onTragicFailure(AlreadyClosedException ex) {
- onTragicFailureInvoked.incrementAndGet();
- }
};
TranslogEventListener throwingListener = (TranslogEventListener) Proxy.newProxyInstance(
@@ -109,18 +99,18 @@ public void onTragicFailure(AlreadyClosedException ex) {
final List translogEventListeners = new LinkedList<>(Arrays.asList(listener, throwingListener, listener));
Collections.shuffle(translogEventListeners, random());
- TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners);
+ TranslogEventListener compositeListener = new CompositeTranslogEventListener(
+ translogEventListeners,
+ new ShardId(new Index("indexName", "indexUuid"), 123)
+ );
expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogRecovery());
expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogSync());
expectThrows(RuntimeException.class, () -> compositeListener.onBeginTranslogRecovery());
expectThrows(RuntimeException.class, () -> compositeListener.onFailure("reason", new RuntimeException("reason")));
- expectThrows(RuntimeException.class, () -> compositeListener.onTragicFailure(new AlreadyClosedException("reason")));
assertEquals(2, onBeginTranslogRecoveryInvoked.get());
assertEquals(2, onTranslogRecoveryInvoked.get());
assertEquals(2, onTranslogSyncInvoked.get());
assertEquals(2, onFailureInvoked.get());
- assertEquals(2, onTragicFailureInvoked.get());
-
}
}
diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java
index 66c697d83510b..aa0689b58baf9 100644
--- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java
+++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java
@@ -113,6 +113,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
+import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.test.DummyShardLock;
@@ -221,7 +222,6 @@ public void setUp() throws Exception {
Lucene.cleanLuceneIndex(store.directory());
Lucene.cleanLuceneIndex(storeReplica.directory());
primaryTranslogDir = createTempDir("translog-primary");
- translogHandler = createTranslogHandler(defaultSettings);
engine = createEngine(store, primaryTranslogDir);
LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
@@ -328,10 +328,12 @@ public void tearDown() throws Exception {
super.tearDown();
try {
if (engine != null && engine.isClosed.get() == false) {
- assertEngineCleanedUp(engine, engine.getTranslog());
+ engine.ensureOpen();
+ assertEngineCleanedUp(engine, engine.translogManager().getTranslog());
}
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
- assertEngineCleanedUp(replicaEngine, replicaEngine.getTranslog());
+ replicaEngine.ensureOpen();
+ assertEngineCleanedUp(replicaEngine, replicaEngine.translogManager().getTranslog());
}
} finally {
IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool));
@@ -524,8 +526,8 @@ protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSup
);
}
- protected TranslogHandler createTranslogHandler(IndexSettings indexSettings) {
- return new TranslogHandler(xContentRegistry(), indexSettings);
+ protected TranslogHandler createTranslogHandler(IndexSettings indexSettings, Engine engine) {
+ return new TranslogHandler(xContentRegistry(), indexSettings, engine);
}
protected InternalEngine createEngine(Store store, Path translogPath) throws IOException {
@@ -662,12 +664,13 @@ protected InternalEngine createEngine(
}
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config);
- internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+ translogHandler = createTranslogHandler(config.getIndexSettings(), internalEngine);
+ internalEngine.translogManager().recoverFromTranslog(translogHandler, internalEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE);
return internalEngine;
}
public static InternalEngine createEngine(EngineConfig engineConfig, int maxDocs) {
- return new InternalEngine(engineConfig, maxDocs, LocalCheckpointTracker::new);
+ return new InternalEngine(engineConfig, maxDocs, LocalCheckpointTracker::new, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER);
}
@FunctionalInterface
@@ -1479,7 +1482,8 @@ public static MapperService createMapperService() throws IOException {
public static Translog getTranslog(Engine engine) {
assert engine instanceof InternalEngine : "only InternalEngines have translogs, got: " + engine.getClass();
InternalEngine internalEngine = (InternalEngine) engine;
- return internalEngine.getTranslog();
+ internalEngine.ensureOpen();
+ return internalEngine.translogManager().getTranslog();
}
/**
diff --git a/test/framework/src/main/java/org/opensearch/index/engine/InternalTestEngine.java b/test/framework/src/main/java/org/opensearch/index/engine/InternalTestEngine.java
index 2c22c391ea3a3..20767c3b029ae 100644
--- a/test/framework/src/main/java/org/opensearch/index/engine/InternalTestEngine.java
+++ b/test/framework/src/main/java/org/opensearch/index/engine/InternalTestEngine.java
@@ -35,6 +35,7 @@
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SequenceNumbers;
+import org.opensearch.index.translog.listener.TranslogEventListener;
import java.io.IOException;
import java.util.Map;
@@ -55,7 +56,7 @@ class InternalTestEngine extends InternalEngine {
int maxDocs,
BiFunction localCheckpointTrackerSupplier
) {
- super(engineConfig, maxDocs, localCheckpointTrackerSupplier);
+ super(engineConfig, maxDocs, localCheckpointTrackerSupplier, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER);
}
@Override
diff --git a/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java
index e1f2357aa2400..602f344470e16 100644
--- a/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java
+++ b/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java
@@ -50,6 +50,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.translog.Translog;
+import org.opensearch.index.translog.TranslogRecoveryRunner;
import org.opensearch.indices.IndicesModule;
import org.opensearch.indices.mapper.MapperRegistry;
@@ -61,17 +62,20 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
-public class TranslogHandler implements Engine.TranslogRecoveryRunner {
+public class TranslogHandler implements TranslogRecoveryRunner {
private final MapperService mapperService;
private final AtomicLong appliedOperations = new AtomicLong();
+ private final Engine engine;
+
long appliedOperations() {
return appliedOperations.get();
}
- public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) {
+ public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings, Engine engine) {
+ this.engine = engine;
Map analyzers = new HashMap<>();
analyzers.put(AnalysisRegistry.DEFAULT_ANALYZER_NAME, new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer()));
IndexAnalyzers indexAnalyzers = new IndexAnalyzers(analyzers, emptyMap(), emptyMap());
@@ -112,7 +116,7 @@ private void applyOperation(Engine engine, Engine.Operation operation) throws IO
}
@Override
- public int run(Engine engine, Translog.Snapshot snapshot) throws IOException {
+ public int run(Translog.Snapshot snapshot) throws IOException {
int opsRecovered = 0;
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
@@ -120,7 +124,7 @@ public int run(Engine engine, Translog.Snapshot snapshot) throws IOException {
opsRecovered++;
appliedOperations.incrementAndGet();
}
- engine.syncTranslog();
+ engine.translogManager().syncTranslog();
return opsRecovered;
}