Skip to content

Commit

Permalink
Merge branch 'feature/track-active-range-requests' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
SailReal committed Dec 21, 2021
2 parents c4dc917 + 7cfc566 commit da44e8b
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 75 deletions.
115 changes: 71 additions & 44 deletions src/main/java/org/cryptomator/fusecloudaccess/OpenFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableRangeSet;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeMap;
import com.google.common.collect.TreeRangeSet;
import jnr.ffi.Pointer;
import org.cryptomator.cloudaccess.api.CloudPath;
import org.cryptomator.cloudaccess.api.CloudProvider;
import org.cryptomator.cloudaccess.api.ProgressListener;
import org.cryptomator.cloudaccess.api.exceptions.CloudTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
Expand All @@ -26,13 +28,13 @@
import java.nio.file.Path;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static java.nio.file.StandardOpenOption.*;

Expand All @@ -44,6 +46,7 @@ class OpenFile implements Closeable {
private final CompletableAsynchronousFileChannel fc;
private final CloudProvider provider;
private final RangeSet<Long> populatedRanges;
private final RangeMap<Long, CompletionStage<Void>> activeRequests;
private final AtomicInteger openFileHandleCount;
private final AtomicReference<OpenFile.State> state;
private volatile CloudPath path;
Expand All @@ -52,11 +55,12 @@ class OpenFile implements Closeable {
public enum State {UNMODIFIED, NEEDS_UPLOAD, UPLOADING, NEEDS_REUPLOAD}

// visible for testing
OpenFile(CloudPath path, CompletableAsynchronousFileChannel fc, CloudProvider provider, RangeSet<Long> populatedRanges, Instant initialLastModified) {
OpenFile(CloudPath path, CompletableAsynchronousFileChannel fc, CloudProvider provider, RangeSet<Long> populatedRanges, RangeMap<Long, CompletionStage<Void>> activeRequests, Instant initialLastModified) {
this.path = path;
this.fc = fc;
this.provider = provider;
this.populatedRanges = populatedRanges;
this.activeRequests = activeRequests;
this.openFileHandleCount = new AtomicInteger();
this.state = new AtomicReference<>(State.UNMODIFIED);
this.lastModified = initialLastModified;
Expand All @@ -65,10 +69,10 @@ public enum State {UNMODIFIED, NEEDS_UPLOAD, UPLOADING, NEEDS_REUPLOAD}
/**
* Creates a cached representation of a file. File contents are loaded on demand from the provided cloud provider.
*
* @param path The path of this file in the cloud
* @param tmpFilePath Where to store the volatile cache
* @param provider The cloud provider used to load and persist file contents
* @param initialSize Must be 0 for newly created files. (Use {@link #truncate(long)} if you want to grow it)
* @param path The path of this file in the cloud
* @param tmpFilePath Where to store the volatile cache
* @param provider The cloud provider used to load and persist file contents
* @param initialSize Must be 0 for newly created files. (Use {@link #truncate(long)} if you want to grow it)
* @return The created file
* @throws IOException I/O errors during creation of the cache file located at <code>tmpFilePath</code>
*/
Expand All @@ -84,7 +88,7 @@ public static OpenFile create(CloudPath path, Path tmpFilePath, CloudProvider pr
throw new IOException("Failed to create file", e);
}
}
return new OpenFile(path, new CompletableAsynchronousFileChannel(fc), provider, TreeRangeSet.create(), Instant.now());
return new OpenFile(path, new CompletableAsynchronousFileChannel(fc), provider, TreeRangeSet.create(), TreeRangeMap.create(), Instant.now());
}

public AtomicInteger getOpenFileHandleCount() {
Expand Down Expand Up @@ -196,7 +200,7 @@ public CompletableFuture<Integer> write(Pointer buf, long offset, long count) {
setLastModified(Instant.now().truncatedTo(ChronoUnit.SECONDS));
markPopulatedIfGrowing(offset);
return fc.writeFromPointer(buf, offset, count).thenApply(written -> {
synchronized (populatedRanges) {
synchronized (this) {
populatedRanges.add(Range.closedOpen(offset, offset + written));
}
return written;
Expand Down Expand Up @@ -225,25 +229,25 @@ CompletionStage<Void> load(long offset, long count) {
}
var requiredLastByte = Math.min(size, offset + count); // reads not behind eof (lastByte is exclusive!)
var requiredRange = Range.closedOpen(offset, requiredLastByte);
synchronized (populatedRanges) {
synchronized (this) {
if (requiredRange.isEmpty() || populatedRanges.encloses(requiredRange)) {
return CompletableFuture.completedFuture(null);
} else {
var desiredCount = Math.max(count, READAHEAD_SIZE); // reads at least the readahead
var desiredLastByte = Math.min(size, offset + desiredCount); // reads not behind eof (lastByte is exclusive!)
var desiredRange = Range.closedOpen(offset, desiredLastByte);
var missingRanges = ImmutableRangeSet.of(desiredRange).difference(populatedRanges);
return CompletableFuture.allOf(missingRanges.asRanges().stream().map(this::loadMissing).toArray(CompletableFuture[]::new)) //
.handle((v, e) -> {
if (e != null && e.getCause() instanceof InterruptedIOException) {
return CompletableFuture.<Void>failedFuture(new CloudTimeoutException(e.getCause()));
} else if (e != null) {
return CompletableFuture.<Void>failedFuture(e);
} else {
return CompletableFuture.<Void>completedFuture(null);
}
}) //
.thenCompose(Function.identity());

var activeRanges = ImmutableRangeSet.copyOf(activeRequests.asMapOfRanges().keySet());
var missingRanges = ImmutableRangeSet.of(desiredRange).difference(populatedRanges).difference(activeRanges);

var relevantRequests = new HashSet<>(activeRequests.subRangeMap(desiredRange).asMapOfRanges().values());

for (var range : missingRanges.asRanges()) {
var request = loadMissing(range);
relevantRequests.add(request);
}

return CompletableFuture.allOf(relevantRequests.stream().map(CompletionStage::toCompletableFuture).toArray(CompletableFuture[]::new));
}
}
} catch (IOException e) {
Expand All @@ -253,12 +257,29 @@ CompletionStage<Void> load(long offset, long count) {

private CompletionStage<Void> loadMissing(Range<Long> requestedRange) {
assert !populatedRanges.intersects(requestedRange); // synchronized by caller
assert activeRequests.subRangeMap(requestedRange).asMapOfRanges().isEmpty(); // synchronized by caller
long offset = requestedRange.lowerEndpoint();
long size = requestedRange.upperEndpoint() - requestedRange.lowerEndpoint();
return provider.read(path, offset, size, ProgressListener.NO_PROGRESS_AWARE).thenCompose(in -> {

var read = provider.read(path, offset, size, ProgressListener.NO_PROGRESS_AWARE).thenCompose(in -> {
var mergeTask = mergeData(requestedRange, in);
return mergeTask.whenComplete((result, exception) -> closeQuietly(in));
});

activeRequests.put(requestedRange, read);

read.whenComplete((result, error) -> completedRequest(requestedRange, read));

return read;
}

// visible for testing
synchronized void completedRequest(Range<Long> requestedRange, CompletionStage<Void> request) {
var entry = activeRequests.getEntry(requestedRange.lowerEndpoint());
// only remove active request if it hasn't been replaced by a broader request
if (entry.getKey().equals(requestedRange) && entry.getValue().equals(request)) {
activeRequests.remove(requestedRange);
}
}

/**
Expand All @@ -269,39 +290,45 @@ private CompletionStage<Void> loadMissing(Range<Long> requestedRange) {
* unless hitting EOF on <code>source</code>.
*
* @param range Where to place the data within the file channel
* @param source The data source
* @param source An input stream beginning at the first byte of the requested range
* @return
*/
// visible for testing
CompletableFuture<Void> mergeData(Range<Long> range, InputStream source) {
synchronized (populatedRanges) {
var missingRanges = ImmutableRangeSet.of(range).difference(populatedRanges).asRanges().iterator();
return mergeDataInternal(missingRanges, source, range.lowerEndpoint());
}
synchronized CompletableFuture<Void> mergeData(Range<Long> range, InputStream source) {
var missingRanges = ImmutableRangeSet.of(range).difference(populatedRanges).asRanges().iterator();
return mergeDataInternal(missingRanges, source, range.lowerEndpoint());
}

private CompletableFuture<Void> mergeDataInternal(Iterator<Range<Long>> missingRanges, InputStream source, final long pos) {
private CompletableFuture<Void> mergeDataInternal(Iterator<Range<Long>> missingRanges, InputStream source, final long sourceOffset) {
if (!missingRanges.hasNext()) {
return CompletableFuture.completedFuture(null);
}
var range = missingRanges.next();
Preconditions.checkArgument(pos <= range.lowerEndpoint());
final long position;
if (pos < range.lowerEndpoint()) {
try {
long skipped = source.skip(range.lowerEndpoint() - pos);
position = pos + skipped;
} catch (IOException e) {
return CompletableFuture.failedFuture(e);
Preconditions.checkArgument(sourceOffset <= range.lowerEndpoint());

// inputstream may contain regions that aren't "missing".
// therefore we need to "skip" till the begin of our range:
try {
long p = sourceOffset;
while (p < range.lowerEndpoint()) {
var skipped = source.skip(range.lowerEndpoint() - p);
if (skipped == 0) {
throw new EOFException("failed to skip to begin of desired range");
}
p += skipped;
}
} else {
position = pos;
assert p == range.lowerEndpoint();
} catch (IOException e) {
return CompletableFuture.failedFuture(e);
}
assert position == range.lowerEndpoint();

// now transfer contents from inputstream to our file. repeat process for next range, when finished
long position = range.lowerEndpoint();
var count = range.upperEndpoint() - range.lowerEndpoint();
return fc.transferFrom(source, position, count).thenCompose(transferred -> {
synchronized (populatedRanges) {
populatedRanges.add(Range.closedOpen(position, position + transferred));
synchronized (this) {
var populatedRange = Range.closedOpen(position, position + transferred);
populatedRanges.add(populatedRange);
}
return mergeDataInternal(missingRanges, source, position + transferred);
});
Expand Down Expand Up @@ -340,7 +367,7 @@ public void truncate(long size) throws IOException {
private void markPopulatedIfGrowing(long newSize) {
long oldSize = getSize();
if (newSize > oldSize) {
synchronized (populatedRanges) {
synchronized (this) {
populatedRanges.add(Range.closedOpen(oldSize, newSize));
}
}
Expand Down
Loading

0 comments on commit da44e8b

Please sign in to comment.