Skip to content

Commit

Permalink
Refactor to move implementation logic out of TranslogDeletionPolicy
Browse files Browse the repository at this point in the history
Signed-off-by: Rabi Panda <adnapibar@gmail.com>
  • Loading branch information
adnapibar committed Oct 29, 2021
1 parent 219475f commit ecc278c
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,35 @@

package org.opensearch.index.translog;

import org.opensearch.index.seqno.RetentionLeases;

import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;

/**
* Default implementation for the {@link TranslogDeletionPolicy}. Plugins can override the default behaviour
* via the {@link org.opensearch.plugins.EnginePlugin#getCustomTranslogDeletionPolicyFactory()}.
*
* The default policy uses total number, size in bytes and maximum age for files.
*/

public class DefaultTranslogDeletionPolicy extends TranslogDeletionPolicy {
public DefaultTranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
super(retentionSizeInBytes, retentionAgeInMillis, retentionTotalFiles);
}
private long retentionSizeInBytes;

private long retentionAgeInMillis;

public DefaultTranslogDeletionPolicy(
long retentionSizeInBytes,
long retentionAgeInMillis,
int retentionTotalFiles,
Supplier<RetentionLeases> retentionLeasesSupplier
) {
super(retentionSizeInBytes, retentionAgeInMillis, retentionTotalFiles, retentionLeasesSupplier);
private int retentionTotalFiles;

public DefaultTranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
super();
this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis;
this.retentionTotalFiles = retentionTotalFiles;
}

@Override
public synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) throws IOException {
long minByLocks = getMinTranslogGenRequiredByLocks();
long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime());
long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes);
long minByRetentionLeasesAndSize = Long.MAX_VALUE;
if (shouldPruneTranslogByRetentionLease) {
// If retention size is specified, size takes precedence.
long minByRetentionLeases = getMinTranslogGenByRetentionLease(readers, writer, retentionLeasesSupplier);
minByRetentionLeasesAndSize = Math.max(minBySize, minByRetentionLeases);
}
final long minByAgeAndSize;
if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) {
// both size and age are disabled;
Expand All @@ -47,11 +45,21 @@ public synchronized long minTranslogGenRequired(List<TranslogReader> readers, Tr
minByAgeAndSize = Math.max(minByAge, minBySize);
}
long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles);
long minByTranslogGenSettings = Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks);
return Math.min(minByTranslogGenSettings, minByRetentionLeasesAndSize);
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks);
}

@Override
public synchronized void setRetentionSizeInBytes(long bytes) {
retentionSizeInBytes = bytes;
}

private long getMinTranslogGenRequiredByLocks() {
return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
@Override
public synchronized void setRetentionAgeInMillis(long ageInMillis) {
retentionAgeInMillis = ageInMillis;
}

@Override
protected synchronized void setRetentionTotalFiles(int retentionTotalFiles) {
this.retentionTotalFiles = retentionTotalFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class TranslogDeletionPolicy {
private final Map<Object, RuntimeException> openTranslogRef;

public void assertNoOpenTranslogRefs() {
if (openTranslogRef.isEmpty() == false) {
if (openTranslogRef != null && openTranslogRef.isEmpty() == false) {
AssertionError e = new AssertionError("not all translog generations have been released");
openTranslogRef.values().forEach(e::addSuppressed);
throw e;
Expand All @@ -63,16 +63,7 @@ public void assertNoOpenTranslogRefs() {
private final Map<Long, Counter> translogRefCounts = new HashMap<>();
private long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;

private long retentionSizeInBytes;

private long retentionAgeInMillis;

private int retentionTotalFiles;

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis;
this.retentionTotalFiles = retentionTotalFiles;
public TranslogDeletionPolicy() {
if (Assertions.ENABLED) {
openTranslogRef = new ConcurrentHashMap<>();
} else {
Expand All @@ -94,17 +85,11 @@ public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) {
this.localCheckpointOfSafeCommit = newCheckpoint;
}

public synchronized void setRetentionSizeInBytes(long bytes) {
retentionSizeInBytes = bytes;
}
public abstract void setRetentionSizeInBytes(long bytes);

public synchronized void setRetentionAgeInMillis(long ageInMillis) {
retentionAgeInMillis = ageInMillis;
}
public abstract void setRetentionAgeInMillis(long ageInMillis);

synchronized void setRetentionTotalFiles(int retentionTotalFiles) {
this.retentionTotalFiles = retentionTotalFiles;
}
protected abstract void setRetentionTotalFiles(int retentionTotalFiles);

/**
* acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation
Expand Down Expand Up @@ -161,7 +146,7 @@ private synchronized void releaseTranslogGen(long translogGen) {
*/
public abstract long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) throws IOException;

static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
public static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
if (retentionSizeInBytes >= 0) {
long totalSize = writer.sizeInBytes();
long minGen = writer.getGeneration();
Expand All @@ -176,7 +161,7 @@ static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter
}
}

static long getMinTranslogGenByAge(List<TranslogReader> readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now)
public static long getMinTranslogGenByAge(List<TranslogReader> readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now)
throws IOException {
if (maxRetentionAgeInMillis >= 0) {
for (TranslogReader reader : readers) {
Expand All @@ -190,7 +175,7 @@ static long getMinTranslogGenByAge(List<TranslogReader> readers, TranslogWriter
}
}

static long getMinTranslogGenByTotalFiles(List<TranslogReader> readers, TranslogWriter writer, final int maxTotalFiles) {
public static long getMinTranslogGenByTotalFiles(List<TranslogReader> readers, TranslogWriter writer, final int maxTotalFiles) {
long minGen = writer.generation;
int totalFiles = 1; // for the current writer
for (int i = readers.size() - 1; i >= 0 && totalFiles < maxTotalFiles; i--) {
Expand All @@ -204,6 +189,10 @@ protected long currentTime() {
return System.currentTimeMillis();
}

protected long getMinTranslogGenRequiredByLocks() {
return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
}

/**
* Returns the local checkpoint of the safe commit. This value is used to calculate the min required generation for recovery.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,12 @@ protected void ensureOpen() {
throw new AlreadyClosedException(toString() + " is already closed");
}
}

public long getMinSeqNo() {
return checkpoint.minSeqNo;
}

public long getMaxSeqNo() {
return checkpoint.maxSeqNo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private boolean isTranslogClean(ShardPath shardPath, ClusterState clusterState,
);
long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardPath.getShardId().id());
// We open translog to check for corruption, do not clean anything.
final TranslogDeletionPolicy retainAllTranslogPolicy = new TranslogDeletionPolicy(
final TranslogDeletionPolicy retainAllTranslogPolicy = new DefaultTranslogDeletionPolicy(
Long.MAX_VALUE,
Long.MAX_VALUE,
Integer.MAX_VALUE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,22 @@ public Optional<TranslogDeletionPolicyFactory> getCustomTranslogDeletionPolicyFa

private static class CustomTranslogDeletionPolicy extends TranslogDeletionPolicy {
public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier<RetentionLeases> retentionLeasesSupplier) {
super(
indexSettings.getTranslogRetentionSize().getBytes(),
indexSettings.getTranslogRetentionAge().getMillis(),
indexSettings.getTranslogRetentionTotalFiles()
);
super();
}

@Override
public void setRetentionSizeInBytes(long bytes) {

}

@Override
public void setRetentionAgeInMillis(long ageInMillis) {

}

@Override
protected void setRetentionTotalFiles(int retentionTotalFiles) {

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void testRetentionHierarchy() throws IOException {
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
allGens.add(readersAndWriter.v2());
try {
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE);
DefaultTranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE);
int selectedReader = randomIntBetween(0, allGens.size() - 1);
final long selectedGenerationByAge = allGens.get(selectedReader).generation;
long maxAge = now - allGens.get(selectedReader).getLastModifiedTime();
Expand Down

0 comments on commit ecc278c

Please sign in to comment.