Skip to content

Commit

Permalink
OAK-11365 Incremental index store: ability to set a timeout (#1962)
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasmueller authored Jan 13, 2025
1 parent bad4936 commit f08aa28
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ public IndexStore buildStore() throws IOException, CommitFailedException {
}

public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint) throws IOException, CommitFailedException {
return buildStore(initialCheckpoint, finalCheckpoint, Long.MAX_VALUE);
}

public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint, long maxDurationSeconds) throws IOException, CommitFailedException {
IncrementalStoreBuilder builder;
IndexStore incrementalStore;
Set<IndexDefinition> indexDefinitions = indexerSupport.getIndexDefinitions();
Expand Down Expand Up @@ -308,6 +312,7 @@ public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint) t
try {
builder = new IncrementalStoreBuilder(indexHelper.getWorkDir(), indexHelper, initialCheckpoint, finalCheckpoint)
.withPreferredPathElements(preferredPathElements)
.withMaxDurationSeconds(maxDurationSeconds)
.withPathPredicate(predicate)
.withBlobStore(indexHelper.getGCBlobStore());
incrementalStore = builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.BufferedWriter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

public class IncrementalFlatFileStoreEditor implements Editor {
Expand All @@ -38,18 +39,29 @@ public class IncrementalFlatFileStoreEditor implements Editor {
private final IncrementalFlatFileStoreNodeStateEntryWriter entryWriter;
private final Predicate<String> predicate;
private final IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy;
// if not 0, timeout if System.nanoTime() exceeds this value
private final long timeoutAtNanos;
private static final int LINE_SEP_LENGTH = System.getProperty("line.separator").length();

public IncrementalFlatFileStoreEditor(BufferedWriter bufferedWriter, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter, Predicate<String> predicate,
IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy) {
IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy, long maxDurationSeconds) {
this.bufferedWriter = bufferedWriter;
this.entryWriter = entryWriter;
this.predicate = predicate;
this.incrementalFlatFileStoreStrategy = incrementalFlatFileStoreStrategy;
long timeout;
if (maxDurationSeconds == Long.MAX_VALUE) {
timeout = 0;
} else {
timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(maxDurationSeconds, TimeUnit.SECONDS);
log.info("Max duration: " + maxDurationSeconds + " timeout: " + timeout + " now: " + System.nanoTime());
}
this.timeoutAtNanos = timeout;
}

@Override
public void enter(NodeState before, NodeState after) {
checkTimeout();
}

@Override
Expand Down Expand Up @@ -112,4 +124,13 @@ private void writeToFile(NodeState e, IncrementalStoreOperand action) {
throw new RuntimeException("Error while creating incremental store", ex);
}
}

private void checkTimeout() {
if (timeoutAtNanos != 0) {
long now = System.nanoTime();
if (now > timeoutAtNanos) {
throw new RuntimeException("Timeout");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ public class IncrementalFlatFileStoreStrategy implements IncrementalIndexStoreSo
private long textSize = 0;
private long entryCount = 0;
private final Set<String> preferredPathElements;
private final long maxDurationSeconds;

public IncrementalFlatFileStoreStrategy(NodeStore nodeStore, @NotNull String beforeCheckpoint, @NotNull String afterCheckpoint, File storeDir,
Set<String> preferredPathElements, @NotNull Compression algorithm,
Predicate<String> pathPredicate, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter) {
Predicate<String> pathPredicate, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter, long maxDurationSeconds) {
this.nodeStore = nodeStore;
this.beforeCheckpoint = beforeCheckpoint;
this.afterCheckpoint = afterCheckpoint;
Expand All @@ -76,6 +77,7 @@ public IncrementalFlatFileStoreStrategy(NodeStore nodeStore, @NotNull String bef
this.entryWriter = entryWriter;
this.preferredPathElements = preferredPathElements;
this.comparator = new PathElementComparator(preferredPathElements);
this.maxDurationSeconds = maxDurationSeconds;
}

@Override
Expand All @@ -85,7 +87,7 @@ public File createSortedStoreFile() throws IOException {
try (BufferedWriter w = FlatFileStoreUtils.createWriter(file, algorithm)) {
NodeState before = Objects.requireNonNull(nodeStore.retrieve(beforeCheckpoint));
NodeState after = Objects.requireNonNull(nodeStore.retrieve(afterCheckpoint));
Exception e = EditorDiff.process(VisibleEditor.wrap(new IncrementalFlatFileStoreEditor(w, entryWriter, pathPredicate, this)), before, after);
Exception e = EditorDiff.process(VisibleEditor.wrap(new IncrementalFlatFileStoreEditor(w, entryWriter, pathPredicate, this, maxDurationSeconds)), before, after);
if (e != null) {
log.error("Exception while building incremental store for checkpoint before {}, after {}", beforeCheckpoint, afterCheckpoint, e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class IncrementalStoreBuilder {
private final IndexHelper indexHelper;
private final String initialCheckpoint;
private final String finalCheckpoint;
private long maxDurationSeconds = Long.MAX_VALUE;
private Predicate<String> pathPredicate = path -> true;
private Set<String> preferredPathElements = Collections.emptySet();
private BlobStore blobStore;
Expand Down Expand Up @@ -107,6 +108,10 @@ public IncrementalStoreBuilder withBlobStore(BlobStore blobStore) {
return this;
}

public IncrementalStoreBuilder withMaxDurationSeconds(long maxDurationSeconds) {
this.maxDurationSeconds = maxDurationSeconds;
return this;
}

public IndexStore build() throws IOException, CompositeException {
logFlags();
Expand All @@ -115,11 +120,12 @@ public IndexStore build() throws IOException, CompositeException {
if (sortStrategyType == IncrementalSortStrategyType.INCREMENTAL_FFS_STORE ||
sortStrategyType == IncrementalSortStrategyType.INCREMENTAL_TREE_STORE) {
IncrementalFlatFileStoreNodeStateEntryWriter entryWriter = new IncrementalFlatFileStoreNodeStateEntryWriter(blobStore);

IncrementalIndexStoreSortStrategy strategy = new IncrementalFlatFileStoreStrategy(
indexHelper.getNodeStore(),
initialCheckpoint,
finalCheckpoint,
dir, preferredPathElements, algorithm, pathPredicate, entryWriter);
dir, preferredPathElements, algorithm, pathPredicate, entryWriter, maxDurationSeconds);
File metadataFile = strategy.createMetadataFile();
File incrementalStoreFile = strategy.createSortedStoreFile();
long entryCount = strategy.getEntryCount();
Expand Down Expand Up @@ -147,4 +153,5 @@ private void logFlags() {
log.info("Compression enabled while sorting : {} ({})", IndexStoreUtils.compressionEnabled(), OAK_INDEXER_USE_ZIP);
log.info("LZ4 enabled for compression algorithm : {} ({})", IndexStoreUtils.useLZ4(), OAK_INDEXER_USE_LZ4);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private IncrementalFlatFileStoreStrategy createIncrementalStrategy(Backend backe
readOnlyNodeStore.retrieve(finalCheckpoint);
return new IncrementalFlatFileStoreStrategy(
readOnlyNodeStore, initialCheckpoint, finalCheckpoint, sortFolder.getRoot(), preferredPathElements,
algorithm, pathPredicate, new IncrementalFlatFileStoreNodeStateEntryWriter(fileBlobStore));
algorithm, pathPredicate, new IncrementalFlatFileStoreNodeStateEntryWriter(fileBlobStore), Long.MAX_VALUE);
}

private void createBaseContent(NodeStore rwNodeStore) throws CommitFailedException {
Expand Down

0 comments on commit f08aa28

Please sign in to comment.