Skip to content

Commit

Permalink
[Compression] ZstdChunker subclass
Browse files Browse the repository at this point in the history
  • Loading branch information
AlessandroPatti committed Sep 29, 2021
1 parent db1c4b8 commit a9c8861
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 98 deletions.
202 changes: 106 additions & 96 deletions src/main/java/com/google/devtools/build/lib/remote/Chunker.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* case of error or when a data source does not get fully consumed, a user must call
* {@link #reset()} manually.
*/
public final class Chunker {
public abstract class Chunker {

private static int defaultChunkSize = 1024 * 16;

Expand All @@ -58,10 +58,6 @@ static int getDefaultChunkSize() {
return defaultChunkSize;
}

public boolean isCompressed() {
return compressed;
}

/** A piece of a byte[] blob. */
public static final class Chunk {

Expand Down Expand Up @@ -109,26 +105,25 @@ public int hashCode() {
private long offset;
private byte[] chunkCache;


private final boolean compressed;
private ByteArrayOutputStream baos;
private ZstdOutputStream zos;

// Set to true on the first call to next(). This is so that the Chunker can open its data source
// lazily on the first call to next(), as opposed to opening it in the constructor or on reset().
private boolean initialized;

private AtomicLong processedBytes = new AtomicLong(0);
private long actualSize = -1;

Chunker(Supplier<InputStream> dataSupplier, long size, int chunkSize, boolean compressed) {
Chunker(Supplier<InputStream> dataSupplier, long size, int chunkSize) {
this.dataSupplier = checkNotNull(dataSupplier);
this.size = size;
this.chunkSize = chunkSize;
this.emptyChunk = new Chunk(ByteString.EMPTY, 0);
this.compressed = compressed;
}

public abstract boolean isCompressed();

public abstract long getActualSize() throws IOException;

protected abstract void initialise() throws IOException;

protected abstract ByteString parseData(byte[] data, int len) throws IOException;

public long getOffset() {
return offset;
}
Expand All @@ -137,27 +132,6 @@ public long getSize() {
return size;
}

public long getActualSize() throws IOException {
if (compressed) {
if (actualSize == -1) {
if (bytesLeft() != 0) {
// If there are bytes left, compute the remaining size
long currentOffset = offset;
while (hasNext()) {
next();
}
actualSize = processedBytes.get();
seek(currentOffset);
} else {
actualSize = processedBytes.get();
}
}
return actualSize;
} else {
return size;
}
}

/**
* Reset the {@link Chunker} state to when it was newly constructed.
*
Expand All @@ -167,14 +141,6 @@ public void reset() throws IOException {
if (data != null) {
data.close();
}
if (zos != null) {
zos.close();
zos = null;
}
if (baos != null) {
baos.close();
baos = null;
}
data = null;
offset = 0;
initialized = false;
Expand All @@ -191,34 +157,14 @@ public void seek(long toOffset) throws IOException {
reset();
}
maybeInitialize();
if (compressed && toOffset > 0) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ZstdOutputStream zos = new ZstdOutputStream(baos);
long remaining = toOffset;
if (toOffset > offset) {
long remaining = toOffset - offset;

while (remaining > 0) {
int toRead = (int) Math.min(chunkSize, remaining);
byte[] chunk = new byte[toRead];
int read = 0;
while(read != toRead) {
int n = data.read(chunk, read, toRead - read);
if (n < 0) {
throw new EOFException("Reached end of stream before finishing seeking!");
}
read += n;
}
next(toRead);
remaining -= toRead;
zos.write(chunk);
zos.flush();
if (remaining == 0 && toOffset == size) {
zos.close();
}
processedBytes.addAndGet(baos.toByteArray().length);
baos.reset();
}
} else {
ByteStreams.skipFully(data, toOffset - offset);
processedBytes.addAndGet(toOffset);
}
offset = toOffset;
}
Expand All @@ -240,6 +186,10 @@ public boolean hasNext() {
* returned.
*/
public Chunk next() throws IOException {
return next((int) Math.min(bytesLeft(), chunkSize));
}

private Chunk next(int bytesToRead) throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
Expand All @@ -252,19 +202,19 @@ public Chunk next() throws IOException {
}

// The cast to int is safe, because the return value is capped at chunkSize.
int bytesToRead = (int) Math.min(bytesLeft(), chunkSize);
if (bytesToRead == 0) {
chunkCache = null;
data = null;
throw new NoSuchElementException();
}

if (chunkCache == null) {
int cacheSize = (int) Math.min(bytesLeft(), chunkSize);
// Lazily allocate it in order to save memory on small data.
// 1) bytesToRead < chunkSize: There will only ever be one next() call.
// 2) bytesToRead == chunkSize: chunkCache will be set to its biggest possible value.
// 3) bytestoRead > chunkSize: Not possible, due to Math.min above.
chunkCache = new byte[bytesToRead];
// 1) cacheSize < chunkSize: There will only ever be one next() call.
// 2) cacheSize == chunkSize: chunkCache will be set to its biggest possible value.
// 3) cacheSize > chunkSize: Not possible, due to Math.min above.
chunkCache = new byte[cacheSize];
}

long offsetBefore = offset;
Expand All @@ -275,23 +225,8 @@ public Chunk next() throws IOException {
+ bytesToRead + " bytes.", e);
}

ByteString blob;
if (compressed) {
zos.write(chunkCache, 0, bytesToRead);
zos.flush();
if (size - offsetBefore - bytesToRead == 0) {
zos.close();
}
byte[] compressed = baos.toByteArray();
baos.reset();
blob = ByteString.copyFrom(compressed, 0, compressed.length);
} else {
blob = ByteString.copyFrom(chunkCache, 0, bytesToRead);
}
processedBytes.addAndGet(blob.size());
ByteString blob = parseData(chunkCache, bytesToRead);

// This has to happen after actualSize has been updated
// or the guard in getActualSize won't work.
offset += bytesToRead;
if (bytesLeft() == 0) {
data.close();
Expand All @@ -313,19 +248,13 @@ private void maybeInitialize() throws IOException {
checkState(data == null);
checkState(offset == 0);
checkState(chunkCache == null);
checkState(zos == null);
checkState(baos == null);
try {
data = dataSupplier.get();
} catch (RuntimeException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw e;
}
if (compressed) {
baos = new ByteArrayOutputStream();
zos = new ZstdOutputStream(baos);
}
processedBytes = new AtomicLong(0);
initialise();
initialized = true;
}

Expand Down Expand Up @@ -406,7 +335,88 @@ public Builder setChunkSize(int chunkSize) {

public Chunker build() {
checkNotNull(inputStream);
return new Chunker(inputStream, size, chunkSize, compressed);
return compressed
? new ZstdChunker(inputStream, size, chunkSize)
: new PlainChunker(inputStream, size, chunkSize);
}
}

public static class PlainChunker extends Chunker {
public PlainChunker(Supplier<InputStream> dataSupplier, long size, int chunkSize) {
super(dataSupplier, size, chunkSize);
}

@Override
public boolean isCompressed() {
return false;
}

@Override
public long getActualSize() throws IOException {
return getSize();
}

@Override
protected void initialise() {}

@Override
protected ByteString parseData(byte[] data, int len) throws IOException {
return ByteString.copyFrom(data, 0, len);
}
}

public static class ZstdChunker extends Chunker {
private long actualSize = -1;
private final AtomicLong processedBytes;
private ByteArrayOutputStream baos;
private ZstdOutputStream zos;

public ZstdChunker(Supplier<InputStream> inputStream, long size, int chunkSize) {
super(inputStream, size, chunkSize);
processedBytes = new AtomicLong(0);
}

@Override
public boolean isCompressed() {
return true;
}

@Override
public long getActualSize() throws IOException {
if (actualSize == -1) {
if (bytesLeft() != 0) {
// If there are bytes left, compute the remaining size
long currentOffset = getOffset();
while (hasNext()) {
next();
}
actualSize = processedBytes.get();
seek(currentOffset);
} else {
actualSize = processedBytes.get();
}
}
return actualSize;
}

@Override
protected void initialise() throws IOException {
baos = new ByteArrayOutputStream();
zos = new ZstdOutputStream(baos);
processedBytes.set(0);
}

@Override
protected ByteString parseData(byte[] data, int len) throws IOException {
zos.write(data, 0, len);
zos.flush();
if (getSize() - getOffset() - len == 0) {
zos.close();
}
byte[] compressed = baos.toByteArray();
processedBytes.addAndGet(baos.size());
baos.reset();
return ByteString.copyFrom(compressed, 0, compressed.length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void resourcesShouldBeReleased() throws IOException {
return in.get();
};

Chunker chunker = new Chunker(supplier, data.length, 1, false);
Chunker chunker = new Chunker.PlainChunker(supplier, data.length, 1);
assertThat(in.get()).isNull();
assertNextEquals(chunker, (byte) 1);
Mockito.verify(in.get(), Mockito.never()).close();
Expand Down Expand Up @@ -204,15 +204,21 @@ public void testMultiChunkCompressed() throws IOException {
@Test
public void testActualSizeIsCorrectAfterSeek() throws IOException {
byte[] data = {72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33};
int[] expectedSizes = {12, 33};
int[] expectedSizes = {12, 24};
for (int expected : expectedSizes) {
Chunker chunker =
Chunker.builder()
.setInput(data)
.setChunkSize(data.length * 2)
.setCompressed(expected != data.length)
.build();
chunker.next();
assertThat(chunker.hasNext()).isFalse();
assertThat(chunker.getActualSize()).isEqualTo(expected);

chunker.reset();
chunker.seek(5);
assertThat(chunker.hasNext()).isTrue();
chunker.next();
assertThat(chunker.hasNext()).isFalse();
assertThat(chunker.getActualSize()).isEqualTo(expected);
Expand Down

0 comments on commit a9c8861

Please sign in to comment.