Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for compression on gRPC cache #14041

Closed
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ccea6a4
[Compression] Add experimental option and verify capabilities
AlessandroPatti Sep 20, 2021
148ebc6
[Compression] Implement bytestream compressed cache downloads
AlessandroPatti Sep 20, 2021
bb6b53f
[Compression] Implement bytestream compressed cache uploads
AlessandroPatti Sep 20, 2021
ad5b0b4
[Compression][Tests] Test chunker with compression
AlessandroPatti Sep 21, 2021
c0a3b3e
[Compression][Tests] Test compressed cache uplaods
AlessandroPatti Sep 21, 2021
1d643aa
[Compression][Tests] Test compressed cache downloads
AlessandroPatti Sep 21, 2021
4d1dc31
Allow calling getActualSize when upload are not finished
AlessandroPatti Sep 26, 2021
d84a76d
Use thin jars for zstd jni
AlessandroPatti Sep 26, 2021
df571eb
[Compression] Wrap output stream with decompression
AlessandroPatti Sep 28, 2021
8c77992
[Compression][Upload] Simplify Chunker with auxiliary classes
AlessandroPatti Oct 1, 2021
ef40b33
Ensure hasNext is consistent after seek
AlessandroPatti Oct 2, 2021
054ce2d
Exhaust input and compute final size
AlessandroPatti Oct 2, 2021
55affb4
Adjust test params
AlessandroPatti Oct 2, 2021
9fc9b0a
[Compression][Tests] Add test for compressed progressive uploads
AlessandroPatti Oct 2, 2021
91ba7b8
[Compression][Upload] Seek before jumping in rpc call
AlessandroPatti Oct 2, 2021
edf0f8c
Compile zstd-jni from sources
AlessandroPatti Oct 12, 2021
43bbd5f
[Compression] Support progressive reads
AlessandroPatti Oct 14, 2021
acb7fb7
Simplify compressing stream
AlessandroPatti Oct 14, 2021
327d6b9
[Compression][Tests] Test progressive compressed download
AlessandroPatti Oct 14, 2021
dfc3068
[Compression] Rename option to remote_cache_compression
AlessandroPatti Oct 14, 2021
f6e1d2e
Only check for expected commited size if we completed the upload
AlessandroPatti Oct 15, 2021
4f7771d
Ensure minimum pipe size
AlessandroPatti Oct 17, 2021
6c13594
Add simple tests for zstd streams
AlessandroPatti Oct 17, 2021
9648105
Move native patch to patch file
AlessandroPatti Oct 20, 2021
522a07f
Add mirror
AlessandroPatti Oct 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[Compression] Implement bytestream compressed cache uploads
  • Loading branch information
AlessandroPatti committed Oct 15, 2021
commit bb6b53fce6cd8614a92a4aef5d4690ee0d070c16
Original file line number Diff line number Diff line change
@@ -298,9 +298,11 @@ boolean uploadsInProgress() {
}
}

private static String buildUploadResourceName(String instanceName, UUID uuid, Digest digest) {
String resourceName =
format("uploads/%s/blobs/%s/%d", uuid, digest.getHash(), digest.getSizeBytes());
private static String buildUploadResourceName(
String instanceName, UUID uuid, Digest digest, boolean compressed) {
String template =
compressed ? "uploads/%s/compressed-blobs/zstd/%s/%d" : "uploads/%s/blobs/%s/%d";
String resourceName = format(template, uuid, digest.getHash(), digest.getSizeBytes());
if (!Strings.isNullOrEmpty(instanceName)) {
resourceName = instanceName + "/" + resourceName;
}
@@ -325,7 +327,8 @@ private ListenableFuture<Void> startAsyncUpload(
}

UUID uploadId = UUID.randomUUID();
String resourceName = buildUploadResourceName(instanceName, uploadId, digest);
String resourceName =
buildUploadResourceName(instanceName, uploadId, digest, chunker.isCompressed());
AsyncUpload newUpload =
new AsyncUpload(
context,
@@ -405,7 +408,8 @@ ListenableFuture<Void> start() {
() ->
retrier.executeAsync(
() -> {
if (committedOffset.get() < chunker.getSize()) {
if (chunker.bytesLeft() != 0
|| committedOffset.get() < chunker.getActualSize()) {
return callAndQueryOnFailure(committedOffset, progressiveBackoff);
}
return Futures.immediateFuture(null);
@@ -417,7 +421,7 @@ ListenableFuture<Void> start() {
callFuture,
(result) -> {
long committedSize = committedOffset.get();
long expected = chunker.getSize();
long expected = chunker.getActualSize();
if (committedSize != expected) {
String message =
format(
96 changes: 91 additions & 5 deletions src/main/java/com/google/devtools/build/lib/remote/Chunker.java
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.github.luben.zstd.ZstdOutputStream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
@@ -25,12 +26,14 @@
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

/**
@@ -55,6 +58,10 @@ static int getDefaultChunkSize() {
return defaultChunkSize;
}

public boolean isCompressed() {
return compressed;
}

/** A piece of a byte[] blob. */
public static final class Chunk {

@@ -102,15 +109,23 @@ public int hashCode() {
private long offset;
private byte[] chunkCache;


private final boolean compressed;
private ByteArrayOutputStream baos;
private ZstdOutputStream zos;

// Set to true on the first call to next(). This is so that the Chunker can open its data source
// lazily on the first call to next(), as opposed to opening it in the constructor or on reset().
private boolean initialized;

Chunker(Supplier<InputStream> dataSupplier, long size, int chunkSize) {
private AtomicLong actualSize = new AtomicLong(0);

Chunker(Supplier<InputStream> dataSupplier, long size, int chunkSize, boolean compressed) {
this.dataSupplier = checkNotNull(dataSupplier);
this.size = size;
this.chunkSize = chunkSize;
this.emptyChunk = new Chunk(ByteString.EMPTY, 0);
this.compressed = compressed;
}

public long getOffset() {
@@ -121,6 +136,13 @@ public long getSize() {
return size;
}

public long getActualSize() {
long actualSize = this.actualSize.get();
checkState(bytesLeft() == 0);
checkState(compressed || actualSize == size);
return actualSize;
}

/**
* Reset the {@link Chunker} state to when it was newly constructed.
*
@@ -130,6 +152,14 @@ public void reset() throws IOException {
if (data != null) {
data.close();
}
if (zos != null) {
zos.close();
zos = null;
}
if (baos != null) {
baos.close();
baos = null;
}
data = null;
offset = 0;
initialized = false;
@@ -146,7 +176,35 @@ public void seek(long toOffset) throws IOException {
reset();
}
maybeInitialize();
ByteStreams.skipFully(data, toOffset - offset);
if (compressed && toOffset > 0) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ZstdOutputStream zos = new ZstdOutputStream(baos);
long remaining = toOffset;

while (remaining > 0) {
int toRead = (int) Math.min(chunkSize, remaining);
byte[] chunk = new byte[toRead];
int read = 0;
while(read != toRead) {
int n = data.read(chunk, read, toRead - read);
if (n < 0) {
throw new EOFException("Reached end of stream before finishing seeking!");
}
read += n;
}
remaining -= toRead;
zos.write(chunk);
zos.flush();
if (remaining == 0 && toOffset == size) {
zos.close();
}
actualSize.addAndGet(baos.toByteArray().length);
baos.reset();
}
} else {
ByteStreams.skipFully(data, toOffset - offset);
actualSize.addAndGet(toOffset);
}
offset = toOffset;
}

@@ -201,10 +259,25 @@ public Chunk next() throws IOException {
throw new IllegalStateException("Reached EOF, but expected "
+ bytesToRead + " bytes.", e);
}
offset += bytesToRead;

ByteString blob = ByteString.copyFrom(chunkCache, 0, bytesToRead);
ByteString blob;
if (compressed) {
zos.write(chunkCache, 0, bytesToRead);
zos.flush();
if (size - offsetBefore - bytesToRead == 0) {
zos.close();
}
byte[] compressed = baos.toByteArray();
baos.reset();
blob = ByteString.copyFrom(compressed, 0, compressed.length);
} else {
blob = ByteString.copyFrom(chunkCache, 0, bytesToRead);
}
actualSize.addAndGet(blob.size());

// This has to happen after actualSize has been updated
// or the guard in getActualSize won't work.
offset += bytesToRead;
if (bytesLeft() == 0) {
data.close();
data = null;
@@ -225,12 +298,19 @@ private void maybeInitialize() throws IOException {
checkState(data == null);
checkState(offset == 0);
checkState(chunkCache == null);
checkState(zos == null);
checkState(baos == null);
try {
data = dataSupplier.get();
} catch (RuntimeException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw e;
}
if (compressed) {
baos = new ByteArrayOutputStream();
zos = new ZstdOutputStream(baos);
}
actualSize = new AtomicLong(0);
initialized = true;
}

@@ -242,6 +322,7 @@ public static Builder builder() {
public static class Builder {
private int chunkSize = getDefaultChunkSize();
private long size;
private boolean compressed;
private Supplier<InputStream> inputStream;

public Builder setInput(byte[] data) {
@@ -251,6 +332,11 @@ public Builder setInput(byte[] data) {
return this;
}

public Builder setCompressed(boolean compressed) {
this.compressed = compressed;
return this;
}

public Builder setInput(long size, InputStream in) {
checkState(inputStream == null);
checkNotNull(in);
@@ -305,7 +391,7 @@ public Builder setChunkSize(int chunkSize) {

public Chunker build() {
checkNotNull(inputStream);
return new Chunker(inputStream, size, chunkSize);
return new Chunker(inputStream, size, chunkSize, compressed);
}
}
}
Original file line number Diff line number Diff line change
@@ -438,7 +438,10 @@ public ListenableFuture<Void> uploadFile(
return uploader.uploadBlobAsync(
context,
digest,
Chunker.builder().setInput(digest.getSizeBytes(), path).build(),
Chunker.builder()
.setInput(digest.getSizeBytes(), path)
.setCompressed(options.cacheByteStreamCompression)
.build(),
/* forceUpload= */ true);
}

@@ -448,7 +451,10 @@ public ListenableFuture<Void> uploadBlob(
return uploader.uploadBlobAsync(
context,
digest,
Chunker.builder().setInput(data.toByteArray()).build(),
Chunker.builder()
.setInput(data.toByteArray())
.setCompressed(options.cacheByteStreamCompression)
.build(),
/* forceUpload= */ true);
}
}
Original file line number Diff line number Diff line change
@@ -130,7 +130,7 @@ public void resourcesShouldBeReleased() throws IOException {
return in.get();
};

Chunker chunker = new Chunker(supplier, data.length, 1);
Chunker chunker = new Chunker(supplier, data.length, 1, false);
assertThat(in.get()).isNull();
assertNextEquals(chunker, (byte) 1);
Mockito.verify(in.get(), Mockito.never()).close();