Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-11365 Incremental index store: ability to set a timeout #1962

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ public IndexStore buildStore() throws IOException, CommitFailedException {
}

public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint) throws IOException, CommitFailedException {
return buildStore(initialCheckpoint, finalCheckpoint, Long.MAX_VALUE);
}

public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint, long maxDurationSeconds) throws IOException, CommitFailedException {
IncrementalStoreBuilder builder;
IndexStore incrementalStore;
Set<IndexDefinition> indexDefinitions = indexerSupport.getIndexDefinitions();
Expand Down Expand Up @@ -308,6 +312,7 @@ public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint) t
try {
builder = new IncrementalStoreBuilder(indexHelper.getWorkDir(), indexHelper, initialCheckpoint, finalCheckpoint)
.withPreferredPathElements(preferredPathElements)
.withMaxDurationSeconds(maxDurationSeconds)
.withPathPredicate(predicate)
.withBlobStore(indexHelper.getGCBlobStore());
incrementalStore = builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.BufferedWriter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

public class IncrementalFlatFileStoreEditor implements Editor {
Expand All @@ -38,18 +39,29 @@ public class IncrementalFlatFileStoreEditor implements Editor {
private final IncrementalFlatFileStoreNodeStateEntryWriter entryWriter;
private final Predicate<String> predicate;
private final IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy;
// if not 0, timeout if System.nanoTime() exceeds this value
private final long timeoutAtNanos;
private static final int LINE_SEP_LENGTH = System.getProperty("line.separator").length();

public IncrementalFlatFileStoreEditor(BufferedWriter bufferedWriter, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter, Predicate<String> predicate,
IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy) {
IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy, long maxDurationSeconds) {
this.bufferedWriter = bufferedWriter;
this.entryWriter = entryWriter;
this.predicate = predicate;
this.incrementalFlatFileStoreStrategy = incrementalFlatFileStoreStrategy;
long timeout;
if (maxDurationSeconds == Long.MAX_VALUE) {
timeout = 0;
} else {
timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(maxDurationSeconds, TimeUnit.SECONDS);
log.info("Max duration: " + maxDurationSeconds + " timeout: " + timeout + " now: " + System.nanoTime());
}
this.timeoutAtNanos = timeout;
}

@Override
public void enter(NodeState before, NodeState after) {
checkTimeout();
}

@Override
Expand Down Expand Up @@ -112,4 +124,13 @@ private void writeToFile(NodeState e, IncrementalStoreOperand action) {
throw new RuntimeException("Error while creating incremental store", ex);
}
}

private void checkTimeout() {
if (timeoutAtNanos != 0) {
long now = System.nanoTime();
if (now > timeoutAtNanos) {
throw new RuntimeException("Timeout");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ public class IncrementalFlatFileStoreStrategy implements IncrementalIndexStoreSo
private long textSize = 0;
private long entryCount = 0;
private final Set<String> preferredPathElements;
private final long maxDurationSeconds;

public IncrementalFlatFileStoreStrategy(NodeStore nodeStore, @NotNull String beforeCheckpoint, @NotNull String afterCheckpoint, File storeDir,
Set<String> preferredPathElements, @NotNull Compression algorithm,
Predicate<String> pathPredicate, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter) {
Predicate<String> pathPredicate, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter, long maxDurationSeconds) {
this.nodeStore = nodeStore;
this.beforeCheckpoint = beforeCheckpoint;
this.afterCheckpoint = afterCheckpoint;
Expand All @@ -76,6 +77,7 @@ public IncrementalFlatFileStoreStrategy(NodeStore nodeStore, @NotNull String bef
this.entryWriter = entryWriter;
this.preferredPathElements = preferredPathElements;
this.comparator = new PathElementComparator(preferredPathElements);
this.maxDurationSeconds = maxDurationSeconds;
}

@Override
Expand All @@ -85,7 +87,7 @@ public File createSortedStoreFile() throws IOException {
try (BufferedWriter w = FlatFileStoreUtils.createWriter(file, algorithm)) {
NodeState before = Objects.requireNonNull(nodeStore.retrieve(beforeCheckpoint));
NodeState after = Objects.requireNonNull(nodeStore.retrieve(afterCheckpoint));
Exception e = EditorDiff.process(VisibleEditor.wrap(new IncrementalFlatFileStoreEditor(w, entryWriter, pathPredicate, this)), before, after);
Exception e = EditorDiff.process(VisibleEditor.wrap(new IncrementalFlatFileStoreEditor(w, entryWriter, pathPredicate, this, maxDurationSeconds)), before, after);
if (e != null) {
log.error("Exception while building incremental store for checkpoint before {}, after {}", beforeCheckpoint, afterCheckpoint, e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class IncrementalStoreBuilder {
private final IndexHelper indexHelper;
private final String initialCheckpoint;
private final String finalCheckpoint;
private long maxDurationSeconds = Long.MAX_VALUE;
private Predicate<String> pathPredicate = path -> true;
private Set<String> preferredPathElements = Collections.emptySet();
private BlobStore blobStore;
Expand Down Expand Up @@ -107,6 +108,10 @@ public IncrementalStoreBuilder withBlobStore(BlobStore blobStore) {
return this;
}

public IncrementalStoreBuilder withMaxDurationSeconds(long maxDurationSeconds) {
this.maxDurationSeconds = maxDurationSeconds;
return this;
}

public IndexStore build() throws IOException, CompositeException {
logFlags();
Expand All @@ -115,11 +120,12 @@ public IndexStore build() throws IOException, CompositeException {
if (sortStrategyType == IncrementalSortStrategyType.INCREMENTAL_FFS_STORE ||
sortStrategyType == IncrementalSortStrategyType.INCREMENTAL_TREE_STORE) {
IncrementalFlatFileStoreNodeStateEntryWriter entryWriter = new IncrementalFlatFileStoreNodeStateEntryWriter(blobStore);

IncrementalIndexStoreSortStrategy strategy = new IncrementalFlatFileStoreStrategy(
indexHelper.getNodeStore(),
initialCheckpoint,
finalCheckpoint,
dir, preferredPathElements, algorithm, pathPredicate, entryWriter);
dir, preferredPathElements, algorithm, pathPredicate, entryWriter, maxDurationSeconds);
File metadataFile = strategy.createMetadataFile();
File incrementalStoreFile = strategy.createSortedStoreFile();
long entryCount = strategy.getEntryCount();
Expand Down Expand Up @@ -147,4 +153,5 @@ private void logFlags() {
log.info("Compression enabled while sorting : {} ({})", IndexStoreUtils.compressionEnabled(), OAK_INDEXER_USE_ZIP);
log.info("LZ4 enabled for compression algorithm : {} ({})", IndexStoreUtils.useLZ4(), OAK_INDEXER_USE_LZ4);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private IncrementalFlatFileStoreStrategy createIncrementalStrategy(Backend backe
readOnlyNodeStore.retrieve(finalCheckpoint);
return new IncrementalFlatFileStoreStrategy(
readOnlyNodeStore, initialCheckpoint, finalCheckpoint, sortFolder.getRoot(), preferredPathElements,
algorithm, pathPredicate, new IncrementalFlatFileStoreNodeStateEntryWriter(fileBlobStore));
algorithm, pathPredicate, new IncrementalFlatFileStoreNodeStateEntryWriter(fileBlobStore), Long.MAX_VALUE);
}

private void createBaseContent(NodeStore rwNodeStore) throws CommitFailedException {
Expand Down
Loading